16
16
17
17
package com .google .gcloud .datastore .testing ;
18
18
19
+ import static com .google .common .base .MoreObjects .firstNonNull ;
19
20
import static java .nio .charset .StandardCharsets .UTF_8 ;
20
21
21
22
import com .google .common .base .Strings ;
54
55
import java .util .List ;
55
56
import java .util .Locale ;
56
57
import java .util .Map ;
58
+ import java .util .logging .Level ;
59
+ import java .util .logging .Logger ;
57
60
import java .util .regex .Pattern ;
58
61
import java .util .zip .ZipEntry ;
59
62
import java .util .zip .ZipInputStream ;
60
63
61
- import java .util .logging .Level ;
62
- import java .util .logging .Logger ;
63
-
64
64
/**
65
65
* Utility to start and stop local Google Cloud Datastore process.
66
66
*/
67
67
public class LocalGcdHelper {
68
-
69
68
private static final Logger log = Logger .getLogger (LocalGcdHelper .class .getName ());
70
69
71
70
private final String projectId ;
72
71
private Path gcdPath ;
72
+ private Process startProcess ;
73
73
private ProcessStreamReader processReader ;
74
+ private ProcessErrorStreamReader processErrorReader ;
74
75
private final int port ;
75
76
76
77
public static final String DEFAULT_PROJECT_ID = "projectid1" ;
@@ -179,49 +180,139 @@ private static Path executablePath(String cmd) {
179
180
}
180
181
181
182
private static class ProcessStreamReader extends Thread {
182
-
183
- private final Process process ;
184
183
private final BufferedReader reader ;
184
+ private volatile boolean terminated ;
185
185
186
- ProcessStreamReader (Process process , String blockUntil ) throws IOException {
186
+ ProcessStreamReader (InputStream inputStream ) {
187
187
super ("Local GCD InputStream reader" );
188
188
setDaemon (true );
189
- this .process = process ;
190
- reader = new BufferedReader (new InputStreamReader (process .getInputStream ()));
189
+ reader = new BufferedReader (new InputStreamReader (inputStream ));
190
+ }
191
+
192
+ void terminate () throws IOException {
193
+ terminated = true ;
194
+ reader .close ();
195
+ }
196
+
197
+ @ Override
198
+ public void run () {
199
+ while (!terminated ) {
200
+ try {
201
+ String line = reader .readLine ();
202
+ if (line == null ) {
203
+ terminated = true ;
204
+ }
205
+ } catch (IOException e ) {
206
+ // ignore
207
+ }
208
+ }
209
+ }
210
+
211
+ public static ProcessStreamReader start (InputStream inputStream ) {
212
+ ProcessStreamReader thread = new ProcessStreamReader (inputStream );
213
+ thread .start ();
214
+ return thread ;
215
+ }
216
+ }
217
+
218
+ private static class ProcessErrorStreamReader extends Thread {
219
+ private static final int LOG_LENGTH_LIMIT = 50000 ;
220
+ private static final String GCD_LOGGING_CLASS =
221
+ "com.google.apphosting.client.serviceapp.BaseApiServlet" ;
222
+
223
+ private final BufferedReader errorReader ;
224
+ private StringBuilder currentLog ;
225
+ private Level currentLogLevel ;
226
+ private boolean collectionMode ;
227
+ private volatile boolean terminated ;
228
+
229
+ ProcessErrorStreamReader (InputStream errorStream , String blockUntil ) throws IOException {
230
+ super ("Local GCD ErrorStream reader" );
231
+ setDaemon (true );
232
+ errorReader = new BufferedReader (new InputStreamReader (errorStream ));
191
233
if (!Strings .isNullOrEmpty (blockUntil )) {
192
234
String line ;
193
235
do {
194
- line = reader .readLine ();
236
+ line = errorReader .readLine ();
195
237
} while (line != null && !line .contains (blockUntil ));
196
238
}
197
239
}
198
240
199
- void terminate () throws InterruptedException , IOException {
200
- process .destroy ();
201
- process .waitFor ();
202
- reader .close ();
241
+ void terminate () throws IOException {
242
+ terminated = true ;
243
+ errorReader .close ();
203
244
}
204
245
205
246
@ Override
206
247
public void run () {
207
- try {
208
- while (reader .readLine () != null ) {
209
- // consume line
248
+ String previousLine = "" ;
249
+ String nextLine = "" ;
250
+ while (!terminated ) {
251
+ try {
252
+ previousLine = nextLine ;
253
+ nextLine = errorReader .readLine ();
254
+ if (nextLine == null ) {
255
+ terminated = true ;
256
+ } else {
257
+ processLogLine (previousLine , nextLine );
258
+ }
259
+ } catch (IOException e ) {
260
+ // ignore
261
+ }
262
+ }
263
+ processLogLine (previousLine , firstNonNull (nextLine , "" ));
264
+ writeLog (currentLogLevel , currentLog );
265
+ }
266
+
267
+ private void processLogLine (String previousLine , String nextLine ) {
268
+ // Each gcd log is two lines with the following format:
269
+ // [Date] [Time] [GCD_LOGGING_CLASS] [method]
270
+ // [LEVEL]: error message
271
+ // Exceptions and stack traces are included in gcd error stream, separated by a newline
272
+ Level nextLogLevel = getLevel (nextLine );
273
+ if (nextLogLevel != null ) {
274
+ writeLog (currentLogLevel , currentLog );
275
+ currentLog = new StringBuilder ();
276
+ currentLogLevel = nextLogLevel ;
277
+ collectionMode = previousLine .contains (GCD_LOGGING_CLASS );
278
+ } else if (collectionMode ) {
279
+ if (currentLog .length () > LOG_LENGTH_LIMIT ) {
280
+ collectionMode = false ;
281
+ } else if (currentLog .length () == 0 ) {
282
+ // strip level out of the line
283
+ currentLog .append ("GCD" );
284
+ currentLog .append (previousLine .split (":" , 2 )[1 ]);
285
+ currentLog .append (System .getProperty ("line.separator" ));
286
+ } else {
287
+ currentLog .append (previousLine );
288
+ currentLog .append (System .getProperty ("line.separator" ));
210
289
}
211
- } catch (IOException e ) {
212
- // ignore
213
290
}
214
291
}
215
292
216
- public static ProcessStreamReader start (Process process , String blockUntil ) throws IOException {
217
- ProcessStreamReader thread = new ProcessStreamReader (process , blockUntil );
293
+ private static void writeLog (Level level , StringBuilder msg ) {
294
+ if (level != null && msg != null && msg .length () != 0 ) {
295
+ log .log (level , msg .toString ().trim ());
296
+ }
297
+ }
298
+
299
+ private static Level getLevel (String line ) {
300
+ try {
301
+ return Level .parse (line .split (":" )[0 ]);
302
+ } catch (IllegalArgumentException e ) {
303
+ return null ; // level wasn't supplied in this log line
304
+ }
305
+ }
306
+
307
+ public static ProcessErrorStreamReader start (InputStream errorStream , String blockUntil )
308
+ throws IOException {
309
+ ProcessErrorStreamReader thread = new ProcessErrorStreamReader (errorStream , blockUntil );
218
310
thread .start ();
219
311
return thread ;
220
312
}
221
313
}
222
314
223
315
private static class CommandWrapper {
224
-
225
316
private final List <String > prefix ;
226
317
private List <String > command ;
227
318
private String nullFilename ;
@@ -392,13 +483,15 @@ private void startGcd(Path executablePath) throws IOException, InterruptedExcept
392
483
if (log .isLoggable (Level .FINE )) {
393
484
log .log (Level .FINE , "Starting datastore emulator for the project: {0}" , projectId );
394
485
}
395
- Process startProcess = CommandWrapper .create ()
396
- .command (gcdAbsolutePath .toString (), "start" , "--testing" , "--allow_remote_shutdown" ,
397
- "--port=" + Integer .toString (port ), projectId )
398
- .directory (gcdPath )
399
- .redirectErrorStream ()
400
- .start ();
401
- processReader = ProcessStreamReader .start (startProcess , "Dev App Server is now running" );
486
+ startProcess =
487
+ CommandWrapper .create ()
488
+ .command (gcdAbsolutePath .toString (), "start" , "--testing" , "--allow_remote_shutdown" ,
489
+ "--port=" + Integer .toString (port ), projectId )
490
+ .directory (gcdPath )
491
+ .start ();
492
+ processReader = ProcessStreamReader .start (startProcess .getInputStream ());
493
+ processErrorReader = ProcessErrorStreamReader .start (
494
+ startProcess .getErrorStream (), "Dev App Server is now running" );
402
495
}
403
496
404
497
private static String md5 (File gcdZipFile ) throws IOException {
@@ -454,6 +547,9 @@ public void stop() throws IOException, InterruptedException {
454
547
sendQuitRequest (port );
455
548
if (processReader != null ) {
456
549
processReader .terminate ();
550
+ processErrorReader .terminate ();
551
+ startProcess .destroy ();
552
+ startProcess .waitFor ();
457
553
}
458
554
if (gcdPath != null ) {
459
555
deleteRecurse (gcdPath );
@@ -465,7 +561,6 @@ private static void deleteRecurse(Path path) throws IOException {
465
561
return ;
466
562
}
467
563
Files .walkFileTree (path , new SimpleFileVisitor <Path >() {
468
-
469
564
@ Override
470
565
public FileVisitResult postVisitDirectory (Path dir , IOException exc ) throws IOException {
471
566
Files .delete (dir );
@@ -480,7 +575,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
480
575
});
481
576
}
482
577
483
- public static LocalGcdHelper start (String projectId , int port )
578
+ public static LocalGcdHelper start (String projectId , int port )
484
579
throws IOException , InterruptedException {
485
580
LocalGcdHelper helper = new LocalGcdHelper (projectId , port );
486
581
helper .start ();
@@ -490,15 +585,14 @@ public static LocalGcdHelper start(String projectId, int port)
490
585
public static void main (String ... args ) throws IOException , InterruptedException {
491
586
Map <String , String > parsedArgs = parseArgs (args );
492
587
String action = parsedArgs .get ("action" );
493
- int port = ( parsedArgs . get ( "port" ) == null ) ? DEFAULT_PORT
494
- : Integer .parseInt (parsedArgs .get ("port" ));
588
+ int port =
589
+ ( parsedArgs . get ( "port" ) == null ) ? DEFAULT_PORT : Integer .parseInt (parsedArgs .get ("port" ));
495
590
switch (action ) {
496
591
case "START" :
497
592
if (!isActive (DEFAULT_PROJECT_ID , port )) {
498
593
LocalGcdHelper helper = start (DEFAULT_PROJECT_ID , port );
499
594
try (FileWriter writer = new FileWriter (".local_gcd_helper" )) {
500
- writer .write (
501
- helper .gcdPath .toAbsolutePath ().toString () + System .lineSeparator ());
595
+ writer .write (helper .gcdPath .toAbsolutePath ().toString () + System .lineSeparator ());
502
596
writer .write (Integer .toString (port ));
503
597
}
504
598
}
0 commit comments