Skip to content
This repository was archived by the owner on Jan 3, 2023. It is now read-only.

Tune Performance on large namespace #1566

Merged
merged 10 commits into from
Feb 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,22 @@
</description>
</property>

<property>
<name>smart.cmdlet.cache.batch</name>
<value>600</value>
<description>
Maximum batch size of cmdlet batch insert.
</description>
</property>

<property>
<name>smart.copy.scheduler.base.sync.batch</name>
<value>500</value>
<description>
Maximum batch size of copyscheduer base sync batch insert.
</description>
</property>

<property>
<name>pd.client.port</name>
<value>7060</value>
Expand Down
10 changes: 10 additions & 0 deletions smart-common/src/main/java/org/smartdata/conf/SmartConfKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ public class SmartConfKeys {
"smart.cmdlet.hist.max.record.lifetime";
public static final String SMART_CMDLET_HIST_MAX_RECORD_LIFETIME_DEFAULT =
"30day";
public static final String SMART_CMDLET_CACHE_BATCH =
"smart.cmdlet.cache.batch";
public static final int SMART_CMDLET_CACHE_BATCH_DEFAULT =
600;

// Schedulers
public static final String SMART_COPY_SCHEDULER_BASE_SYNC_BATCH =
"smart.copy.scheduler.base.sync.batch";
public static final int SMART_COPY_SCHEDULER_BASE_SYNC_BATCH_DEFAULT =
500;

// Action
public static final String SMART_ACTION_MOVE_THROTTLE_MB_KEY = "smart.action.move.throttle.mb";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class CmdletManager extends AbstractService {
private MetaStore metaStore;
private AtomicLong maxActionId;
private AtomicLong maxCmdletId;
// cache sync threshold, default 500
private int cacheCmdTh = 600;

private int maxNumPendingCmdlets;
private List<Long> pendingCmdlet;
Expand All @@ -93,6 +95,7 @@ public class CmdletManager extends AbstractService {
private List<Long> runningCmdlets;
private Map<Long, CmdletInfo> idToCmdlets;
private Map<Long, ActionInfo> idToActions;
private Map<Long, CmdletInfo> cacheCmd;
private Map<String, Long> fileLocks;
private ListMultimap<String, ActionScheduler> schedulers = ArrayListMultimap.create();
private List<ActionSchedulerService> schedulerServices = new ArrayList<>();
Expand All @@ -115,12 +118,17 @@ public CmdletManager(ServerContext context) throws IOException {
this.idToLaunchCmdlet = new HashMap<>();
this.idToCmdlets = new ConcurrentHashMap<>();
this.idToActions = new ConcurrentHashMap<>();
this.cacheCmd = new ConcurrentHashMap<>();
this.fileLocks = new ConcurrentHashMap<>();
this.purgeTask = new CmdletPurgeTask(context.getConf());
this.dispatcher = new CmdletDispatcher(context, this, scheduledCmdlet,
idToLaunchCmdlet, runningCmdlets, schedulers);
maxNumPendingCmdlets = context.getConf().getInt(SmartConfKeys.SMART_CMDLET_MAX_NUM_PENDING_KEY,
maxNumPendingCmdlets = context.getConf()
.getInt(SmartConfKeys.SMART_CMDLET_MAX_NUM_PENDING_KEY,
SmartConfKeys.SMART_CMDLET_MAX_NUM_PENDING_DEFAULT);
cacheCmdTh = context.getConf()
.getInt(SmartConfKeys.SMART_CMDLET_CACHE_BATCH,
SmartConfKeys.SMART_CMDLET_CACHE_BATCH_DEFAULT);
}

@VisibleForTesting
Expand Down Expand Up @@ -298,6 +306,7 @@ public void start() throws IOException {

@Override
public void stop() throws IOException {
batchSyncCmdAction();
LOG.info("Stopping ...");
dispatcher.stop();
for (int i = schedulerServices.size() - 1; i >= 0; i--) {
Expand Down Expand Up @@ -357,32 +366,57 @@ public long submitCmdlet(CmdletDescriptor cmdletDescriptor) throws IOException {
*/
private void syncCmdAction(CmdletInfo cmdletInfo,
List<ActionInfo> actionInfos) throws IOException {
Set<String> filesLocked = lockMovefileActionFiles(actionInfos);
lockMovefileActionFiles(actionInfos);
LOG.debug("Cache cmd {}", cmdletInfo);
for (ActionInfo actionInfo : actionInfos) {
idToActions.put(actionInfo.getActionId(), actionInfo);
}
idToCmdlets.put(cmdletInfo.getCid(), cmdletInfo);
cacheCmd.put(cmdletInfo.getCid(), cmdletInfo);
synchronized (pendingCmdlet) {
pendingCmdlet.add(cmdletInfo.getCid());
}
}

private synchronized void batchSyncCmdAction() throws IOException {
if (cacheCmd.size() == 0) {
return;
}
LOG.debug("Number of cached cmds {}", cacheCmd.size());
List<CmdletInfo> cmdletInfos = new ArrayList<>();
List<ActionInfo> actionInfos = new ArrayList<>();
for (Long cid : cacheCmd.keySet()) {
cmdletInfos.add(cacheCmd.get(cid));
if (cmdletInfos.size() >= cacheCmdTh){
break;
}
for (Long aid : cacheCmd.get(cid).getAids()) {
actionInfos.add(idToActions.get(aid));
}
}
if (cmdletInfos.size() == 0) {
return;
}
LOG.debug("Number of cmds {} to submit", cmdletInfos.size());
try {
metaStore.insertCmdlet(cmdletInfo);
metaStore.insertCmdlets(
cmdletInfos.toArray(new CmdletInfo[cmdletInfos.size()]));
metaStore.insertActions(
actionInfos.toArray(new ActionInfo[actionInfos.size()]));
numCmdletsGen.incrementAndGet();
} catch (MetaStoreException e) {
LOG.error("{} submit to DB error", cmdletInfo, e);

try {
for (String file : filesLocked) {
fileLocks.remove(file);
LOG.error("{} submit to DB error", cmdletInfos, e);
for (CmdletInfo cmdletInfo : cmdletInfos) {
try {
metaStore.deleteCmdlet(cmdletInfo.getCid());
} catch (MetaStoreException e1) {
LOG.error("{} delete from DB error", cmdletInfo, e);
}
metaStore.deleteCmdlet(cmdletInfo.getCid());
} catch (MetaStoreException e1) {
LOG.error("{} delete from DB error", cmdletInfo, e);
}
throw new IOException(e);
}

for (ActionInfo actionInfo : actionInfos) {
idToActions.put(actionInfo.getActionId(), actionInfo);
}
idToCmdlets.put(cmdletInfo.getCid(), cmdletInfo);
synchronized (pendingCmdlet) {
pendingCmdlet.add(cmdletInfo.getCid());
for (CmdletInfo cmdletInfo : cmdletInfos) {
cacheCmd.remove(cmdletInfo.getCid());
}
}

Expand Down Expand Up @@ -871,7 +905,7 @@ public void deleteCmdletByRule(long rid) throws IOException {
}
}

public synchronized void updateStatus(StatusMessage status) {
public void updateStatus(StatusMessage status) {
LOG.debug("Got status update: " + status);
try {
if (status instanceof CmdletStatusUpdate) {
Expand All @@ -895,13 +929,15 @@ private void onCmdletStatusUpdate(CmdletStatusUpdate statusUpdate) throws IOExce
if (idToCmdlets.containsKey(cmdletId)) {
CmdletState state = statusUpdate.getCurrentState();
CmdletInfo cmdletInfo = idToCmdlets.get(cmdletId);
cmdletInfo.setState(state);
//The cmdlet is already finished or terminated, remove status from memory.
if (CmdletState.isTerminalState(state)) {
cmdletFinished(cmdletId);
synchronized (cmdletInfo) {
cmdletInfo.setState(state);
//The cmdlet is already finished or terminated, remove status from memory.
if (CmdletState.isTerminalState(state)) {
cmdletFinished(cmdletId);
} else {
// Updating cmdlet status which is not pending or running
}
}
} else {
// Updating cmdlet status which is not pending or running
}
}

Expand All @@ -924,7 +960,7 @@ private void onActionStatusReport(ActionStatusReport report) throws IOException
}
}

private void onActionStarted(ActionStarted started) {
private synchronized void onActionStarted(ActionStarted started) {
if (idToActions.containsKey(started.getActionId())) {
idToActions.get(started.getActionId()).setCreateTime(started.getTimestamp());
} else {
Expand Down Expand Up @@ -960,6 +996,7 @@ private void onActionFinished(ActionFinished finished) throws IOException, Actio

private void flushCmdletInfo(CmdletInfo info) throws IOException {
try {
cacheCmd.remove(info.getCid());
metaStore.updateCmdlet(info.getCid(), info.getRid(), info.getState());
} catch (MetaStoreException e) {
LOG.error(
Expand Down Expand Up @@ -1022,13 +1059,16 @@ protected List<ActionInfo> createActionInfos(CmdletDescriptor cmdletDescriptor,
}

private class ScheduleTask implements Runnable {
private int round;
public ScheduleTask() {
round = 0;
}

@Override
public void run() {
try {
int nScheduled;
batchSyncCmdAction();
do {
nScheduled = scheduleCmdlet();
totalScheduled += nScheduled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public DispatchTask(CmdletDispatcher dispatcher) {
@Override
public void run() {
long curr = System.currentTimeMillis();
if (curr - lastInfo >= 5000) {
if (curr - lastInfo >= 3000) {
if (!(statDispatched == 0 && statRound == statNoMoreCmdlet)) {
LOG.info(
"timeInterval={} statRound={} statFail={} statDispatched={} "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void run() {
// System.out.println(this + " -> " + System.currentTimeMillis());
long endProcessTime = System.currentTimeMillis();

if (endProcessTime - startCheckTime > 3000 || LOG.isDebugEnabled()) {
if (endProcessTime - startCheckTime > 2000 || LOG.isDebugEnabled()) {
LOG.warn(
"Rule "
+ ctx.getRuleId()
Expand All @@ -304,7 +304,9 @@ public void run() {
+ (endCheckTime - startCheckTime)
+ "ms, SubmitTime = "
+ (endProcessTime - endCheckTime)
+ "ms.");
+ "ms, fileNum = "
+ numCmdSubmitted
+ ".");
}

} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.action.SyncAction;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.hdfs.action.HdfsAction;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;
Expand Down Expand Up @@ -82,11 +83,11 @@ public class CopyScheduler extends ActionSchedulerService {
// Check interval of executorService
private long checkInterval = 150;
// Base sync batch insert size
private int batchSize = 300;
private int batchSize = 500;
// Cache of the file_diff
private Map<Long, FileDiff> fileDiffCache;
// cache sync threshold, default 50
private int cacheSyncTh = 50;
// cache sync threshold, default 100
private int cacheSyncTh = 100;
// record the file_diff whether being changed
private Map<Long, Boolean> fileDiffCacheChanged;

Expand All @@ -103,6 +104,9 @@ public CopyScheduler(SmartContext context, MetaStore metaStore) {
this.executorService = Executors.newSingleThreadScheduledExecutor();
try {
conf = getContext().getConf();
cacheSyncTh = conf.getInt(SmartConfKeys
.SMART_COPY_SCHEDULER_BASE_SYNC_BATCH,
SmartConfKeys.SMART_COPY_SCHEDULER_BASE_SYNC_BATCH_DEFAULT);
} catch (NullPointerException e) {
// SmartContext is empty
conf = new Configuration();
Expand Down Expand Up @@ -321,7 +325,9 @@ private FileStatus[] listFileStatuesOfDirs(String dirName) {
private void baseSync(String srcDir,
String destDir) throws MetaStoreException {
List<FileInfo> srcFiles = metaStore.getFilesByPrefix(srcDir);
LOG.debug("Directory Base Sync {} files", srcFiles.size());
if (srcFiles.size() > 0) {
LOG.info("Directory Base Sync {} files", srcFiles.size());
}
// <file name, fileInfo>
Map<String, FileInfo> srcFileSet = new HashMap<>();
for (FileInfo fileInfo : srcFiles) {
Expand Down Expand Up @@ -548,6 +554,11 @@ public void start() throws IOException {

@Override
public void stop() throws IOException {
try {
batchDirectSync();
} catch (MetaStoreException e) {
throw new IOException(e);
}
executorService.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,16 +902,19 @@ public void deleteAllRules() throws MetaStoreException {
}


public synchronized void insertCmdlets(CmdletInfo[] commands)
public void insertCmdlets(CmdletInfo[] commands)
throws MetaStoreException {
if (commands.length == 0) {
return;
}
try {
cmdletDao.insert(commands);
} catch (Exception e) {
throw new MetaStoreException(e);
}
}

public synchronized void insertCmdlet(CmdletInfo command)
public void insertCmdlet(CmdletInfo command)
throws MetaStoreException {
try {
// Update if exists
Expand Down Expand Up @@ -1032,7 +1035,7 @@ public int getNumCmdletsInTerminiatedStates() throws MetaStoreException {
}
}

public synchronized void insertActions(ActionInfo[] actionInfos)
public void insertActions(ActionInfo[] actionInfos)
throws MetaStoreException {
try {
actionDao.insert(actionInfos);
Expand All @@ -1041,7 +1044,7 @@ public synchronized void insertActions(ActionInfo[] actionInfos)
}
}

public synchronized void insertAction(ActionInfo actionInfo)
public void insertAction(ActionInfo actionInfo)
throws MetaStoreException {
LOG.debug("Insert Action ID {}", actionInfo.getActionId());
try {
Expand Down Expand Up @@ -1115,7 +1118,7 @@ public void updateAction(ActionInfo actionInfo) throws MetaStoreException {
}
}

public synchronized void updateActions(ActionInfo[] actionInfos)
public void updateActions(ActionInfo[] actionInfos)
throws MetaStoreException {
if (actionInfos == null || actionInfos.length == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ public void testWithoutCluster() throws MetaStoreException, IOException, Interru

cmdletManager.start();
cmdletManager.submitCmdlet("hello");
verify(metaStore, times(1)).insertCmdlet(any(CmdletInfo.class));
verify(metaStore, times(1)).insertActions(any(ActionInfo[].class));

Thread.sleep(500);
Assert.assertEquals(1, cmdletManager.getCmdletsSizeInCache());
Thread.sleep(1000);
verify(metaStore, times(1)).insertCmdlets(any(CmdletInfo[].class));
verify(metaStore, times(1)).insertActions(any(ActionInfo[].class));
Thread.sleep(500);

long actionStartTime = System.currentTimeMillis();
cmdletManager.updateStatus(new ActionStarted(actionId, actionStartTime));
Expand Down