Skip to content

Commit 754db12

Browse files
authored
Merge pull request #69 from clrcrl/insert-by-period
Add insert-by-period materialization
2 parents 77ba63d + b924133 commit 754db12

File tree

2 files changed

+230
-2
lines changed

2 files changed

+230
-2
lines changed

README.md

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ This macro implements an "outer union." The list of tables provided to this macr
190190
Usage:
191191
```
192192
{{ dbt_utils.union_tables(
193-
tables=[ref('table_1'), ref('table_2')],
194-
column_override={"some_field": "varchar(100)"},
193+
tables=[ref('table_1'), ref('table_2')],
194+
column_override={"some_field": "varchar(100)"},
195195
exclude=["some_other_field"]
196196
) }}
197197
```
@@ -266,6 +266,59 @@ Usage:
266266
```
267267
{{ dbt_utils.get_url_parameter(field='page_url', url_parameter='utm_source') }}
268268
```
269+
---
270+
### Materializations
271+
#### insert_by_period ([source](macros/materializations/insert_by_period_materialization.sql))
272+
`insert_by_period` allows dbt to insert records into a table one period (i.e. day, week) at a time.
273+
274+
This materialization is appropriate for event data that can be processed in discrete periods. It is similar in concept to the built-in incremental materialization, but has the added benefit of building the model in chunks even during a full-refresh so is particularly useful for models where the initial run can be problematic.
275+
276+
Should a run of a model using this materialization be interrupted, a subsequent run will continue building the target table from where it was interrupted (granted the `--full-refresh` flag is omitted).
277+
278+
Progress is logged in the command line for easy monitoring.
279+
280+
Usage:
281+
```sql
282+
{{
283+
config(
284+
materialized = "insert_by_period",
285+
period = "day",
286+
timestamp_field = "created_at",
287+
start_date = "2018-01-01",
288+
stop_date = "2018-06-01"
289+
}}
290+
291+
with events as (
292+
293+
select *
294+
from {{ ref('events') }}
295+
where __PERIOD_FILTER__ -- This will be replaced with a filter in the materialization code
296+
297+
)
298+
299+
....complex aggregates here....
300+
301+
```
302+
Configuration values:
303+
* `period`: period to break the model into, must be a valid [datepart](https://docs.aws.amazon.com/redshift/latest/dg/r_Dateparts_for_datetime_functions.html) (default='Week')
304+
* `timestamp_field`: the column name of the timestamp field that will be used to break the model into smaller queries
305+
* `start_date`: literal date or timestamp - generally choose a date that is earlier than the start of your data
306+
* `stop_date`: literal date or timestamp (default=current_timestamp)
307+
308+
Caveats:
309+
* This materialization is compatible with dbt 0.10.1.
310+
* This materialization has been written for Redshift.
311+
* This materialization can only be used for a model where records are not expected to change after they are created.
312+
* Any model post-hooks that use `{{ this }}` will fail using this materialization. For example:
313+
```yaml
314+
models:
315+
project-name:
316+
post-hook: "grant select on {{ this }} to db_reader"
317+
```
318+
A useful workaround is to change the above post-hook to:
319+
```yaml
320+
post-hook: "grant select on {{ this.schema }}.{{ this.name }} to db_reader"
321+
```
269322
270323
----
271324
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}
2+
3+
{% call statement('period_boundaries', fetch_result=True) -%}
4+
with data as (
5+
select
6+
coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp,
7+
coalesce(
8+
{{dbt_utils.dateadd('millisecond',
9+
-1,
10+
"nullif('" ~ stop_date ~ "','')::timestamp")}},
11+
{{dbt_utils.current_timestamp()}}
12+
) as stop_timestamp
13+
from "{{target_schema}}"."{{target_table}}"
14+
)
15+
16+
select
17+
start_timestamp,
18+
stop_timestamp,
19+
{{dbt_utils.datediff('start_timestamp',
20+
'stop_timestamp',
21+
period)}} + 1 as num_periods
22+
from data
23+
{%- endcall %}
24+
25+
{%- endmacro %}
26+
27+
{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}
28+
29+
{%- set period_filter -%}
30+
("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
31+
"{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
32+
"{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp)
33+
{%- endset -%}
34+
35+
{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}
36+
37+
select
38+
{{target_cols_csv}}
39+
from (
40+
{{filtered_sql}}
41+
)
42+
43+
{%- endmacro %}
44+
45+
{% materialization insert_by_period, default -%}
46+
{%- set timestamp_field = config.require('timestamp_field') -%}
47+
{%- set start_date = config.require('start_date') -%}
48+
{%- set stop_date = config.get('stop_date') or '' -%}}
49+
{%- set period = config.get('period') or 'week' -%}
50+
51+
{%- if not '__PERIOD_FILTER__' is in sql -%}
52+
{%- set error_message -%}
53+
Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql
54+
{%- endset -%}
55+
{{ exceptions.raise_compiler_error(error_message) }}
56+
{%- endif -%}
57+
58+
{%- set identifier = model['name'] -%}
59+
60+
{%- set existing_relations = adapter.list_relations(schema=schema) -%}
61+
{%- set old_relation = adapter.get_relation(relations_list=existing_relations,
62+
schema=schema, identifier=identifier) -%}
63+
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%}
64+
65+
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
66+
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
67+
68+
{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
69+
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}
70+
71+
{%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%}
72+
{%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%}
73+
{%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%}
74+
75+
-- setup
76+
{% if old_relation is none -%}
77+
-- noop
78+
{%- elif should_truncate -%}
79+
{{adapter.truncate_relation(old_relation)}}
80+
{%- elif should_drop -%}
81+
{{adapter.drop_relation(old_relation)}}
82+
{%- set old_relation = none -%}
83+
{%- endif %}
84+
85+
{{run_hooks(pre_hooks, inside_transaction=False)}}
86+
87+
-- `begin` happens here, so `commit` after it to finish the transaction
88+
{{run_hooks(pre_hooks, inside_transaction=True)}}
89+
{% call statement() -%}
90+
begin; -- make extra sure we've closed out the transaction
91+
commit;
92+
{%- endcall %}
93+
94+
-- build model
95+
{% if force_create or old_relation is none -%}
96+
{# Create an empty target table -#}
97+
{% call statement('main') -%}
98+
{%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%}
99+
{{create_table_as(False, target_relation, empty_sql)}};
100+
{%- endcall %}
101+
{%- endif %}
102+
103+
{% set _ = dbt_utils.get_period_boundaries(schema,
104+
identifier,
105+
timestamp_field,
106+
start_date,
107+
stop_date,
108+
period) %}
109+
{%- set start_timestamp = load_result('period_boundaries')['data'][0][0] | string -%}
110+
{%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%}
111+
{%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%}
112+
113+
{% set target_columns = adapter.get_columns_in_table(schema, identifier) %}
114+
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
115+
{%- set loop_vars = {'sum_rows_inserted': 0} -%}
116+
117+
-- commit each period as a separate transaction
118+
{% for i in range(num_periods) -%}
119+
{%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) -%}
120+
{{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}}
121+
122+
{%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period' ~ i ~ '_tmp' -%}
123+
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
124+
schema=schema, type='table') -%}
125+
{% call statement() -%}
126+
{% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv,
127+
sql,
128+
timestamp_field,
129+
period,
130+
start_timestamp,
131+
stop_timestamp,
132+
i) %}
133+
{{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}}
134+
{%- endcall %}
135+
136+
{{adapter.expand_target_column_types(temp_table=tmp_identifier,
137+
to_schema=schema,
138+
to_table=identifier)}}
139+
{%- set name = 'main-' ~ i -%}
140+
{% call statement(name, fetch_result=True) -%}
141+
insert into {{target_relation}} ({{target_cols_csv}})
142+
(
143+
select
144+
{{target_cols_csv}}
145+
from {{tmp_relation.include(schema=False)}}
146+
);
147+
{%- endcall %}
148+
{%- set rows_inserted = (load_result('main-' ~ i)['status'].split(" "))[2] | int -%}
149+
{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
150+
{%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%}
151+
152+
{%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " records inserted" -%}
153+
{{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}}
154+
155+
{%- endfor %}
156+
157+
{% call statement() -%}
158+
begin;
159+
{%- endcall %}
160+
161+
{{run_hooks(post_hooks, inside_transaction=True)}}
162+
163+
{% call statement() -%}
164+
commit;
165+
{%- endcall %}
166+
167+
{{run_hooks(post_hooks, inside_transaction=False)}}
168+
169+
{%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%}
170+
171+
{% call noop_statement(name='main', status=status_string) -%}
172+
-- no-op
173+
{%- endcall %}
174+
175+
{%- endmaterialization %}

0 commit comments

Comments
 (0)