Skip to content

Commit 631f22f

Browse files
srkukarnisijie
authored andcommitted
Consolidate error handling to Spawner (apache#203)
1 parent 8df9d54 commit 631f22f

File tree

9 files changed

+63
-50
lines changed

9 files changed

+63
-50
lines changed

pulsar-functions/conf/function_worker.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ metricsConfig:
4444
# threadContainerFactory:
4545
# threadGroupName: "Thread Function Container Group"
4646
processContainerFactory:
47-
livenessCheckInterval: 10
48-
47+
logDirectory:
48+
4949
schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler"
5050
functionAssignmentTopicName: "assignments"
5151
failureCheckFreqMs: 30000

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/FunctionContainer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@
2828
*/
2929
public interface FunctionContainer {
3030

31-
void start() throws Exception;
31+
void start();
3232

3333
void join() throws Exception;
3434

3535
void stop();
3636

37+
boolean isAlive();
38+
39+
Exception getDeathException();
40+
3741
CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus();
3842

3943
CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics();

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ProcessFunctionContainer.java

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,13 @@ class ProcessFunctionContainer implements FunctionContainer {
5454
private Exception startupException;
5555
private ManagedChannel channel;
5656
private InstanceControlGrpc.InstanceControlFutureStub stub;
57-
private Timer alivenessTimer;
58-
private int alivenessCheckInterval;
59-
private volatile boolean markedKilled = false;
6057

6158
ProcessFunctionContainer(InstanceConfig instanceConfig,
6259
int maxBufferedTuples,
6360
String instanceFile,
6461
String logDirectory,
6562
String codeFile,
66-
String pulsarServiceUrl,
67-
int alivenessCheckInterval) {
63+
String pulsarServiceUrl) {
6864
List<String> args = new LinkedList<>();
6965
if (instanceConfig.getFunctionConfig().getRuntime() == Function.FunctionConfig.Runtime.JAVA) {
7066
args.add("java");
@@ -168,27 +164,19 @@ class ProcessFunctionContainer implements FunctionContainer {
168164
args.add(String.valueOf(instancePort));
169165

170166
processBuilder = new ProcessBuilder(args);
171-
this.alivenessCheckInterval = alivenessCheckInterval;
172167
}
173168

174169
/**
175170
* The core logic that initialize the thread container and executes the function
176171
*/
177172
@Override
178-
public void start() throws Exception {
173+
public void start() {
179174
Runtime.getRuntime().addShutdownHook(new Thread(() -> process.destroy()));
180175
startProcess();
181176
channel = ManagedChannelBuilder.forAddress("127.0.0.1", instancePort)
182177
.usePlaintext(true)
183178
.build();
184179
stub = InstanceControlGrpc.newFutureStub(channel);
185-
alivenessTimer = new Timer();
186-
alivenessTimer.scheduleAtFixedRate(new TimerTask() {
187-
@Override
188-
public void run() {
189-
checkForAliveness();
190-
}
191-
}, alivenessCheckInterval * 1000, alivenessCheckInterval * 1000);
192180
}
193181

194182
@Override
@@ -197,11 +185,9 @@ public void join() throws Exception {
197185
}
198186

199187
@Override
200-
public synchronized void stop() {
201-
markedKilled = true;
188+
public void stop() {
202189
process.destroy();
203190
channel.shutdown();
204-
alivenessTimer.cancel();
205191
}
206192

207193
@Override
@@ -262,6 +248,7 @@ private int findAvailablePort() {
262248
}
263249

264250
private void startProcess() {
251+
startupException = null;
265252
try {
266253
log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
267254
process = processBuilder.start();
@@ -278,21 +265,24 @@ private void startProcess() {
278265
}
279266
}
280267

281-
private synchronized void checkForAliveness() {
282-
if (markedKilled) return;
283-
if (!process.isAlive()) {
284-
log.error("Process is no longer alive, Restarting...");
285-
InputStream errorStream = process.getErrorStream();
286-
try {
287-
byte[] errorBytes = new byte[errorStream.available()];
288-
errorStream.read(errorBytes);
289-
String errorMessage = new String(errorBytes);
290-
startupException = new RuntimeException(errorMessage);
291-
log.error("ErrorStream was " + errorMessage);
292-
} catch (Exception ex) {
293-
startupException = ex;
294-
}
295-
startProcess();
268+
@Override
269+
public boolean isAlive() {
270+
return process.isAlive();
271+
}
272+
273+
@Override
274+
public Exception getDeathException() {
275+
if (isAlive()) return null;
276+
if (startupException != null) return startupException;
277+
InputStream errorStream = process.getErrorStream();
278+
try {
279+
byte[] errorBytes = new byte[errorStream.available()];
280+
errorStream.read(errorBytes);
281+
String errorMessage = new String(errorBytes);
282+
startupException = new RuntimeException(errorMessage);
283+
} catch (Exception ex) {
284+
startupException = ex;
296285
}
286+
return startupException;
297287
}
298288
}

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ProcessFunctionContainerFactory.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,19 @@
3030
public class ProcessFunctionContainerFactory implements FunctionContainerFactory {
3131

3232
private int maxBufferedTuples;
33-
private int livenessCheckInterval;
3433
private String pulsarServiceUrl;
3534
private String javaInstanceJarFile;
3635
private String pythonInstanceFile;
3736
private String logDirectory;
3837

3938
@VisibleForTesting
4039
public ProcessFunctionContainerFactory(int maxBufferedTuples,
41-
int livenessCheckInterval,
4240
String pulsarServiceUrl,
4341
String javaInstanceJarFile,
4442
String pythonInstanceFile,
4543
String logDirectory) {
4644

4745
this.maxBufferedTuples = maxBufferedTuples;
48-
this.livenessCheckInterval = livenessCheckInterval;
4946
this.pulsarServiceUrl = pulsarServiceUrl;
5047
this.javaInstanceJarFile = javaInstanceJarFile;
5148
this.pythonInstanceFile = pythonInstanceFile;
@@ -71,8 +68,7 @@ public ProcessFunctionContainer createContainer(InstanceConfig instanceConfig, S
7168
instanceFile,
7269
logDirectory,
7370
codeFile,
74-
pulsarServiceUrl,
75-
livenessCheckInterval);
71+
pulsarServiceUrl);
7672
}
7773

7874
@Override

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class ThreadFunctionContainer implements FunctionContainer {
4444
@Getter
4545
private InstanceConfig instanceConfig;
4646
private JavaInstanceRunnable javaInstanceRunnable;
47+
private Exception startupException;
4748

4849
ThreadFunctionContainer(InstanceConfig instanceConfig,
4950
int maxBufferedTuples,
@@ -73,6 +74,13 @@ class ThreadFunctionContainer implements FunctionContainer {
7374
@Override
7475
public void start() {
7576
log.info("ThreadContainer starting function with instance config {}", instanceConfig);
77+
startupException = null;
78+
this.fnThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
79+
@Override
80+
public void uncaughtException(Thread t, Throwable e) {
81+
startupException = new Exception(e);
82+
}
83+
});
7684
this.fnThread.start();
7785
}
7886

@@ -109,4 +117,15 @@ public CompletableFuture<FunctionStatus> getFunctionStatus() {
109117
public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
110118
return CompletableFuture.completedFuture(javaInstanceRunnable.getAndResetMetrics());
111119
}
120+
121+
@Override
122+
public boolean isAlive() {
123+
return this.fnThread.isAlive();
124+
}
125+
126+
@Override
127+
public Exception getDeathException() {
128+
if (isAlive()) return null;
129+
else return startupException;
130+
}
112131
}

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,19 @@ public void start() throws Exception {
9595
metricsCollectionTimer.scheduleAtFixedRate(new TimerTask() {
9696
@Override
9797
public void run() {
98-
log.info("Collecting metrics for function" + FunctionConfigUtils.getFullyQualifiedName(assignmentInfo.getFunctionConfig()));
99-
functionContainer.getAndResetMetrics().thenAccept(t -> {
100-
if (t != null) {
101-
log.debug("Collected metrics {}", t);
102-
metricsSink.processRecord(t, assignmentInfo.getFunctionConfig());
103-
}
104-
});
98+
if (functionContainer.isAlive()) {
99+
log.info("Collecting metrics for function" + FunctionConfigUtils.getFullyQualifiedName(assignmentInfo.getFunctionConfig()));
100+
functionContainer.getAndResetMetrics().thenAccept(t -> {
101+
if (t != null) {
102+
log.debug("Collected metrics {}", t);
103+
metricsSink.processRecord(t, assignmentInfo.getFunctionConfig());
104+
}
105+
});
106+
} else {
107+
log.error("Function Container is dead with exception", functionContainer.getDeathException());
108+
log.error("Restarting...");
109+
functionContainer.start();
110+
}
105111
}
106112
}, metricsCollectionInterval * 1000, metricsCollectionInterval * 1000);
107113
}

pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/container/ProcessFunctionContainerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public ProcessFunctionContainerTest() {
5858
this.pulsarServiceUrl = "pulsar://localhost:6670";
5959
this.logDirectory = "Users/user/logs";
6060
this.factory = new ProcessFunctionContainerFactory(
61-
1024, 10, pulsarServiceUrl, javaInstanceJarFile, pythonInstanceFile, logDirectory);
61+
1024, pulsarServiceUrl, javaInstanceJarFile, pythonInstanceFile, logDirectory);
6262
}
6363

6464
@AfterMethod

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
105105
} else if (workerConfig.getProcessContainerFactory() != null) {
106106
this.functionContainerFactory = new ProcessFunctionContainerFactory(
107107
workerConfig.getLimitsConfig().getMaxBufferedTuples(),
108-
workerConfig.getProcessContainerFactory().getLivenessCheckInterval(),
109108
workerConfig.getPulsarServiceUrl(),
110109
workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
111110
workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ static class ProcessContainerFactory {
8282
private String javaInstanceJarLocation;
8383
private String pythonInstanceLocation;
8484
private String logDirectory;
85-
private int livenessCheckInterval;
8685
}
8786
private ProcessContainerFactory processContainerFactory;
8887

0 commit comments

Comments
 (0)