Skip to content
This repository was archived by the owner on Jan 3, 2023. It is now read-only.

Commit 53ab86b

Browse files
authored
Add a throttle on EC to avoid IO overload (#1947)
1 parent 14232ec commit 53ab86b

File tree

4 files changed

+46
-10
lines changed

4 files changed

+46
-10
lines changed

conf/smart-default.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@
133133
<description>The throughput limit (MB) for SSM copy overall</description>
134134
</property>
135135

136+
<property>
137+
<name>smart.action.ec.throttle.mb</name>
138+
<value>0</value>
139+
<description>The throughput limit (MB) for SSM EC overall</description>
140+
</property>
141+
136142
<property>
137143
<name>smart.action.local.execution.disabled</name>
138144
<value>false</value>

smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ public class SmartConfKeys {
138138
public static final long SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
139139
public static final String SMART_ACTION_COPY_THROTTLE_MB_KEY = "smart.action.copy.throttle.mb";
140140
public static final long SMART_ACTION_COPY_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
141+
public static final String SMART_ACTION_EC_THROTTLE_MB_KEY = "smart.action.ec.throttle.mb";
142+
public static final long SMART_ACTION_EC_THROTTLE_MB_DEFAULT = 0L;
141143
public static final String SMART_ACTION_LOCAL_EXECUTION_DISABLED_KEY =
142144
"smart.action.local.execution.disabled";
143145
public static final boolean SMART_ACTION_LOCAL_EXECUTION_DISABLED_DEFAULT = false;

smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/smartdata/hdfs/action/ErasureCodingAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected void execute() throws Exception {
8484
final String DIR_RESULT =
8585
"The EC policy is set successfully for the given directory.";
8686
final String CONVERT_RESULT =
87-
"The file is converted successfully with the given or default ec policy.";
87+
"The file is converted successfully with the given or default EC policy.";
8888

8989
this.setDfsClient(HadoopUtil.getDFSClient(
9090
HadoopUtil.getNameNodeUri(conf), conf));

smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
*/
1818
package org.smartdata.hdfs.scheduler;
1919

20+
import com.google.common.util.concurrent.RateLimiter;
2021
import org.apache.hadoop.util.VersionInfo;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324
import org.smartdata.SmartContext;
2425
import org.smartdata.conf.SmartConf;
26+
import org.smartdata.conf.SmartConfKeys;
2527
import org.smartdata.hdfs.action.*;
2628
import org.smartdata.metastore.MetaStore;
2729
import org.smartdata.metastore.MetaStoreException;
@@ -48,11 +50,18 @@ public class ErasureCodingScheduler extends ActionSchedulerService {
4850
private Set<String> fileLock;
4951
private SmartConf conf;
5052
private MetaStore metaStore;
53+
private long throttleInMb;
54+
private RateLimiter rateLimiter;
5155

5256
public ErasureCodingScheduler(SmartContext context, MetaStore metaStore) {
5357
super(context, metaStore);
5458
this.conf = context.getConf();
5559
this.metaStore = metaStore;
60+
this.throttleInMb = conf.getLong(
61+
SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_KEY, SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_DEFAULT);
62+
if (this.throttleInMb > 0) {
63+
this.rateLimiter = RateLimiter.create(throttleInMb);
64+
}
5665
}
5766

5867
public List<String> getSupportedActions() {
@@ -112,17 +121,25 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
112121
return ScheduleResult.SUCCESS;
113122
}
114123

115-
// check file lock merely for ec & unec action
116-
if (fileLock.contains(srcPath)) {
117-
return ScheduleResult.FAIL;
118-
}
119124
try {
120-
if (!metaStore.getFile(srcPath).isdir()) {
121-
// For ec or unec, add ecTmp argument
122-
String tmpName = createTmpName(action);
123-
action.getArgs().put(EC_TMP, EC_DIR + tmpName);
124-
actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
125+
if (metaStore.getFile(srcPath).isdir()) {
126+
return ScheduleResult.SUCCESS;
127+
}
128+
// The below code is just for ec or unec action with file as argument, not directory
129+
// check file lock merely for ec & unec action
130+
if (fileLock.contains(srcPath)) {
131+
return ScheduleResult.FAIL;
132+
}
133+
if (isLimitedByThrottle(srcPath)) {
134+
if (LOG.isDebugEnabled()) {
135+
LOG.debug("Failed to schedule {} due to limitation of throttle!", actionInfo);
136+
}
137+
return ScheduleResult.RETRY;
125138
}
139+
// For ec or unec, add ecTmp argument
140+
String tmpName = createTmpName(action);
141+
action.getArgs().put(EC_TMP, EC_DIR + tmpName);
142+
actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
126143
} catch (MetaStoreException ex) {
127144
LOG.error("Error occurred for getting file info", ex);
128145
actionInfo.appendLog(ex.getMessage());
@@ -159,4 +176,15 @@ public void onActionFinished(ActionInfo actionInfo) {
159176
fileLock.remove(actionInfo.getArgs().get(HdfsAction.FILE_PATH));
160177
}
161178
}
179+
180+
public boolean isLimitedByThrottle(String srcPath) throws MetaStoreException {
181+
if (this.rateLimiter == null) {
182+
return false;
183+
}
184+
int fileLengthInMb = (int) metaStore.getFile(srcPath).getLength() >> 20;
185+
if (fileLengthInMb > 0) {
186+
return !rateLimiter.tryAcquire(fileLengthInMb);
187+
}
188+
return false;
189+
}
162190
}

0 commit comments

Comments
 (0)