@@ -6,13 +6,45 @@ var async = require("async");
6
6
7
7
exports . init = function ( db , cb ) {
8
8
exports . db = db ;
9
- async . map (
10
- [ "create table events (id integer primary key autoincrement, timestamp timestamp, class varchar(32), data text, device varchar(256), lat real, lon real)" ,
11
- "create table vessels (id integer primary key autoincrement, mmsi varchar(256), last_seen integer references events(id))" ,
12
- "create table events_ais (id integer references events(id), vessel_id integer references vessels(id))" ,
13
- "create table devices (id integer primary key autoincrement, name varchar(256), last_seen integer references events(id))" ] ,
14
- function ( item , cb ) { exports . db . run ( item , cb ) ; } ,
15
- cb ) ;
9
+
10
+ // Write to DB every two seconds...
11
+ exports . inLogic = 0 ;
12
+ exports . db . run (
13
+ "begin transaction" ,
14
+ function ( err ) {
15
+ if ( err ) { return console . log ( err ) ; }
16
+ setInterval ( function ( ) {
17
+ if ( exports . inLogic ) return ;
18
+ exports . db . run (
19
+ "end transaction" ,
20
+ function ( err ) {
21
+ if ( err ) { return console . log ( err ) ; }
22
+ // console.log("COMMIT");
23
+ exports . db . run (
24
+ "begin transaction" ,
25
+ function ( err ) {
26
+ if ( err ) { return console . log ( err ) ; }
27
+ }
28
+ ) ;
29
+ } ) ;
30
+ } , 1000 ) ;
31
+
32
+ async . map (
33
+ [ "create table events (id integer primary key autoincrement, timestamp timestamp, class varchar(32), data text, device varchar(256), lat real, lon real)" ,
34
+ "create table vessels (id integer primary key autoincrement, mmsi varchar(256), last_seen integer references events(id))" ,
35
+ "create table events_ais (id integer references events(id), vessel_id integer references vessels(id))" ,
36
+ "create table devices (id integer primary key autoincrement, name varchar(256), last_seen integer references events(id))" ] ,
37
+ function ( item , cb ) { exports . db . run ( item , cb ) ; } ,
38
+ cb ) ;
39
+ } ) ;
40
+ }
41
+
42
+ exports . callInOneTransaction = function ( fn , cb ) {
43
+ exports . inLogic ++ ;
44
+ fn ( function ( ) {
45
+ exports . inLogic -- ;
46
+ cb ( ) ;
47
+ } ) ;
16
48
}
17
49
18
50
exports . getDevices = function ( cb ) {
@@ -45,6 +77,93 @@ exports.getVessels = function (cb) {
45
77
) ;
46
78
}
47
79
80
+ exports . getRouteSql = function ( query ) {
81
+ var sql = query . sql ;
82
+ var params = underscore . clone ( query . params ) ;
83
+
84
+ sql = "select * from (" + sql + ") where lon is not null and lat is not null" ;
85
+
86
+ if ( query . timemin != undefined ) {
87
+ sql = sql + " and timestamp >= $timemin" ;
88
+ params . $timemin = query . timemin ;
89
+ }
90
+ if ( query . timemax != undefined ) {
91
+ sql = sql + " and timestamp <= $timemax" ;
92
+ params . $timemax = query . timemax ;
93
+ }
94
+
95
+ var regionsql = [ ] ;
96
+ if ( query . latmin != undefined ) { regionsql . push ( "lat >= $latmin" ) ; }
97
+ if ( query . lonmin != undefined ) { regionsql . push ( "lon >= $lonmin" ) ; }
98
+ if ( query . latmax != undefined ) { regionsql . push ( "lat <= $latmax" ) ; }
99
+ if ( query . lonmax != undefined ) { regionsql . push ( "lon <= $lonmax" ) ; }
100
+ if ( regionsql . length ) {
101
+ regionsql = regionsql . join ( ' and ' ) ;
102
+ sql = sql + " and " + regionsql ;
103
+
104
+ params . $latmin = query . latmin ;
105
+ params . $lonmin = query . lonmin ;
106
+ params . $latmax = query . latmax ;
107
+ params . $lonmax = query . lonmax ;
108
+ }
109
+
110
+ if ( query . maxentries != undefined ) {
111
+ countSql = exports . getCountSql ( { sql : sql , params :params } ) ;
112
+
113
+ sql = sql + " and random() % (" + countSql . sql + ") / $maxentries = 0" ;
114
+ params = countSql . params ;
115
+ params . $maxentries = query . maxentries ;
116
+ }
117
+
118
+ sql = sql + " order by timestamp asc" ;
119
+
120
+ return {
121
+ sql : sql ,
122
+ params : params
123
+ }
124
+ }
125
+
126
+ exports . getCountSql = function ( query ) {
127
+ var sql = query . sql ;
128
+ var params = underscore . clone ( query . params ) ;
129
+
130
+ return {
131
+ sql : "select count(*) as count from (" + sql + ")" ,
132
+ params : params
133
+ } ;
134
+ }
135
+
136
+ exports . onlyWithRoute = function ( query , routecb , cb ) {
137
+ var sql = exports . getCountSql ( exports . getRouteSql ( query ) ) ;
138
+
139
+ exports . db . get (
140
+ sql . sql ,
141
+ sql . params ,
142
+ function ( err , row ) {
143
+ if ( err ) { console . log ( err ) ; return cb ( ) ; }
144
+ if ( row . count == 0 ) { return cb ( ) ; }
145
+ routecb ( cb ) ;
146
+ }
147
+ ) ;
148
+
149
+ }
150
+
151
+ exports . getRoute = function ( query , cb ) {
152
+ var sql = exports . getRouteSql ( query ) ;
153
+
154
+ console . log ( "QUERY" ) ;
155
+ console . log ( sql . sql ) ;
156
+ console . log ( sql . params ) ;
157
+ console . log ( "\n\n" ) ;
158
+ exports . db . all (
159
+ sql . sql ,
160
+ sql . params ,
161
+ function ( err , rows ) {
162
+ if ( err ) return cb ( err ) ;
163
+ cb ( null , rows . map ( function ( row ) { return JSON . parse ( row . data ) ; } ) ) ;
164
+ } ) ;
165
+ } ;
166
+
48
167
exports . Logger = function ( ) {
49
168
var self = this ;
50
169
@@ -113,26 +232,30 @@ exports.Logger = function() {
113
232
} ) ;
114
233
115
234
self . saveDeviceQueue = async . queue ( function ( data , cb ) {
116
- exports . db . get (
117
- "select count(*) as count from devices where name = $name" ,
118
- { $name : data . path } ,
119
- function ( err , row ) {
120
- if ( err ) { console . warn ( err ) ; return cb ( ) ; }
121
- if ( row . count > 0 ) {
122
- if ( data . dont_update_last_seen ) { return cb ( ) ; }
123
- exports . db . run (
124
- "update devices set last_seen = $id where name = $path" ,
125
- { $path :data . path ,
126
- $id :data . id } ,
127
- function ( err ) { if ( err ) { console . warn ( err ) ; } cb ( ) ; } ) ;
128
- } else {
129
- exports . db . run (
130
- "insert into devices (name, last_seen) values ($path, $id)" ,
131
- { $path :data . path ,
132
- $id :data . id } ,
133
- function ( err ) { if ( err ) { console . warn ( err ) ; } cb ( ) ; } ) ;
134
- }
135
- } ) ;
235
+ exports . callInOneTransaction (
236
+ function ( cb ) {
237
+ exports . db . get (
238
+ "select count(*) as count from devices where name = $name" ,
239
+ { $name : data . path } ,
240
+ function ( err , row ) {
241
+ if ( err ) { console . warn ( err ) ; return cb ( ) ; }
242
+ if ( row . count > 0 ) {
243
+ if ( data . dont_update_last_seen ) { return cb ( ) ; }
244
+ exports . db . run (
245
+ "update devices set last_seen = $id where name = $path" ,
246
+ { $path :data . path ,
247
+ $id :data . id } ,
248
+ function ( err ) { if ( err ) { console . warn ( err ) ; } cb ( ) ; } ) ;
249
+ } else {
250
+ exports . db . run (
251
+ "insert into devices (name, last_seen) values ($path, $id)" ,
252
+ { $path :data . path ,
253
+ $id :data . id } ,
254
+ function ( err ) { if ( err ) { console . warn ( err ) ; } cb ( ) ; } ) ;
255
+ }
256
+ } ) ;
257
+ } ,
258
+ cb ) ;
136
259
} , 1 ) ;
137
260
138
261
self . on ( 'saveDevice' , function ( data ) {
@@ -174,37 +297,41 @@ exports.Logger = function() {
174
297
} ) ;
175
298
176
299
self . saveVesselQueue = async . queue ( function ( data , cb ) {
177
- var next = function ( err ) {
178
- if ( err ) { console . warn ( err ) ; return cb ( ) ; }
179
- exports . db . get (
180
- "select id from vessels where mmsi = $mmsi" ,
181
- { $mmsi :data . mmsi } ,
182
- function ( err , row ) {
300
+ exports . callInOneTransaction (
301
+ function ( cb ) {
302
+ var next = function ( err ) {
183
303
if ( err ) { console . warn ( err ) ; return cb ( ) ; }
184
- self . emit ( "saveAIS" , { event :data , vessel : { id :row . id } } ) ;
185
- cb ( ) ;
186
- } ) ;
187
- }
188
-
189
- exports . db . get (
190
- "select count(*) as count from vessels where mmsi = $mmsi" ,
191
- { $mmsi : data . mmsi } ,
192
- function ( err , row ) {
193
- if ( err ) { console . warn ( err ) ; return cb ( ) ; }
194
- if ( row . count > 0 ) {
195
- exports . db . run (
196
- "update vessels set last_seen = $id where mmsi = $mmsi" ,
197
- { $mmsi :data . mmsi ,
198
- $id :data . id } ,
199
- next ) ;
200
- } else {
201
- exports . db . run (
202
- "insert into vessels (mmsi, last_seen) values ($mmsi, $id)" ,
203
- { $mmsi :data . mmsi ,
204
- $id :data . id } ,
205
- next ) ;
304
+ exports . db . get (
305
+ "select id from vessels where mmsi = $mmsi" ,
306
+ { $mmsi :data . mmsi } ,
307
+ function ( err , row ) {
308
+ if ( err ) { console . warn ( err ) ; return cb ( ) ; }
309
+ self . emit ( "saveAIS" , { event :data , vessel : { id :row . id } } ) ;
310
+ cb ( ) ;
311
+ } ) ;
206
312
}
207
- } ) ;
313
+
314
+ exports . db . get (
315
+ "select count(*) as count from vessels where mmsi = $mmsi" ,
316
+ { $mmsi : data . mmsi } ,
317
+ function ( err , row ) {
318
+ if ( err ) { console . warn ( err ) ; return cb ( ) ; }
319
+ if ( row . count > 0 ) {
320
+ exports . db . run (
321
+ "update vessels set last_seen = $id where mmsi = $mmsi" ,
322
+ { $mmsi :data . mmsi ,
323
+ $id :data . id } ,
324
+ next ) ;
325
+ } else {
326
+ exports . db . run (
327
+ "insert into vessels (mmsi, last_seen) values ($mmsi, $id)" ,
328
+ { $mmsi :data . mmsi ,
329
+ $id :data . id } ,
330
+ next ) ;
331
+ }
332
+ } ) ;
333
+ } ,
334
+ cb ) ;
208
335
} , 1 ) ;
209
336
210
337
self . on ( 'saveVessel' , function ( data ) {
0 commit comments