Skip to content
This repository was archived by the owner on Jan 3, 2023. It is now read-only.

Commit d79aca3

Browse files
authored
Solve #1678, fix delete failure during DFSIO (#1741)
* Add file check for mergedelete in CopySchduler.
1 parent 798f97d commit d79aca3

File tree

2 files changed

+119
-45
lines changed

2 files changed

+119
-45
lines changed

smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
136136
String srcDir = action.getArgs().get(SyncAction.SRC);
137137
String path = action.getArgs().get(HdfsAction.FILE_PATH);
138138
String destDir = action.getArgs().get(SyncAction.DEST);
139+
String destPath = path.replace(srcDir, destDir);
139140
// Check again to avoid corner cases
140141
long did = fileDiffChainMap.get(path).getHead();
141142
if (did == -1) {
@@ -155,7 +156,7 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
155156
switch (fileDiff.getDiffType()) {
156157
case APPEND:
157158
action.setActionType("copy");
158-
action.getArgs().put("-dest", path.replace(srcDir, destDir));
159+
action.getArgs().put("-dest", destPath);
159160
if (rateLimiter != null) {
160161
String strLen = fileDiff.getParameters().get("-length");
161162
if (strLen != null) {
@@ -173,19 +174,19 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
173174
break;
174175
case DELETE:
175176
action.setActionType("delete");
176-
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
177+
action.getArgs().put(HdfsAction.FILE_PATH, destPath);
177178
break;
178179
case RENAME:
179180
action.setActionType("rename");
180-
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
181+
action.getArgs().put(HdfsAction.FILE_PATH, destPath);
181182
// TODO scope check
182183
String remoteDest = fileDiff.getParameters().get("-dest");
183184
action.getArgs().put("-dest", remoteDest.replace(srcDir, destDir));
184185
fileDiff.getParameters().remove("-dest");
185186
break;
186187
case METADATA:
187188
action.setActionType("metadata");
188-
action.getArgs().put(HdfsAction.FILE_PATH, path.replace(srcDir, destDir));
189+
action.getArgs().put(HdfsAction.FILE_PATH, destPath);
189190
break;
190191
default:
191192
break;
@@ -620,6 +621,18 @@ private void unlockFile(long did){
620621
fileLock.remove(diff.getSrc());
621622
}
622623

624+
private boolean fileExistOnStandby(String filePath) {
625+
// TODO Need to be more general to handle failure
626+
try {
627+
// Check if file exists at standby cluster
628+
FileSystem fs = FileSystem.get(URI.create(filePath), conf);
629+
return fs.exists(new Path(filePath));
630+
} catch (IOException e) {
631+
LOG.debug("Fetch remote file status fails!", e);
632+
return false;
633+
}
634+
}
635+
623636
private class ScheduleTask implements Runnable {
624637

625638
private void syncFileDiff() {
@@ -805,7 +818,7 @@ void mergeDelete(FileDiff fileDiff) throws MetaStoreException {
805818
for (long did : appendChain) {
806819
FileDiff diff = fileDiffCache.get(did);
807820
if (diff.getParameters().containsKey("-offset")) {
808-
if (diff.getParameters().get("-offset").equals("0")) {
821+
if (!isCreate && diff.getParameters().get("-offset").equals("0")) {
809822
isCreate = true;
810823
}
811824
}
@@ -818,7 +831,13 @@ void mergeDelete(FileDiff fileDiff) throws MetaStoreException {
818831
// Delete raw is enough
819832
fileDiffCacheChanged.put(fileDiff.getDiffId(), true);
820833
}
821-
diffChain.add(fileDiff.getDiffId());
834+
if (fileExistOnStandby(filePath)) {
835+
// Only allow delete when file do exist on remote
836+
diffChain.add(fileDiff.getDiffId());
837+
} else {
838+
// Mark this delete diff as applied
839+
updateFileDiffInCache(fileDiff.getDiffId(), FileDiffState.APPLIED);
840+
}
822841
} else {
823842
updateFileDiffInCache(fileDiff.getDiffId(), FileDiffState.APPLIED);
824843
}

smart-server/src/test/java/org/smartdata/server/TestCopyScheduler.java

Lines changed: 94 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.smartdata.server;
1919

2020
/*import org.apache.hadoop.fs.FileStatus;
21+
import org.apache.hadoop.fs.Path;
2122
import org.apache.hadoop.hdfs.DFSTestUtil;
2223
import org.apache.hadoop.hdfs.DistributedFileSystem;
23-
import org.apache.hadoop.fs.Path;
2424
import org.junit.Assert;
2525
import org.junit.Test;
2626
import org.smartdata.admin.SmartAdmin;
@@ -43,8 +43,8 @@ public class TestCopyScheduler extends MiniSmartClusterHarness {
4343
public void appendMerge() throws Exception {
4444
waitTillSSMExitSafeMode();
4545
MetaStore metaStore = ssm.getMetaStore();
46-
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
47-
CmdletManager cmdletManager = ssm.getCmdletManager();
46+
// SmartAdmin admin = new SmartAdmin(smartContext.getConf());
47+
// CmdletManager cmdletManager = ssm.getCmdletManager();
4848
DistributedFileSystem dfs = cluster.getFileSystem();
4949
final String srcPath = "/src/";
5050
final String destPath = "/dest/";
@@ -55,7 +55,8 @@ public void appendMerge() throws Exception {
5555
// Write to src
5656
for (int i = 0; i < 3; i++) {
5757
// Create test files
58-
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
58+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
59+
1024, (short) 1, 1);
5960
for (int j = 0; j < 10; j++) {
6061
DFSTestUtil.appendFile(dfs, new Path(srcPath + i), 1024);
6162
}
@@ -71,8 +72,8 @@ public void appendMerge() throws Exception {
7172
public void deleteMerge() throws Exception {
7273
waitTillSSMExitSafeMode();
7374
MetaStore metaStore = ssm.getMetaStore();
74-
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
75-
CmdletManager cmdletManager = ssm.getCmdletManager();
75+
// SmartAdmin admin = new SmartAdmin(smartContext.getConf());
76+
// CmdletManager cmdletManager = ssm.getCmdletManager();
7677
DistributedFileSystem dfs = cluster.getFileSystem();
7778
final String srcPath = "/src/";
7879
final String destPath = "/dest/";
@@ -83,7 +84,8 @@ public void deleteMerge() throws Exception {
8384
// Write to src
8485
for (int i = 0; i < 3; i++) {
8586
// Create test files
86-
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
87+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
88+
1024, (short) 1, 1);
8789
do {
8890
Thread.sleep(500);
8991
} while (!dfs.isFileClosed(new Path(srcPath + i)));
@@ -106,7 +108,7 @@ public void deleteMerge() throws Exception {
106108
public void renameMerge() throws Exception {
107109
waitTillSSMExitSafeMode();
108110
MetaStore metaStore = ssm.getMetaStore();
109-
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
111+
// SmartAdmin admin = new SmartAdmin(smartContext.getConf());
110112
DistributedFileSystem dfs = cluster.getFileSystem();
111113
final String srcPath = "/src/";
112114
final String destPath = "/dest/";
@@ -117,8 +119,10 @@ public void renameMerge() throws Exception {
117119
// Write to src
118120
for (int i = 0; i < 3; i++) {
119121
// Create test files
120-
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
121-
dfs.rename(new Path(srcPath + i), new Path(srcPath + i + 10));
122+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
123+
1024, (short) 1, 1);
124+
dfs.rename(new Path(srcPath + i),
125+
new Path(srcPath + i + 10));
122126
// Rename target ends with 10
123127
DFSTestUtil.appendFile(dfs, new Path(srcPath + i + 10), 1024);
124128
}
@@ -138,11 +142,12 @@ public void renameMerge() throws Exception {
138142
public void failRetry() throws Exception {
139143
waitTillSSMExitSafeMode();
140144
MetaStore metaStore = ssm.getMetaStore();
141-
CmdletManager cmdletManager = ssm.getCmdletManager();
145+
// CmdletManager cmdletManager = ssm.getCmdletManager();
142146
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
143147
long ruleId =
144148
admin.submitRule(
145-
"file: every 1s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
149+
"file: every 1s | path matches \"/src/*\"| sync -dest /dest/",
150+
RuleState.ACTIVE);
146151
FileDiff fileDiff = new FileDiff(FileDiffType.RENAME, FileDiffState.PENDING);
147152
fileDiff.setSrc("/src/1");
148153
fileDiff.getParameters().put("-dest", "/src/2");
@@ -171,20 +176,23 @@ public void testForceSync() throws Exception {
171176
// Write to src
172177
for (int i = 0; i < 3; i++) {
173178
// Create test files
174-
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
179+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
180+
1024, (short) 1, 1);
175181
}
176182
177183
for (int i = 0; i < 3; i++) {
178184
// Create test files
179-
DFSTestUtil.createFile(dfs, new Path(destPath + i + 5), 1024, (short) 1, 1);
185+
DFSTestUtil.createFile(dfs, new Path(destPath + i + 5),
186+
1024, (short) 1, 1);
180187
}
181188
182189
// Clear file diffs
183190
metaStore.deleteAllFileDiff();
184191
// Submit rules and trigger forceSync
185192
long ruleId =
186193
admin.submitRule(
187-
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
194+
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
195+
RuleState.ACTIVE);
188196
Thread.sleep(1000);
189197
Assert.assertTrue(metaStore.getFileDiffs(FileDiffState.PENDING).size() > 0);
190198
}
@@ -200,14 +208,17 @@ public void batchSync() throws Exception {
200208
FileInfo fileInfo;
201209
long now = System.currentTimeMillis();
202210
for (int i = 0; i < 100; i++) {
203-
fileInfo = new FileInfo(srcPath + i, i, 1024, false, (short)3,
204-
1024, now, now, (short) 1, null, null, (byte)3);
211+
fileInfo = new FileInfo(srcPath + i, i,
212+
1024, false, (short) 3,
213+
1024, now, now, (short) 1,
214+
null, null, (byte) 3);
205215
metaStore.insertFile(fileInfo);
206216
Thread.sleep(100);
207217
}
208218
long ruleId =
209219
admin.submitRule(
210-
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
220+
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
221+
RuleState.ACTIVE);
211222
Thread.sleep(2200);
212223
do {
213224
Thread.sleep(1000);
@@ -222,15 +233,17 @@ public void testDelete() throws Exception {
222233
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
223234
long ruleId =
224235
admin.submitRule(
225-
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
236+
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
237+
RuleState.ACTIVE);
226238
FileDiff fileDiff = new FileDiff(FileDiffType.DELETE, FileDiffState.PENDING);
227239
fileDiff.setSrc("/src/1");
228240
metaStore.insertFileDiff(fileDiff);
229241
Thread.sleep(1200);
230242
do {
231243
Thread.sleep(1000);
232244
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() == 0);
233-
Assert.assertTrue(cmdletManager.listNewCreatedActions("sync", 0).size() > 0);
245+
Assert.assertTrue(cmdletManager
246+
.listNewCreatedActions("sync", 0).size() > 0);
234247
}
235248
236249
@Test(timeout = 60000)
@@ -241,7 +254,8 @@ public void testRename() throws Exception {
241254
SmartAdmin admin = new SmartAdmin(smartContext.getConf());
242255
long ruleId =
243256
admin.submitRule(
244-
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
257+
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
258+
RuleState.ACTIVE);
245259
FileDiff fileDiff = new FileDiff(FileDiffType.RENAME, FileDiffState.PENDING);
246260
fileDiff.setSrc("/src/1");
247261
fileDiff.getParameters().put("-dest", "/src/2");
@@ -250,7 +264,8 @@ public void testRename() throws Exception {
250264
do {
251265
Thread.sleep(1000);
252266
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() == 0);
253-
Assert.assertTrue(cmdletManager.listNewCreatedActions("sync", 0).size() > 0);
267+
Assert.assertTrue(cmdletManager
268+
.listNewCreatedActions("sync", 0).size() > 0);
254269
}
255270
256271
@Test
@@ -266,10 +281,12 @@ public void testMeta() throws Exception {
266281
dfs.mkdirs(new Path(destPath));
267282
long ruleId =
268283
admin.submitRule(
269-
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/", RuleState.ACTIVE);
284+
"file: every 2s | path matches \"/src/*\"| sync -dest /dest/",
285+
RuleState.ACTIVE);
270286
Thread.sleep(4200);
271287
// Write to src
272-
DFSTestUtil.createFile(dfs, new Path(srcPath + 1), 1024, (short) 1, 1);
288+
DFSTestUtil.createFile(dfs, new Path(srcPath + 1),
289+
1024, (short) 1, 1);
273290
Thread.sleep(1000);
274291
FileDiff fileDiff = new FileDiff(FileDiffType.METADATA, FileDiffState.PENDING);
275292
fileDiff.setSrc("/src/1");
@@ -306,12 +323,14 @@ public void testCache() throws Exception {
306323
// Write to src
307324
for (int i = 0; i < 3; i++) {
308325
// Create test files
309-
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
326+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
327+
1024, (short) 1, 1);
310328
}
311329
do {
312330
Thread.sleep(1000);
313331
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() <= 2);
314-
List<ActionInfo> actionInfos = cmdletManager.listNewCreatedActions("sync", 0);
332+
List<ActionInfo> actionInfos = cmdletManager
333+
.listNewCreatedActions("sync", 0);
315334
Assert.assertTrue(actionInfos.size() >= 3);
316335
Thread.sleep(20000);
317336
}
@@ -341,12 +360,14 @@ public void testWithSyncRule() throws Exception {
341360
// Write to src
342361
for (int i = 0; i < 3; i++) {
343362
// Create test files
344-
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
363+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
364+
1024, (short) 1, 1);
345365
}
346366
do {
347367
Thread.sleep(1000);
348368
} while (admin.getRuleInfo(ruleId).getNumCmdsGen() <= 2);
349-
List<ActionInfo> actionInfos = cmdletManager.listNewCreatedActions("sync", 0);
369+
List<ActionInfo> actionInfos = cmdletManager
370+
.listNewCreatedActions("sync", 0);
350371
Assert.assertTrue(actionInfos.size() >= 3);
351372
do {
352373
Thread.sleep(800);
@@ -376,7 +397,8 @@ public void testCopy() throws Exception {
376397
// Write to src
377398
for (int i = 0; i < 3; i++) {
378399
// Create test files
379-
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
400+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
401+
1024, (short) 1, 1);
380402
}
381403
Thread.sleep(1000);
382404
CmdletManager cmdletManager = ssm.getCmdletManager();
@@ -386,7 +408,8 @@ public void testCopy() throws Exception {
386408
cmdletManager.submitCmdlet(
387409
"sync -file /src/" + i + " -src " + srcPath + " -dest " + destPath);
388410
}
389-
List<ActionInfo> actionInfos = cmdletManager.listNewCreatedActions("sync", 0);
411+
List<ActionInfo> actionInfos = cmdletManager
412+
.listNewCreatedActions("sync", 0);
390413
Assert.assertTrue(actionInfos.size() >= 3);
391414
do {
392415
Thread.sleep(1000);
@@ -399,6 +422,45 @@ public void testCopy() throws Exception {
399422
}
400423
}
401424
425+
@Test(timeout = 40000)
426+
public void testEmpyDelete() throws Exception {
427+
// Delete files not exist on standby cluster
428+
waitTillSSMExitSafeMode();
429+
MetaStore metaStore = ssm.getMetaStore();
430+
// metaStore.deleteAllFileDiff();
431+
// metaStore.deleteAllFileInfo();
432+
// metaStore.deleteAllCmdlets();
433+
// metaStore.deleteAllActions();
434+
DistributedFileSystem dfs = cluster.getFileSystem();
435+
final String srcPath = "/src/";
436+
final String destPath = "/dest/";
437+
// Write to src
438+
for (int i = 0; i < 3; i++) {
439+
// Create test files
440+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
441+
1024, (short) 1, 1);
442+
}
443+
Thread.sleep(500);
444+
BackUpInfo backUpInfo = new BackUpInfo(1L, srcPath, destPath, 100);
445+
metaStore.insertBackUpInfo(backUpInfo);
446+
dfs.mkdirs(new Path(srcPath));
447+
dfs.mkdirs(new Path(destPath));
448+
Thread.sleep(100);
449+
for (int i = 0; i < 3; i++) {
450+
// delete test files on primary cluster
451+
dfs.delete(new Path(srcPath + i), false);
452+
}
453+
454+
Thread.sleep(2000);
455+
CmdletManager cmdletManager = ssm.getCmdletManager();
456+
// Submit sync action
457+
for (int i = 0; i < 3; i++) {
458+
// Create test files
459+
cmdletManager.submitCmdlet(
460+
"sync -file /src/" + i + " -src " + srcPath + " -dest " + destPath);
461+
}
462+
}
463+
402464
@Test(timeout = 40000)
403465
public void testCopyDelete() throws Exception {
404466
waitTillSSMExitSafeMode();
@@ -417,7 +479,8 @@ public void testCopyDelete() throws Exception {
417479
// Write to src
418480
for (int i = 0; i < 3; i++) {
419481
// Create test files
420-
DFSTestUtil.createFile(dfs, new Path(srcPath + i), 1024, (short) 1, 1);
482+
DFSTestUtil.createFile(dfs, new Path(srcPath + i),
483+
1024, (short) 1, 1);
421484
dfs.delete(new Path(srcPath + i), false);
422485
}
423486
@@ -429,13 +492,5 @@ public void testCopyDelete() throws Exception {
429492
cmdletManager.submitCmdlet(
430493
"sync -file /src/" + i + " -src " + srcPath + " -dest " + destPath);
431494
}
432-
List<ActionInfo> actionInfos = cmdletManager.listNewCreatedActions("sync", 0);
433-
Assert.assertTrue(actionInfos.size() >= 3);
434-
Thread.sleep(3000);
435-
for (int i = 0; i < 3; i++) {
436-
// Write 10 files
437-
Assert.assertFalse(dfs.exists(new Path(destPath + i)));
438-
System.out.printf("File %d is copied.\n", i);
439-
}
440495
}
441496
}*/

0 commit comments

Comments
 (0)