@@ -105,6 +105,97 @@ def indicate_duplicates(
105
105
)
106
106
107
107
108
+ def interpolate (block : blocks .Block , method : str = "linear" ) -> blocks .Block :
109
+ if method != "linear" :
110
+ raise NotImplementedError (
111
+ f"Only 'linear' interpolate method supported. { constants .FEEDBACK_LINK } "
112
+ )
113
+ backwards_window = windows .WindowSpec (following = 0 )
114
+ forwards_window = windows .WindowSpec (preceding = 0 )
115
+
116
+ output_column_ids = []
117
+
118
+ original_columns = block .value_columns
119
+ original_labels = block .column_labels
120
+ block , offsets = block .promote_offsets ()
121
+ for column in original_columns :
122
+ # null in same places column is null
123
+ should_interpolate = block ._column_type (column ) in [
124
+ pd .Float64Dtype (),
125
+ pd .Int64Dtype (),
126
+ ]
127
+ if should_interpolate :
128
+ block , notnull = block .apply_unary_op (column , ops .notnull_op )
129
+ block , masked_offsets = block .apply_binary_op (
130
+ offsets , notnull , ops .partial_arg3 (ops .where_op , None )
131
+ )
132
+
133
+ block , previous_value = block .apply_window_op (
134
+ column , agg_ops .LastNonNullOp (), backwards_window
135
+ )
136
+ block , next_value = block .apply_window_op (
137
+ column , agg_ops .FirstNonNullOp (), forwards_window
138
+ )
139
+ block , previous_value_offset = block .apply_window_op (
140
+ masked_offsets ,
141
+ agg_ops .LastNonNullOp (),
142
+ backwards_window ,
143
+ skip_reproject_unsafe = True ,
144
+ )
145
+ block , next_value_offset = block .apply_window_op (
146
+ masked_offsets ,
147
+ agg_ops .FirstNonNullOp (),
148
+ forwards_window ,
149
+ skip_reproject_unsafe = True ,
150
+ )
151
+
152
+ block , prediction_id = _interpolate (
153
+ block ,
154
+ previous_value_offset ,
155
+ previous_value ,
156
+ next_value_offset ,
157
+ next_value ,
158
+ offsets ,
159
+ )
160
+
161
+ block , interpolated_column = block .apply_binary_op (
162
+ column , prediction_id , ops .fillna_op
163
+ )
164
+ # Pandas performs ffill-like behavior to extrapolate forwards
165
+ block , interpolated_and_ffilled = block .apply_binary_op (
166
+ interpolated_column , previous_value , ops .fillna_op
167
+ )
168
+
169
+ output_column_ids .append (interpolated_and_ffilled )
170
+ else :
171
+ output_column_ids .append (column )
172
+
173
+ # Force reproject since used `skip_project_unsafe` perviously
174
+ block = block .select_columns (output_column_ids )._force_reproject ()
175
+ return block .with_column_labels (original_labels )
176
+
177
+
178
+ def _interpolate (
179
+ block : blocks .Block ,
180
+ x0_id : str ,
181
+ y0_id : str ,
182
+ x1_id : str ,
183
+ y1_id : str ,
184
+ xpredict_id : str ,
185
+ ) -> typing .Tuple [blocks .Block , str ]:
186
+ """Applies linear interpolation equation to predict y values for xpredict."""
187
+ block , x1x0diff = block .apply_binary_op (x1_id , x0_id , ops .sub_op )
188
+ block , y1y0diff = block .apply_binary_op (y1_id , y0_id , ops .sub_op )
189
+ block , xpredictx0diff = block .apply_binary_op (xpredict_id , x0_id , ops .sub_op )
190
+
191
+ block , y1_weight = block .apply_binary_op (y1y0diff , x1x0diff , ops .div_op )
192
+ block , y1_part = block .apply_binary_op (xpredictx0diff , y1_weight , ops .mul_op )
193
+
194
+ block , prediction_id = block .apply_binary_op (y0_id , y1_part , ops .add_op )
195
+ block = block .drop_columns ([x1x0diff , y1y0diff , xpredictx0diff , y1_weight , y1_part ])
196
+ return block , prediction_id
197
+
198
+
108
199
def drop_duplicates (
109
200
block : blocks .Block , columns : typing .Sequence [str ], keep : str = "first"
110
201
) -> blocks .Block :
0 commit comments