Skip to content
This repository was archived by the owner on Jan 3, 2023. It is now read-only.

Commit b2a12ba

Browse files
authored
Solve #1162, Dispatch actions to specified type of executor (#1332)
1 parent 060625b commit b2a12ba

File tree

16 files changed

+476
-175
lines changed

16 files changed

+476
-175
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.smartdata.server.cluster;
19+
20+
21+
/**
22+
* Represent each nodes that SSM services (SmartServers and SmartAgents) running on.
23+
*
24+
*/
25+
public class NodeInfo {
26+
private String id;
27+
private String location;
28+
29+
public NodeInfo(String id, String location) {
30+
this.id = id;
31+
this.location = location;
32+
}
33+
34+
public String getId() {
35+
return id;
36+
}
37+
38+
public void setId(String id) {
39+
this.id = id;
40+
}
41+
42+
public String getLocation() {
43+
return location;
44+
}
45+
46+
public void setLocation(String location) {
47+
this.location = location;
48+
}
49+
}
50+
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.smartdata.server.engine;
19+
20+
import org.smartdata.server.cluster.NodeInfo;
21+
22+
public class ActiveServerInfo extends NodeInfo {
23+
private static ActiveServerInfo inst;
24+
25+
private ActiveServerInfo() {
26+
super("ActiveSSMServer", "127.0.0.1");
27+
}
28+
29+
public static ActiveServerInfo getInstance() {
30+
if (inst == null) {
31+
inst = new ActiveServerInfo();
32+
}
33+
return inst;
34+
}
35+
}

smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,13 @@ public class CmdletManager extends AbstractService {
9292
private Map<String, Long> fileLocks;
9393
private ListMultimap<String, ActionScheduler> schedulers = ArrayListMultimap.create();
9494
private List<ActionSchedulerService> schedulerServices = new ArrayList<>();
95+
private long totalScheduled = 0;
9596

9697
public CmdletManager(ServerContext context) {
9798
super(context);
9899

99100
this.metaStore = context.getMetaStore();
100101
this.executorService = Executors.newScheduledThreadPool(2);
101-
this.dispatcher = new CmdletDispatcher(context, this);
102102
this.runningCmdlets = new ArrayList<>();
103103
this.pendingCmdlet = new LinkedList<>();
104104
this.schedulingCmdlet = new LinkedList<>();
@@ -107,6 +107,8 @@ public CmdletManager(ServerContext context) {
107107
this.idToCmdlets = new ConcurrentHashMap<>();
108108
this.idToActions = new ConcurrentHashMap<>();
109109
this.fileLocks = new ConcurrentHashMap<>();
110+
this.dispatcher = new CmdletDispatcher(context, this, scheduledCmdlet,
111+
idToLaunchCmdlet, runningCmdlets, schedulers);
110112
}
111113

112114
@VisibleForTesting
@@ -270,18 +272,19 @@ private void syncCmdAction(CmdletInfo cmdletInfo,
270272
@Override
271273
public void start() throws IOException {
272274
LOG.info("Starting ...");
273-
executorService.scheduleAtFixedRate(new ScheduleTask(), 100, 50, TimeUnit.MILLISECONDS);
274-
executorService.scheduleAtFixedRate(
275-
new DispatchTask(this.dispatcher), 200, 100, TimeUnit.MILLISECONDS);
275+
executorService.scheduleAtFixedRate(new ScheduleTask(), 100, 50, TimeUnit.MILLISECONDS);
276+
276277
for (ActionSchedulerService s : schedulerServices) {
277278
s.start();
278279
}
280+
dispatcher.start();
279281
LOG.info("Started.");
280282
}
281283

282284
@Override
283285
public void stop() throws IOException {
284286
LOG.info("Stopping ...");
287+
dispatcher.stop();
285288
for (int i = schedulerServices.size() - 1; i >= 0; i--) {
286289
schedulerServices.get(i).stop();
287290
}
@@ -967,44 +970,12 @@ public void run() {
967970
int nScheduled;
968971
do {
969972
nScheduled = scheduleCmdlet();
973+
totalScheduled += nScheduled;
970974
} while (nScheduled != 0);
971975
} catch (IOException e) {
972976
LOG.error("Exception when Scheduling Cmdlet. "
973977
+ scheduledCmdlet.size() + " cmdlets are pending for dispatch.", e);
974978
}
975979
}
976980
}
977-
978-
private class DispatchTask implements Runnable {
979-
private final CmdletDispatcher dispatcher;
980-
981-
public DispatchTask(CmdletDispatcher dispatcher) {
982-
this.dispatcher = dispatcher;
983-
}
984-
985-
@Override
986-
public void run() {
987-
while (dispatcher.canDispatchMore()) {
988-
try {
989-
LaunchCmdlet launchCmdlet = getNextCmdletToRun();
990-
if (launchCmdlet == null) {
991-
break;
992-
} else {
993-
cmdletPreExecutionProcess(launchCmdlet);
994-
dispatcher.dispatch(launchCmdlet);
995-
}
996-
} catch (IOException e) {
997-
LOG.error("Cmdlet dispatcher error", e);
998-
}
999-
}
1000-
}
1001-
}
1002-
1003-
public void cmdletPreExecutionProcess(LaunchCmdlet cmdlet) {
1004-
for (LaunchAction action : cmdlet.getLaunchActions()) {
1005-
for (ActionScheduler p : schedulers.get(action.getActionType())) {
1006-
p.onPreDispatch(action);
1007-
}
1008-
}
1009-
}
1010981
}

smart-engine/src/main/java/org/smartdata/server/engine/StandbyServerInfo.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,10 @@
1717
*/
1818
package org.smartdata.server.engine;
1919

20-
public class StandbyServerInfo {
21-
private String id;
22-
private String location;
20+
import org.smartdata.server.cluster.NodeInfo;
2321

22+
public class StandbyServerInfo extends NodeInfo {
2423
public StandbyServerInfo(String id, String location) {
25-
this.id = id;
26-
this.location = location;
27-
}
28-
29-
public String getId() {
30-
return id;
31-
}
32-
33-
public void setId(String id) {
34-
this.id = id;
35-
}
36-
37-
public String getLocation() {
38-
return location;
39-
}
40-
41-
public void setLocation(String location) {
42-
this.location = location;
24+
super(id, location);
4325
}
4426
}
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,15 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.smartdata.hdfs.metric.fetcher;
18+
package org.smartdata.server.engine.cmdlet;
1919

20-
public class DispatcherConf {
21-
int maxConcurrentMovesPerNode;
20+
21+
public enum CmdletDispatchPolicy {
22+
ANY,
23+
PREFER_AGENT,
24+
PREFER_REMOTE_SSM,
25+
PREFER_LOCAL;
26+
// MUST_AGENT,
27+
// MUST_REMOTE_SSM,
28+
// MUST_LOCAL;
2229
}

0 commit comments

Comments
 (0)