# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ This is an example dag for using the KubernetesPodOperator. """ from __future__ import annotations import os from datetime import datetime from kubernetes.client import models as k8s from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator port = k8s.V1ContainerPort(name="http", container_port=80) init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")] init_container = k8s.V1Container( name="init-container", image="ubuntu:16.04", env=init_environments, command=["bash", "-cx"], args=["echo 10"], ) tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")] # [END howto_operator_k8s_cluster_resources] ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_kubernetes_operator_async" with DAG( dag_id=DAG_ID, schedule=None, start_date=datetime(2021, 1, 1), tags=["example"], ) as dag: k = KubernetesPodOperator( task_id="task_1aa", namespace="airflow", in_cluster=False, image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo", "10"], labels={"foo": "bar"}, ports=[port], name="task_1a", on_finish_action="delete_pod", hostnetwork=False, tolerations=tolerations, init_containers=[init_container], deferrable=True, ) # [START howto_operator_k8s_write_xcom_async] write_xcom_async = KubernetesPodOperator( task_id="task_1bb", namespace="airflow", image="alpine", cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"], name="task_1b", do_xcom_push=True, on_finish_action="delete_pod", in_cluster=False, get_logs=True, deferrable=True, )