Skip to content

Commit 59732c0

Browse files
authored
Generate the unique id for vineyard objects/blobs. (#1988)
Fixes #1987 Signed-off-by: Ye Cao <[email protected]>
1 parent ee11896 commit 59732c0

File tree

5 files changed

+171
-58
lines changed

5 files changed

+171
-58
lines changed

src/common/util/uuid.h

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ limitations under the License.
2323
#include <mach/mach.h>
2424
#endif
2525

26+
#include <atomic>
27+
#include <chrono>
2628
#include <cstdint>
2729
#include <cstdlib>
2830
#include <ctime>
@@ -141,31 +143,67 @@ using SessionID = int64_t;
141143
*/
142144
using PlasmaID = std::string;
143145

146+
class IDGenerator {
147+
public:
148+
static IDGenerator& getInstance() {
149+
static IDGenerator instance;
150+
return instance;
151+
}
152+
153+
ObjectID GenerateID(InstanceID id = 0) {
154+
auto timestamp = GetCurrentTimestamp();
155+
auto instance_id = id & 0x3FFUL;
156+
uint64_t sequence = sequence_.fetch_add(1) & sequence_mask;
157+
158+
return ((timestamp << timestamp_shift) |
159+
(instance_id << instance_id_shift) | sequence);
160+
}
161+
162+
private:
163+
const uint64_t timestamp_shift = 22; // 41 bits for timestamp
164+
const uint64_t instance_id_shift = 12; // 10 bits for instance id
165+
const uint64_t sequence_mask = 0xFFFUL; // 12 bits for sequence number
166+
167+
std::atomic<uint64_t> sequence_{0};
168+
169+
IDGenerator() = default;
170+
171+
uint64_t GetCurrentTimestamp() {
172+
auto now = std::chrono::high_resolution_clock::now();
173+
auto ts = std::chrono::duration_cast<std::chrono::milliseconds>(
174+
now.time_since_epoch())
175+
.count();
176+
return (ts & 0x1FFFFFFFFFF);
177+
}
178+
};
179+
144180
/*
145181
* @brief Make empty blob and preallocate blob always mapping to the same place
146182
* Others will be mapped randomly between
147183
* (0x8000000000000000UL,0xFFFFFFFFFFFFFFFFUL) exclusively.
148184
*/
149185
inline ObjectID GenerateBlobID(const uintptr_t ptr) {
186+
static IDGenerator& idGenerator = IDGenerator::getInstance();
150187
if (ptr == 0x8000000000000000UL ||
151188
ptr == std::numeric_limits<uintptr_t>::max()) {
152189
return static_cast<uint64_t>(ptr) | 0x8000000000000000UL;
153190
}
154-
auto ts = detail::cycleclock::now() % (0x7FFFFFFFFFFFFFFFUL - 2) + 1;
155-
return (0x7FFFFFFFFFFFFFFFUL & static_cast<uint64_t>(ts)) |
156-
0x8000000000000000UL;
191+
return (idGenerator.GenerateID() | 0x8000000000000000UL);
157192
}
158193

159194
inline SessionID GenerateSessionID() {
160-
return 0x7FFFFFFFFFFFFFFFUL & detail::cycleclock::now();
195+
static IDGenerator& idGenerator = IDGenerator::getInstance();
196+
return 0x7FFFFFFFFFFFFFFFUL & idGenerator.GenerateID();
161197
}
162198

163-
inline ObjectID GenerateObjectID() {
164-
return 0x7FFFFFFFFFFFFFFFUL & detail::cycleclock::now();
199+
inline ObjectID GenerateObjectID(InstanceID instance_id = 0) {
200+
static IDGenerator& idGenerator = IDGenerator::getInstance();
201+
return 0x7FFFFFFFFFFFFFFFUL & idGenerator.GenerateID(instance_id);
165202
}
166203

167-
inline ObjectID GenerateSignature() {
168-
return 0x7FFFFFFFFFFFFFFFUL & detail::cycleclock::now();
204+
inline ObjectID GenerateSignature(InstanceID instance_id = 0) {
205+
static IDGenerator& idGenerator = IDGenerator::getInstance();
206+
return 0x7FFFFFFFFFFFFFFFUL & idGenerator.GenerateID(instance_id);
169207
}
170208

171209
const std::string ObjectIDToString(const ObjectID id);

src/server/server/vineyard_server.cc

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,8 @@ Status VineyardServer::ListName(
454454

455455
namespace detail {
456456

457-
Status validate_metadata(const json& tree, json& result, Signature& signature) {
457+
Status validate_metadata(const json& tree, json& result, Signature& signature,
458+
InstanceID instance_id) {
458459
// validate typename
459460
auto type_name_node = tree.value("typename", json(nullptr));
460461
if (type_name_node.is_null() || !type_name_node.is_string()) {
@@ -468,7 +469,7 @@ Status validate_metadata(const json& tree, json& result, Signature& signature) {
468469
RETURN_ON_ASSERT(tree.contains("instance_id"),
469470
"The instance_id filed must be presented");
470471
result = tree;
471-
signature = GenerateSignature();
472+
signature = GenerateSignature(instance_id);
472473
if (result.find("signature") != result.end()) {
473474
signature = result["signature"].get<Signature>();
474475
} else {
@@ -485,20 +486,21 @@ Status validate_metadata(const json& tree, json& result, Signature& signature) {
485486

486487
Status put_members_recursively(
487488
std::shared_ptr<IMetaService> metadata_service_ptr, const json& meta,
488-
json& tree, std::string const& instance_name) {
489+
json& tree, std::string const& instance_name, InstanceID instance_id) {
489490
for (auto& item : tree.items()) {
490491
if (item.value().is_object()) {
491492
auto& sub_tree = item.value();
492493
if (!sub_tree.contains("id")) {
493494
Signature signature;
494-
RETURN_ON_ERROR(validate_metadata(sub_tree, sub_tree, signature));
495+
RETURN_ON_ERROR(
496+
validate_metadata(sub_tree, sub_tree, signature, instance_id));
495497

496498
// recursively create members
497-
RETURN_ON_ERROR(put_members_recursively(metadata_service_ptr, meta,
498-
sub_tree, instance_name));
499+
RETURN_ON_ERROR(put_members_recursively(
500+
metadata_service_ptr, meta, sub_tree, instance_name, instance_id));
499501

500502
Status s;
501-
ObjectID id = GenerateObjectID();
503+
ObjectID id = GenerateObjectID(instance_id);
502504
InstanceID computed_instance_id = 0;
503505
std::vector<meta_tree::op_t> ops;
504506
VCATCH_JSON_ERROR(
@@ -545,18 +547,18 @@ Status VineyardServer::CreateData(
545547
InstanceID& computed_instance_id) {
546548
if (status.ok()) {
547549
auto decorated_tree = json::object();
548-
RETURN_ON_ERROR(
549-
detail::validate_metadata(tree, decorated_tree, signature));
550+
RETURN_ON_ERROR(detail::validate_metadata(
551+
tree, decorated_tree, signature, self->instance_id()));
550552

551553
// expand trees: for putting many metadatas in a single call
552554
if (recursive) {
553555
RETURN_ON_ERROR(detail::put_members_recursively(
554556
self->meta_service_ptr_, meta, decorated_tree,
555-
self->instance_name_));
557+
self->instance_name_, self->instance_id()));
556558
}
557559

558560
Status s;
559-
id = GenerateObjectID();
561+
id = GenerateObjectID(self->instance_id());
560562
VCATCH_JSON_ERROR(
561563
meta, s,
562564
meta_tree::PutDataOps(meta, self->instance_name(), id,
@@ -590,8 +592,8 @@ Status VineyardServer::CreateData(
590592
for (auto const& tree : trees) {
591593
Signature signature;
592594
auto decorated_tree = json::object();
593-
RETURN_ON_ERROR(
594-
detail::validate_metadata(tree, decorated_tree, signature));
595+
RETURN_ON_ERROR(detail::validate_metadata(
596+
tree, decorated_tree, signature, self->instance_id()));
595597
signatures.emplace_back(signature);
596598
decorated_trees.emplace_back(decorated_tree);
597599
}
@@ -601,12 +603,12 @@ Status VineyardServer::CreateData(
601603
for (auto& decorated_tree : decorated_trees) {
602604
RETURN_ON_ERROR(detail::put_members_recursively(
603605
self->meta_service_ptr_, meta, decorated_tree,
604-
self->instance_name_));
606+
self->instance_name_, self->instance_id()));
605607
}
606608
}
607609

608610
for (auto& decorated_tree : decorated_trees) {
609-
ObjectID id = GenerateObjectID();
611+
ObjectID id = GenerateObjectID(self->instance_id());
610612
InstanceID computed_instance_id = UnspecifiedInstanceID();
611613
Status s;
612614
VCATCH_JSON_ERROR(meta, s,
@@ -726,7 +728,7 @@ Status VineyardServer::ShallowCopy(const ObjectID id,
726728
ENSURE_VINEYARDD_READY();
727729
auto self(shared_from_this());
728730
RETURN_ON_ASSERT(!IsBlob(id), "The blobs cannot be shallow copied");
729-
ObjectID target_id = GenerateObjectID();
731+
ObjectID target_id = GenerateObjectID(self->instance_id());
730732
meta_service_ptr_->RequestToShallowCopy(
731733
[id, extra_metadata, target_id](const Status& status, const json& meta,
732734
std::vector<meta_tree::op_t>& ops,

test/concurrent_id_test.cc

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/** Copyright 2020-2023 Alibaba Group Holding Limited.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
#include <cstdint>
17+
#include <iostream>
18+
#include <mutex>
19+
#include <thread>
20+
#include <unordered_set>
21+
#include <vector>
22+
23+
#include "common/util/logging.h"
24+
#include "common/util/status.h"
25+
26+
#include "client/client.h"
27+
#include "client/ds/blob.h"
28+
29+
using namespace vineyard; // NOLINT(build/namespaces)
30+
31+
const int num_threads = 16;
32+
const int ids_per_thread = 10000;
33+
34+
void testGenerateBlobID(std::string ipc_socket) {
35+
std::unordered_set<uint64_t> blob_ids;
36+
std::mutex mtx;
37+
38+
auto generate_blob_id = [&]() {
39+
auto client = std::make_shared<Client>();
40+
VINEYARD_CHECK_OK(client->Connect(ipc_socket));
41+
for (int i = 0; i < ids_per_thread; ++i) {
42+
std::unique_ptr<BlobWriter> blob_writer;
43+
44+
VINEYARD_CHECK_OK(client->CreateBlob(1, blob_writer));
45+
auto blob_id = blob_writer->id();
46+
std::lock_guard<std::mutex> lock(mtx);
47+
auto result = blob_ids.insert(blob_id);
48+
if (!result.second) {
49+
LOG(ERROR) << "Duplicated blob id: " << blob_id;
50+
}
51+
CHECK(result.second == true);
52+
}
53+
VINEYARD_CHECK_OK(client->Clear());
54+
};
55+
56+
std::vector<std::thread> threads;
57+
for (int i = 0; i < num_threads; ++i) {
58+
threads.emplace_back(generate_blob_id);
59+
}
60+
for (auto& thread : threads) {
61+
thread.join();
62+
}
63+
}
64+
65+
void testGenerateObjectID(std::string ipc_socket) {
66+
std::unordered_set<uint64_t> object_ids;
67+
std::mutex mtx;
68+
69+
auto generate_object_id = [&]() {
70+
auto client = std::make_shared<Client>();
71+
VINEYARD_CHECK_OK(client->Connect(ipc_socket));
72+
for (int i = 0; i < ids_per_thread; ++i) {
73+
std::unique_ptr<BlobWriter> blob_writer;
74+
VINEYARD_CHECK_OK(client->CreateBlob(1, blob_writer));
75+
std::shared_ptr<Object> object = blob_writer->Seal(*client.get());
76+
auto object_id = object->id();
77+
std::lock_guard<std::mutex> lock(mtx);
78+
auto result = object_ids.insert(object_id);
79+
if (!result.second) {
80+
LOG(ERROR) << "Duplicated object id: " << object_id;
81+
}
82+
CHECK(result.second == true);
83+
}
84+
VINEYARD_CHECK_OK(client->Clear());
85+
};
86+
87+
std::vector<std::thread> threads;
88+
for (int i = 0; i < num_threads; ++i) {
89+
threads.emplace_back(generate_object_id);
90+
}
91+
for (auto& thread : threads) {
92+
thread.join();
93+
}
94+
}
95+
96+
int main(int argc, char** argv) {
97+
if (argc < 2) {
98+
printf("usage ./concurrent_id_test <ipc_socket>");
99+
return 1;
100+
}
101+
std::string ipc_socket = std::string(argv[1]);
102+
103+
testGenerateBlobID(ipc_socket);
104+
testGenerateObjectID(ipc_socket);
105+
return 0;
106+
}

test/id_test.cc

Lines changed: 0 additions & 33 deletions
This file was deleted.

test/runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ def run_vineyard_cpp_tests(meta, allocator, endpoints, tests):
483483
run_test(tests, 'hashmap_test')
484484
run_test(tests, 'hashmap_mvcc_test')
485485
# run_test(tests, 'hosseinmoein_dataframe_test')
486-
run_test(tests, 'id_test')
486+
run_test(tests, 'concurrent_id_test')
487487
run_test(tests, 'invalid_connect_test', '127.0.0.1:%d' % rpc_socket_port)
488488
run_test(tests, 'large_meta_test')
489489
run_test(tests, 'list_object_test')

0 commit comments

Comments
 (0)