|
2 | 2 | from datetime import timedelta
|
3 | 3 |
|
4 | 4 | from airflow import DAG
|
| 5 | +from airflow.operators.bash import BashOperator |
| 6 | +from airflow.operators.empty import EmptyOperator |
5 | 7 | from airflow.utils.timezone import datetime
|
6 | 8 |
|
7 | 9 | from astronomer.providers.core.sensors.filesystem import FileSensorAsync
|
|
24 | 26 | default_args=default_args,
|
25 | 27 | tags=["example", "async", "core"],
|
26 | 28 | ) as dag:
|
| 29 | + start = EmptyOperator(task_id="start") |
| 30 | + |
| 31 | + create_file = BashOperator( |
| 32 | + task_id="create_file", |
| 33 | + bash_command="sleep 10 && touch /usr/local/airflow/dags/example_file_async_sensor.txt", |
| 34 | + ) |
| 35 | + |
27 | 36 | # [START howto_sensor_filesystem_async]
|
28 | 37 | file_sensor_task = FileSensorAsync(
|
29 | 38 | task_id="file_sensor_task",
|
30 |
| - filepath="example_file_async_sensor.txt", |
| 39 | + filepath="/usr/local/airflow/dags/example_file_async_sensor.txt", |
31 | 40 | fs_conn_id=FS_CONN_ID,
|
| 41 | + poke_interval=3, |
32 | 42 | )
|
33 | 43 | # [END howto_sensor_filesystem_async]
|
| 44 | + |
| 45 | + delete_file = BashOperator( |
| 46 | + task_id="delete_file", |
| 47 | + bash_command="rm /usr/local/airflow/dags/example_file_async_sensor.txt", |
| 48 | + trigger_rule="all_done", |
| 49 | + ) |
| 50 | + |
| 51 | + end = EmptyOperator(task_id="end") |
| 52 | + |
| 53 | + start >> [file_sensor_task, create_file] |
| 54 | + [create_file, file_sensor_task] >> delete_file |
| 55 | + [file_sensor_task, create_file, delete_file] >> end |
0 commit comments