Skip to content

fix: incremental backups #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions housewatch/clickhouse/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ def execute_backup(
responses = []
for shard, node in nodes:
params["shard"] = shard if is_sharded else "noshard"

query_settings["async"] = "true"
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}/{shard}', '{aws_key}', '{aws_secret}')"
final_query = query % (params or {}) if substitute_params else query
query_settings["base_backup"] = f"S3('{base_backup}/{params['shard']}', '{aws_key}', '{aws_secret}')"

parametrized_query = query % (params or {}) if substitute_params else query
final_query = "{query} SETTINGS {settings}".format(
query=parametrized_query,
settings=", ".join([f"{k} = {v}" for k, v in query_settings.items()]),
)

client = Client(
host=node["host_address"],
database=settings.CLICKHOUSE_DATABASE,
Expand All @@ -51,7 +59,7 @@ def execute_backup(
send_receive_timeout=30,
password=settings.CLICKHOUSE_PASSWORD,
)
result = client.execute(final_query, settings=query_settings, with_column_types=True, query_id=query_id)
result = client.execute(final_query, with_column_types=True, query_id=query_id)
response = []
for res in result[0]:
item = {}
Expand Down Expand Up @@ -90,12 +98,8 @@ def create_table_backup(
aws_key = settings.AWS_ACCESS_KEY_ID
aws_secret = settings.AWS_SECRET_ACCESS_KEY

query_settings = {}
QUERY = """BACKUP TABLE %(database)s.%(table)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}', '{aws_key}', '{aws_secret}')"
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')"""
return execute_backup(
QUERY,
{
Expand All @@ -106,7 +110,6 @@ def create_table_backup(
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
Expand All @@ -119,12 +122,9 @@ def create_database_backup(database, bucket, path, aws_key=None, aws_secret=None
if aws_key is None or aws_secret is None:
aws_key = settings.AWS_ACCESS_KEY_ID
aws_secret = settings.AWS_SECRET_ACCESS_KEY
query_settings = {}
QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}', '{aws_key}', '{aws_secret}')"

QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')"""
return execute_backup(
QUERY,
{
Expand All @@ -134,7 +134,6 @@ def create_database_backup(database, bucket, path, aws_key=None, aws_secret=None
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
Expand Down Expand Up @@ -187,7 +186,8 @@ def run_backup(backup_id, incremental=False):
else:
backup.last_run = br
backup.last_run_time = now
backup.last_base_backup = S3_LOCATION

backup.last_base_backup = S3_LOCATION
backup.save()
return

Expand Down
Loading