Skip to content

Commit de2b3d7

Browse files
thinker0nwangtw
authored andcommitted
Support utils.Time of kafka-client-spout (apache#3116)
* Support utils.Time of kafka-client-spout
1 parent 38060b0 commit de2b3d7

File tree

1 file changed

+241
-0
lines changed
  • storm-compatibility/src/java/org/apache/storm/utils

1 file changed

+241
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.storm.utils;
20+
21+
import java.util.Iterator;
22+
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.concurrent.locks.LockSupport;
27+
import java.util.logging.Level;
28+
import java.util.logging.Logger;
29+
30+
/**
31+
* This class implements time simulation support. When time simulation is enabled,
32+
* methods on this class will use fixed time. When time simulation is disabled,
33+
* methods will pass through to relevant java.lang.System/java.lang.Thread calls.
34+
* Methods using units higher than nanoseconds will pass through to System.currentTimeMillis().
35+
* Methods supporting nanoseconds will pass through to System.nanoTime().
36+
*/
37+
public final class Time {
38+
private static final Logger LOG = Logger.getLogger(Time.class.getName());
39+
private static final AtomicBoolean SIMULATING = new AtomicBoolean(false);
40+
private static final AtomicLong AUTO_ADVANCE_NANOS_ON_SLEEP = new AtomicLong(0);
41+
private static final Map<Thread, AtomicLong> THREAD_SLEEP_TIMES_NANOS = new ConcurrentHashMap<>();
42+
private static final Object SLEEP_TIMES_LOCK = new Object();
43+
private static final AtomicLong SIMULATED_CURR_TIME_NANOS = new AtomicLong(0);
44+
45+
private Time() {
46+
}
47+
48+
public static boolean isSimulating() {
49+
return SIMULATING.get();
50+
}
51+
52+
public static void sleepUntil(long targetTimeMs) throws InterruptedException {
53+
if (SIMULATING.get()) {
54+
simulatedSleepUntilNanos(millisToNanos(targetTimeMs));
55+
} else {
56+
long sleepTimeMs = targetTimeMs - currentTimeMillis();
57+
if (sleepTimeMs > 0) {
58+
Thread.sleep(sleepTimeMs);
59+
}
60+
}
61+
}
62+
63+
public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException {
64+
if (SIMULATING.get()) {
65+
simulatedSleepUntilNanos(targetTimeNanos);
66+
} else {
67+
long sleepTimeNanos = targetTimeNanos - nanoTime();
68+
long sleepTimeMs = nanosToMillis(sleepTimeNanos);
69+
int sleepTimeNanosSansMs = (int) (sleepTimeNanos % 1_000_000);
70+
if (sleepTimeNanos > 0) {
71+
Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs);
72+
}
73+
}
74+
}
75+
76+
private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException {
77+
try {
78+
synchronized (SLEEP_TIMES_LOCK) {
79+
if (!SIMULATING.get()) {
80+
LOG.log(Level.FINER, Thread.currentThread()
81+
+ " is still sleeping after simulated time disabled.",
82+
new RuntimeException("STACK TRACE"));
83+
throw new InterruptedException();
84+
}
85+
THREAD_SLEEP_TIMES_NANOS.put(Thread.currentThread(), new AtomicLong(targetTimeNanos));
86+
}
87+
while (SIMULATED_CURR_TIME_NANOS.get() < targetTimeNanos) {
88+
synchronized (SLEEP_TIMES_LOCK) {
89+
if (!SIMULATING.get()) {
90+
LOG.log(Level.FINER, Thread.currentThread()
91+
+ " is still sleeping after simulated time disabled.",
92+
new RuntimeException("STACK TRACE"));
93+
throw new InterruptedException();
94+
}
95+
long autoAdvance = AUTO_ADVANCE_NANOS_ON_SLEEP.get();
96+
if (autoAdvance > 0) {
97+
advanceTimeNanos(autoAdvance);
98+
}
99+
}
100+
Thread.sleep(10);
101+
}
102+
} finally {
103+
THREAD_SLEEP_TIMES_NANOS.remove(Thread.currentThread());
104+
}
105+
}
106+
107+
public static void sleep(long ms) throws InterruptedException {
108+
if (ms > 0) {
109+
if (SIMULATING.get()) {
110+
simulatedSleepUntilNanos(millisToNanos(currentTimeMillis() + ms));
111+
} else {
112+
Thread.sleep(ms);
113+
}
114+
}
115+
}
116+
117+
public static void parkNanos(long nanos) throws InterruptedException {
118+
if (nanos > 0) {
119+
if (SIMULATING.get()) {
120+
simulatedSleepUntilNanos(nanoTime() + nanos);
121+
} else {
122+
LockSupport.parkNanos(nanos);
123+
}
124+
}
125+
}
126+
127+
public static void sleepSecs(long secs) throws InterruptedException {
128+
if (secs > 0) {
129+
sleep(secs * 1000);
130+
}
131+
}
132+
133+
public static long nanoTime() {
134+
if (SIMULATING.get()) {
135+
return SIMULATED_CURR_TIME_NANOS.get();
136+
} else {
137+
return System.nanoTime();
138+
}
139+
}
140+
141+
public static long currentTimeMillis() {
142+
if (SIMULATING.get()) {
143+
return nanosToMillis(SIMULATED_CURR_TIME_NANOS.get());
144+
} else {
145+
return System.currentTimeMillis();
146+
}
147+
}
148+
149+
public static long nanosToMillis(long nanos) {
150+
return nanos / 1_000_000;
151+
}
152+
153+
public static long millisToNanos(long millis) {
154+
return millis * 1_000_000;
155+
}
156+
157+
public static long secsToMillis(int secs) {
158+
return 1000 * (long) secs;
159+
}
160+
161+
public static long secsToMillisLong(double secs) {
162+
return (long) (1000 * secs);
163+
}
164+
165+
public static int currentTimeSecs() {
166+
return (int) (currentTimeMillis() / 1000);
167+
}
168+
169+
public static int deltaSecs(int timeInSeconds) {
170+
return Time.currentTimeSecs() - timeInSeconds;
171+
}
172+
173+
public static long deltaMs(long timeInMilliseconds) {
174+
return Time.currentTimeMillis() - timeInMilliseconds;
175+
}
176+
177+
public static void advanceTime(long ms) {
178+
advanceTimeNanos(millisToNanos(ms));
179+
}
180+
181+
public static void advanceTimeNanos(long nanos) {
182+
if (!SIMULATING.get()) {
183+
throw new IllegalStateException("Cannot simulate time unless in simulation mode");
184+
}
185+
if (nanos < 0) {
186+
throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
187+
}
188+
synchronized (SLEEP_TIMES_LOCK) {
189+
long newTime = SIMULATED_CURR_TIME_NANOS.addAndGet(nanos);
190+
Iterator<AtomicLong> sleepTimesIter = THREAD_SLEEP_TIMES_NANOS.values().iterator();
191+
while (sleepTimesIter.hasNext()) {
192+
AtomicLong curr = sleepTimesIter.next();
193+
if (SIMULATED_CURR_TIME_NANOS.get() >= curr.get()) {
194+
sleepTimesIter.remove();
195+
}
196+
}
197+
LOG.log(Level.FINER, "Advanced simulated time to " + newTime);
198+
}
199+
}
200+
201+
public static void advanceTimeSecs(long secs) {
202+
advanceTime(secs * 1_000);
203+
}
204+
205+
public static boolean isThreadWaiting(Thread t) {
206+
if (!SIMULATING.get()) {
207+
throw new IllegalStateException("Must be in simulation mode");
208+
}
209+
AtomicLong time = THREAD_SLEEP_TIMES_NANOS.get(t);
210+
return !t.isAlive() || time != null && nanoTime() < time.longValue();
211+
}
212+
213+
public static class SimulatedTime implements AutoCloseable {
214+
215+
public SimulatedTime() {
216+
this(null);
217+
}
218+
219+
public SimulatedTime(Number advanceTimeMs) {
220+
synchronized (Time.SLEEP_TIMES_LOCK) {
221+
Time.SIMULATING.set(true);
222+
Time.SIMULATED_CURR_TIME_NANOS.set(0);
223+
Time.THREAD_SLEEP_TIMES_NANOS.clear();
224+
if (advanceTimeMs != null) {
225+
Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(millisToNanos(advanceTimeMs.longValue()));
226+
} else {
227+
Time.AUTO_ADVANCE_NANOS_ON_SLEEP.set(0);
228+
}
229+
LOG.warning("AutoCloseable Simulated Time Starting...");
230+
}
231+
}
232+
233+
@Override
234+
public void close() {
235+
synchronized (Time.SLEEP_TIMES_LOCK) {
236+
Time.SIMULATING.set(false);
237+
LOG.warning("AutoCloseable Simulated Time Ending...");
238+
}
239+
}
240+
}
241+
}

0 commit comments

Comments
 (0)