-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
Copy pathconfig_migrations.py
95 lines (81 loc) · 3.82 KB
/
config_migrations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, List, Mapping
from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import AirbyteMessageSerializer
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from orjson import orjson
class MigrateConfig:
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.
Specifically, starting from `2.0.1`, the `start_date` property should be not (None or `None`):
> "start_date": "2020-01-01"
instead of, in `2.0.0` for some older configs, when the `start_date` was not required:
> {...}
"""
message_repository: MessageRepository = InMemoryMessageRepository()
migrate_key: str = "start_date"
# default spec value for the `start_date` is `2020-01-01`
default_start_date_value: str = "2020-01-01"
@classmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether the config should be migrated to have the new structure with `start_date`,
based on the source spec.
Returns:
> True, if the transformation is necessary
> False, otherwise.
"""
# If the config was already migrated, there is no need to do this again.
# but if the customer has already switched to the new version,
# corrected the old config and switches back to the new version,
# we should try to migrate the modified old custom reports.
none_values: List[str] = [None, "None"]
key_not_present_in_config = cls.migrate_key not in config
key_present_in_config_but_invalid = cls.migrate_key in config and config.get(cls.migrate_key) in none_values
if key_not_present_in_config:
return True
elif key_present_in_config_but_invalid:
return True
else:
return False
@classmethod
def modify_config(cls, config: Mapping[str, Any], source: Source = None) -> Mapping[str, Any]:
config[cls.migrate_key] = cls.default_start_date_value
return config
@classmethod
def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls.modify_config(config, source)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config
@classmethod
def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
@classmethod
def migrate(cls, args: List[str], source: Source) -> None:
"""
This method checks the input args, should the config be migrated,
transform if neccessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls.should_migrate(config):
cls.emit_control_message(
cls.modify_and_save(config_path, source, config),
)