17
17
*/
18
18
package org .smartdata .hdfs .scheduler ;
19
19
20
+ import com .google .common .util .concurrent .RateLimiter ;
20
21
import org .apache .hadoop .util .VersionInfo ;
21
22
import org .slf4j .Logger ;
22
23
import org .slf4j .LoggerFactory ;
23
24
import org .smartdata .SmartContext ;
24
25
import org .smartdata .conf .SmartConf ;
26
+ import org .smartdata .conf .SmartConfKeys ;
25
27
import org .smartdata .hdfs .action .*;
26
28
import org .smartdata .metastore .MetaStore ;
27
29
import org .smartdata .metastore .MetaStoreException ;
@@ -48,11 +50,18 @@ public class ErasureCodingScheduler extends ActionSchedulerService {
48
50
private Set <String > fileLock ;
49
51
private SmartConf conf ;
50
52
private MetaStore metaStore ;
53
+ private long throttleInMb ;
54
+ private RateLimiter rateLimiter ;
51
55
52
56
public ErasureCodingScheduler (SmartContext context , MetaStore metaStore ) {
53
57
super (context , metaStore );
54
58
this .conf = context .getConf ();
55
59
this .metaStore = metaStore ;
60
+ this .throttleInMb = conf .getLong (
61
+ SmartConfKeys .SMART_ACTION_EC_THROTTLE_MB_KEY , SmartConfKeys .SMART_ACTION_EC_THROTTLE_MB_DEFAULT );
62
+ if (this .throttleInMb > 0 ) {
63
+ this .rateLimiter = RateLimiter .create (throttleInMb );
64
+ }
56
65
}
57
66
58
67
public List <String > getSupportedActions () {
@@ -112,17 +121,25 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {
112
121
return ScheduleResult .SUCCESS ;
113
122
}
114
123
115
- // check file lock merely for ec & unec action
116
- if (fileLock .contains (srcPath )) {
117
- return ScheduleResult .FAIL ;
118
- }
119
124
try {
120
- if (!metaStore .getFile (srcPath ).isdir ()) {
121
- // For ec or unec, add ecTmp argument
122
- String tmpName = createTmpName (action );
123
- action .getArgs ().put (EC_TMP , EC_DIR + tmpName );
124
- actionInfo .getArgs ().put (EC_TMP , EC_DIR + tmpName );
125
+ if (metaStore .getFile (srcPath ).isdir ()) {
126
+ return ScheduleResult .SUCCESS ;
127
+ }
128
+ // The below code is just for ec or unec action with file as argument, not directory
129
+ // check file lock merely for ec & unec action
130
+ if (fileLock .contains (srcPath )) {
131
+ return ScheduleResult .FAIL ;
132
+ }
133
+ if (isLimitedByThrottle (srcPath )) {
134
+ if (LOG .isDebugEnabled ()) {
135
+ LOG .debug ("Failed to schedule {} due to limitation of throttle!" , actionInfo );
136
+ }
137
+ return ScheduleResult .RETRY ;
125
138
}
139
+ // For ec or unec, add ecTmp argument
140
+ String tmpName = createTmpName (action );
141
+ action .getArgs ().put (EC_TMP , EC_DIR + tmpName );
142
+ actionInfo .getArgs ().put (EC_TMP , EC_DIR + tmpName );
126
143
} catch (MetaStoreException ex ) {
127
144
LOG .error ("Error occurred for getting file info" , ex );
128
145
actionInfo .appendLog (ex .getMessage ());
@@ -159,4 +176,15 @@ public void onActionFinished(ActionInfo actionInfo) {
159
176
fileLock .remove (actionInfo .getArgs ().get (HdfsAction .FILE_PATH ));
160
177
}
161
178
}
179
+
180
+ public boolean isLimitedByThrottle (String srcPath ) throws MetaStoreException {
181
+ if (this .rateLimiter == null ) {
182
+ return false ;
183
+ }
184
+ int fileLengthInMb = (int ) metaStore .getFile (srcPath ).getLength () >> 20 ;
185
+ if (fileLengthInMb > 0 ) {
186
+ return !rateLimiter .tryAcquire (fileLengthInMb );
187
+ }
188
+ return false ;
189
+ }
162
190
}
0 commit comments