Skip to content

Commit 714eea3

Browse files
midavadimdarynaishchenkoalafanechere
authored
🎉 Source Slack migration to low code (#35477)
Co-authored-by: darynaishchenko <[email protected]> Co-authored-by: Daryna Ishchenko <[email protected]> Co-authored-by: Augustin <[email protected]>
1 parent 0c49832 commit 714eea3

26 files changed

+1311
-542
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[run]
2+
omit =
3+
source_slack/run.py

airbyte-integrations/connectors/source-slack/acceptance-test-config.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ acceptance_tests:
66
- spec_path: "source_slack/spec.json"
77
backward_compatibility_tests_config:
88
# edited `min`/`max` > `minimum`/`maximum` for `lookback_window` field
9-
disable_for_version: "0.1.26"
9+
#disable_for_version: "0.1.26"
10+
# slight changes: removed doc url, added new null oauth param
11+
disable_for_version: "0.3.10"
1012
connection:
1113
tests:
1214
- config_path: "secrets/config.json"

airbyte-integrations/connectors/source-slack/integration_tests/abnormal_state.json

+43-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,49 @@
99
{
1010
"type": "STREAM",
1111
"stream": {
12-
"stream_state": { "float_ts": 7270247822 },
13-
"stream_descriptor": { "name": "channel_messages" }
12+
"stream_descriptor": {
13+
"name": "channel_messages"
14+
},
15+
"stream_state": {
16+
"states": [
17+
{
18+
"partition": {
19+
"channel_id": "C04LTCM2Y56",
20+
"parent_slice": {}
21+
},
22+
"cursor": {
23+
"float_ts": "2534945416"
24+
}
25+
},
26+
{
27+
"partition": {
28+
"channel": "C04KX3KEZ54",
29+
"parent_slice": {}
30+
},
31+
"cursor": {
32+
"float_ts": "2534945416"
33+
}
34+
},
35+
{
36+
"partition": {
37+
"channel": "C04L3M4PTJ6",
38+
"parent_slice": {}
39+
},
40+
"cursor": {
41+
"float_ts": "2534945416"
42+
}
43+
},
44+
{
45+
"partition": {
46+
"channel": "C04LTCM2Y56",
47+
"parent_slice": {}
48+
},
49+
"cursor": {
50+
"float_ts": "2534945416"
51+
}
52+
}
53+
]
54+
}
1455
}
1556
}
1657
]

airbyte-integrations/connectors/source-slack/integration_tests/expected_records.jsonl

+11-9
Large diffs are not rendered by default.

airbyte-integrations/connectors/source-slack/metadata.yaml

+15-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
13-
dockerImageTag: 0.4.1
13+
dockerImageTag: 1.0.0
1414
dockerRepository: airbyte/source-slack
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/slack
1616
githubIssueLabel: source-slack
@@ -28,6 +28,19 @@ data:
2828
oss:
2929
enabled: true
3030
releaseStage: generally_available
31+
releases:
32+
breakingChanges:
33+
1.0.0:
34+
message:
35+
The source Slack connector is being migrated from the Python CDK to our declarative low-code CDK.
36+
Due to changes in the handling of state format for incremental substreams, this migration constitutes a breaking change for the channel_messages stream.
37+
Users will need to reset source configuration, refresh the source schema and reset the channel_messages stream after upgrading.
38+
For more information, see our migration documentation for source Slack.
39+
upgradeDeadline: "2024-04-29"
40+
scopedImpact:
41+
- scopeType: stream
42+
impactedScopes:
43+
- "channel_messages"
3144
suggestedStreams:
3245
streams:
3346
- users
@@ -38,5 +51,5 @@ data:
3851
supportLevel: certified
3952
tags:
4053
- language:python
41-
- cdk:python
54+
- cdk:low-code
4255
metadataSpecVersion: "1.0"

airbyte-integrations/connectors/source-slack/poetry.lock

+76-62
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-integrations/connectors/source-slack/pyproject.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "0.4.1"
6+
version = "1.0.0"
77
name = "source-slack"
88
description = "Source implementation for Slack."
99
authors = [ "Airbyte <[email protected]>",]
@@ -19,6 +19,7 @@ include = "source_slack"
1919
python = "^3.9,<3.12"
2020
pendulum = "==2.1.2"
2121
airbyte-cdk = "^0"
22+
freezegun = "^1.4.0"
2223

2324
[tool.poetry.scripts]
2425
source-slack = "source_slack.run:run"

airbyte-integrations/connectors/source-slack/source_slack/components/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
from dataclasses import dataclass
4+
from typing import List
5+
6+
import requests
7+
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
8+
from airbyte_cdk.sources.declarative.types import Record
9+
10+
11+
@dataclass
12+
class ChannelMembersExtractor(DpathExtractor):
13+
"""
14+
Transform response from list of strings to list dicts:
15+
from: ['aa', 'bb']
16+
to: [{'member_id': 'aa'}, {{'member_id': 'bb'}]
17+
"""
18+
19+
def extract_records(self, response: requests.Response) -> List[Record]:
20+
records = super().extract_records(response)
21+
return [{"member_id": record} for record in records]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
import logging
4+
from functools import partial
5+
from typing import Any, Iterable, List, Mapping, Optional
6+
7+
import requests
8+
from airbyte_cdk.models import SyncMode
9+
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter
10+
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
11+
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
12+
from airbyte_cdk.sources.streams.core import StreamData
13+
from airbyte_cdk.sources.streams.http import HttpStream
14+
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
15+
16+
LOGGER = logging.getLogger("airbyte_logger")
17+
18+
19+
class JoinChannelsStream(HttpStream):
20+
"""
21+
This class is a special stream which joins channels because the Slack API only returns messages from channels this bot is in.
22+
Its responses should only be logged for debugging reasons, not read as records.
23+
"""
24+
25+
url_base = "https://slack.com/api/"
26+
http_method = "POST"
27+
primary_key = "id"
28+
29+
def __init__(self, channel_filter: List[str] = None, **kwargs):
30+
self.channel_filter = channel_filter or []
31+
super().__init__(**kwargs)
32+
33+
def path(self, **kwargs) -> str:
34+
return "conversations.join"
35+
36+
def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable:
37+
"""
38+
Override to simply indicate that the specific channel was joined successfully.
39+
This method should not return any data, but should return an empty iterable.
40+
"""
41+
is_ok = response.json().get("ok", False)
42+
if is_ok:
43+
self.logger.info(f"Successfully joined channel: {stream_slice['channel_name']}")
44+
else:
45+
self.logger.info(f"Unable to joined channel: {stream_slice['channel_name']}. Reason: {response.json()}")
46+
return []
47+
48+
def request_body_json(self, stream_slice: Mapping = None, **kwargs) -> Optional[Mapping]:
49+
if stream_slice:
50+
return {"channel": stream_slice.get("channel")}
51+
52+
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
53+
"""
54+
The pagination is not applicable to this Service Stream.
55+
"""
56+
return None
57+
58+
59+
class ChannelsRetriever(SimpleRetriever):
60+
def __post_init__(self, parameters: Mapping[str, Any]):
61+
super().__post_init__(parameters)
62+
self.stream_slicer = SinglePartitionRouter(parameters={})
63+
self.record_selector.transformations = []
64+
65+
def should_join_to_channel(self, config: Mapping[str, Any], record: Record) -> bool:
66+
"""
67+
The `is_member` property indicates whether the API Bot is already assigned / joined to the channel.
68+
https://api.slack.com/types/conversation#booleans
69+
"""
70+
return config["join_channels"] and not record.get("is_member")
71+
72+
def make_join_channel_slice(self, channel: Mapping[str, Any]) -> Mapping[str, Any]:
73+
channel_id: str = channel.get("id")
74+
channel_name: str = channel.get("name")
75+
LOGGER.info(f"Joining Slack Channel: `{channel_name}`")
76+
return {"channel": channel_id, "channel_name": channel_name}
77+
78+
def join_channels_stream(self, config) -> JoinChannelsStream:
79+
token = config["credentials"].get("api_token") or config["credentials"].get("access_token")
80+
authenticator = TokenAuthenticator(token)
81+
channel_filter = config["channel_filter"]
82+
return JoinChannelsStream(authenticator=authenticator, channel_filter=channel_filter)
83+
84+
def join_channel(self, config: Mapping[str, Any], record: Mapping[str, Any]):
85+
list(
86+
self.join_channels_stream(config).read_records(
87+
sync_mode=SyncMode.full_refresh,
88+
stream_slice=self.make_join_channel_slice(record),
89+
)
90+
)
91+
92+
def read_records(
93+
self,
94+
records_schema: Mapping[str, Any],
95+
stream_slice: Optional[StreamSlice] = None,
96+
) -> Iterable[StreamData]:
97+
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
98+
99+
self._paginator.reset()
100+
101+
most_recent_record_from_slice = None
102+
record_generator = partial(
103+
self._parse_records,
104+
stream_state=self.state or {},
105+
stream_slice=_slice,
106+
records_schema=records_schema,
107+
)
108+
109+
for stream_data in self._read_pages(record_generator, self.state, _slice):
110+
# joining channel logic
111+
if self.should_join_to_channel(self.config, stream_data):
112+
self.join_channel(self.config, stream_data)
113+
114+
current_record = self._extract_record(stream_data, _slice)
115+
if self.cursor and current_record:
116+
self.cursor.observe(_slice, current_record)
117+
118+
most_recent_record_from_slice = self._get_most_recent_record(most_recent_record_from_slice, current_record, _slice)
119+
yield stream_data
120+
121+
if self.cursor:
122+
self.cursor.observe(_slice, most_recent_record_from_slice)
123+
return
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
import logging
4+
from typing import Any, List, Mapping
5+
6+
from airbyte_cdk import AirbyteEntrypoint
7+
from airbyte_cdk.config_observation import create_connector_config_control_message
8+
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
9+
from source_slack import SourceSlack
10+
11+
logger = logging.getLogger("airbyte_logger")
12+
13+
14+
class MigrateLegacyConfig:
15+
message_repository: MessageRepository = InMemoryMessageRepository()
16+
17+
@classmethod
18+
def _should_migrate(cls, config: Mapping[str, Any]) -> bool:
19+
"""
20+
legacy config:
21+
{
22+
"start_date": "2021-07-22T20:00:00Z",
23+
"end_date": "2021-07-23T20:00:00Z",
24+
"lookback_window": 1,
25+
"join_channels": True,
26+
"channel_filter": ["airbyte-for-beginners", "good-reads"],
27+
"api_token": "api-token"
28+
}
29+
api token should be in the credentials object
30+
"""
31+
if config.get("api_token") and not config.get("credentials"):
32+
return True
33+
return False
34+
35+
@classmethod
36+
def _move_token_to_credentials(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
37+
api_token = config["api_token"]
38+
config.update({"credentials": {"api_token": api_token, "option_title": "API Token Credentials"}})
39+
config.pop("api_token")
40+
return config
41+
42+
@classmethod
43+
def _modify_and_save(cls, config_path: str, source: SourceSlack, config: Mapping[str, Any]) -> Mapping[str, Any]:
44+
migrated_config = cls._move_token_to_credentials(config)
45+
# save the config
46+
source.write_config(migrated_config, config_path)
47+
return migrated_config
48+
49+
@classmethod
50+
def _emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
51+
# add the Airbyte Control Message to message repo
52+
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
53+
# emit the Airbyte Control Message from message queue to stdout
54+
for message in cls.message_repository._message_queue:
55+
print(message.json(exclude_unset=True))
56+
57+
@classmethod
58+
def migrate(cls, args: List[str], source: SourceSlack) -> None:
59+
"""
60+
This method checks the input args, should the config be migrated,
61+
transform if necessary and emit the CONTROL message.
62+
"""
63+
# get config path
64+
config_path = AirbyteEntrypoint(source).extract_config(args)
65+
# proceed only if `--config` arg is provided
66+
if config_path:
67+
# read the existing config
68+
config = source.read_config(config_path)
69+
# migration check
70+
if cls._should_migrate(config):
71+
cls._emit_control_message(
72+
cls._modify_and_save(config_path, source, config),
73+
)

0 commit comments

Comments
 (0)