@@ -1051,7 +1051,16 @@ void VersionSet::InstallVersionSnapshot(Version* v, VersionEdit* edit) {
1051
1051
}
1052
1052
}
1053
1053
1054
- // dump version and snapshot
1054
+ // multi thread safe
1055
+ // Information kept for every waiting manifest writer
1056
+ struct VersionSet ::ManifestWriter {
1057
+ Status status;
1058
+ VersionEdit* edit;
1059
+ bool done;
1060
+ port::CondVar cv;
1061
+
1062
+ explicit ManifestWriter (port::Mutex* mu) : done(false ), cv(mu) { }
1063
+ };
1055
1064
Status VersionSet::LogAndApply (VersionEdit* edit, port::Mutex* mu) {
1056
1065
if (edit->has_log_number_ ) {
1057
1066
assert (edit->log_number_ >= log_number_);
@@ -1074,6 +1083,17 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
1074
1083
edit->SetLastSequence (last_sequence_);
1075
1084
}
1076
1085
1086
+ mu->AssertHeld ();
1087
+ // multi write control, do not batch edit write, but multi thread safety
1088
+ ManifestWriter w (mu);
1089
+ w.edit = edit;
1090
+ manifest_writers_.push_back (&w);
1091
+ while (!w.done && &w != manifest_writers_.front ()) {
1092
+ w.cv .Wait ();
1093
+ }
1094
+ assert (manifest_writers_.front () == &w);
1095
+
1096
+ // first manifest writer, batch edit
1077
1097
Version* v = new Version (this );
1078
1098
{
1079
1099
VersionSetBuilder builder (this , current_);
@@ -1189,6 +1209,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
1189
1209
Log (options_->info_log , " [%s][dfs error] set force_switch_manifest" , dbname_.c_str ());
1190
1210
}
1191
1211
1212
+ // TODO: batch write manifest finish, no need
1213
+ manifest_writers_.pop_front ();
1214
+ if (!manifest_writers_.empty ()) {
1215
+ manifest_writers_.front ()->cv .Signal ();
1216
+ }
1192
1217
return s;
1193
1218
}
1194
1219
0 commit comments