13
13
import edu .umd .cs .findbugs .annotations .NonNull ;
14
14
import java .io .IOException ;
15
15
import java .nio .channels .ClosedByInterruptException ;
16
+ import java .util .HashMap ;
16
17
import java .util .Map ;
17
18
import java .util .concurrent .Callable ;
18
- import java .util .concurrent .ConcurrentHashMap ;
19
19
import java .util .concurrent .ExecutorService ;
20
- import java .util .concurrent .Future ;
21
20
import java .util .concurrent .LinkedBlockingQueue ;
22
21
import java .util .concurrent .ThreadPoolExecutor ;
23
22
import java .util .concurrent .TimeUnit ;
24
- import java .util .concurrent .atomic .AtomicBoolean ;
25
- import java .util .concurrent .atomic .AtomicInteger ;
26
23
import org .apache .logging .log4j .LogManager ;
27
24
import org .apache .logging .log4j .Logger ;
28
25
@@ -75,21 +72,15 @@ static synchronized ExecutorService getCompactionExecutor(final @NonNull MerkleD
75
72
return compactionExecutor ;
76
73
}
77
74
78
- private final AtomicBoolean compactionEnabled = new AtomicBoolean ();
75
+ // Synchronized on this
76
+ private boolean compactionEnabled = false ;
79
77
80
- // A map of compactor futures by task names
81
- final Map <String , Future <Boolean >> futuresByName = new ConcurrentHashMap <>(16 );
82
-
83
- // A map of compactors by task names
84
- final Map <String , DataFileCompactor > compactorsByName = new ConcurrentHashMap <>(16 );
78
+ // A map of compactors by task names. Synchronized on this
79
+ final Map <String , DataFileCompactor > compactorsByName = new HashMap <>(16 );
85
80
86
81
@ NonNull
87
82
private final MerkleDbConfig merkleDbConfig ;
88
83
89
- // Number of compaction tasks currently running. Checked during shutdown to make sure all
90
- // tasks are stopped
91
- private final AtomicInteger tasksRunning = new AtomicInteger (0 );
92
-
93
84
/**
94
85
* Creates a new instance of {@link MerkleDbCompactionCoordinator}.
95
86
* @param tableName the name of the table
@@ -104,8 +95,8 @@ public MerkleDbCompactionCoordinator(@NonNull String tableName, @NonNull MerkleD
104
95
/**
105
96
* Enables background compaction.
106
97
*/
107
- void enableBackgroundCompaction () {
108
- compactionEnabled . set ( true ) ;
98
+ synchronized void enableBackgroundCompaction () {
99
+ compactionEnabled = true ;
109
100
}
110
101
111
102
/**
@@ -114,7 +105,7 @@ void enableBackgroundCompaction() {
114
105
* critical for snapshots (e.g. update an index), it will be stopped until {@link
115
106
* #resumeCompaction()}} is called.
116
107
*/
117
- public void pauseCompaction () throws IOException {
108
+ synchronized void pauseCompaction () throws IOException {
118
109
for (final DataFileCompactor compactor : compactorsByName .values ()) {
119
110
compactor .pauseCompaction ();
120
111
}
@@ -123,7 +114,7 @@ public void pauseCompaction() throws IOException {
123
114
/**
124
115
* Resumes previously stopped data file collection compaction.
125
116
*/
126
- public void resumeCompaction () throws IOException {
117
+ synchronized void resumeCompaction () throws IOException {
127
118
for (final DataFileCompactor compactor : compactorsByName .values ()) {
128
119
compactor .resumeCompaction ();
129
120
}
@@ -133,26 +124,24 @@ public void resumeCompaction() throws IOException {
133
124
* Stops all compactions in progress and disables background compaction. All subsequent calls to
134
125
* compacting methods will be ignored until {@link #enableBackgroundCompaction()} is called.
135
126
*/
136
- void stopAndDisableBackgroundCompaction () {
137
- compactionEnabled .set (false );
127
+ synchronized void stopAndDisableBackgroundCompaction () {
128
+ // Make sure no new compaction tasks are scheduled
129
+ compactionEnabled = false ;
138
130
// Interrupt all running compaction tasks, if any
139
- for (final Future < Boolean > futureEntry : futuresByName .values ()) {
140
- futureEntry . cancel ( true );
131
+ for (final DataFileCompactor compactor : compactorsByName .values ()) {
132
+ compactor . interruptCompaction ( );
141
133
}
142
- futuresByName .clear ();
143
- compactorsByName .clear ();
144
134
// Wait till all the tasks are stopped
145
- final long now = System .currentTimeMillis ();
146
135
try {
147
- while (( tasksRunning . get () != 0 ) && ( System . currentTimeMillis () - now < SHUTDOWN_TIMEOUT_MILLIS )) {
148
- Thread . sleep ( 1 );
136
+ while (! compactorsByName . isEmpty ( )) {
137
+ wait ( SHUTDOWN_TIMEOUT_MILLIS );
149
138
}
150
139
} catch (final InterruptedException e ) {
151
140
logger .warn (MERKLE_DB .getMarker (), "Interrupted while waiting for compaction tasks to complete" , e );
152
141
}
153
142
// If some tasks are still running, there is nothing else to than to log it
154
- if (tasksRunning . get () != 0 ) {
155
- logger .error (MERKLE_DB .getMarker (), "Failed to stop all compactions tasks" );
143
+ if (! compactorsByName . isEmpty () ) {
144
+ logger .warn (MERKLE_DB .getMarker (), "Timed out waiting to stop all compactions tasks" );
156
145
}
157
146
}
158
147
@@ -163,19 +152,18 @@ void stopAndDisableBackgroundCompaction() {
163
152
* @param key Compaction task name
164
153
* @param compactor Compactor to run
165
154
*/
166
- public void compactIfNotRunningYet (final String key , final DataFileCompactor compactor ) {
167
- if (!compactionEnabled . get () ) {
155
+ public synchronized void compactIfNotRunningYet (final String key , final DataFileCompactor compactor ) {
156
+ if (!compactionEnabled ) {
168
157
return ;
169
158
}
170
159
if (isCompactionRunning (key )) {
171
160
logger .info (MERKLE_DB .getMarker (), "Compaction for {} is already in progress" , key );
172
161
return ;
173
162
}
174
- assert !compactorsByName .containsKey (key );
175
163
compactorsByName .put (key , compactor );
176
164
final ExecutorService executor = getCompactionExecutor (merkleDbConfig );
177
165
final CompactionTask task = new CompactionTask (key , compactor );
178
- futuresByName . put ( key , executor .submit (task ) );
166
+ executor .submit (task );
179
167
}
180
168
181
169
/**
@@ -184,13 +172,12 @@ public void compactIfNotRunningYet(final String key, final DataFileCompactor com
184
172
* @param key Compactor name
185
173
* @return {@code true} if compaction with this name is currently running, {@code false} otherwise
186
174
*/
187
- public boolean isCompactionRunning (final String key ) {
188
- final Future <?> future = futuresByName .get (key );
189
- return (future != null ) && !future .isDone ();
175
+ synchronized boolean isCompactionRunning (final String key ) {
176
+ return compactorsByName .containsKey (key );
190
177
}
191
178
192
- boolean isCompactionEnabled () {
193
- return compactionEnabled . get () ;
179
+ synchronized boolean isCompactionEnabled () {
180
+ return compactionEnabled ;
194
181
}
195
182
196
183
/**
@@ -211,7 +198,6 @@ public CompactionTask(@NonNull String id, @NonNull DataFileCompactor compactor)
211
198
212
199
@ Override
213
200
public Boolean call () {
214
- tasksRunning .incrementAndGet ();
215
201
try {
216
202
return compactor .compact ();
217
203
} catch (final InterruptedException | ClosedByInterruptException e ) {
@@ -221,9 +207,10 @@ public Boolean call() {
221
207
// will stop all future merges from happening
222
208
logger .error (EXCEPTION .getMarker (), "[{}] Compaction failed" , id , e );
223
209
} finally {
224
- compactorsByName .remove (id );
225
- futuresByName .remove (id );
226
- tasksRunning .decrementAndGet ();
210
+ synchronized (MerkleDbCompactionCoordinator .this ) {
211
+ compactorsByName .remove (id );
212
+ MerkleDbCompactionCoordinator .this .notifyAll ();
213
+ }
227
214
}
228
215
return false ;
229
216
}
0 commit comments