Skip to content

Commit 8497d78

Browse files
Migrate sendgrid to config-based (#15257)
* fix spec * read records from lists stream * campaigns * contacts * stats_automations * segments * single_sends * templates * suppressions_global * suppression groups * suppression group memebers * blocks * bounces * invalid emails and spam reports * bump cdk version * fix paths * bump cdk version * only define cursor field in one place * move to definitions * move bounces inside the streams array * move all streams within the streams array * update sendgrid config * fix * derp * rename field * fix parse * Revert "fix parse" This reverts commit 3c76c5a. * fix parse timestamp * extract datetime parser * remove print * use parser * top level docstring * rename variable * Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid" This reverts commit 99caa58, reversing changes made to 028bdfb. * Revert "Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid"" This reverts commit 8d55afa. * Revert "Revert "Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid""" This reverts commit 9b70a3b. * do not use timestamp() * Revert "do not use timestamp()" This reverts commit 016cb69. * Handle extracting no records from root * bump cdk version * handle empty record * update unit test * messages stream needs a different slicer * handle missing keys * Update unit test * record extractor interface * dpath extractor * docstring * use dpath * Revert "Merge branch 'alex/selectNoRecords' into alex/configbasedsendgrid" This reverts commit ac92374, reversing changes made to e10d6b9. * bump cdk version * use dpath * missing cursor field * start DRYing the config * delete more cruff * DRY * get start time from config * delete custom streams * step=30days * bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 330a196 commit 8497d78

File tree

10 files changed

+306
-362
lines changed

10 files changed

+306
-362
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@
892892
- name: Sendgrid
893893
sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87
894894
dockerRepository: airbyte/source-sendgrid
895-
dockerImageTag: 0.2.8
895+
dockerImageTag: 0.2.9
896896
documentationUrl: https://docs.airbyte.io/integrations/sources/sendgrid
897897
icon: sendgrid.svg
898898
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8644,7 +8644,7 @@
86448644
supportsNormalization: false
86458645
supportsDBT: false
86468646
supported_destination_sync_modes: []
8647-
- dockerImage: "airbyte/source-sendgrid:0.2.8"
8647+
- dockerImage: "airbyte/source-sendgrid:0.2.9"
86488648
spec:
86498649
documentationUrl: "https://docs.airbyte.io/integrations/sources/sendgrid"
86508650
connectionSpecification:
@@ -8653,7 +8653,7 @@
86538653
type: "object"
86548654
required:
86558655
- "apikey"
8656-
additionalProperties: false
8656+
additionalProperties: true
86578657
properties:
86588658
apikey:
86598659
title: "Sendgrid API key"

airbyte-integrations/connectors/source-sendgrid/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ RUN pip install .
1212
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

15-
LABEL io.airbyte.version=0.2.8
15+
LABEL io.airbyte.version=0.2.9
1616
LABEL io.airbyte.name=airbyte/source-sendgrid

airbyte-integrations/connectors/source-sendgrid/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@
1111
author="Airbyte",
1212
author_email="[email protected]",
1313
packages=find_packages(),
14-
install_requires=["airbyte-cdk~=0.1", "backoff", "requests", "pytest==6.1.2", "pytest-mock"],
14+
install_requires=["airbyte-cdk>=0.1.74", "backoff", "requests", "pytest==6.1.2", "pytest-mock"],
1515
package_data={"": ["*.json", "schemas/*.json"]},
1616
)
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
definitions:
2+
page_size: 50
3+
step: "30d"
4+
5+
schema_loader:
6+
type: JsonSchema
7+
file_path: "./source_sendgrid/schemas/{{ options.name }}.json"
8+
9+
requester:
10+
type: HttpRequester
11+
name: "{{ options['name'] }}"
12+
url_base: "https://api.sendgrid.com"
13+
http_method: "GET"
14+
authenticator:
15+
type: "BearerAuthenticator"
16+
api_token: "{{ config.apikey }}"
17+
cursor_paginator:
18+
type: LimitPaginator
19+
url_base: "*ref(definitions.requester.url_base)"
20+
page_size: "*ref(definitions.page_size)"
21+
limit_option:
22+
inject_into: "request_parameter"
23+
field_name: "page_size"
24+
page_token_option:
25+
inject_into: "path"
26+
pagination_strategy:
27+
type: "CursorPagination"
28+
cursor_value: "{{ response._metadata.next }}"
29+
offset_paginator:
30+
type: LimitPaginator
31+
$options:
32+
url_base: "*ref(definitions.requester.url_base)"
33+
page_size: "*ref(definitions.page_size)"
34+
limit_option:
35+
inject_into: "request_parameter"
36+
field_name: "limit"
37+
page_token_option:
38+
inject_into: "request_parameter"
39+
field_name: "offset"
40+
pagination_strategy:
41+
type: "OffsetIncrement"
42+
retriever:
43+
type: SimpleRetriever
44+
name: "{{ options['name'] }}"
45+
primary_key: "{{ options['primary_key'] }}"
46+
stream_slicer:
47+
type: "DatetimeStreamSlicer"
48+
start_datetime:
49+
datetime: "{{ config['start_time'] }}"
50+
datetime_format: "%s"
51+
end_datetime:
52+
datetime: "{{ now_utc() }}"
53+
datetime_format: "%Y-%m-%d %H:%M:%S.%f%z"
54+
step: "*ref(definitions.step)"
55+
cursor_field: "{{ options.stream_cursor_field }}"
56+
start_time_option:
57+
field_name: "start_time"
58+
inject_into: "request_parameter"
59+
end_time_option:
60+
field_name: "end_time"
61+
inject_into: "request_parameter"
62+
datetime_format: "%s"
63+
messages_stream_slicer:
64+
type: "DatetimeStreamSlicer"
65+
start_datetime:
66+
datetime: "{{ config['start_time'] }}"
67+
datetime_format: "%s"
68+
end_datetime:
69+
datetime: "{{ now_utc() }}}"
70+
datetime_format: "%Y-%m-%d %H:%M:%S.%f%z"
71+
step: "*ref(definitions.step)"
72+
cursor_field: "{{ options.stream_cursor_field }}"
73+
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
74+
75+
base_stream:
76+
type: DeclarativeStream
77+
schema_loader:
78+
$ref: "*ref(definitions.schema_loader)"
79+
retriever:
80+
$ref: "*ref(definitions.retriever)"
81+
record_selector:
82+
extractor:
83+
field_pointer: []
84+
requester:
85+
$ref: "*ref(definitions.requester)"
86+
paginator:
87+
type: NoPagination
88+
streams:
89+
- $ref: "*ref(definitions.base_stream)"
90+
$options:
91+
name: "lists"
92+
primary_key: "id"
93+
path: "/v3/marketing/lists"
94+
field_pointer: ["result"]
95+
retriever:
96+
$ref: "*ref(definitions.base_stream.retriever)"
97+
paginator:
98+
$ref: "*ref(definitions.cursor_paginator)"
99+
- $ref: "*ref(definitions.base_stream)"
100+
$options:
101+
name: "campaigns"
102+
primary_key: "id"
103+
path: "/v3/marketing/campaigns"
104+
field_pointer: ["result"]
105+
retriever:
106+
$ref: "*ref(definitions.base_stream.retriever)"
107+
paginator:
108+
$ref: "*ref(definitions.cursor_paginator)"
109+
- $ref: "*ref(definitions.base_stream)"
110+
$options:
111+
name: "contacts"
112+
primary_key: "id"
113+
path: "/v3/marketing/contacts"
114+
field_pointer: ["result"]
115+
- $ref: "*ref(definitions.base_stream)"
116+
$options:
117+
name: "stats_automations"
118+
primary_key: "id"
119+
path: "/v3/marketing/stats/automations"
120+
field_pointer: ["results"]
121+
retriever:
122+
$ref: "*ref(definitions.base_stream.retriever)"
123+
paginator:
124+
$ref: "*ref(definitions.cursor_paginator)"
125+
- $ref: "*ref(definitions.base_stream)"
126+
$options:
127+
name: "segments"
128+
primary_key: "id"
129+
path: "/v3/marketing/segments"
130+
field_pointer: ["results"]
131+
- $ref: "*ref(definitions.base_stream)"
132+
$options:
133+
name: "single_sends"
134+
primary_key: "id"
135+
path: "/v3/marketing/stats/singlesends"
136+
field_pointer: ["results"]
137+
retriever:
138+
$ref: "*ref(definitions.base_stream.retriever)"
139+
paginator:
140+
$ref: "*ref(definitions.cursor_paginator)"
141+
- $ref: "*ref(definitions.base_stream)"
142+
$options:
143+
name: "templates"
144+
primary_key: "id"
145+
path: "/v3/templates"
146+
field_pointer: ["result"]
147+
retriever:
148+
$ref: "*ref(definitions.base_stream.retriever)"
149+
requester:
150+
$ref: "*ref(definitions.base_stream.retriever.requester)"
151+
request_options_provider:
152+
request_parameters:
153+
generations: "legacy,dynamic"
154+
paginator:
155+
$ref: "*ref(definitions.cursor_paginator)"
156+
- $ref: "*ref(definitions.base_stream)"
157+
$options:
158+
name: "bounces"
159+
primary_key: "email"
160+
stream_cursor_field: "created"
161+
path: "/v3/suppression/bounces"
162+
field_pointer: []
163+
retriever:
164+
$ref: "*ref(definitions.base_stream.retriever)"
165+
paginator:
166+
$ref: "*ref(definitions.offset_paginator)"
167+
stream_slicer:
168+
$ref: "*ref(definitions.stream_slicer)"
169+
- $ref: "*ref(definitions.base_stream)"
170+
$options:
171+
name: "global_suppressions"
172+
primary_key: "email"
173+
stream_cursor_field: "created"
174+
path: "/v3/suppression/unsubscribes"
175+
field_pointer: []
176+
retriever:
177+
$ref: "*ref(definitions.base_stream.retriever)"
178+
paginator:
179+
$ref: "*ref(definitions.offset_paginator)"
180+
stream_slicer:
181+
$ref: "*ref(definitions.stream_slicer)"
182+
- $ref: "*ref(definitions.base_stream)"
183+
$options:
184+
name: "blocks"
185+
primary_key: "email"
186+
stream_cursor_field: "created"
187+
path: "/v3/suppression/blocks"
188+
field_pointer: []
189+
retriever:
190+
$ref: "*ref(definitions.base_stream.retriever)"
191+
paginator:
192+
$ref: "*ref(definitions.offset_paginator)"
193+
stream_slicer:
194+
$ref: "*ref(definitions.stream_slicer)"
195+
- $ref: "*ref(definitions.base_stream)"
196+
$options:
197+
name: "suppression_groups"
198+
primary_key: "id"
199+
path: "/v3/asm/groups"
200+
field_pointer: []
201+
- $ref: "*ref(definitions.base_stream)"
202+
$options:
203+
name: "suppression_group_members"
204+
primary_key: "group_id"
205+
path: "/v3/asm/suppressions"
206+
field_pointer: []
207+
retriever:
208+
$ref: "*ref(definitions.base_stream.retriever)"
209+
paginator:
210+
$ref: "*ref(definitions.offset_paginator)"
211+
- $ref: "*ref(definitions.base_stream)"
212+
$options:
213+
name: "invalid_emails"
214+
primary_key: "email"
215+
stream_cursor_field: "created"
216+
path: "/v3/suppression/invalid_emails"
217+
field_pointer: []
218+
retriever:
219+
$ref: "*ref(definitions.base_stream.retriever)"
220+
paginator:
221+
$ref: "*ref(definitions.offset_paginator)"
222+
stream_slicer:
223+
$ref: "*ref(definitions.stream_slicer)"
224+
- $ref: "*ref(definitions.base_stream)"
225+
$options:
226+
name: "spam_reports"
227+
primary_key: "email"
228+
stream_cursor_field: "created"
229+
path: "/v3/suppression/spam_reports"
230+
field_pointer: []
231+
retriever:
232+
$ref: "*ref(definitions.base_stream.retriever)"
233+
paginator:
234+
$ref: "*ref(definitions.offset_paginator)"
235+
stream_slicer:
236+
$ref: "*ref(definitions.stream_slicer)"
237+
- $ref: "*ref(definitions.base_stream)"
238+
$options:
239+
name: "messages"
240+
primary_key: "msg_id"
241+
stream_cursor_field: "last_event_time"
242+
path: "/v3/messages"
243+
field_pointer: []
244+
retriever:
245+
$ref: "*ref(definitions.base_stream.retriever)"
246+
requester:
247+
$ref: "*ref(definitions.requester)"
248+
request_options_provider:
249+
request_parameters:
250+
limit: 1000
251+
query: 'last_event_time BETWEEN TIMESTAMP "{{stream_slice.start_time}}" AND TIMESTAMP "{{stream_slice.end_time}}"'
252+
stream_slicer:
253+
$ref: "*ref(definitions.messages_stream_slicer)"
254+
check:
255+
type: CheckStream
256+
stream_names: ["lists"]

airbyte-integrations/connectors/source-sendgrid/source_sendgrid/source.py

Lines changed: 10 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -2,63 +2,17 @@
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

5+
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
56

6-
from typing import Any, List, Mapping, Tuple
7+
"""
8+
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
9+
source connector.
710
8-
from airbyte_cdk.models import SyncMode
9-
from airbyte_cdk.sources import AbstractSource
10-
from airbyte_cdk.sources.streams import Stream
11-
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
11+
WARNING: Do not modify this file.
12+
"""
1213

13-
from .streams import (
14-
Blocks,
15-
Bounces,
16-
Campaigns,
17-
Contacts,
18-
GlobalSuppressions,
19-
InvalidEmails,
20-
Lists,
21-
Messages,
22-
Scopes,
23-
Segments,
24-
SingleSends,
25-
SpamReports,
26-
StatsAutomations,
27-
SuppressionGroupMembers,
28-
SuppressionGroups,
29-
Templates,
30-
)
3114

32-
33-
class SourceSendgrid(AbstractSource):
34-
def check_connection(self, logger, config) -> Tuple[bool, any]:
35-
try:
36-
authenticator = TokenAuthenticator(config["apikey"])
37-
scopes_gen = Scopes(authenticator=authenticator).read_records(sync_mode=SyncMode.full_refresh)
38-
next(scopes_gen)
39-
return True, None
40-
except Exception as error:
41-
return False, f"Unable to connect to Sendgrid API with the provided credentials - {error}"
42-
43-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
44-
authenticator = TokenAuthenticator(config["apikey"])
45-
46-
streams = [
47-
Lists(authenticator=authenticator),
48-
Campaigns(authenticator=authenticator),
49-
Contacts(authenticator=authenticator),
50-
StatsAutomations(authenticator=authenticator),
51-
Segments(authenticator=authenticator),
52-
SingleSends(authenticator=authenticator),
53-
Templates(authenticator=authenticator),
54-
Messages(authenticator=authenticator, start_time=config["start_time"]),
55-
GlobalSuppressions(authenticator=authenticator, start_time=config["start_time"]),
56-
SuppressionGroups(authenticator=authenticator),
57-
SuppressionGroupMembers(authenticator=authenticator),
58-
Blocks(authenticator=authenticator, start_time=config["start_time"]),
59-
Bounces(authenticator=authenticator, start_time=config["start_time"]),
60-
InvalidEmails(authenticator=authenticator, start_time=config["start_time"]),
61-
SpamReports(authenticator=authenticator, start_time=config["start_time"]),
62-
]
63-
64-
return streams
15+
# Declarative Source
16+
class SourceSendgrid(YamlDeclarativeSource):
17+
def __init__(self):
18+
super().__init__(**{"path_to_yaml": "./source_sendgrid/sendgrid.yaml"})

airbyte-integrations/connectors/source-sendgrid/source_sendgrid/spec.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"title": "Sendgrid Spec",
66
"type": "object",
77
"required": ["apikey"],
8-
"additionalProperties": false,
8+
"additionalProperties": true,
99
"properties": {
1010
"apikey": {
1111
"title": "Sendgrid API key",

0 commit comments

Comments
 (0)