@@ -52,7 +52,6 @@ public class BlobUploadProcessor
52
52
private readonly BlobContainerClient testRunsContainerClient ;
53
53
private readonly BlobContainerClient buildDefinitionsContainerClient ;
54
54
private readonly BlobContainerClient buildFailuresContainerClient ;
55
- private readonly QueueClient queueClient ;
56
55
private readonly IOptions < PipelineWitnessSettings > options ;
57
56
private readonly Dictionary < string , int ? > cachedDefinitionRevisions = new ( ) ;
58
57
private readonly IFailureAnalyzer failureAnalyzer ;
@@ -61,7 +60,6 @@ public BlobUploadProcessor(
61
60
ILogger < BlobUploadProcessor > logger ,
62
61
BuildLogProvider logProvider ,
63
62
BlobServiceClient blobServiceClient ,
64
- QueueServiceClient queueServiceClient ,
65
63
BuildHttpClient buildClient ,
66
64
TestResultsHttpClient testResultsClient ,
67
65
IOptions < PipelineWitnessSettings > options ,
@@ -72,11 +70,6 @@ public BlobUploadProcessor(
72
70
throw new ArgumentNullException ( nameof ( blobServiceClient ) ) ;
73
71
}
74
72
75
- if ( queueServiceClient == null )
76
- {
77
- throw new ArgumentNullException ( nameof ( queueServiceClient ) ) ;
78
- }
79
-
80
73
this . logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
81
74
this . options = options ?? throw new ArgumentNullException ( nameof ( options ) ) ;
82
75
this . logProvider = logProvider ?? throw new ArgumentNullException ( nameof ( logProvider ) ) ;
@@ -89,8 +82,6 @@ public BlobUploadProcessor(
89
82
this . buildFailuresContainerClient = blobServiceClient . GetBlobContainerClient ( BuildFailuresContainerName ) ;
90
83
this . testRunsContainerClient = blobServiceClient . GetBlobContainerClient ( TestRunsContainerName ) ;
91
84
this . buildDefinitionsContainerClient = blobServiceClient . GetBlobContainerClient ( BuildDefinitionsContainerName ) ;
92
- this . queueClient = queueServiceClient . GetQueueClient ( this . options . Value . BuildLogBundlesQueueName ) ;
93
- this . queueClient . CreateIfNotExists ( ) ;
94
85
this . failureAnalyzer = failureAnalyzer ;
95
86
}
96
87
@@ -178,15 +169,12 @@ public async Task UploadBuildBlobsAsync(string account, Guid projectId, int buil
178
169
logger . LogWarning ( "No logs available for build {Project}: {BuildId}" , build . Project . Name , build . Id ) ;
179
170
return ;
180
171
}
181
-
182
- var bundles = BuildLogBundles ( account , build , timeline , logs ) ;
183
172
184
- // We no longer process log bundles on separate messages.
185
- // During zero downtime upgrade phase, process all the bundles sequentially but allow for processing message in the log bundle queue
186
- // After the upgrade phase, this should be rewritten to remove bundling but keep the log -> timeline record association
187
- foreach ( var bundle in bundles )
173
+ var buildLogInfos = GetBuildLogInfos ( account , build , timeline , logs ) ;
174
+
175
+ foreach ( var log in buildLogInfos )
188
176
{
189
- await ProcessBuildLogBundleAsync ( bundle ) ;
177
+ await UploadLogLinesBlobAsync ( account , build , log ) ;
190
178
}
191
179
}
192
180
@@ -246,14 +234,6 @@ private async Task UploadBuildFailureBlobAsync(string account, Build build, Time
246
234
}
247
235
}
248
236
249
- public async Task ProcessBuildLogBundleAsync ( BuildLogBundle buildLogBundle )
250
- {
251
- foreach ( var log in buildLogBundle . TimelineLogs )
252
- {
253
- await UploadLogLinesBlobAsync ( buildLogBundle , log ) ;
254
- }
255
- }
256
-
257
237
public async Task UploadBuildDefinitionBlobsAsync ( string account , string projectName )
258
238
{
259
239
var definitions = await buildClient . GetFullDefinitionsAsync2 ( project : projectName ) ;
@@ -361,35 +341,14 @@ private async Task UploadBuildDefinitionBlobAsync(string account, BuildDefinitio
361
341
}
362
342
}
363
343
364
- private List < BuildLogBundle > BuildLogBundles ( string account , Build build , Timeline timeline , List < BuildLog > logs )
344
+ private List < BuildLogInfo > GetBuildLogInfos ( string account , Build build , Timeline timeline , List < BuildLog > logs )
365
345
{
366
- BuildLogBundle CreateBundle ( ) => new BuildLogBundle
367
- {
368
- Account = account ,
369
- BuildId = build . Id ,
370
- ProjectId = build . Project . Id ,
371
- ProjectName = build . Project . Name ,
372
- QueueTime = build . QueueTime . Value ,
373
- StartTime = build . StartTime . Value ,
374
- FinishTime = build . FinishTime . Value ,
375
- DefinitionId = build . Definition . Id ,
376
- DefinitionName = build . Definition . Name ,
377
- DefinitionPath = build . Definition . Path
378
- } ;
379
-
380
- BuildLogBundle currentBundle ;
381
- var logBundles = new List < BuildLogBundle > ( ) ;
382
- logBundles . Add ( currentBundle = CreateBundle ( ) ) ;
383
-
384
346
var logsById = logs . ToDictionary ( l => l . Id ) ;
385
347
348
+ var buildLogInfos = new List < BuildLogInfo > ( ) ;
349
+
386
350
foreach ( var log in logs )
387
351
{
388
- if ( currentBundle . TimelineLogs . Count >= this . options . Value . BuildLogBundleSize )
389
- {
390
- logBundles . Add ( currentBundle = CreateBundle ( ) ) ;
391
- }
392
-
393
352
var logRecords = timeline . Records . Where ( x => x . Log ? . Id == log . Id ) . ToArray ( ) ;
394
353
395
354
if ( logRecords . Length > 1 )
@@ -419,7 +378,7 @@ private List<BuildLogBundle> BuildLogBundles(string account, Build build, Timeli
419
378
}
420
379
}
421
380
422
- currentBundle . TimelineLogs . Add ( new BuildLogInfo
381
+ buildLogInfos . Add ( new BuildLogInfo
423
382
{
424
383
LogId = log . Id ,
425
384
LineCount = log . LineCount ,
@@ -430,7 +389,7 @@ private List<BuildLogBundle> BuildLogBundles(string account, Build build, Timeli
430
389
} ) ;
431
390
}
432
391
433
- return logBundles ;
392
+ return buildLogInfos ;
434
393
}
435
394
436
395
private async Task UploadBuildBlobAsync ( string account , Build build )
@@ -599,29 +558,29 @@ private async Task UploadTimelineBlobAsync(string account, Build build, Timeline
599
558
}
600
559
}
601
560
602
- private async Task UploadLogLinesBlobAsync ( BuildLogBundle build , BuildLogInfo log )
561
+ private async Task UploadLogLinesBlobAsync ( string account , Build build , BuildLogInfo log )
603
562
{
604
563
try
605
564
{
606
565
// we don't use FinishTime in the logs blob path to prevent duplicating logs when processing retries.
607
566
// i.e. logs with a given buildid/logid are immutable and retries only add new logs.
608
- var blobPath = $ "{ build . ProjectName } /{ build . QueueTime : yyyy/MM/dd} /{ build . BuildId } -{ log . LogId } .jsonl";
567
+ var blobPath = $ "{ build . Project . Name } /{ build . QueueTime : yyyy/MM/dd} /{ build . Id } -{ log . LogId } .jsonl";
609
568
var blobClient = this . buildLogLinesContainerClient . GetBlobClient ( blobPath ) ;
610
569
611
570
if ( await blobClient . ExistsAsync ( ) )
612
571
{
613
- this . logger . LogInformation ( "Skipping existing log for build {BuildId}, record {RecordId}, log {LogId}" , build . BuildId , log . RecordId , log . LogId ) ;
572
+ this . logger . LogInformation ( "Skipping existing log for build {BuildId}, record {RecordId}, log {LogId}" , build . Id , log . RecordId , log . LogId ) ;
614
573
return ;
615
574
}
616
575
617
- this . logger . LogInformation ( "Processing log for build {BuildId}, record {RecordId}, log {LogId}" , build . BuildId , log . RecordId , log . LogId ) ;
576
+ this . logger . LogInformation ( "Processing log for build {BuildId}, record {RecordId}, log {LogId}" , build . Id , log . RecordId , log . LogId ) ;
618
577
619
578
var lineNumber = 0 ;
620
579
var characterCount = 0 ;
621
580
622
581
// Over an open read stream and an open write stream, one line at a time, read, process, and write to
623
582
// blob storage
624
- using ( var logStream = await this . logProvider . GetLogStreamAsync ( build . ProjectName , build . BuildId , log . LogId ) )
583
+ using ( var logStream = await this . logProvider . GetLogStreamAsync ( build . Project . Name , build . Id , log . LogId ) )
625
584
using ( var logReader = new StreamReader ( logStream ) )
626
585
using ( var blobStream = await blobClient . OpenWriteAsync ( overwrite : true , new BlobOpenWriteOptions ( ) ) )
627
586
using ( var blobWriter = new StreamWriter ( blobStream ) )
@@ -657,13 +616,13 @@ private async Task UploadLogLinesBlobAsync(BuildLogBundle build, BuildLogInfo lo
657
616
658
617
await blobWriter . WriteLineAsync ( JsonConvert . SerializeObject ( new
659
618
{
660
- OrganizationName = build . Account ,
661
- ProjectId = build . ProjectId ,
662
- ProjectName = build . ProjectName ,
663
- BuildDefinitionId = build . DefinitionId ,
664
- BuildDefinitionPath = build . DefinitionPath ,
665
- BuildDefinitionName = build . DefinitionName ,
666
- BuildId = build . BuildId ,
619
+ OrganizationName = account ,
620
+ ProjectId = build . Project . Id ,
621
+ ProjectName = build . Project . Name ,
622
+ BuildDefinitionId = build . Definition . Id ,
623
+ BuildDefinitionPath = build . Definition . Path ,
624
+ BuildDefinitionName = build . Definition . Name ,
625
+ BuildId = build . Id ,
667
626
LogId = log . LogId ,
668
627
LineNumber = lineNumber ,
669
628
Length = message . Length ,
@@ -674,15 +633,15 @@ await blobWriter.WriteLineAsync(JsonConvert.SerializeObject(new
674
633
}
675
634
}
676
635
677
- logger . LogInformation ( "Processed {CharacterCount} characters and {LineCount} lines for build {BuildId}, record {RecordId}, log {LogId}" , characterCount , lineNumber , build . BuildId , log . RecordId , log . LogId ) ;
636
+ logger . LogInformation ( "Processed {CharacterCount} characters and {LineCount} lines for build {BuildId}, record {RecordId}, log {LogId}" , characterCount , lineNumber , build . Id , log . RecordId , log . LogId ) ;
678
637
}
679
638
catch ( RequestFailedException ex ) when ( ex . Status == ( int ) HttpStatusCode . Conflict )
680
639
{
681
- this . logger . LogInformation ( "Ignoring existing blob exception for build {BuildId}, record {RecordId}, log {LogId}" , build . BuildId , log . RecordId , log . LogId ) ;
640
+ this . logger . LogInformation ( "Ignoring existing blob exception for build {BuildId}, record {RecordId}, log {LogId}" , build . Id , log . RecordId , log . LogId ) ;
682
641
}
683
642
catch ( Exception ex )
684
643
{
685
- this . logger . LogError ( ex , "Error processing build {BuildId}, record {RecordId}, log {LogId}" , build . BuildId , log . RecordId , log . LogId ) ;
644
+ this . logger . LogError ( ex , "Error processing build {BuildId}, record {RecordId}, log {LogId}" , build . Id , log . RecordId , log . LogId ) ;
686
645
throw ;
687
646
}
688
647
}
0 commit comments