3
3
import numpy as np
4
4
from goofi .data import Data , DataType
5
5
from goofi .node import Node
6
- from goofi .params import StringParam , BoolParam
6
+ from goofi .params import StringParam , BoolParam , FloatParam
7
7
import json
8
+ import time
8
9
9
10
class WriteCsv (Node ):
10
11
@staticmethod
11
12
def config_input_slots ():
12
13
# This node will accept a table as its input.
13
- return {"table_input" : DataType .TABLE }
14
+ return {"table_input" : DataType .TABLE ,
15
+ "start" : DataType .ARRAY ,
16
+ "stop" : DataType .ARRAY ,}
14
17
15
18
@staticmethod
16
19
def config_params ():
17
20
# Parameters can include the CSV filename, write control, and timestamp option.
18
21
return {
19
22
"Write" : {
20
23
"filename" : StringParam ("output.csv" ),
21
- "write" : False ,
24
+ "start" : BoolParam (False , trigger = True ),
25
+ "stop" : BoolParam (False , trigger = True ),
26
+ 'duration' : FloatParam (0.0 , 0.0 , 100.0 ),
22
27
"timestamps" : BoolParam (False ), # New timestamp parameter
23
28
},
24
29
}
@@ -31,76 +36,86 @@ def setup(self):
31
36
self .base_filename = None # Track the base filename without timestamp
32
37
self .written_files = set () # Track files to ensure headers are written
33
38
self .last_values = {} # Store the last known value for each column
34
-
35
- def process (self , table_input : Data ):
36
- # Check if writing is enabled
37
- if not self .params ["Write" ]["write" ].value :
38
- return
39
-
40
- table_data = table_input .data
41
-
42
- # Extract actual data content, handling multiple columns
43
- actual_data = {
44
- key : (value .data if isinstance (value , Data ) else value )
45
- for key , value in table_data .items ()
46
- }
47
-
48
- def flatten (data ):
49
- """Ensure lists and NumPy arrays are stored as JSON strings to keep their structure."""
50
- if isinstance (data , np .ndarray ):
51
- return json .dumps (data .tolist ()) # Convert ndarray to list before serializing
52
- elif isinstance (data , (list , tuple )):
53
- return json .dumps (data ) # Serialize list as a JSON string
54
- return data # Return scalars as-is
55
-
56
- flattened_data = {
57
- col : [flatten (values )] if not isinstance (values , list )
58
- else [flatten (v ) for v in values ]
59
- for col , values in actual_data .items ()
60
- }
61
-
62
- # Ensure all columns have the same length by padding with None
63
- max_length = max (map (len , flattened_data .values ()), default = 0 )
64
- for col in flattened_data :
65
- flattened_data [col ] += [None ] * (max_length - len (flattened_data [col ]))
66
-
67
- # Replace None with the last known value
68
- for col in flattened_data :
69
- if col not in self .last_values :
70
- self .last_values [col ] = None # Initialize with None if not present
71
- for i in range (len (flattened_data [col ])):
72
- if flattened_data [col ][i ] is None :
73
- flattened_data [col ][i ] = self .last_values [col ]
74
- else :
75
- self .last_values [col ] = flattened_data [col ][i ] # Update the last known value
76
-
77
- # Add timestamp column if enabled
78
- if self .params ["Write" ]["timestamps" ].value :
79
- timestamps = [datetime .datetime .utcnow ().isoformat ()] * max_length
80
- flattened_data ["timestamp" ] = timestamps
81
-
82
- # Convert to DataFrame
83
- df = self .pd .DataFrame (flattened_data )
84
-
85
- # Get the filename from parameters
86
- filename = self .params ["Write" ]["filename" ].value
87
-
88
- # Check if filename has changed, then update with timestamp
89
- if filename != self .base_filename :
90
- basename , ext = os .path .splitext (filename )
91
- datetime_str = datetime .datetime .now ().strftime ("%Y%m%d_%H%M%S" )
92
- fn = f"{ basename } _{ datetime_str } { ext } "
93
- self .last_filename = fn
94
- self .base_filename = filename
95
- else :
96
- fn = self .last_filename
97
-
98
- # Determine if headers should be written
99
- write_header = fn not in self .written_files
100
-
101
- # Append new data to CSV
102
- df .to_csv (fn , mode = "a" , header = write_header , index = False )
103
-
104
- # Mark file as written to prevent duplicate headers
105
- if write_header :
106
- self .written_files .add (fn )
39
+ self .is_writing = False
40
+
41
+ def process (self , table_input : Data , start : Data , stop : Data ):
42
+ if start is not None or self .params ["Write" ]["start" ].value :
43
+ self .is_writing = True
44
+ self .start_time = time .time ()
45
+
46
+ if stop is not None or self .params ["Write" ]["stop" ].value :
47
+ self .is_writing = False
48
+
49
+ self .input_slots ["start" ].clear ()
50
+ self .input_slots ["stop" ].clear ()
51
+ duration = self .params ["Write" ]["duration" ].value
52
+ if self .is_writing :
53
+ table_data = table_input .data
54
+ if duration > 0 :
55
+ if time .time () - self .start_time > duration :
56
+ self .is_writing = False
57
+ # Extract actual data content, handling multiple columns
58
+ actual_data = {
59
+ key : (value .data if isinstance (value , Data ) else value )
60
+ for key , value in table_data .items ()
61
+ }
62
+
63
+ def flatten (data ):
64
+ """Ensure lists and NumPy arrays are stored as JSON strings to keep their structure."""
65
+ if isinstance (data , np .ndarray ):
66
+ return json .dumps (data .tolist ()) # Convert ndarray to list before serializing
67
+ elif isinstance (data , (list , tuple )):
68
+ return json .dumps (data ) # Serialize list as a JSON string
69
+ return data # Return scalars as-is
70
+
71
+ flattened_data = {
72
+ col : [flatten (values )] if not isinstance (values , list )
73
+ else [flatten (v ) for v in values ]
74
+ for col , values in actual_data .items ()
75
+ }
76
+
77
+ # Ensure all columns have the same length by padding with None
78
+ max_length = max (map (len , flattened_data .values ()), default = 0 )
79
+ for col in flattened_data :
80
+ flattened_data [col ] += [None ] * (max_length - len (flattened_data [col ]))
81
+
82
+ # Replace None with the last known value
83
+ for col in flattened_data :
84
+ if col not in self .last_values :
85
+ self .last_values [col ] = None # Initialize with None if not present
86
+ for i in range (len (flattened_data [col ])):
87
+ if flattened_data [col ][i ] is None :
88
+ flattened_data [col ][i ] = self .last_values [col ]
89
+ else :
90
+ self .last_values [col ] = flattened_data [col ][i ] # Update the last known value
91
+
92
+ # Add timestamp column if enabled
93
+ if self .params ["Write" ]["timestamps" ].value :
94
+ timestamps = [datetime .datetime .utcnow ().isoformat ()] * max_length
95
+ flattened_data ["timestamp" ] = timestamps
96
+
97
+ # Convert to DataFrame
98
+ df = self .pd .DataFrame (flattened_data )
99
+
100
+ # Get the filename from parameters
101
+ filename = self .params ["Write" ]["filename" ].value
102
+
103
+ # Check if filename has changed, then update with timestamp
104
+ if filename != self .base_filename :
105
+ basename , ext = os .path .splitext (filename )
106
+ datetime_str = datetime .datetime .now ().strftime ("%Y%m%d_%H%M%S" )
107
+ fn = f"{ basename } _{ datetime_str } { ext } "
108
+ self .last_filename = fn
109
+ self .base_filename = filename
110
+ else :
111
+ fn = self .last_filename
112
+
113
+ # Determine if headers should be written
114
+ write_header = fn not in self .written_files
115
+
116
+ # Append new data to CSV
117
+ df .to_csv (fn , mode = "a" , header = write_header , index = False )
118
+
119
+ # Mark file as written to prevent duplicate headers
120
+ if write_header :
121
+ self .written_files .add (fn )
0 commit comments