17
17
18
18
import static com .google .common .base .Preconditions .checkNotNull ;
19
19
20
+ import java .time .Instant ;
21
+ import java .time .ZoneId ;
22
+ import java .time .format .DateTimeFormatter ;
23
+ import java .util .ArrayList ;
24
+ import java .util .List ;
20
25
import java .util .Map ;
21
26
import java .util .concurrent .CompletableFuture ;
22
27
import java .util .concurrent .ConcurrentHashMap ;
28
+ import java .util .concurrent .ConcurrentSkipListMap ;
23
29
import java .util .concurrent .CountDownLatch ;
24
30
import java .util .concurrent .Executors ;
25
31
import java .util .concurrent .ScheduledExecutorService ;
30
36
import org .apache .bookkeeper .client .BookKeeper ;
31
37
import org .apache .bookkeeper .conf .ClientConfiguration ;
32
38
import org .apache .bookkeeper .mledger .AsyncCallbacks ;
39
+ import org .apache .bookkeeper .mledger .AsyncCallbacks .ManagedLedgerInfoCallback ;
33
40
import org .apache .bookkeeper .mledger .AsyncCallbacks .OpenLedgerCallback ;
34
41
import org .apache .bookkeeper .mledger .ManagedLedger ;
35
42
import org .apache .bookkeeper .mledger .ManagedLedgerConfig ;
36
43
import org .apache .bookkeeper .mledger .ManagedLedgerException ;
44
+ import org .apache .bookkeeper .mledger .ManagedLedgerException .MetaStoreException ;
37
45
import org .apache .bookkeeper .mledger .ManagedLedgerFactory ;
38
46
import org .apache .bookkeeper .mledger .ManagedLedgerFactoryConfig ;
39
47
import org .apache .bookkeeper .mledger .ManagedLedgerFactoryMXBean ;
48
+ import org .apache .bookkeeper .mledger .ManagedLedgerInfo ;
49
+ import org .apache .bookkeeper .mledger .ManagedLedgerInfo .CursorInfo ;
50
+ import org .apache .bookkeeper .mledger .ManagedLedgerInfo .LedgerInfo ;
51
+ import org .apache .bookkeeper .mledger .ManagedLedgerInfo .MessageRangeInfo ;
52
+ import org .apache .bookkeeper .mledger .ManagedLedgerInfo .PositionInfo ;
40
53
import org .apache .bookkeeper .mledger .impl .ManagedLedgerImpl .ManagedLedgerInitializeLedgerCallback ;
41
54
import org .apache .bookkeeper .mledger .impl .ManagedLedgerImpl .State ;
55
+ import org .apache .bookkeeper .mledger .impl .MetaStore .MetaStoreCallback ;
56
+ import org .apache .bookkeeper .mledger .impl .MetaStore .Stat ;
57
+ import org .apache .bookkeeper .mledger .proto .MLDataFormats ;
58
+ import org .apache .bookkeeper .mledger .proto .MLDataFormats .ManagedCursorInfo ;
59
+ import org .apache .bookkeeper .mledger .proto .MLDataFormats .MessageRange ;
60
+ import org .apache .bookkeeper .mledger .util .Futures ;
42
61
import org .apache .bookkeeper .util .OrderedSafeExecutor ;
43
62
import org .apache .zookeeper .Watcher ;
44
63
import org .apache .zookeeper .ZooKeeper ;
@@ -290,6 +309,133 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
290
309
entryCacheManager .clear ();
291
310
}
292
311
312
+ @ Override
313
+ public ManagedLedgerInfo getManagedLedgerInfo (String name ) throws InterruptedException , ManagedLedgerException {
314
+ class Result {
315
+ ManagedLedgerInfo info = null ;
316
+ ManagedLedgerException e = null ;
317
+ }
318
+ final Result r = new Result ();
319
+ final CountDownLatch latch = new CountDownLatch (1 );
320
+ asyncGetManagedLedgerInfo (name , new ManagedLedgerInfoCallback () {
321
+ @ Override
322
+ public void getInfoComplete (ManagedLedgerInfo info , Object ctx ) {
323
+ r .info = info ;
324
+ latch .countDown ();
325
+ }
326
+
327
+ @ Override
328
+ public void getInfoFailed (ManagedLedgerException exception , Object ctx ) {
329
+ r .e = exception ;
330
+ latch .countDown ();
331
+ }
332
+ }, null );
333
+
334
+ latch .await ();
335
+
336
+ if (r .e != null ) {
337
+ throw r .e ;
338
+ }
339
+ return r .info ;
340
+ }
341
+
342
+ @ Override
343
+ public void asyncGetManagedLedgerInfo (String name , ManagedLedgerInfoCallback callback , Object ctx ) {
344
+ store .getManagedLedgerInfo (name , new MetaStoreCallback <MLDataFormats .ManagedLedgerInfo >() {
345
+ @ Override
346
+ public void operationComplete (MLDataFormats .ManagedLedgerInfo pbInfo , Stat stat ) {
347
+ ManagedLedgerInfo info = new ManagedLedgerInfo ();
348
+ info .version = stat .getVersion ();
349
+ info .creationDate = DATE_FORMAT .format (Instant .ofEpochMilli (stat .getCreationTimestamp ()));
350
+ info .modificationDate = DATE_FORMAT .format (Instant .ofEpochMilli (stat .getModificationTimestamp ()));
351
+
352
+ info .ledgers = new ArrayList <>(pbInfo .getLedgerInfoCount ());
353
+ for (int i = 0 ; i < pbInfo .getLedgerInfoCount (); i ++) {
354
+ MLDataFormats .ManagedLedgerInfo .LedgerInfo pbLedgerInfo = pbInfo .getLedgerInfo (i );
355
+ LedgerInfo ledgerInfo = new LedgerInfo ();
356
+ ledgerInfo .ledgerId = pbLedgerInfo .getLedgerId ();
357
+ ledgerInfo .entries = pbLedgerInfo .hasEntries () ? pbLedgerInfo .getEntries () : null ;
358
+ ledgerInfo .size = pbLedgerInfo .hasSize () ? pbLedgerInfo .getSize () : null ;
359
+ info .ledgers .add (ledgerInfo );
360
+ }
361
+
362
+ store .getCursors (name , new MetaStoreCallback <List <String >>() {
363
+ @ Override
364
+ public void operationComplete (List <String > cursorsList , Stat stat ) {
365
+ // Get the info for each cursor
366
+ info .cursors = new ConcurrentSkipListMap <>();
367
+ List <CompletableFuture <Void >> cursorsFutures = new ArrayList <>();
368
+
369
+ for (String cursorName : cursorsList ) {
370
+ CompletableFuture <Void > cursorFuture = new CompletableFuture <>();
371
+ cursorsFutures .add (cursorFuture );
372
+ store .asyncGetCursorInfo (name , cursorName ,
373
+ new MetaStoreCallback <MLDataFormats .ManagedCursorInfo >() {
374
+ @ Override
375
+ public void operationComplete (ManagedCursorInfo pbCursorInfo , Stat stat ) {
376
+ CursorInfo cursorInfo = new CursorInfo ();
377
+ cursorInfo .version = stat .getVersion ();
378
+ cursorInfo .creationDate = DATE_FORMAT
379
+ .format (Instant .ofEpochMilli (stat .getCreationTimestamp ()));
380
+ cursorInfo .modificationDate = DATE_FORMAT
381
+ .format (Instant .ofEpochMilli (stat .getModificationTimestamp ()));
382
+
383
+ cursorInfo .cursorsLedgerId = pbCursorInfo .getCursorsLedgerId ();
384
+
385
+ if (pbCursorInfo .hasMarkDeleteLedgerId ()) {
386
+ cursorInfo .markDelete = new PositionInfo ();
387
+ cursorInfo .markDelete .ledgerId = pbCursorInfo .getMarkDeleteLedgerId ();
388
+ cursorInfo .markDelete .entryId = pbCursorInfo .getMarkDeleteEntryId ();
389
+ }
390
+
391
+ if (pbCursorInfo .getIndividualDeletedMessagesCount () > 0 ) {
392
+ cursorInfo .individualDeletedMessages = new ArrayList <>();
393
+ for (int i = 0 ; i < pbCursorInfo
394
+ .getIndividualDeletedMessagesCount (); i ++) {
395
+ MessageRange range = pbCursorInfo .getIndividualDeletedMessages (i );
396
+ MessageRangeInfo rangeInfo = new MessageRangeInfo ();
397
+ rangeInfo .from .ledgerId = range .getLowerEndpoint ().getLedgerId ();
398
+ rangeInfo .from .entryId = range .getLowerEndpoint ().getEntryId ();
399
+ rangeInfo .to .ledgerId = range .getUpperEndpoint ().getLedgerId ();
400
+ rangeInfo .to .entryId = range .getUpperEndpoint ().getEntryId ();
401
+ cursorInfo .individualDeletedMessages .add (rangeInfo );
402
+ }
403
+ }
404
+
405
+ info .cursors .put (cursorName , cursorInfo );
406
+ cursorFuture .complete (null );
407
+ }
408
+
409
+ @ Override
410
+ public void operationFailed (MetaStoreException e ) {
411
+ cursorFuture .completeExceptionally (e );
412
+ }
413
+ });
414
+ }
415
+
416
+ Futures .waitForAll (cursorsFutures ).thenRun (() -> {
417
+ // Completed all the cursors info
418
+ callback .getInfoComplete (info , ctx );
419
+ }).exceptionally ((ex ) -> {
420
+ callback .getInfoFailed (new ManagedLedgerException (ex ), ctx );
421
+ return null ;
422
+ });
423
+ }
424
+
425
+ @ Override
426
+ public void operationFailed (MetaStoreException e ) {
427
+ callback .getInfoFailed (e , ctx );
428
+ }
429
+ });
430
+ }
431
+
432
+ @ Override
433
+ public void operationFailed (MetaStoreException e ) {
434
+ callback .getInfoFailed (e , ctx );
435
+ }
436
+ });
437
+ }
438
+
293
439
public MetaStore getMetaStore () {
294
440
return store ;
295
441
}
@@ -311,4 +457,6 @@ public BookKeeper getBookKeeper() {
311
457
}
312
458
313
459
private static final Logger log = LoggerFactory .getLogger (ManagedLedgerFactoryImpl .class );
460
+
461
+ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter .ofPattern ("yyyy-MM-dd HH:mm:ss.SSSZ" ).withZone (ZoneId .systemDefault ());
314
462
}
0 commit comments