Skip to content

Commit a9c2437

Browse files
srkukarnisijie
authored andcommitted
Add more information about function in getstatus (apache#205)
1 parent 631f22f commit a9c2437

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

pulsar-functions/proto/src/main/proto/InstanceCommunication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ option java_outer_classname = "InstanceCommunication";
2727
message FunctionStatus {
2828
bool running = 1;
2929
string failureException = 2;
30+
int64 numRestarts = 11;
3031
int64 numProcessed = 3;
3132
int64 numSuccessfullyProcessed = 4;
3233
int64 numTimeouts = 5;

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/spawner/Spawner.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public static Spawner createSpawner(FunctionConfig fnConfig,
7070
private MetricsSink metricsSink;
7171
private int metricsCollectionInterval;
7272
private Timer metricsCollectionTimer;
73+
private int numRestarts;
7374

7475
private Spawner(LimitsConfig limitsConfig,
7576
AssignmentInfo assignmentInfo,
@@ -83,12 +84,14 @@ private Spawner(LimitsConfig limitsConfig,
8384
this.codeFile = codeFile;
8485
this.metricsSink = metricsSink;
8586
this.metricsCollectionInterval = metricsCollectionInterval;
87+
this.numRestarts = 0;
8688
}
8789

8890
public void start() throws Exception {
8991
log.info("Spawner starting function {}", this.assignmentInfo.getFunctionConfig().getName());
9092
functionContainer = functionContainerFactory.createContainer(createJavaInstanceConfig(), codeFile);
9193
functionContainer.start();
94+
numRestarts++;
9295
if (metricsSink != null) {
9396
log.info("Scheduling Metrics Collection every " + metricsCollectionInterval + " secs for " + FunctionConfigUtils.getFullyQualifiedName(assignmentInfo.getFunctionConfig()));
9497
metricsCollectionTimer = new Timer();
@@ -107,6 +110,7 @@ public void run() {
107110
log.error("Function Container is dead with exception", functionContainer.getDeathException());
108111
log.error("Restarting...");
109112
functionContainer.start();
113+
numRestarts++;
110114
}
111115
}
112116
}, metricsCollectionInterval * 1000, metricsCollectionInterval * 1000);
@@ -120,7 +124,14 @@ public void join() throws Exception {
120124
}
121125

122126
public CompletableFuture<FunctionStatus> getFunctionStatus() {
123-
return functionContainer.getFunctionStatus();
127+
return functionContainer.getFunctionStatus().thenApply(f -> {
128+
FunctionStatus.Builder builder = FunctionStatus.newBuilder();
129+
builder.mergeFrom(f).setNumRestarts(numRestarts);
130+
if (functionContainer.getDeathException() != null) {
131+
builder.setFailureException(functionContainer.getDeathException().getMessage());
132+
}
133+
return builder.build();
134+
});
124135
}
125136

126137
@Override

0 commit comments

Comments
 (0)