Skip to content

Commit 510744c

Browse files
[feat][fn] Add stateStorageURL and pulsarWebService URL to go InstanceConfig (apache#20443)
Co-authored-by: Andy Walker <[email protected]>
1 parent 8b3c085 commit 510744c

File tree

5 files changed

+33
-11
lines changed

5 files changed

+33
-11
lines changed

pulsar-function-go/conf/conf.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ import (
3333
const ConfigPath = "conf/conf.yaml"
3434

3535
type Conf struct {
36-
PulsarServiceURL string `json:"pulsarServiceURL" yaml:"pulsarServiceURL"`
37-
InstanceID int `json:"instanceID" yaml:"instanceID"`
38-
FuncID string `json:"funcID" yaml:"funcID"`
39-
FuncVersion string `json:"funcVersion" yaml:"funcVersion"`
40-
MaxBufTuples int `json:"maxBufTuples" yaml:"maxBufTuples"`
41-
Port int `json:"port" yaml:"port"`
42-
ClusterName string `json:"clusterName" yaml:"clusterName"`
43-
KillAfterIdleMs time.Duration `json:"killAfterIdleMs" yaml:"killAfterIdleMs"`
36+
PulsarServiceURL string `json:"pulsarServiceURL" yaml:"pulsarServiceURL"`
37+
StateStorageServiceURL string `json:"stateStorageServiceUrl" yaml:"stateStorageServiceUrl"`
38+
PulsarWebServiceURL string `json:"pulsarWebServiceUrl" yaml:"pulsarWebServiceUrl"`
39+
InstanceID int `json:"instanceID" yaml:"instanceID"`
40+
FuncID string `json:"funcID" yaml:"funcID"`
41+
FuncVersion string `json:"funcVersion" yaml:"funcVersion"`
42+
MaxBufTuples int `json:"maxBufTuples" yaml:"maxBufTuples"`
43+
Port int `json:"port" yaml:"port"`
44+
ClusterName string `json:"clusterName" yaml:"clusterName"`
45+
KillAfterIdleMs time.Duration `json:"killAfterIdleMs" yaml:"killAfterIdleMs"`
4446
// function details config
4547
Tenant string `json:"tenant" yaml:"tenant"`
4648
NameSpace string `json:"nameSpace" yaml:"nameSpace"`

pulsar-function-go/pf/instanceConf.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ type instanceConf struct {
3939
port int
4040
clusterName string
4141
pulsarServiceURL string
42+
stateServiceURL string
43+
pulsarWebServiceURL string
4244
killAfterIdle time.Duration
4345
expectedHealthCheckInterval int32
4446
metricsPort int
@@ -76,6 +78,8 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
7678
port: cfg.Port,
7779
clusterName: cfg.ClusterName,
7880
pulsarServiceURL: cfg.PulsarServiceURL,
81+
stateServiceURL: cfg.StateStorageServiceURL,
82+
pulsarWebServiceURL: cfg.PulsarWebServiceURL,
7983
killAfterIdle: cfg.KillAfterIdleMs,
8084
expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval,
8185
metricsPort: cfg.MetricsPort,

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
@Getter
2828
public class GoInstanceConfig {
2929
private String pulsarServiceURL = "";
30+
private String stateStorageServiceUrl = "";
31+
private String pulsarWebServiceUrl = "";
3032
private int instanceID;
3133
private String funcID = "";
3234
private String funcVersion = "";

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
132132
AuthenticationConfig authConfig,
133133
String originalCodeFileName,
134134
String pulsarServiceUrl,
135+
String stateStorageServiceUrl,
136+
String pulsarWebServiceUrl,
135137
boolean k8sRuntime) throws IOException {
136138
final List<String> args = new LinkedList<>();
137139
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
@@ -140,6 +142,14 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
140142
goInstanceConfig.setClusterName(instanceConfig.getClusterName());
141143
}
142144

145+
if (null != stateStorageServiceUrl) {
146+
goInstanceConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
147+
}
148+
149+
if (instanceConfig.isExposePulsarAdminClientEnabled() && StringUtils.isNotBlank(pulsarWebServiceUrl)) {
150+
goInstanceConfig.setPulsarWebServiceUrl(pulsarWebServiceUrl);
151+
}
152+
143153
if (instanceConfig.getInstanceId() != 0) {
144154
goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
145155
}
@@ -310,8 +320,9 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
310320
final List<String> args = new LinkedList<>();
311321

312322
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
313-
return getGoInstanceCmd(instanceConfig, authConfig,
314-
originalCodeFileName, pulsarServiceUrl, k8sRuntime);
323+
return getGoInstanceCmd(instanceConfig, authConfig, originalCodeFileName,
324+
pulsarServiceUrl, stateStorageServiceUrl, pulsarWebServiceUrl,
325+
k8sRuntime);
315326
}
316327

317328
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {

pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,9 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
122122
.build();
123123

124124
instanceConfig.setFunctionDetails(functionDetails);
125+
instanceConfig.setExposePulsarAdminClientEnabled(true);
125126

126-
List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, authConfig,"config", "pulsar://localhost:6650", k8sRuntime);
127+
List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, authConfig, "config", "pulsar://localhost:6650", "bk://localhost:4181", "http://localhost:8080", k8sRuntime);
127128
if (k8sRuntime) {
128129
goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
129130
} else {
@@ -151,6 +152,8 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
151152
Assert.assertEquals(goInstanceConfig.get("autoAck"), true);
152153
Assert.assertEquals(goInstanceConfig.get("regexPatternSubscription"), false);
153154
Assert.assertEquals(goInstanceConfig.get("pulsarServiceURL"), "pulsar://localhost:6650");
155+
Assert.assertEquals(goInstanceConfig.get("stateStorageServiceUrl"), "bk://localhost:4181");
156+
Assert.assertEquals(goInstanceConfig.get("pulsarWebServiceUrl"), "http://localhost:8080");
154157
Assert.assertEquals(goInstanceConfig.get("runtime"), 3);
155158
Assert.assertEquals(goInstanceConfig.get("cpu"), 2.0);
156159
Assert.assertEquals(goInstanceConfig.get("funcID"), "func-7734");

0 commit comments

Comments
 (0)