52
52
#include " flow/Trace.h"
53
53
54
54
#include < cinttypes>
55
+ #include < cstdint>
55
56
#include < ctime>
56
57
#include < climits>
57
58
#include " flow/IAsyncFile.h"
@@ -5084,6 +5085,7 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
5084
5085
static TaskParam<int64_t > maxTagID () { return __FUNCTION__sr; }
5085
5086
static TaskParam<Version> beginVersion () { return __FUNCTION__sr; }
5086
5087
static TaskParam<Version> endVersion () { return __FUNCTION__sr; }
5088
+ static TaskParam<int64_t > bytesWritten () { return __FUNCTION__sr; }
5087
5089
static TaskParam<std::vector<RestoreConfig::RestoreFile>> logs () { return __FUNCTION__sr; }
5088
5090
} Params;
5089
5091
@@ -5173,7 +5175,8 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
5173
5175
// Writes backup mutations to the database
5174
5176
ACTOR static Future<Void> writeMutations (Database cx,
5175
5177
std::vector<Standalone<VectorRef<KeyValueRef>>> mutations,
5176
- Key mutationLogPrefix) {
5178
+ Key mutationLogPrefix,
5179
+ Reference<Task> task) {
5177
5180
state Reference<ReadYourWritesTransaction> tr (new ReadYourWritesTransaction (cx));
5178
5181
state Standalone<VectorRef<KeyValueRef>> oldFormatMutations;
5179
5182
state int mutationIndex = 0 ;
@@ -5210,6 +5213,14 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
5210
5213
++mutationCount;
5211
5214
}
5212
5215
wait (tr->commit ());
5216
+ int64_t oldBytes = Params.bytesWritten ().get (task);
5217
+ Params.bytesWritten ().set (task, oldBytes + txBytes);
5218
+ DisabledTraceEvent (" FileRestorePartitionedLogCommittData" )
5219
+ .detail (" TaskInstance" , THIS_ADDR)
5220
+ .detail (" MutationIndex" , mutationIndex)
5221
+ .detail (" MutationCount" , mutationCount)
5222
+ .detail (" TotalMutation" , totalMutation)
5223
+ .detail (" Bytes" , txBytes);
5213
5224
mutationIndex += mutationCount; // update mutationIndex after the commit succeeds
5214
5225
} catch (Error& e) {
5215
5226
if (e.code () == error_code_transaction_too_large) {
@@ -5291,7 +5302,7 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
5291
5302
makeReference<PartitionedLogIteratorTwoBuffers>(bc, k, filesByTag[k], fileEndVersionByTag[k]);
5292
5303
}
5293
5304
5294
- DisabledTraceEvent (" RestoredPartitionedLogDataExeStart " )
5305
+ DisabledTraceEvent (" FileRestorePartitionedLogDataExeStart " )
5295
5306
.detail (" BeginVersion" , begin)
5296
5307
.detail (" EndVersion" , end)
5297
5308
.detail (" Files" , logs.size ())
@@ -5312,7 +5323,7 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
5312
5323
// batching mutations from multiple versions together before writing to the database
5313
5324
state int64_t bytes = oneVersionData.expectedSize ();
5314
5325
if (totalBytes + bytes > CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE ) {
5315
- wait (writeMutations (cx, mutations, restore.mutationLogPrefix ()));
5326
+ wait (writeMutations (cx, mutations, restore.mutationLogPrefix (), task ));
5316
5327
mutations.clear ();
5317
5328
totalBytes = 0 ;
5318
5329
}
@@ -5321,15 +5332,15 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
5321
5332
} catch (Error& e) {
5322
5333
if (e.code () == error_code_end_of_stream) {
5323
5334
if (mutations.size () > 0 ) {
5324
- wait (writeMutations (cx, mutations, restore.mutationLogPrefix ()));
5335
+ wait (writeMutations (cx, mutations, restore.mutationLogPrefix (), task ));
5325
5336
}
5326
5337
break ;
5327
5338
} else {
5328
5339
throw ;
5329
5340
}
5330
5341
}
5331
5342
}
5332
- DisabledTraceEvent (" RestoredPartitionedLogDataExeDone " )
5343
+ DisabledTraceEvent (" FileRestorePartitionedLogDataExeDone " )
5333
5344
.detail (" BeginVersion" , begin)
5334
5345
.detail (" EndVersion" , end)
5335
5346
.detail (" Files" , logs.size ())
@@ -5342,7 +5353,19 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
5342
5353
Reference<TaskBucket> taskBucket,
5343
5354
Reference<FutureBucket> futureBucket,
5344
5355
Reference<Task> task) {
5345
- RestoreConfig (task).fileBlocksFinished ().atomicOp (tr, 1 , MutationRef::Type::AddValue);
5356
+ state int64_t logBytesWritten = Params.bytesWritten ().get (task);
5357
+ RestoreConfig (task).bytesWritten ().atomicOp (tr, logBytesWritten, MutationRef::Type::AddValue);
5358
+
5359
+ int64_t blocks =
5360
+ (logBytesWritten + CLIENT_KNOBS->BACKUP_LOGFILE_BLOCK_SIZE - 1 ) / CLIENT_KNOBS->BACKUP_LOGFILE_BLOCK_SIZE ;
5361
+ // When dispatching, we don't know how many blocks are there, so we have to do it here
5362
+ RestoreConfig (task).filesBlocksDispatched ().atomicOp (tr, blocks, MutationRef::Type::AddValue);
5363
+ RestoreConfig (task).fileBlocksFinished ().atomicOp (tr, blocks, MutationRef::Type::AddValue);
5364
+
5365
+ DisabledTraceEvent (" FileRestorePartitionedLogCommittedData" )
5366
+ .detail (" TaskInstance" , THIS_ADDR)
5367
+ .detail (" Blocks" , blocks)
5368
+ .detail (" LogBytes" , logBytesWritten);
5346
5369
5347
5370
state Reference<TaskFuture> taskFuture = futureBucket->unpack (task->params [Task::reservedTaskParamKeyDone]);
5348
5371
wait (taskFuture->set (tr, taskBucket) && taskBucket->finish (tr, task));
@@ -5370,6 +5393,7 @@ struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase {
5370
5393
Params.beginVersion ().set (task, begin);
5371
5394
Params.endVersion ().set (task, end);
5372
5395
Params.logs ().set (task, logs);
5396
+ Params.bytesWritten ().set (task, 0 );
5373
5397
5374
5398
if (!waitFor) {
5375
5399
return taskBucket->addTask (tr, task);
@@ -5450,7 +5474,7 @@ struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase {
5450
5474
wait (success (RestoreDispatchPartitionedTaskFunc::addTask (
5451
5475
tr, taskBucket, task, firstVersion, beginVersion, endVersion)));
5452
5476
5453
- TraceEvent (" RestorePartitionDispatch " )
5477
+ TraceEvent (" FileRestorePartitionDispatch " )
5454
5478
.detail (" RestoreUID" , restore.getUid ())
5455
5479
.detail (" BeginVersion" , beginVersion)
5456
5480
.detail (" ApplyLag" , applyLag)
@@ -5519,7 +5543,7 @@ struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase {
5519
5543
// If apply lag is 0 then we are done so create the completion task
5520
5544
wait (success (RestoreCompleteTaskFunc::addTask (tr, taskBucket, task, TaskCompletionKey::noSignal ())));
5521
5545
5522
- TraceEvent (" RestorePartitionDispatch " )
5546
+ TraceEvent (" FileRestorePartitionDispatch " )
5523
5547
.detail (" RestoreUID" , restore.getUid ())
5524
5548
.detail (" BeginVersion" , beginVersion)
5525
5549
.detail (" ApplyLag" , applyLag)
@@ -5535,7 +5559,7 @@ struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase {
5535
5559
wait (success (RestoreDispatchPartitionedTaskFunc::addTask (
5536
5560
tr, taskBucket, task, firstVersion, beginVersion, endVersion)));
5537
5561
5538
- TraceEvent (" RestorePartitionDispatch " )
5562
+ TraceEvent (" FileRestorePartitionDispatch " )
5539
5563
.detail (" RestoreUID" , restore.getUid ())
5540
5564
.detail (" BeginVersion" , beginVersion)
5541
5565
.detail (" ApplyLag" , applyLag)
@@ -5589,7 +5613,7 @@ struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase {
5589
5613
wait (waitForAll (addTaskFutures));
5590
5614
wait (taskBucket->finish (tr, task));
5591
5615
5592
- TraceEvent (" RestorePartitionDispatch " )
5616
+ TraceEvent (" FileRestorePartitionDispatch " )
5593
5617
.detail (" RestoreUID" , restore.getUid ())
5594
5618
.detail (" BeginVersion" , beginVersion)
5595
5619
.detail (" EndVersion" , endVersion)
0 commit comments