Skip to content

Implement SFTPSensorAsync #654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
194daec
Add async sftp sensor
Sep 20, 2022
37aeafe
Merge branch 'astronomer:main' into main
tseruga Sep 20, 2022
cf975c4
Add example DAG for sftp sensor async
Sep 20, 2022
444d214
Merge branch 'main' of https://github.com/tseruga/astronomer-providers
Sep 20, 2022
df4acec
Fix some type hints
Sep 20, 2022
4332023
Fix any type hint
Sep 20, 2022
94e16bb
Move poll_interval to parent poke_interval
Sep 21, 2022
62db44e
Add async sftp sensor
Sep 20, 2022
517a5bb
Add example DAG for sftp sensor async
Sep 20, 2022
6bf9b57
Fix some type hints
Sep 20, 2022
f70415a
Fix any type hint
Sep 20, 2022
b6fe3ad
Move poll_interval to parent poke_interval
Sep 21, 2022
e00ff11
Merge branch 'main' of https://github.com/tseruga/astronomer-providers
Sep 22, 2022
a2b7c45
Add provider docs for sftp sensor async
Sep 22, 2022
adcc456
Add sftp to docs index. remove wildcard from docstring
Sep 23, 2022
007fd36
Merge branch 'main' into main
tseruga Sep 23, 2022
c1db648
Fix doc build
pankajastro Sep 23, 2022
5d4cd1e
Update trigger event logic for failure scenarios
Sep 26, 2022
754096e
Merge branch 'main' into main
tseruga Sep 26, 2022
a58f07e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 26, 2022
2649b8c
Merge branch 'main' into main
tseruga Sep 27, 2022
5bf4bcb
Fix type hints to satisfy mypy. Fix docs bad merge conflict fix
Sep 27, 2022
203261e
Add more tests to increase code coverage for sftp
Sep 27, 2022
69de543
Add even more sftp test to cover last few lines
Sep 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ Extras
- ``pip install 'astronomer-providers[openlineage]'``
- Openlineage

* - ``sftp``
- ``pip install 'astronomer-providers[sftp]'``
- Sftp

* - ``snowflake``
- ``pip install 'astronomer-providers[snowflake]'``
- Snowflake
Expand Down
Empty file.
Empty file.
37 changes: 37 additions & 0 deletions astronomer/providers/sftp/example_dags/example_sftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os
from datetime import timedelta

from airflow import DAG
from airflow.utils.timezone import datetime

from astronomer.providers.sftp.sensors.sftp import SFTPSensorAsync

SFTP_CONN_ID = os.getenv("ASTRO_SFTP_CONN_ID", "sftp_default")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))

default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}


with DAG(
dag_id="example_async_sftp_sensor",
start_date=datetime(2022, 1, 1),
schedule_interval=None,
catchup=False,
default_args=default_args,
tags=["example", "async", "sftp"],
) as dag:
# [START howto_sensor_sftp_async]
async_sftp_sensor = SFTPSensorAsync(
task_id="async_sftp_sensor",
sftp_conn_id=SFTP_CONN_ID,
path="path/on/sftp/server",
file_pattern="*.csv",
poke_interval=5,
)
# [END howto_sensor_sftp_async]

async_sftp_sensor
Empty file.
127 changes: 127 additions & 0 deletions astronomer/providers/sftp/hooks/sftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from fnmatch import fnmatch
from typing import List

import asyncssh
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from asgiref.sync import sync_to_async


class SFTPHookAsync(BaseHook):
"""
Interact with an SFTP server via asyncssh package

:param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server
:param host: hostname of the SFTP server
:param port: port of the SFTP server
:param username: username used when authenticating to the SFTP server
:param password: password used when authenticating to the SFTP server
Can be left blank if using a key file
:param known_hosts: path to the known_hosts file on the local file system
If known_hosts is set to the literal "none", then no host verification is performed
:param key_file: path to the client key file used for authentication to SFTP server
:param passphrase: passphrase used with the key_file for authentication to SFTP server
"""

conn_name_attr = "ssh_conn_id"
default_conn_name = "sftp_default"
conn_type = "sftp"
hook_name = "SFTP"
default_known_hosts = '"~/.ssh/known_hosts"'

def __init__( # nosec: B107
self,
sftp_conn_id: str = default_conn_name,
host: str = "",
port: int = 22,
username: str = "",
password: str = "",
known_hosts: str = default_known_hosts,
key_file: str = "",
passphrase: str = "",
) -> None:
self.sftp_conn_id = sftp_conn_id
self.host = host
self.port = port
self.username = username
self.password = password
self.known_hosts = known_hosts
self.key_file = key_file
self.passphrase = passphrase

async def _get_conn(self) -> asyncssh.SSHClientConnection:
"""
Asynchronously connect to the SFTP server as an SSH client

The following parameters are provided either in the extra json object in
the SFTP connection definition

key_file
known_hosts
passphrase
"""
if self.sftp_conn_id is not None:
conn = await sync_to_async(self.get_connection)(self.sftp_conn_id)

if conn.extra is not None:
extra_options = conn.extra_dejson

if "key_file" in extra_options and self.key_file == "":
self.key_file = extra_options.get("key_file")

if "known_hosts" in extra_options:
self.known_hosts = extra_options.get("known_hosts")

if "passphrase" in extra_options:
self.passphrase = extra_options.get("passphrase")

conn_config = {
"host": conn.host,
"port": conn.port,
"username": conn.login,
"password": conn.password,
}

if self.key_file:
conn_config.update(client_keys=self.key_file)

if self.known_hosts:
if self.known_hosts.lower() == "none":
conn_config.update(known_hosts=None)
else:
conn_config.update(known_hosts=self.known_hosts)

if self.passphrase:
conn_config.update(passphrase=self.passphrase)

ssh_client = await asyncssh.connect(**conn_config)

return ssh_client

async def list_directory(self, path: str = "") -> List[str]:
"""Returns a list of files on the SFTP server at the provided path"""
ssh_conn = await self._get_conn()
sftp_client = await ssh_conn.start_sftp_client()
try:
files = await sftp_client.listdir(path)
return sorted(files)
except asyncssh.SFTPNoSuchFile:
raise AirflowException(f"No files at path {path} found — Deferring")

async def get_file_by_pattern(self, path: str = "", fnmatch_pattern: str = "") -> str:
"""
Returns the name of a file matching the file pattern at the provided path, if one exists
Otherwise, raises an AirflowException to be handled upstream for deferring
"""
files_list = await self.list_directory(path)

if files_list is None:
raise AirflowException(f"No files at path {path} found...")

for file in files_list:
if not fnmatch(file, fnmatch_pattern):
pass
else:
return file

raise AirflowException(f"No files matching file pattern were found at {path} — Deferring")
Empty file.
52 changes: 52 additions & 0 deletions astronomer/providers/sftp/sensors/sftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import timedelta
from typing import Any, Dict, Optional

from airflow.providers.sftp.sensors.sftp import SFTPSensor

from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync
from astronomer.providers.sftp.triggers.sftp import SFTPTrigger
from astronomer.providers.utils.typing_compat import Context


class SFTPSensorAsync(SFTPSensor):
"""
Polls an SFTP server continuously until a file_pattern is matched at a defined path

:param path: The path on the SFTP server to search for a file matching the file pattern.
Authentication method used in the SFTP connection must have access to this path
:param file_pattern: Pattern to be used for matching against the list of files at the path above.
Uses the fnmatch module from std library to perform the matching.
"""

def __init__(self, *, path: str, file_pattern: str = "", **kwargs: Any) -> None:

self.path = path
self.file_pattern = file_pattern

super().__init__(path=path, file_pattern=file_pattern, **kwargs)
self.hook = SFTPHookAsync(sftp_conn_id=self.sftp_conn_id)

def execute(self, context: Context) -> None:
"""
Logic that the sensor uses to correctly identify which trigger to
execute, and defer execution as expected.
"""
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=SFTPTrigger(
path=self.path,
file_pattern=self.file_pattern,
sftp_conn_id=self.sftp_conn_id,
poke_interval=self.poke_interval,
),
method_name="execute_complete",
)

def execute_complete(self, context: Dict[str, Any], event: Optional[Dict[Any, Any]] = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
self.log.info("%s completed successfully.", self.task_id)
return None
Empty file.
63 changes: 63 additions & 0 deletions astronomer/providers/sftp/triggers/sftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio
from typing import Any, AsyncIterator, Dict, Tuple

from airflow.exceptions import AirflowException
from airflow.triggers.base import BaseTrigger, TriggerEvent

from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync


class SFTPTrigger(BaseTrigger):
"""
Trigger that fires when either the path on the SFTP server does not exist,
or when there are no files matching the file pattern at the path

:param path: The path on the SFTP server to search for a file matching the file pattern.
Authentication method used in the SFTP connection must have access to this path
:param file_pattern: Pattern to be used for matching against the list of files at the path above.
Uses the fnmatch module from std library to perform the matching.

:param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server
:param poke_interval: How often, in seconds, to check for the existence of the file on the SFTP server
"""

def __init__(
self,
path: str,
file_pattern: str = "",
sftp_conn_id: str = "sftp_default",
poke_interval: float = 5,
) -> None:
super().__init__()
self.path = path
self.file_pattern = file_pattern
self.sftp_conn_id = sftp_conn_id
self.poke_interval = poke_interval

def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serializes SFTPTrigger arguments and classpath"""
return (
"astronomer.providers.sftp.triggers.sftp.SFTPTrigger",
{
"path": self.path,
"file_pattern": self.file_pattern,
"sftp_conn_id": self.sftp_conn_id,
"poke_interval": self.poke_interval,
},
)

async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
"""
Makes a series of asynchronous calls to sftp servers via async sftp hook. It yields a Trigger if
file matching file pattern exists at the specified path, otherwise it throws an exception.
"""
hook = self._get_async_hook()
while True:
try:
await hook.get_file_by_pattern(path=self.path, fnmatch_pattern=self.file_pattern)
yield TriggerEvent(True)
except AirflowException:
await asyncio.sleep(self.poke_interval)

def _get_async_hook(self) -> SFTPHookAsync:
return SFTPHookAsync(sftp_conn_id=self.sftp_conn_id)
2 changes: 2 additions & 0 deletions docs/providers/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Async Provider Example DAG's
HTTP <http/index>
Microsoft <microsoft/index>
Snowflake <snowflake/index>
SFTP <sftp/index>


* `Amazon AWS <amazon/index.html>`_
Expand All @@ -27,3 +28,4 @@ Async Provider Example DAG's
* `HTTP <http/index>`_
* `Microsoft <microsoft/index>`_
* `Snowflake <snowflake/index>`_
* `SFTP <sftp/index>`_
11 changes: 11 additions & 0 deletions docs/providers/sftp/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SFTP Example DAG
------------------

.. toctree::
:hidden:
:maxdepth: 2

SFTP Sensors <sensors/sftp>


* `SFTP Sensors <sensors/sftp.html>`_
12 changes: 12 additions & 0 deletions docs/providers/sftp/sensors/sftp.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
SFTP Sensor Async
"""""""""""""""""


Checks for the existence of a file on an SFTP server
:class:`~astronomer.providers.sftp.sensors.sftp.SFTPSensorAsync`.

.. exampleinclude:: /../astronomer/providers/sftp/example_dags/example_sftp.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_sftp_async]
:end-before: [END howto_sensor_sftp_async]
5 changes: 5 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ http =
apache-airflow-providers-http
microsoft.azure =
apache-airflow-providers-microsoft-azure
sftp =
apache-airflow-providers-sftp
asyncssh>=2.12.0
snowflake =
apache-airflow-providers-snowflake
# If in future we move Openlineage extractors out of the repo, this dependency should be removed
Expand Down Expand Up @@ -122,7 +125,9 @@ all =
apache-airflow-providers-google>=8.1.0
apache-airflow-providers-http
apache-airflow-providers-snowflake
apache-airflow-providers-sftp
apache-airflow-providers-microsoft-azure
asyncssh>=2.12.0
databricks-sql-connector>=2.0.4;python_version>='3.10'
apache-airflow-providers-dbt-cloud>=2.1.0
gcloud-aio-bigquery
Expand Down
Empty file added tests/sftp/hooks/__init__.py
Empty file.
Loading