Skip to content

Commit c55f185

Browse files
🚀 Source Klaviyo: New Stream addition along with update to existing (#11685)
* 🚀 flow stream added along with flow, campaign and flow message addition to event * ⚡ liniting fix * 🔨 annotations updated along with update method for klaviyo * 💥 docker version updated and log added * 🔨 fixed acceptance test for klaviyo flow stream * 🔨 unused import removed * fix: flows stream has no records thus tests are failing * chore: update seed file Co-authored-by: Harshith Mullapudi <[email protected]>
1 parent fc73df4 commit c55f185

File tree

10 files changed

+70
-6
lines changed

10 files changed

+70
-6
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@
454454
- name: Klaviyo
455455
sourceDefinitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
456456
dockerRepository: airbyte/source-klaviyo
457-
dockerImageTag: 0.1.3
457+
dockerImageTag: 0.1.4
458458
documentationUrl: https://docs.airbyte.io/integrations/sources/klaviyo
459459
icon: klaviyo.svg
460460
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4282,7 +4282,7 @@
42824282
supported_destination_sync_modes: []
42834283
supported_source_sync_modes:
42844284
- "append"
4285-
- dockerImage: "airbyte/source-klaviyo:0.1.3"
4285+
- dockerImage: "airbyte/source-klaviyo:0.1.4"
42864286
spec:
42874287
documentationUrl: "https://docs.airbyte.io/integrations/sources/klaviyo"
42884288
changelogUrl: "https://docs.airbyte.io/integrations/sources/klaviyo"

airbyte-integrations/connectors/source-klaviyo/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ RUN pip install .
1212
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

15-
LABEL io.airbyte.version=0.1.3
15+
LABEL io.airbyte.version=0.1.4
1616
LABEL io.airbyte.name=airbyte/source-klaviyo

airbyte-integrations/connectors/source-klaviyo/acceptance-test-config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ tests:
1313
- config_path: "secrets/config.json"
1414
basic_read:
1515
- config_path: "secrets/config.json"
16+
empty_streams: ['flows']
1617
incremental:
1718
- config_path: "secrets/config.json"
1819
future_state_path: "integration_tests/abnormal_state.json"

airbyte-integrations/connectors/source-klaviyo/integration_tests/abnormal_state.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
},
55
"global_exclusions": {
66
"timestamp": "2120-10-10T00:00:00Z"
7+
},
8+
"flows": {
9+
"created": "2120-10-10 00:00:00"
710
}
811
}

airbyte-integrations/connectors/source-klaviyo/integration_tests/configured_catalog.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@
5959
"cursor_field": null,
6060
"destination_sync_mode": "append",
6161
"primary_key": null
62+
},
63+
{
64+
"stream": {
65+
"name": "flows",
66+
"json_schema": {},
67+
"supported_sync_modes": ["full_refresh", "incremental"],
68+
"source_defined_cursor": null,
69+
"default_cursor_field": null,
70+
"source_defined_primary_key": [["id"]],
71+
"namespace": null
72+
},
73+
"sync_mode": "full_refresh",
74+
"cursor_field": null,
75+
"destination_sync_mode": "append",
76+
"primary_key": null
6277
}
6378
]
6479
}

airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,19 @@ class Event(BaseSchemaModel):
104104
statistic_id: str
105105
event_properties: dict
106106
person: dict
107+
flow_id: Optional[str]
108+
campaign_id: Optional[str]
109+
flow_message_id: Optional[str]
110+
111+
112+
class Flow(BaseSchemaModel):
113+
id: str
114+
name: str
115+
status: str
116+
created: datetime
117+
updated: datetime
118+
customer_filter: Optional[dict]
119+
trigger: dict
107120

108121

109122
class GlobalExclusion(BaseSchemaModel):

airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from airbyte_cdk.sources.streams import Stream
1111
from pydantic import Field
1212
from pydantic.main import BaseModel
13-
from source_klaviyo.streams import Campaigns, Events, GlobalExclusions, Lists, Metrics
13+
from source_klaviyo.streams import Campaigns, Events, Flows, GlobalExclusions, Lists, Metrics
1414

1515

1616
class ConnectorConfig(BaseModel):
@@ -61,6 +61,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
6161
GlobalExclusions(api_key=config.api_key, start_date=config.start_date),
6262
Lists(api_key=config.api_key),
6363
Metrics(api_key=config.api_key),
64+
Flows(api_key=config.api_key, start_date=config.start_date),
6465
]
6566

6667
def spec(self, *args, **kwargs) -> ConnectorSpecification:

airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

5+
import datetime
56
from abc import ABC, abstractmethod
67
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
78

89
import pendulum
910
import requests
1011
from airbyte_cdk.sources.streams.http import HttpStream
11-
from source_klaviyo.schemas import Campaign, Event, GlobalExclusion, Metric, PersonList
12+
from source_klaviyo.schemas import Campaign, Event, Flow, GlobalExclusion, Metric, PersonList
1213

1314

1415
class KlaviyoStream(HttpStream, ABC):
@@ -99,7 +100,13 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
99100
the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
100101
"""
101102
state_ts = int(current_stream_state.get(self.cursor_field, 0))
102-
return {self.cursor_field: max(latest_record.get(self.cursor_field), state_ts)}
103+
latest_record = latest_record.get(self.cursor_field)
104+
105+
if isinstance(latest_record, str):
106+
latest_record = datetime.datetime.strptime(latest_record, "%Y-%m-%d %H:%M:%S")
107+
latest_record = datetime.datetime.timestamp(latest_record)
108+
109+
return {self.cursor_field: max(latest_record, state_ts)}
103110

104111
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
105112
"""
@@ -240,3 +247,26 @@ class Events(IncrementalKlaviyoStream):
240247

241248
def path(self, **kwargs) -> str:
242249
return "metrics/timeline"
250+
251+
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
252+
"""
253+
:return an iterable containing each record in the response
254+
"""
255+
response_json = response.json()
256+
for record in response_json.get("data", []):
257+
flow = record["event_properties"].get("$flow")
258+
flow_message_id = record["event_properties"].get("$message")
259+
260+
record["flow_id"] = flow
261+
record["flow_message_id"] = flow_message_id
262+
record["campaign_id"] = flow_message_id if not flow else None
263+
264+
yield record
265+
266+
267+
class Flows(ReverseIncrementalKlaviyoStream):
268+
schema = Flow
269+
cursor_field = "created"
270+
271+
def path(self, **kwargs) -> str:
272+
return "flows"

docs/integrations/sources/klaviyo.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,6 @@ Please follow these [steps](https://help.klaviyo.com/hc/en-us/articles/115005062
5252

5353
| Version | Date | Pull Request | Subject |
5454
| :------ | :-------- | :----- | :------ |
55+
| `0.1.4` | 2022-04-15 | [11723](https://github.com/airbytehq/airbyte/issues/11723) | Enhance klaviyo source for flows stream and update to events stream. |
5556
| `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. |
5657
| `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT |

0 commit comments

Comments
 (0)