Skip to content
This repository was archived by the owner on Jan 3, 2023. It is now read-only.

Add a throttle on EC to avoid IO overload #1947

Merged
merged 2 commits into from
Sep 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@
<description>The throughput limit (MB) for SSM copy overall</description>
</property>

<property>
<name>smart.action.ec.throttle.mb</name>
<value>0</value>
<description>The throughput limit (MB) for SSM EC overall</description>
</property>

<property>
<name>smart.action.local.execution.disabled</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public class SmartConfKeys {
public static final long SMART_ACTION_MOVE_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
public static final String SMART_ACTION_COPY_THROTTLE_MB_KEY = "smart.action.copy.throttle.mb";
public static final long SMART_ACTION_COPY_THROTTLE_MB_DEFAULT = 0L; // 0 means unlimited
public static final String SMART_ACTION_EC_THROTTLE_MB_KEY = "smart.action.ec.throttle.mb";
public static final long SMART_ACTION_EC_THROTTLE_MB_DEFAULT = 0L;
public static final String SMART_ACTION_LOCAL_EXECUTION_DISABLED_KEY =
"smart.action.local.execution.disabled";
public static final boolean SMART_ACTION_LOCAL_EXECUTION_DISABLED_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void execute() throws Exception {
final String DIR_RESULT =
"The EC policy is set successfully for the given directory.";
final String CONVERT_RESULT =
"The file is converted successfully with the given or default ec policy.";
"The file is converted successfully with the given or default EC policy.";

this.setDfsClient(HadoopUtil.getDFSClient(
HadoopUtil.getNameNodeUri(conf), conf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package org.smartdata.hdfs.scheduler;

import com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.hdfs.action.*;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;
Expand All @@ -48,11 +50,18 @@ public class ErasureCodingScheduler extends ActionSchedulerService {
private Set<String> fileLock;
private SmartConf conf;
private MetaStore metaStore;
private long throttleInMb;
private RateLimiter rateLimiter;

public ErasureCodingScheduler(SmartContext context, MetaStore metaStore) {
super(context, metaStore);
this.conf = context.getConf();
this.metaStore = metaStore;
this.throttleInMb = conf.getLong(
SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_KEY, SmartConfKeys.SMART_ACTION_EC_THROTTLE_MB_DEFAULT);
if (this.throttleInMb > 0) {
this.rateLimiter = RateLimiter.create(throttleInMb);
}
}

public List<String> getSupportedActions() {
Expand Down Expand Up @@ -112,17 +121,25 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
return ScheduleResult.SUCCESS;
}

// check file lock merely for ec & unec action
if (fileLock.contains(srcPath)) {
return ScheduleResult.FAIL;
}
try {
if (!metaStore.getFile(srcPath).isdir()) {
// For ec or unec, add ecTmp argument
String tmpName = createTmpName(action);
action.getArgs().put(EC_TMP, EC_DIR + tmpName);
actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
if (metaStore.getFile(srcPath).isdir()) {
return ScheduleResult.SUCCESS;
}
// The below code is just for ec or unec action with file as argument, not directory
// check file lock merely for ec & unec action
if (fileLock.contains(srcPath)) {
return ScheduleResult.FAIL;
}
if (isLimitedByThrottle(srcPath)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to schedule {} due to limitation of throttle!", actionInfo);
}
return ScheduleResult.RETRY;
}
// For ec or unec, add ecTmp argument
String tmpName = createTmpName(action);
action.getArgs().put(EC_TMP, EC_DIR + tmpName);
actionInfo.getArgs().put(EC_TMP, EC_DIR + tmpName);
} catch (MetaStoreException ex) {
LOG.error("Error occurred for getting file info", ex);
actionInfo.appendLog(ex.getMessage());
Expand Down Expand Up @@ -159,4 +176,15 @@ public void onActionFinished(ActionInfo actionInfo) {
fileLock.remove(actionInfo.getArgs().get(HdfsAction.FILE_PATH));
}
}

public boolean isLimitedByThrottle(String srcPath) throws MetaStoreException {
if (this.rateLimiter == null) {
return false;
}
int fileLengthInMb = (int) metaStore.getFile(srcPath).getLength() >> 20;
if (fileLengthInMb > 0) {
return !rateLimiter.tryAcquire(fileLengthInMb);
}
return false;
}
}