Skip to content

Improve error handling during localrun start #10450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class LocalRunner implements AutoCloseable {
private final Thread shutdownHook;
private ClassLoader userCodeClassLoader;
private boolean userCodeClassLoaderCreated;
private RuntimeFactory runtimeFactory;
private HTTPServer metricsServer;

public enum RuntimeEnv {
THREAD,
Expand Down Expand Up @@ -185,7 +188,12 @@ public static void main(String[] args) throws Exception {

// parse args by JCommander
jcommander.parse(args);
localRunner.start(true);
try {
localRunner.start(true);
} catch (Exception e) {
log.error("Encountered error starting localrunner", e);
localRunner.close();
}
}

@Builder
Expand Down Expand Up @@ -227,11 +235,13 @@ public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, Sin
this.connectorsDir = Paths.get(pulsarHome, "connectors").toString();
}
this.metricsPortStart = metricsPortStart;
shutdownHook = new Thread() {
public void run() {
LocalRunner.this.stop();
shutdownHook = new Thread(() -> {
try {
LocalRunner.this.close();
} catch (Exception exception) {
log.warn("Encountered exception when closing localrunner", exception);
}
};
});
}

private static File createNarExtractionTempDirectory() {
Expand Down Expand Up @@ -260,12 +270,21 @@ public synchronized void stop() {
} catch (IllegalStateException e) {
// ignore possible "Shutdown in progress"
}
log.info("Shutting down the localrun runtimeSpawner ...");

if (metricsServer != null) {
metricsServer.stop();
}

for (RuntimeSpawner spawner : spawners) {
spawner.close();
}
spawners.clear();

if (runtimeFactory != null) {
runtimeFactory.close();
runtimeFactory = null;
}

if (userCodeClassLoaderCreated) {
if (userCodeClassLoader instanceof Closeable) {
try {
Expand Down Expand Up @@ -464,7 +483,7 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
String stateStorageServiceUrl, AuthenticationConfig authConfig,
String userCodeFile) throws Exception {
SecretsProviderConfigurator secretsProviderConfigurator = getSecretsProviderConfigurator();
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
runtimeFactory = new ProcessRuntimeFactory(
serviceUrl,
webServiceUrl,
stateStorageServiceUrl,
Expand All @@ -475,71 +494,66 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio
null, /* extra dependencies dir */
narExtractionDirectory, /* nar extraction dir */
secretsProviderConfigurator,
false, Optional.empty(), Optional.empty())) {

for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
// TODO: correctly implement function version and id
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());

if (metricsPortStart != null) {
int metricsPort = metricsPortStart + i;
if (metricsPortStart < 0 || metricsPortStart > 65535) {
throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
}
instanceConfig.setMetricsPort(metricsPort);
} else {
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
false, Optional.empty(), Optional.empty());

for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
// TODO: correctly implement function version and id
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());

if (metricsPortStart != null) {
int metricsPort = metricsPortStart + i;
if (metricsPortStart < 0 || metricsPortStart > 65535) {
throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
}
instanceConfig.setClusterName("local");
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
if (functionConfig.getExposePulsarAdminClientEnabled() != null) {
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
instanceConfig.setMetricsPort(metricsPort);
} else {
instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
}
instanceConfig.setClusterName("local");
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
if (functionConfig.getExposePulsarAdminClientEnabled() != null) {
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
null,
containerFactory,
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
Timer statusCheckTimer = new Timer();
statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
int index = 0;
for (RuntimeSpawner spawner : spawners) {
futures[index] = spawner.getFunctionStatusAsJson(index);
index++;
}
try {
CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
for (index = 0; index < futures.length; ++index) {
String json = futures[index].get();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
log.info(gson.toJson(new JsonParser().parse(json)));
}
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.error("Could not get status from all local instances");
}
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
null,
runtimeFactory,
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
Timer statusCheckTimer = new Timer();
statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
int index = 0;
for (RuntimeSpawner spawner : spawners) {
futures[index] = spawner.getFunctionStatusAsJson(index);
index++;
}
}, 30000, 30000);
java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
statusCheckTimer.cancel();
try {
CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
for (index = 0; index < futures.length; ++index) {
String json = futures[index].get();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
log.info(gson.toJson(new JsonParser().parse(json)));
}
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.error("Could not get status from all local instances");
}
});
}
}
}, 30000, 30000);
java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> statusCheckTimer.cancel()));
}


Expand Down Expand Up @@ -574,13 +588,12 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
RuntimeUtils.registerDefaultCollectors(collectorRegistry);

ThreadRuntimeFactory threadRuntimeFactory;
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
if (userCodeClassLoader != null) {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
}
threadRuntimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
serviceUrl,
stateStorageServiceUrl,
authConfig,
Expand Down Expand Up @@ -614,16 +627,15 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi
instanceConfig,
userCodeFile,
null,
threadRuntimeFactory,
runtimeFactory,
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}

if (metricsPortStart != null) {
// starting metrics server
log.info("Starting metrics server on port {}", metricsPortStart);
new HTTPServer(new InetSocketAddress(metricsPortStart), collectorRegistry, true);
metricsServer = new HTTPServer(new InetSocketAddress(metricsPortStart), collectorRegistry, true);
}
}

Expand Down