1
1
from __future__ import annotations
2
2
3
- from typing import List , Optional , Union , Callable
3
+ from typing import Callable , List , Optional , Union
4
4
5
5
from pyspark .sql .dataframe import DataFrame
6
- from pyspark .sql .functions import col , expr , last , lead , lit , when
6
+ import pyspark .sql .functions as sfn
7
7
from pyspark .sql .window import Window
8
8
9
- import tempo .utils as t_utils
10
9
import tempo .resample as t_resample
11
10
import tempo .tsdf as t_tsdf
11
+ import tempo .utils as t_utils
12
12
13
13
# Interpolation fill options
14
14
method_options = ["zero" , "null" , "bfill" , "ffill" , "linear" ]
@@ -130,56 +130,56 @@ def __interpolate_column(
130
130
END AS is_interpolated_{ target_col }
131
131
"""
132
132
output_df = output_df .withColumn (
133
- f"is_interpolated_{ target_col } " , expr (flag_expr )
133
+ f"is_interpolated_{ target_col } " , sfn . expr (flag_expr )
134
134
)
135
135
136
136
# Handle zero fill
137
137
if method == "zero" :
138
138
output_df = output_df .withColumn (
139
139
target_col ,
140
- when (
141
- col (f"is_interpolated_{ target_col } " ) == False , # noqa: E712
142
- col (target_col ),
143
- ).otherwise (lit (0 )),
140
+ sfn . when (
141
+ sfn . col (f"is_interpolated_{ target_col } " ) == False , # noqa: E712
142
+ sfn . col (target_col ),
143
+ ).otherwise (sfn . lit (0 )),
144
144
)
145
145
146
146
# Handle null fill
147
147
if method == "null" :
148
148
output_df = output_df .withColumn (
149
149
target_col ,
150
- when (
151
- col (f"is_interpolated_{ target_col } " ) == False , # noqa: E712
152
- col (target_col ),
150
+ sfn . when (
151
+ sfn . col (f"is_interpolated_{ target_col } " ) == False , # noqa: E712
152
+ sfn . col (target_col ),
153
153
).otherwise (None ),
154
154
)
155
155
156
156
# Handle forward fill
157
157
if method == "ffill" :
158
158
output_df = output_df .withColumn (
159
159
target_col ,
160
- when (
161
- col (f"is_interpolated_{ target_col } " ) == True , # noqa: E712
162
- col (f"previous_{ target_col } " ),
163
- ).otherwise (col (target_col )),
160
+ sfn . when (
161
+ sfn . col (f"is_interpolated_{ target_col } " ) == True , # noqa: E712
162
+ sfn . col (f"previous_{ target_col } " ),
163
+ ).otherwise (sfn . col (target_col )),
164
164
)
165
165
# Handle backwards fill
166
166
if method == "bfill" :
167
167
output_df = output_df .withColumn (
168
168
target_col ,
169
169
# Handle case when subsequent value is null
170
- when (
171
- (col (f"is_interpolated_{ target_col } " ) == True ) # noqa: E712
170
+ sfn . when (
171
+ (sfn . col (f"is_interpolated_{ target_col } " ) == True ) # noqa: E712
172
172
& (
173
- col (f"next_{ target_col } " ).isNull ()
174
- & (col (f"{ ts_col } _{ target_col } " ).isNull ())
173
+ sfn . col (f"next_{ target_col } " ).isNull ()
174
+ & (sfn . col (f"{ ts_col } _{ target_col } " ).isNull ())
175
175
),
176
- col (f"next_null_{ target_col } " ),
176
+ sfn . col (f"next_null_{ target_col } " ),
177
177
).otherwise (
178
178
# Handle standard backwards fill
179
- when (
180
- col (f"is_interpolated_{ target_col } " ) == True , # noqa: E712
181
- col (f"next_{ target_col } " ),
182
- ).otherwise (col (f"{ target_col } " ))
179
+ sfn . when (
180
+ sfn . col (f"is_interpolated_{ target_col } " ) == True , # noqa: E712
181
+ sfn . col (f"next_{ target_col } " ),
182
+ ).otherwise (sfn . col (f"{ target_col } " ))
183
183
),
184
184
)
185
185
@@ -205,10 +205,12 @@ def __generate_time_series_fill(
205
205
"""
206
206
return df .withColumn (
207
207
"previous_timestamp" ,
208
- col (ts_col ),
208
+ sfn . col (ts_col ),
209
209
).withColumn (
210
210
"next_timestamp" ,
211
- lead (df [ts_col ]).over (Window .partitionBy (* partition_cols ).orderBy (ts_col )),
211
+ sfn .lead (df [ts_col ]).over (
212
+ Window .partitionBy (* partition_cols ).orderBy (ts_col )
213
+ ),
212
214
)
213
215
214
216
def __generate_column_time_fill (
@@ -232,13 +234,13 @@ def __generate_column_time_fill(
232
234
233
235
return df .withColumn (
234
236
f"previous_timestamp_{ target_col } " ,
235
- last (col (f"{ ts_col } _{ target_col } " ), ignorenulls = True ).over (
237
+ sfn . last (sfn . col (f"{ ts_col } _{ target_col } " ), ignorenulls = True ).over (
236
238
window .orderBy (ts_col ).rowsBetween (Window .unboundedPreceding , 0 )
237
239
),
238
240
).withColumn (
239
241
f"next_timestamp_{ target_col } " ,
240
- last (col (f"{ ts_col } _{ target_col } " ), ignorenulls = True ).over (
241
- window .orderBy (col (ts_col ).desc ()).rowsBetween (
242
+ sfn . last (sfn . col (f"{ ts_col } _{ target_col } " ), ignorenulls = True ).over (
243
+ window .orderBy (sfn . col (ts_col ).desc ()).rowsBetween (
242
244
Window .unboundedPreceding , 0
243
245
)
244
246
),
@@ -266,21 +268,21 @@ def __generate_target_fill(
266
268
return (
267
269
df .withColumn (
268
270
f"previous_{ target_col } " ,
269
- last (df [target_col ], ignorenulls = True ).over (
271
+ sfn . last (df [target_col ], ignorenulls = True ).over (
270
272
window .orderBy (ts_col ).rowsBetween (Window .unboundedPreceding , 0 )
271
273
),
272
274
)
273
275
# Handle if subsequent value is null
274
276
.withColumn (
275
277
f"next_null_{ target_col } " ,
276
- last (df [target_col ], ignorenulls = True ).over (
277
- window .orderBy (col (ts_col ).desc ()).rowsBetween (
278
+ sfn . last (df [target_col ], ignorenulls = True ).over (
279
+ window .orderBy (sfn . col (ts_col ).desc ()).rowsBetween (
278
280
Window .unboundedPreceding , 0
279
281
)
280
282
),
281
283
).withColumn (
282
284
f"next_{ target_col } " ,
283
- lead (df [target_col ]).over (window .orderBy (ts_col )),
285
+ sfn . lead (df [target_col ]).over (window .orderBy (ts_col )),
284
286
)
285
287
)
286
288
@@ -356,7 +358,7 @@ def interpolate(
356
358
for column in target_cols :
357
359
add_column_time = add_column_time .withColumn (
358
360
f"{ ts_col } _{ column } " ,
359
- when (col (column ).isNull (), None ).otherwise (col (ts_col )),
361
+ sfn . when (sfn . col (column ).isNull (), None ).otherwise (sfn . col (ts_col )),
360
362
)
361
363
add_column_time = self .__generate_column_time_fill (
362
364
add_column_time , partition_cols , ts_col , column
@@ -365,9 +367,10 @@ def interpolate(
365
367
# Handle edge case if last value (latest) is null
366
368
edge_filled = add_column_time .withColumn (
367
369
"next_timestamp" ,
368
- when (
369
- col ("next_timestamp" ).isNull (), expr (f"{ ts_col } + interval { freq } " )
370
- ).otherwise (col ("next_timestamp" )),
370
+ sfn .when (
371
+ sfn .col ("next_timestamp" ).isNull (),
372
+ sfn .expr (f"{ ts_col } + interval { freq } " ),
373
+ ).otherwise (sfn .col ("next_timestamp" )),
371
374
)
372
375
373
376
# Fill target column for nearest values
@@ -380,7 +383,7 @@ def interpolate(
380
383
# Generate missing timeseries values
381
384
exploded_series = target_column_filled .withColumn (
382
385
f"new_{ ts_col } " ,
383
- expr (
386
+ sfn . expr (
384
387
f"explode(sequence({ ts_col } , next_timestamp - interval { freq } , interval { freq } )) as timestamp"
385
388
),
386
389
)
@@ -390,10 +393,12 @@ def interpolate(
390
393
flagged_series = (
391
394
exploded_series .withColumn (
392
395
"is_ts_interpolated" ,
393
- when (col (f"new_{ ts_col } " ) != col (ts_col ), True ).otherwise (False ),
396
+ sfn .when (sfn .col (f"new_{ ts_col } " ) != sfn .col (ts_col ), True ).otherwise (
397
+ False
398
+ ),
394
399
)
395
- .withColumn (ts_col , col (f"new_{ ts_col } " ))
396
- .drop (col (f"new_{ ts_col } " ))
400
+ .withColumn (ts_col , sfn . col (f"new_{ ts_col } " ))
401
+ .drop (sfn . col (f"new_{ ts_col } " ))
397
402
)
398
403
399
404
# # Perform interpolation on each target column
0 commit comments