|
4 | 4 |
|
5 | 5 | import json
|
6 | 6 | import uuid
|
7 |
| -from typing import List, Tuple |
| 7 | +from datetime import datetime |
| 8 | +from typing import Dict, List, Tuple |
8 | 9 |
|
9 | 10 | import anyio
|
10 | 11 | from airbyte_protocol.models.airbyte_protocol import ConnectorSpecification # type: ignore
|
11 |
| -from dagger import Container, ExecError, File, ImageLayerCompression, QueryError |
| 12 | +from connector_ops.utils import ConnectorLanguage # type: ignore |
| 13 | +from dagger import Container, ExecError, File, ImageLayerCompression, Platform, QueryError |
12 | 14 | from pipelines import consts
|
13 | 15 | from pipelines.airbyte_ci.connectors.build_image import steps
|
14 | 16 | from pipelines.airbyte_ci.connectors.publish.context import PublishConnectorContext
|
15 | 17 | from pipelines.airbyte_ci.connectors.reports import ConnectorReport
|
16 | 18 | from pipelines.airbyte_ci.metadata.pipeline import MetadataUpload, MetadataValidation
|
17 | 19 | from pipelines.airbyte_ci.steps.python_registry import PublishToPythonRegistry, PythonRegistryPublishContext
|
| 20 | +from pipelines.consts import LOCAL_BUILD_PLATFORM |
18 | 21 | from pipelines.dagger.actions.remote_storage import upload_to_gcs
|
19 | 22 | from pipelines.dagger.actions.system import docker
|
20 | 23 | from pipelines.helpers.pip import is_package_published
|
21 | 24 | from pipelines.models.steps import Step, StepResult, StepStatus
|
22 |
| -from pydantic import ValidationError |
| 25 | +from pydantic import BaseModel, ValidationError |
23 | 26 |
|
24 | 27 |
|
25 | 28 | class InvalidSpecOutputError(Exception):
|
@@ -76,6 +79,56 @@ async def _run(self) -> StepResult:
|
76 | 79 | )
|
77 | 80 |
|
78 | 81 |
|
| 82 | +class ConnectorDependenciesMetadata(BaseModel): |
| 83 | + connector_technical_name: str |
| 84 | + connector_repository: str |
| 85 | + connector_version: str |
| 86 | + connector_definition_id: str |
| 87 | + dependencies: Dict[str, str] |
| 88 | + generation_time: datetime = datetime.utcnow() |
| 89 | + |
| 90 | + |
| 91 | +class UploadDependenciesToMetadataService(Step): |
| 92 | + context: PublishConnectorContext |
| 93 | + title = "Upload connector dependencies list to GCS." |
| 94 | + key_prefix = "connector_dependencies" |
| 95 | + |
| 96 | + async def _run(self, built_containers_per_platform: Dict[Platform, Container]) -> StepResult: |
| 97 | + assert self.context.connector.language in [ |
| 98 | + ConnectorLanguage.PYTHON, |
| 99 | + ConnectorLanguage.LOW_CODE, |
| 100 | + ], "This step can only run for Python connectors." |
| 101 | + built_container = built_containers_per_platform[LOCAL_BUILD_PLATFORM] |
| 102 | + pip_freeze_output = await built_container.with_exec(["pip", "freeze"], skip_entrypoint=True).stdout() |
| 103 | + dependencies = {line.split("==")[0]: line.split("==")[1] for line in pip_freeze_output.splitlines() if "==" in line} |
| 104 | + connector_technical_name = self.context.connector.technical_name |
| 105 | + connector_version = self.context.metadata["dockerImageTag"] |
| 106 | + dependencies_metadata = ConnectorDependenciesMetadata( |
| 107 | + connector_technical_name=connector_technical_name, |
| 108 | + connector_repository=self.context.metadata["dockerRepository"], |
| 109 | + connector_version=connector_version, |
| 110 | + connector_definition_id=self.context.metadata["definitionId"], |
| 111 | + dependencies=dependencies, |
| 112 | + ).json() |
| 113 | + file = ( |
| 114 | + (await self.context.get_connector_dir()) |
| 115 | + .with_new_file("dependencies.json", contents=dependencies_metadata) |
| 116 | + .file("dependencies.json") |
| 117 | + ) |
| 118 | + key = f"{self.key_prefix}/{connector_technical_name}/{connector_version}/dependencies.json" |
| 119 | + exit_code, stdout, stderr = await upload_to_gcs( |
| 120 | + self.context.dagger_client, |
| 121 | + file, |
| 122 | + key, |
| 123 | + self.context.metadata_bucket_name, |
| 124 | + self.context.metadata_service_gcs_credentials_secret, |
| 125 | + flags=['--cache-control="no-cache"'], |
| 126 | + ) |
| 127 | + if exit_code != 0: |
| 128 | + return StepResult(step=self, status=StepStatus.FAILURE, stdout=stdout, stderr=stderr) |
| 129 | + return StepResult(step=self, status=StepStatus.SUCCESS, stdout="Uploaded connector dependencies to metadata service bucket.") |
| 130 | + |
| 131 | + |
79 | 132 | class PushConnectorImageToRegistry(Step):
|
80 | 133 | context: PublishConnectorContext
|
81 | 134 | title = "Push connector image to registry"
|
@@ -282,7 +335,6 @@ def create_connector_report(results: List[StepResult]) -> ConnectorReport:
|
282 | 335 |
|
283 | 336 | check_connector_image_results = await CheckConnectorImageDoesNotExist(context).run()
|
284 | 337 | results.append(check_connector_image_results)
|
285 |
| - |
286 | 338 | python_registry_steps, terminate_early = await _run_python_registry_publish_pipeline(context)
|
287 | 339 | results.extend(python_registry_steps)
|
288 | 340 | if terminate_early:
|
@@ -313,6 +365,10 @@ def create_connector_report(results: List[StepResult]) -> ConnectorReport:
|
313 | 365 | if build_connector_results.status is not StepStatus.SUCCESS:
|
314 | 366 | return create_connector_report(results)
|
315 | 367 |
|
| 368 | + if context.connector.language in [ConnectorLanguage.PYTHON, ConnectorLanguage.LOW_CODE]: |
| 369 | + upload_dependencies_step = await UploadDependenciesToMetadataService(context).run(build_connector_results.output) |
| 370 | + results.append(upload_dependencies_step) |
| 371 | + |
316 | 372 | built_connector_platform_variants = list(build_connector_results.output.values())
|
317 | 373 | push_connector_image_results = await PushConnectorImageToRegistry(context).run(built_connector_platform_variants)
|
318 | 374 | results.append(push_connector_image_results)
|
|
0 commit comments