Skip to content

Commit 6d4ed9d

Browse files
authored
feat(registry): add remove latest partition job (#44795)
## What Provides a job to delete all latest partitions. Why? If you remove the partitions they will be readded **triggering a reprocess of latest** ## Review guide <!-- 1. `x.py` 2. `y.py` --> ## User Impact <!-- * What is the end result perceived by the user? * If there are negative side effects, please list them. --> ## Can this PR be safely reverted and rolled back? <!-- * If unsure, leave it blank. --> - [ ] YES 💚 - [ ] NO ❌
1 parent 94ceb17 commit 6d4ed9d

File tree

3 files changed

+32
-1
lines changed

3 files changed

+32
-1
lines changed

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from orchestrator.jobs.registry import (
2626
add_new_metadata_partitions,
2727
remove_stale_metadata_partitions,
28+
remove_latest_metadata_partitions,
2829
generate_cloud_registry,
2930
generate_oss_registry,
3031
generate_registry_entry,
@@ -203,6 +204,7 @@
203204
generate_nightly_reports,
204205
add_new_metadata_partitions,
205206
remove_stale_metadata_partitions,
207+
remove_latest_metadata_partitions,
206208
generate_stale_gcs_latest_metadata_file,
207209
]
208210

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py

+29
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,35 @@ def remove_stale_metadata_partitions():
5252
remove_stale_metadata_partitions_op()
5353

5454

55+
@op(required_resource_keys={"latest_metadata_file_blobs"})
56+
def remove_latest_metadata_partitions_op(context):
57+
"""
58+
This op is responsible for removing for latest metadata files. (Generally used to reprocess metadata files).
59+
"""
60+
latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs
61+
partition_name = registry_entry.metadata_partitions_def.name
62+
63+
all_latest = [blob.etag for blob in latest_metadata_file_blobs]
64+
context.log.info(f"Found {len(all_latest)} latest metadata files found in GCS bucket")
65+
66+
all_etag_partitions = context.instance.get_dynamic_partitions(partition_name)
67+
context.log.info(f"Found {len(all_etag_partitions)} existing metadata partitions")
68+
69+
for latest_etag in all_latest:
70+
if latest_etag in all_etag_partitions:
71+
context.log.info(f"Removing latest etag: {latest_etag}")
72+
context.instance.delete_dynamic_partition(partition_name, latest_etag)
73+
context.log.info(f"Removed latest etag: {latest_etag}")
74+
75+
76+
@job(tags={"dagster/priority": HIGH_QUEUE_PRIORITY})
77+
def remove_latest_metadata_partitions():
78+
"""
79+
This job is responsible for removing latest metadata partitions. (Generally used to reprocess metadata files).
80+
"""
81+
remove_latest_metadata_partitions_op()
82+
83+
5584
@op(required_resource_keys={"slack", "all_metadata_file_blobs"})
5685
def add_new_metadata_partitions_op(context):
5786
"""

airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "orchestrator"
3-
version = "0.4.0"
3+
version = "0.4.1"
44
description = ""
55
authors = ["Ben Church <[email protected]>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)