63
63
import org .apache .pulsar .functions .instance .InstanceConfig ;
64
64
import org .apache .pulsar .functions .instance .stats .FunctionCollectorRegistry ;
65
65
import org .apache .pulsar .functions .proto .Function ;
66
+ import org .apache .pulsar .functions .runtime .RuntimeFactory ;
66
67
import org .apache .pulsar .functions .runtime .RuntimeSpawner ;
67
68
import org .apache .pulsar .functions .runtime .RuntimeUtils ;
68
69
import org .apache .pulsar .functions .runtime .process .ProcessRuntimeFactory ;
@@ -94,6 +95,8 @@ public class LocalRunner implements AutoCloseable {
94
95
private final Thread shutdownHook ;
95
96
private ClassLoader userCodeClassLoader ;
96
97
private boolean userCodeClassLoaderCreated ;
98
+ private RuntimeFactory runtimeFactory ;
99
+ private HTTPServer metricsServer ;
97
100
98
101
public enum RuntimeEnv {
99
102
THREAD ,
@@ -185,7 +188,12 @@ public static void main(String[] args) throws Exception {
185
188
186
189
// parse args by JCommander
187
190
jcommander .parse (args );
188
- localRunner .start (true );
191
+ try {
192
+ localRunner .start (true );
193
+ } catch (Exception e ) {
194
+ log .error ("Encountered error starting localrunner" , e );
195
+ localRunner .close ();
196
+ }
189
197
}
190
198
191
199
@ Builder
@@ -227,11 +235,13 @@ public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, Sin
227
235
this .connectorsDir = Paths .get (pulsarHome , "connectors" ).toString ();
228
236
}
229
237
this .metricsPortStart = metricsPortStart ;
230
- shutdownHook = new Thread () {
231
- public void run () {
232
- LocalRunner .this .stop ();
238
+ shutdownHook = new Thread (() -> {
239
+ try {
240
+ LocalRunner .this .close ();
241
+ } catch (Exception exception ) {
242
+ log .warn ("Encountered exception when closing localrunner" , exception );
233
243
}
234
- };
244
+ }) ;
235
245
}
236
246
237
247
private static File createNarExtractionTempDirectory () {
@@ -260,12 +270,21 @@ public synchronized void stop() {
260
270
} catch (IllegalStateException e ) {
261
271
// ignore possible "Shutdown in progress"
262
272
}
263
- log .info ("Shutting down the localrun runtimeSpawner ..." );
273
+
274
+ if (metricsServer != null ) {
275
+ metricsServer .stop ();
276
+ }
277
+
264
278
for (RuntimeSpawner spawner : spawners ) {
265
279
spawner .close ();
266
280
}
267
281
spawners .clear ();
268
282
283
+ if (runtimeFactory != null ) {
284
+ runtimeFactory .close ();
285
+ runtimeFactory = null ;
286
+ }
287
+
269
288
if (userCodeClassLoaderCreated ) {
270
289
if (userCodeClassLoader instanceof Closeable ) {
271
290
try {
@@ -464,7 +483,7 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
464
483
String stateStorageServiceUrl , AuthenticationConfig authConfig ,
465
484
String userCodeFile ) throws Exception {
466
485
SecretsProviderConfigurator secretsProviderConfigurator = getSecretsProviderConfigurator ();
467
- try ( ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory (
486
+ runtimeFactory = new ProcessRuntimeFactory (
468
487
serviceUrl ,
469
488
webServiceUrl ,
470
489
stateStorageServiceUrl ,
@@ -475,71 +494,66 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
475
494
null , /* extra dependencies dir */
476
495
narExtractionDirectory , /* nar extraction dir */
477
496
secretsProviderConfigurator ,
478
- false , Optional .empty (), Optional .empty ())) {
479
-
480
- for (int i = 0 ; i < parallelism ; ++i ) {
481
- InstanceConfig instanceConfig = new InstanceConfig ();
482
- instanceConfig .setFunctionDetails (functionDetails );
483
- // TODO: correctly implement function version and id
484
- instanceConfig .setFunctionVersion (UUID .randomUUID ().toString ());
485
- instanceConfig .setFunctionId (UUID .randomUUID ().toString ());
486
- instanceConfig .setInstanceId (i + instanceIdOffset );
487
- instanceConfig .setMaxBufferedTuples (1024 );
488
- instanceConfig .setPort (FunctionCommon .findAvailablePort ());
489
-
490
- if (metricsPortStart != null ) {
491
- int metricsPort = metricsPortStart + i ;
492
- if (metricsPortStart < 0 || metricsPortStart > 65535 ) {
493
- throw new IllegalArgumentException ("Metrics port need to be within the range of 0 and 65535" );
494
- }
495
- instanceConfig .setMetricsPort (metricsPort );
496
- } else {
497
- instanceConfig .setMetricsPort (FunctionCommon .findAvailablePort ());
497
+ false , Optional .empty (), Optional .empty ());
498
+
499
+ for (int i = 0 ; i < parallelism ; ++i ) {
500
+ InstanceConfig instanceConfig = new InstanceConfig ();
501
+ instanceConfig .setFunctionDetails (functionDetails );
502
+ // TODO: correctly implement function version and id
503
+ instanceConfig .setFunctionVersion (UUID .randomUUID ().toString ());
504
+ instanceConfig .setFunctionId (UUID .randomUUID ().toString ());
505
+ instanceConfig .setInstanceId (i + instanceIdOffset );
506
+ instanceConfig .setMaxBufferedTuples (1024 );
507
+ instanceConfig .setPort (FunctionCommon .findAvailablePort ());
508
+
509
+ if (metricsPortStart != null ) {
510
+ int metricsPort = metricsPortStart + i ;
511
+ if (metricsPortStart < 0 || metricsPortStart > 65535 ) {
512
+ throw new IllegalArgumentException ("Metrics port need to be within the range of 0 and 65535" );
498
513
}
499
- instanceConfig .setClusterName ("local" );
500
- if (functionConfig != null ) {
501
- instanceConfig .setMaxPendingAsyncRequests (functionConfig .getMaxPendingAsyncRequests ());
502
- if (functionConfig .getExposePulsarAdminClientEnabled () != null ) {
503
- instanceConfig .setExposePulsarAdminClientEnabled (functionConfig .getExposePulsarAdminClientEnabled ());
504
- }
514
+ instanceConfig .setMetricsPort (metricsPort );
515
+ } else {
516
+ instanceConfig .setMetricsPort (FunctionCommon .findAvailablePort ());
517
+ }
518
+ instanceConfig .setClusterName ("local" );
519
+ if (functionConfig != null ) {
520
+ instanceConfig .setMaxPendingAsyncRequests (functionConfig .getMaxPendingAsyncRequests ());
521
+ if (functionConfig .getExposePulsarAdminClientEnabled () != null ) {
522
+ instanceConfig .setExposePulsarAdminClientEnabled (functionConfig .getExposePulsarAdminClientEnabled ());
505
523
}
506
- RuntimeSpawner runtimeSpawner = new RuntimeSpawner (
507
- instanceConfig ,
508
- userCodeFile ,
509
- null ,
510
- containerFactory ,
511
- 30000 );
512
- spawners .add (runtimeSpawner );
513
- runtimeSpawner .start ();
514
524
}
515
- Timer statusCheckTimer = new Timer ();
516
- statusCheckTimer .scheduleAtFixedRate (new TimerTask () {
517
- @ Override
518
- public void run () {
519
- CompletableFuture <String >[] futures = new CompletableFuture [spawners .size ()];
520
- int index = 0 ;
521
- for (RuntimeSpawner spawner : spawners ) {
522
- futures [index ] = spawner .getFunctionStatusAsJson (index );
523
- index ++;
524
- }
525
- try {
526
- CompletableFuture .allOf (futures ).get (5 , TimeUnit .SECONDS );
527
- for (index = 0 ; index < futures .length ; ++index ) {
528
- String json = futures [index ].get ();
529
- Gson gson = new GsonBuilder ().setPrettyPrinting ().create ();
530
- log .info (gson .toJson (new JsonParser ().parse (json )));
531
- }
532
- } catch (TimeoutException | InterruptedException | ExecutionException e ) {
533
- log .error ("Could not get status from all local instances" );
534
- }
525
+ RuntimeSpawner runtimeSpawner = new RuntimeSpawner (
526
+ instanceConfig ,
527
+ userCodeFile ,
528
+ null ,
529
+ runtimeFactory ,
530
+ 30000 );
531
+ spawners .add (runtimeSpawner );
532
+ runtimeSpawner .start ();
533
+ }
534
+ Timer statusCheckTimer = new Timer ();
535
+ statusCheckTimer .scheduleAtFixedRate (new TimerTask () {
536
+ @ Override
537
+ public void run () {
538
+ CompletableFuture <String >[] futures = new CompletableFuture [spawners .size ()];
539
+ int index = 0 ;
540
+ for (RuntimeSpawner spawner : spawners ) {
541
+ futures [index ] = spawner .getFunctionStatusAsJson (index );
542
+ index ++;
535
543
}
536
- }, 30000 , 30000 );
537
- java .lang .Runtime .getRuntime ().addShutdownHook (new Thread () {
538
- public void run () {
539
- statusCheckTimer .cancel ();
544
+ try {
545
+ CompletableFuture .allOf (futures ).get (5 , TimeUnit .SECONDS );
546
+ for (index = 0 ; index < futures .length ; ++index ) {
547
+ String json = futures [index ].get ();
548
+ Gson gson = new GsonBuilder ().setPrettyPrinting ().create ();
549
+ log .info (gson .toJson (new JsonParser ().parse (json )));
550
+ }
551
+ } catch (TimeoutException | InterruptedException | ExecutionException e ) {
552
+ log .error ("Could not get status from all local instances" );
540
553
}
541
- });
542
- }
554
+ }
555
+ }, 30000 , 30000 );
556
+ java .lang .Runtime .getRuntime ().addShutdownHook (new Thread (() -> statusCheckTimer .cancel ()));
543
557
}
544
558
545
559
@@ -574,13 +588,12 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
574
588
FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry .getDefaultImplementation ();
575
589
RuntimeUtils .registerDefaultCollectors (collectorRegistry );
576
590
577
- ThreadRuntimeFactory threadRuntimeFactory ;
578
591
ClassLoader originalClassLoader = Thread .currentThread ().getContextClassLoader ();
579
592
try {
580
593
if (userCodeClassLoader != null ) {
581
594
Thread .currentThread ().setContextClassLoader (userCodeClassLoader );
582
595
}
583
- threadRuntimeFactory = new ThreadRuntimeFactory ("LocalRunnerThreadGroup" ,
596
+ runtimeFactory = new ThreadRuntimeFactory ("LocalRunnerThreadGroup" ,
584
597
serviceUrl ,
585
598
stateStorageServiceUrl ,
586
599
authConfig ,
@@ -614,16 +627,15 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
614
627
instanceConfig ,
615
628
userCodeFile ,
616
629
null ,
617
- threadRuntimeFactory ,
630
+ runtimeFactory ,
618
631
30000 );
619
632
spawners .add (runtimeSpawner );
620
633
runtimeSpawner .start ();
621
634
}
622
-
623
635
if (metricsPortStart != null ) {
624
636
// starting metrics server
625
637
log .info ("Starting metrics server on port {}" , metricsPortStart );
626
- new HTTPServer (new InetSocketAddress (metricsPortStart ), collectorRegistry , true );
638
+ metricsServer = new HTTPServer (new InetSocketAddress (metricsPortStart ), collectorRegistry , true );
627
639
}
628
640
}
629
641
0 commit comments