Skip to content
This repository was archived by the owner on Jan 3, 2023. It is now read-only.

Commit 68eb135

Browse files
wayblinklittlezhou
authored andcommitted
#882 Add unit tests for move file actions (#915)
1 parent c3d6a5e commit 68eb135

File tree

2 files changed

+204
-190
lines changed

2 files changed

+204
-190
lines changed
Lines changed: 203 additions & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -1,189 +1,203 @@
1-
///**
2-
// * Licensed to the Apache Software Foundation (ASF) under one
3-
// * or more contributor license agreements. See the NOTICE file
4-
// * distributed with this work for additional information
5-
// * regarding copyright ownership. The ASF licenses this file
6-
// * to you under the Apache License, Version 2.0 (the
7-
// * "License"); you may not use this file except in compliance
8-
// * with the License. You may obtain a copy of the License at
9-
// *
10-
// * http://www.apache.org/licenses/LICENSE-2.0
11-
// *
12-
// * Unless required by applicable law or agreed to in writing, software
13-
// * distributed under the License is distributed on an "AS IS" BASIS,
14-
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
// * See the License for the specific language governing permissions and
16-
// * limitations under the License.
17-
// */
18-
//package org.smartdata.hdfs.action;
19-
//
20-
//import org.apache.hadoop.fs.FSDataOutputStream;
21-
//import org.apache.hadoop.fs.Path;
22-
//import org.junit.Assert;
23-
//import org.junit.Test;
24-
//import org.smartdata.actions.MockActionStatusReporter;
25-
//import org.smartdata.hdfs.MiniClusterWithStoragesHarness;
26-
//import org.smartdata.hdfs.action.move.MoverStatus;
27-
//import org.smartdata.protocol.message.ActionFinished;
28-
//import org.smartdata.protocol.message.StatusMessage;
29-
//import org.smartdata.protocol.message.StatusReporter;
30-
//
31-
//import java.util.HashMap;
32-
//import java.util.Map;
33-
//
34-
///**
35-
// * Test for MoveFileAction.
36-
// */
37-
//public class TestMoveFileAction extends MiniClusterWithStoragesHarness {
38-
// @Test(timeout = 300000)
39-
// public void testParallelMovers() throws Exception {
40-
// final String file1 = "/testParallelMovers/file1";
41-
// final String file2 = "/testParallelMovers/file2";
42-
// Path dir = new Path("/testParallelMovers");
43-
// dfs.mkdirs(dir);
44-
// // write to DISK
45-
// dfs.setStoragePolicy(dir, "HOT");
46-
// final FSDataOutputStream out1 = dfs.create(new Path(file1));
47-
// out1.writeChars("testParallelMovers1");
48-
// out1.close();
49-
// final FSDataOutputStream out2 = dfs.create(new Path(file2));
50-
// out2.writeChars("testParallelMovers2");
51-
// out2.close();
52-
//
53-
// // schedule move to ARCHIVE or SSD
54-
// ArchiveFileAction action1 = new ArchiveFileAction();
55-
// action1.setDfsClient(dfsClient);
56-
// action1.setContext(smartContext);
57-
// action1.setStatusReporter(new MockActionStatusReporter());
58-
// Map<String, String> args1 = new HashMap();
59-
// args1.put(ArchiveFileAction.FILE_PATH, file1);
60-
// action1.init(args1);
61-
//
62-
// AllSsdFileAction action2 = new AllSsdFileAction();
63-
// action2.setDfsClient(dfsClient);
64-
// action2.setContext(smartContext);
65-
// action2.setStatusReporter(new MockActionStatusReporter());
66-
//
67-
// Map<String, String> args2 = new HashMap();
68-
// args2.put(AllSsdFileAction.FILE_PATH, file2);
69-
// action2.init(args2);
70-
//
71-
// action1.run();
72-
// action2.run();
73-
// }
74-
//
75-
// @Test(timeout = 300000)
76-
// public void testMoverPercentage() throws Exception {
77-
// final String file1 = "/testParallelMovers/file1";
78-
// final String file2 = "/testParallelMovers/child/file2";
79-
// String dir = "/testParallelMovers";
80-
// dfs.mkdirs(new Path(dir));
81-
// dfs.mkdirs(new Path("/testParallelMovers/child"));
82-
//
83-
// // write to DISK
84-
// dfs.setStoragePolicy(new Path(dir), "HOT");
85-
// final FSDataOutputStream out1 = dfs.create(new Path(file1), (short)5);
86-
// final String string1 = "testParallelMovers1";
87-
// out1.writeChars(string1);
88-
// out1.close();
89-
// final FSDataOutputStream out2 = dfs.create(new Path(file2));
90-
// final String string2 = "testParallelMovers212345678901234567890";
91-
// out2.writeChars(string2);
92-
// out2.close();
93-
//
94-
// // schedule move to ALL_SSD
95-
// long totalSize1 = string1.length()*2*5;
96-
// long blockNum1 = 1*5;
97-
// long totalSize2 = string2.length()*2*3;
98-
// long blockNum2 = 2*3;
99-
// scheduleMoverWithPercentage(dir, "ALL_SSD", totalSize1 + totalSize2,
100-
// blockNum1 + blockNum2);
101-
//
102-
// // schedule move to ONE_SSD
103-
// totalSize1 = string1.length()*2*4;
104-
// blockNum1 = 1*4;
105-
// totalSize2 = string2.length()*2*2;
106-
// blockNum2 = 2*2;
107-
// scheduleMoverWithPercentage(dir, "ONE_SSD", totalSize1 + totalSize2,
108-
// blockNum1 + blockNum2);
109-
// }
110-
//
111-
// private void scheduleMoverWithPercentage(String dir, String storageType,
112-
// long totalSize, long totolBlocks) throws Exception {
113-
// MoveFileAction moveFileAction = new MoveFileAction();
114-
// moveFileAction.setDfsClient(dfsClient);
115-
// moveFileAction.setContext(smartContext);
116-
// moveFileAction.setStatusReporter(new MockActionStatusReporter());
117-
// Map<String, String> args = new HashMap();
118-
// args.put(MoveFileAction.FILE_PATH, dir);
119-
// args.put(MoveFileAction.STORAGE_POLICY, storageType);
120-
// moveFileAction.init(args);
121-
// moveFileAction.run();
122-
//
123-
// MoverStatus moverStatus = moveFileAction.getStatus();
124-
// System.out.println("Mover is finished.");
125-
// Assert.assertEquals(1.0f, moverStatus.getPercentage(), 0.00001f);
126-
// Assert.assertEquals(1.0f, moveFileAction.getProgress(), 0.00001f);
127-
// Assert.assertEquals(totalSize, moverStatus.getTotalSize());
128-
// Assert.assertEquals(totolBlocks, moverStatus.getTotalBlocks());
129-
// }
130-
//
131-
// @Test(timeout = 300000)
132-
// public void testMoveNonexitedFile() throws Exception {
133-
// String dir = "/testParallelMovers";
134-
//
135-
// // schedule move to ALL_SSD
136-
// MoveFileAction moveFileAction = new MoveFileAction();
137-
// moveFileAction.setDfsClient(dfsClient);
138-
// moveFileAction.setContext(smartContext);
139-
// moveFileAction.setStatusReporter(new StatusReporter() {
140-
// @Override
141-
// public void report(StatusMessage status) {
142-
// if (status instanceof ActionFinished) {
143-
// ActionFinished finished = (ActionFinished) status;
144-
// Assert.assertNotNull(finished.getThrowable());
145-
// }
146-
// }
147-
// });
148-
//
149-
// Map<String, String> args = new HashMap();
150-
// args.put(MoveFileAction.FILE_PATH, dir);
151-
// args.put(MoveFileAction.STORAGE_POLICY, "ALL_SSD");
152-
// moveFileAction.init(args);
153-
// moveFileAction.run();
154-
// }
155-
//
156-
// @Test
157-
// public void testMultiblockFile() throws Exception {
158-
// final String file1 = "/testParallelMovers/file1";
159-
// Path dir = new Path("/testParallelMovers");
160-
// dfs.mkdirs(dir);
161-
// // write to DISK
162-
// dfs.setStoragePolicy(dir, "HOT");
163-
// final FSDataOutputStream out1 = dfs.create(new Path(file1));
164-
// out1.writeChars("This is a block with 50B." +
165-
// "This is a block with 50B." +
166-
// "This is a block with 50B." +
167-
// "This is a block with 50B." +
168-
// "This is a block with 50B." +
169-
// "This is a block with 50B." +
170-
// "This is a block with 50B." +
171-
// "This is a block with 50B." +
172-
// "This is a block with 50B." +
173-
// "This is a block with 50B.");
174-
// out1.close();
175-
//
176-
// // schedule move to ARCHIVE or SSD
177-
// ArchiveFileAction action1 = new ArchiveFileAction();
178-
// action1.setDfsClient(dfsClient);
179-
// action1.setContext(smartContext);
180-
// action1.setStatusReporter(new MockActionStatusReporter());
181-
// Map<String, String> args1 = new HashMap();
182-
// args1.put(ArchiveFileAction.FILE_PATH, file1);
183-
// action1.init(args1);
184-
// action1.run();
185-
//
186-
// MoverStatus status = action1.getStatus();
187-
// Assert.assertEquals(1.0f, status.getPercentage(), 0.0000001f);
188-
// }
189-
//}
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.smartdata.hdfs.action;
19+
20+
import org.apache.hadoop.fs.FSDataOutputStream;
21+
import org.apache.hadoop.fs.Path;
22+
import org.apache.hadoop.fs.StorageType;
23+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
24+
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
25+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
import org.smartdata.action.MockActionStatusReporter;
29+
import org.smartdata.hdfs.MiniClusterHarness;
30+
import org.smartdata.hdfs.action.move.MoverExecutor;
31+
import org.smartdata.hdfs.action.move.StorageGroup;
32+
import org.smartdata.protocol.message.ActionFinished;
33+
import org.smartdata.protocol.message.StatusMessage;
34+
import org.smartdata.protocol.message.StatusReporter;
35+
36+
import java.net.URI;
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
40+
/**
41+
* Test for MoveFileAction.
42+
*/
43+
public class TestMoveFileAction extends MiniClusterHarness {
44+
45+
@Test(timeout = 300000)
46+
public void testParallelMove() throws Exception {
47+
String dir = "/test";
48+
String file1 = "/test/file1";
49+
String file2 = "/test/file2";
50+
dfs.mkdirs(new Path(dir));
51+
52+
//write to DISK
53+
dfs.setStoragePolicy(new Path(dir), "HOT");
54+
FSDataOutputStream out1 = dfs.create(new Path(file1));
55+
final String str1 = "testtesttest1";
56+
out1.writeChars(str1);
57+
out1.close();
58+
FSDataOutputStream out2 = dfs.create(new Path(file2));
59+
final String str2 = "testtesttest2";
60+
out2.writeChars(str2);
61+
out2.close();
62+
63+
//move to SSD
64+
MoveFileAction moveFileAction1 = new MoveFileAction();
65+
moveFileAction1.setDfsClient(dfsClient);
66+
moveFileAction1.setContext(smartContext);
67+
moveFileAction1.setStatusReporter(new MockActionStatusReporter());
68+
Map<String, String> args1 = new HashMap();
69+
args1.put(MoveFileAction.FILE_PATH, dir);
70+
String storageType1 = "ONE_SSD";
71+
args1.put(MoveFileAction.STORAGE_POLICY, storageType1);
72+
SchedulePlan plan1 = createPlan(file1, storageType1);
73+
args1.put(MoveFileAction.MOVE_PLAN, plan1.toString());
74+
75+
MoveFileAction moveFileAction2 = new MoveFileAction();
76+
moveFileAction2.setDfsClient(dfsClient);
77+
moveFileAction2.setContext(smartContext);
78+
moveFileAction2.setStatusReporter(new MockActionStatusReporter());
79+
Map<String, String> args2 = new HashMap();
80+
args2.put(MoveFileAction.FILE_PATH, dir);
81+
String storageType2 = "ONE_SSD";
82+
args2.put(MoveFileAction.STORAGE_POLICY, storageType2);
83+
SchedulePlan plan2 = createPlan(file2, storageType2);
84+
args2.put(MoveFileAction.MOVE_PLAN, plan2.toString());
85+
86+
//init and run
87+
moveFileAction1.init(args1);
88+
moveFileAction2.init(args2);
89+
moveFileAction1.run();
90+
moveFileAction2.run();
91+
}
92+
93+
@Test(timeout = 300000)
94+
public void testMove() throws Exception{
95+
String dir = "/test";
96+
String file = "/test/file";
97+
dfs.mkdirs(new Path(dir));
98+
99+
//write to DISK
100+
dfs.setStoragePolicy(new Path(dir), "HOT");
101+
FSDataOutputStream out = dfs.create(new Path(file));
102+
final String str = "testtesttest";
103+
out.writeChars(str);
104+
105+
//move to SSD
106+
MoveFileAction moveFileAction = new MoveFileAction();
107+
moveFileAction.setDfsClient(dfsClient);
108+
moveFileAction.setContext(smartContext);
109+
moveFileAction.setStatusReporter(new MockActionStatusReporter());
110+
Map<String, String> args = new HashMap();
111+
args.put(MoveFileAction.FILE_PATH, dir);
112+
String storageType = "ONE_SSD";
113+
args.put(MoveFileAction.STORAGE_POLICY, storageType);
114+
SchedulePlan plan = createPlan(file, storageType);
115+
args.put(MoveFileAction.MOVE_PLAN, plan.toString());
116+
117+
//init and run
118+
moveFileAction.init(args);
119+
moveFileAction.run();
120+
}
121+
122+
@Test(timeout = 300000)
123+
public void testMoveNonexitedFile() throws Exception {
124+
String dir = "/testParallelMovers";
125+
126+
// schedule move to ALL_SSD
127+
MoveFileAction moveFileAction = new MoveFileAction();
128+
moveFileAction.setDfsClient(dfsClient);
129+
moveFileAction.setContext(smartContext);
130+
moveFileAction.setStatusReporter(new StatusReporter() {
131+
@Override
132+
public void report(StatusMessage status) {
133+
if (status instanceof ActionFinished) {
134+
ActionFinished finished = (ActionFinished) status;
135+
Assert.assertNotNull(finished.getThrowable());
136+
}
137+
}
138+
});
139+
140+
Map<String, String> args = new HashMap();
141+
args.put(MoveFileAction.FILE_PATH, dir);
142+
args.put(MoveFileAction.STORAGE_POLICY, "ALL_SSD");
143+
moveFileAction.init(args);
144+
moveFileAction.run();
145+
}
146+
147+
@Test
148+
public void testMoveMultiblockFile() throws Exception {
149+
final String file1 = "/testParallelMovers/file1";
150+
Path dir = new Path("/testParallelMovers");
151+
dfs.mkdirs(dir);
152+
153+
// write to DISK
154+
dfs.setStoragePolicy(dir, "HOT");
155+
final FSDataOutputStream out1 = dfs.create(new Path(file1));
156+
out1.writeChars("This is a block with 50B." +
157+
"This is a block with 50B." +
158+
"This is a block with 50B." +
159+
"This is a block with 50B." +
160+
"This is a block with 50B." +
161+
"This is a block with 50B." +
162+
"This is a block with 50B." +
163+
"This is a block with 50B." +
164+
"This is a block with 50B." +
165+
"This is a block with 50B.");
166+
out1.close();
167+
168+
// schedule move to SSD
169+
ArchiveFileAction action1 = new ArchiveFileAction();
170+
action1.setDfsClient(dfsClient);
171+
action1.setContext(smartContext);
172+
action1.setStatusReporter(new MockActionStatusReporter());
173+
Map<String, String> args1 = new HashMap();
174+
args1.put(ArchiveFileAction.FILE_PATH, file1);
175+
args1.put(MoveFileAction.MOVE_PLAN, null);
176+
SchedulePlan plan = createPlan(file1, "SSD");
177+
args1.put(MoveFileAction.MOVE_PLAN, plan.toString());
178+
action1.init(args1);
179+
action1.run();
180+
}
181+
182+
private SchedulePlan createPlan(String dir, String storageType) throws Exception {
183+
URI namenode = cluster.getURI();
184+
SchedulePlan plan = new SchedulePlan(namenode, dir);
185+
// Schedule move in the same node
186+
for (LocatedBlock lb : MoverExecutor.getLocatedBlocks(dfsClient, dir)) {
187+
ExtendedBlock block = lb.getBlock();
188+
for (DatanodeInfo datanodeInfo : lb.getLocations()) {
189+
StorageGroup source = new StorageGroup(datanodeInfo, StorageType.DISK.toString());
190+
StorageGroup target = new StorageGroup(datanodeInfo, storageType);
191+
addPlan(plan, source, target, block.getBlockId());
192+
}
193+
}
194+
return plan;
195+
}
196+
197+
private void addPlan(SchedulePlan plan, StorageGroup source, StorageGroup target, long blockId) {
198+
DatanodeInfo sourceDatanode = source.getDatanodeInfo();
199+
DatanodeInfo targetDatanode = target.getDatanodeInfo();
200+
plan.addPlan(blockId, sourceDatanode.getDatanodeUuid(), source.getStorageType(),
201+
targetDatanode.getIpAddr(), targetDatanode.getXferPort(), target.getStorageType());
202+
}
203+
}

0 commit comments

Comments
 (0)