Skip to content

connectors-ci: per connector dockerd #28566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
LICENSE_SHORT_FILE_PATH,
PYPROJECT_TOML_FILE_PATH,
)
from ci_connector_ops.pipelines.utils import get_file_contents
from dagger import CacheVolume, Client, Container, DaggerError, Directory, File, Platform, Secret
from ci_connector_ops.pipelines.utils import get_file_contents, slugify
from dagger import CacheSharingMode, CacheVolume, Container, DaggerError, Directory, File, Platform, Secret
from dagger.engine._version import CLI_VERSION as dagger_engine_version

if TYPE_CHECKING:
Expand Down Expand Up @@ -373,20 +373,21 @@ async def with_ci_connector_ops(context: PipelineContext) -> Container:
return await with_installed_python_package(context, python_base_environment, CI_CONNECTOR_OPS_SOURCE_PATH)


def with_global_dockerd_service(dagger_client: Client) -> Container:
def with_dockerd_service(context: ConnectorContext) -> Container:
"""Create a container with a docker daemon running.
We expose its 2375 port to use it as a docker host for docker-in-docker use cases.
Args:
dagger_client (Client): The dagger client used to create the container.
context (ConnectorContext): The current connector context.
Returns:
Container: The container running dockerd as a service
"""
return (
dagger_client.container()
context.dagger_client.container()
.from_(consts.DOCKER_DIND_IMAGE)
.with_env_variable("CONNECTOR_NAME", context.connector.technical_name)
.with_mounted_cache(
"/tmp",
dagger_client.cache_volume("shared-tmp"),
context.dagger_client.cache_volume(f"shared-tmp-{slugify(context.connector.technical_name)}"),
)
.with_exposed_port(2375)
.with_exec(["dockerd", "--log-level=error", "--host=tcp://0.0.0.0:2375", "--tls=false"], insecure_root_capabilities=True)
Expand All @@ -405,12 +406,16 @@ def with_bound_docker_host(
Returns:
Container: The container bound to the docker host.
"""
dockerd = context.dockerd_service
docker_hostname = "global-docker-host"
dockerd_service = with_dockerd_service(context)
docker_hostname = f"{slugify(context.connector.technical_name)}-docker-host"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return (
container.with_env_variable("DOCKER_HOST", f"tcp://{docker_hostname}:2375")
.with_service_binding(docker_hostname, dockerd)
.with_mounted_cache("/tmp", context.dagger_client.cache_volume("shared-tmp"))
.with_service_binding(docker_hostname, dockerd_service)
.with_mounted_cache(
"/tmp",
context.dagger_client.cache_volume(f"shared-tmp-{slugify(context.connector.technical_name)}"),
sharing=CacheSharingMode.PRIVATE,
)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import anyio
import dagger
from ci_connector_ops.pipelines.actions import environments
from ci_connector_ops.pipelines.bases import ConnectorReport, Step, StepResult, StepStatus
from ci_connector_ops.pipelines.contexts import ConnectorContext
from ci_connector_ops.pipelines.format import java_connectors, python_connectors
Expand Down Expand Up @@ -83,15 +82,9 @@ async def run_connectors_format_pipelines(
execute_timeout: Optional[int],
) -> List[ConnectorContext]:
async with dagger.Connection(dagger.Config(log_output=sys.stderr, execute_timeout=execute_timeout)) as dagger_client:
requires_dind = any(context.connector.language == ConnectorLanguage.JAVA for context in contexts)
dockerd_service = environments.with_global_dockerd_service(dagger_client)
async with anyio.create_task_group() as tg_main:
if requires_dind:
tg_main.start_soon(dockerd_service.sync)
await anyio.sleep(10) # Wait for the docker service to be ready
for context in contexts:
context.dagger_client = dagger_client.pipeline(f"Format - {context.connector.technical_name}")
context.dockerd_service = dockerd_service
await run_connector_format_pipeline(context)
# When the connectors pipelines are done, we can stop the dockerd service
tg_main.cancel_scope.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import anyio
import dagger
from ci_connector_ops.pipelines.actions import environments
from ci_connector_ops.pipelines.bases import NoOpStep, Report, StepResult, StepStatus
from ci_connector_ops.pipelines.contexts import ConnectorContext, ContextState
from ci_connector_ops.utils import ConnectorLanguage
Expand Down Expand Up @@ -82,17 +81,10 @@ async def run_connectors_pipelines(
default_connectors_semaphore = anyio.Semaphore(concurrency)
dagger_logs_output = sys.stderr if not dagger_logs_path else open(dagger_logs_path, "w")
async with dagger.Connection(Config(log_output=dagger_logs_output, execute_timeout=execute_timeout)) as dagger_client:
# HACK: This is to get a long running dockerd service to be shared across all the connectors pipelines
# Using the "normal" service binding leads to restart of dockerd during pipeline run that can cause corrupted docker state
# See https://github.com/airbytehq/airbyte/issues/27233
dockerd_service = environments.with_global_dockerd_service(dagger_client)
async with anyio.create_task_group() as tg_main:
tg_main.start_soon(dockerd_service.sync)
await anyio.sleep(10) # Wait for the docker service to be ready
async with anyio.create_task_group() as tg_connectors:
for context in contexts:
context.dagger_client = dagger_client.pipeline(f"{pipeline_name} - {context.connector.technical_name}")
context.dockerd_service = dockerd_service
tg_connectors.start_soon(
connector_pipeline,
context,
Expand Down