Skip to content

Commit 891e4bd

Browse files
Temporal slicing (#229)
* saving current state of the refactoring * Temporal slicing helper functions * Making Spark Session a class-level fixture Suppressing unhelpful warnings * Pandas baseline not ready yet. * Adding the JSON based unit test data config * Setting accessor methods and loading data files in setUp * Adding separate json files * updating the json files for the tests configs of interpolation and tsdf * updating the json files for the tests configs of utils * updated the get_data_as_sdf to use buildTestDF function * Documenting temporal slicing functions refactored utils env variables stubbed out test functions for temporal slicing * Using jsonref library to resolve references to shared data objects * reformatting all code with Black * resolving flake8 errors * a few more formatting issues * resolving black & flake8 formatting issues * Fixed interpolation tests. * Test case should not fail if we cannot load test data. Issue a warning and continue. * Got all tests working * converted all tests in tsdf_tests.py to new framework * converted tests in utils_tests.py to new framework * converted tests in delta_writer_tests.py to new framework * converted tests in as_of_join_tests.py to new framework * suppressing some warnings * applied black formatting * test code for new temporal slicing functions * import version code after path mangling... * Should not build public API docs on private / internal functions, classes, etc. * reformatting docstrings to properly generate Sphinx docs created utils function get_display_df to prepare a TSDF for display functions * black reformatting * Adding time slicing section to user guide * Slicing operations need to be able to handle numeric as well as string-based timestamps from the user. * black and flake8 formatting Co-authored-by: Souvik Pratiher <[email protected]>
1 parent 1b37bcb commit 891e4bd

File tree

8 files changed

+703
-32
lines changed

8 files changed

+703
-32
lines changed

docs/user-guide.rst

+43
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,50 @@ time column and the optional partition column specification.
5959
phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts", partition_cols = ["User"])
6060
display(phone_accel_tsdf)
6161
62+
Slice by Time
63+
~~~~~~~~~~~~~~~~~~~~~~
64+
65+
You can slice across all timeseries in a TSDF in various ways. This allows you to select or filter by timestamp across
66+
all the series.
67+
68+
You can select all observations at a specific point in time:
69+
70+
.. code-block:: python
71+
72+
target_time = '2015-02-23T13:03:53.919+0000'
73+
at_target_tsdf = phone_accel_tsdf.at(target_time)
74+
display(at_target_tsdf)
75+
76+
You can slice data before or after a particular point in time (either inclusive or exclusive of the target time):
77+
78+
.. code-block:: python
79+
80+
before_tsdf = phone_accel_tsdf.before(target_time)
81+
at_or_after_tsdf = phone_accel_tsdf.atOrAfter(target_time)
82+
83+
Or in an interval between two timestamps:
84+
85+
.. code-block:: python
86+
87+
start_ts = '2015-02-23T13:03:53.909+0000'
88+
end_ts = target_time
89+
interval_inclusive = phone_accel_tsdf.between(start_ts, end_ts)
90+
interval_exclusive = phone_accel_tsdf.between(start_ts, end_ts, inclusive=False)
91+
92+
You can take a look at the earliest (oldest) or latest (most recent) records across all series:
93+
94+
.. code-block:: python
95+
96+
n = 5
97+
oldest_five_tsdf = phone_accel_tsdf.earliest(n)
98+
latest_five_tsdf = phone_accel_tsdf.latest(n)
99+
100+
Or the records immediately before (or after) a particular point in time. This can be thought of like an "as-of" select.
101+
102+
.. code-block:: python
62103
104+
as_of_tsdf = phone_accel_tsdf.priorTo(target_time)
105+
next_five_tsdf = phone_accel_tsdf.subsequentTo(target_time, n=5)
63106
64107
Resample and Visualize
65108
~~~~~~~~~~~~~~~~~~~~~~

python/requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ Sphinx==4.5.0
1919
sphinx-design==0.2.0
2020
sphinx-panels==0.6.0
2121
jsonref==0.2
22+
python-dateutil==2.8.2

python/tempo/tsdf.py

+197-22
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@
88
from IPython.display import display as ipydisplay
99
from pyspark.sql import SparkSession
1010
from pyspark.sql.dataframe import DataFrame
11-
from pyspark.sql.window import Window
11+
from pyspark.sql.window import Window, WindowSpec
1212
from scipy.fft import fft, fftfreq
1313

1414
import tempo.io as tio
1515
import tempo.resample as rs
1616
from tempo.interpol import Interpolation
17-
from tempo.utils import ENV_BOOLEAN, PLATFORM, calculate_time_horizon
17+
from tempo.utils import (
18+
ENV_CAN_RENDER_HTML,
19+
IS_DATABRICKS,
20+
calculate_time_horizon,
21+
get_display_df,
22+
)
1823

1924
logger = logging.getLogger(__name__)
2025

@@ -36,7 +41,7 @@ def __init__(self, df, ts_col="event_ts", partition_cols=None, sequence_col=None
3641
self.partitionCols = (
3742
[]
3843
if partition_cols is None
39-
else self.__validated_columns(df, partition_cols)
44+
else self.__validated_columns(df, partition_cols.copy())
4045
)
4146

4247
self.df = df
@@ -309,6 +314,10 @@ def __getTimePartitions(self, tsPartitionVal, fraction=0.1):
309314
)
310315
return TSDF(df, self.ts_col, self.partitionCols + ["ts_partition"])
311316

317+
#
318+
# Slicing & Selection
319+
#
320+
312321
def select(self, *cols):
313322
"""
314323
pyspark.sql.DataFrame.select() method's equivalent for TSDF objects
@@ -342,7 +351,164 @@ def select(self, *cols):
342351
"In TSDF's select statement original ts_col, partitionCols and seq_col_stub(optional) must be present"
343352
)
344353

345-
def show(self, n=20, truncate=True, vertical=False):
354+
def __slice(self, op: str, target_ts):
355+
"""
356+
Private method to slice TSDF by time
357+
358+
:param op: string symbol of the operation to perform
359+
:type op: str
360+
:param target_ts: timestamp on which to filter
361+
362+
:return: a TSDF object containing only those records within the time slice specified
363+
"""
364+
# quote our timestamp if its a string
365+
target_expr = f"'{target_ts}'" if isinstance(target_ts, str) else target_ts
366+
slice_expr = f.expr(f"{self.ts_col} {op} {target_expr}")
367+
sliced_df = self.df.where(slice_expr)
368+
return TSDF(
369+
sliced_df,
370+
ts_col=self.ts_col,
371+
partition_cols=self.partitionCols,
372+
sequence_col=self.sequence_col,
373+
)
374+
375+
def at(self, ts):
376+
"""
377+
Select only records at a given time
378+
379+
:param ts: timestamp of the records to select
380+
381+
:return: a :class:`~tsdf.TSDF` object containing just the records at the given time
382+
"""
383+
return self.__slice("==", ts)
384+
385+
def before(self, ts):
386+
"""
387+
Select only records before a given time
388+
389+
:param ts: timestamp on which to filter records
390+
391+
:return: a :class:`~tsdf.TSDF` object containing just the records before the given time
392+
"""
393+
return self.__slice("<", ts)
394+
395+
def atOrBefore(self, ts):
396+
"""
397+
Select only records at or before a given time
398+
399+
:param ts: timestamp on which to filter records
400+
401+
:return: a :class:`~tsdf.TSDF` object containing just the records at or before the given time
402+
"""
403+
return self.__slice("<=", ts)
404+
405+
def after(self, ts):
406+
"""
407+
Select only records after a given time
408+
409+
:param ts: timestamp on which to filter records
410+
411+
:return: a :class:`~tsdf.TSDF` object containing just the records after the given time
412+
"""
413+
return self.__slice(">", ts)
414+
415+
def atOrAfter(self, ts):
416+
"""
417+
Select only records at or after a given time
418+
419+
:param ts: timestamp on which to filter records
420+
421+
:return: a :class:`~tsdf.TSDF` object containing just the records at or after the given time
422+
"""
423+
return self.__slice(">=", ts)
424+
425+
def between(self, start_ts, end_ts, inclusive=True):
426+
"""
427+
Select only records in a given range
428+
429+
:param start_ts: starting time of the range to select
430+
:param end_ts: ending time of the range to select
431+
:param inclusive: whether the range is inclusive of the endpoints or not, defaults to True
432+
:type inclusive: bool
433+
434+
:return: a :class:`~tsdf.TSDF` object containing just the records within the range specified
435+
"""
436+
if inclusive:
437+
return self.atOrAfter(start_ts).atOrBefore(end_ts)
438+
return self.after(start_ts).before(end_ts)
439+
440+
def __top_rows_per_series(self, win: WindowSpec, n: int):
441+
"""
442+
Private method to select just the top n rows per series (as defined by a window ordering)
443+
444+
:param win: the window on which we order the rows in each series
445+
:param n: the number of rows to return
446+
447+
:return: a :class:`~tsdf.TSDF` object containing just the top n rows in each series
448+
"""
449+
row_num_col = "__row_num"
450+
prev_records_df = (
451+
self.df.withColumn(row_num_col, f.row_number().over(win))
452+
.where(f.col(row_num_col) <= f.lit(n))
453+
.drop(row_num_col)
454+
)
455+
return TSDF(
456+
prev_records_df,
457+
ts_col=self.ts_col,
458+
partition_cols=self.partitionCols,
459+
sequence_col=self.sequence_col,
460+
)
461+
462+
def earliest(self, n: int = 1):
463+
"""
464+
Select the earliest n records for each series
465+
466+
:param n: number of records to select (default is 1)
467+
468+
:return: a :class:`~tsdf.TSDF` object containing the earliest n records for each series
469+
"""
470+
prev_window = self.__baseWindow(reverse=False)
471+
return self.__top_rows_per_series(prev_window, n)
472+
473+
def latest(self, n: int = 1):
474+
"""
475+
Select the latest n records for each series
476+
477+
:param n: number of records to select (default is 1)
478+
479+
:return: a :class:`~tsdf.TSDF` object containing the latest n records for each series
480+
"""
481+
next_window = self.__baseWindow(reverse=True)
482+
return self.__top_rows_per_series(next_window, n)
483+
484+
def priorTo(self, ts, n: int = 1):
485+
"""
486+
Select the n most recent records prior to a given time
487+
You can think of this like an 'asOf' select - it selects the records as of a particular time
488+
489+
:param ts: timestamp on which to filter records
490+
:param n: number of records to select (default is 1)
491+
492+
:return: a :class:`~tsdf.TSDF` object containing the n records prior to the given time
493+
"""
494+
return self.atOrBefore(ts).latest(n)
495+
496+
def subsequentTo(self, ts, n: int = 1):
497+
"""
498+
Select the n records subsequent to a give time
499+
500+
:param ts: timestamp on which to filter records
501+
:param n: number of records to select (default is 1)
502+
503+
:return: a :class:`~tsdf.TSDF` object containing the n records subsequent to the given time
504+
"""
505+
return self.atOrAfter(ts).earliest(n)
506+
507+
#
508+
# Display functions
509+
#
510+
511+
def show(self, n=20, k=5, truncate=True, vertical=False):
346512
"""
347513
pyspark.sql.DataFrame.show() method's equivalent for TSDF objects
348514
@@ -372,16 +538,14 @@ def show(self, n=20, truncate=True, vertical=False):
372538
phone_accel_tsdf.show()
373539
374540
"""
375-
if PLATFORM == "DATABRICKS" or ENV_BOOLEAN is False:
376-
self.df.show(n, truncate, vertical)
377-
elif ENV_BOOLEAN:
541+
# validate k <= n
542+
if k > n:
543+
raise ValueError(f"Parameter k {k} cannot be greater than parameter n {n}")
544+
545+
if not (IS_DATABRICKS) and ENV_CAN_RENDER_HTML:
378546
# In Jupyter notebooks, for wide dataframes the below line will enable rendering the output in a scrollable format.
379547
ipydisplay(HTML("<style>pre { white-space: pre !important; }</style>"))
380-
self.df.show(n, truncate, vertical)
381-
else:
382-
self.df.show(
383-
n, truncate=False
384-
) # default show method behaviour in case all condition fails
548+
get_display_df(self, k).show(n, truncate, vertical)
385549

386550
def describe(self):
387551
"""
@@ -672,23 +836,34 @@ def asofJoin(
672836

673837
return asofDF
674838

675-
def __baseWindow(self, sort_col=None):
676-
# add all sort keys - time is first, unique sequence number breaks the tie
839+
def __baseWindow(self, sort_col=None, reverse=False):
840+
# figure out our sorting columns
841+
primary_sort_col = self.ts_col if not sort_col else sort_col
842+
sort_cols = (
843+
[primary_sort_col, self.sequence_col]
844+
if self.sequence_col
845+
else [primary_sort_col]
846+
)
677847

678-
sort_col = self.ts_col if not sort_col else sort_col
679-
ptntl_sort_keys = [sort_col, self.sequence_col]
680-
sort_keys = [f.col(col_name) for col_name in ptntl_sort_keys if col_name != ""]
848+
# are we ordering forwards (default) or reveresed?
849+
col_fn = f.col
850+
if reverse:
851+
col_fn = lambda colname: f.col(colname).desc() # noqa E731
681852

682-
w = Window().orderBy(sort_keys)
853+
# our window will be sorted on our sort_cols in the appropriate direction
854+
w = Window().orderBy([col_fn(col) for col in sort_cols])
855+
# and partitioned by any series IDs
683856
if self.partitionCols:
684857
w = w.partitionBy([f.col(elem) for elem in self.partitionCols])
685858
return w
686859

687-
def __rangeBetweenWindow(self, range_from, range_to, sort_col=None):
688-
return self.__baseWindow(sort_col).rangeBetween(range_from, range_to)
860+
def __rangeBetweenWindow(self, range_from, range_to, sort_col=None, reverse=False):
861+
return self.__baseWindow(sort_col=sort_col, reverse=reverse).rangeBetween(
862+
range_from, range_to
863+
)
689864

690-
def __rowsBetweenWindow(self, rows_from, rows_to):
691-
return self.__baseWindow().rowsBetween(rows_from, rows_to)
865+
def __rowsBetweenWindow(self, rows_from, rows_to, reverse=False):
866+
return self.__baseWindow(reverse=reverse).rowsBetween(rows_from, rows_to)
692867

693868
def withPartitionCols(self, partitionCols):
694869
"""

python/tempo/utils.py

+18-8
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
from pandas import DataFrame as pandasDataFrame
99
from pyspark.sql.dataframe import DataFrame
1010
from pyspark.sql.functions import expr, max, min, sum, percentile_approx
11+
1112
from tempo.resample import checkAllowableFreq, freq_dict
1213

1314
logger = logging.getLogger(__name__)
14-
PLATFORM = "DATABRICKS" if "DB_HOME" in os.environ.keys() else "NON_DATABRICKS"
15+
IS_DATABRICKS = "DB_HOME" in os.environ.keys()
1516

1617
"""
1718
DB_HOME env variable has been chosen and that's because this variable is a special variable that will be available in DBR.
@@ -139,31 +140,40 @@ def display_unavailable(df):
139140
)
140141

141142

142-
ENV_BOOLEAN = __is_capable_of_html_rendering()
143+
def get_display_df(tsdf, k):
144+
# let's show the n most recent records per series, in order:
145+
orderCols = tsdf.partitionCols.copy()
146+
orderCols.append(tsdf.ts_col)
147+
if tsdf.sequence_col:
148+
orderCols.append(tsdf.sequence_col)
149+
return tsdf.latest(k).df.orderBy(orderCols)
150+
151+
152+
ENV_CAN_RENDER_HTML = __is_capable_of_html_rendering()
143153

144154

145155
if (
146-
(PLATFORM == "DATABRICKS")
147-
and not isinstance(get_ipython(), None)
148-
and "display" in get_ipython().user_ns.keys()
156+
IS_DATABRICKS
157+
and not (get_ipython() is None)
158+
and ("display" in get_ipython().user_ns.keys())
149159
):
150160
method = get_ipython().user_ns["display"]
151161
# Under 'display' key in user_ns the original databricks display method is present
152162
# to know more refer: /databricks/python_shell/scripts/db_ipykernel_launcher.py
153163

154164
def display_improvised(obj):
155165
if type(obj).__name__ == "TSDF":
156-
method(obj.df)
166+
method(get_display_df(obj, k=5))
157167
else:
158168
method(obj)
159169

160170
display = display_improvised
161171

162-
elif ENV_BOOLEAN:
172+
elif ENV_CAN_RENDER_HTML:
163173

164174
def display_html_improvised(obj):
165175
if type(obj).__name__ == "TSDF":
166-
display_html(obj.df)
176+
display_html(get_display_df(obj, k=5))
167177
else:
168178
display_html(obj)
169179

0 commit comments

Comments
 (0)