-
Notifications
You must be signed in to change notification settings - Fork 26
Implement SFTPSensorAsync
#654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
194daec
Add async sftp sensor
37aeafe
Merge branch 'astronomer:main' into main
tseruga cf975c4
Add example DAG for sftp sensor async
444d214
Merge branch 'main' of https://github.com/tseruga/astronomer-providers
df4acec
Fix some type hints
4332023
Fix any type hint
94e16bb
Move poll_interval to parent poke_interval
62db44e
Add async sftp sensor
517a5bb
Add example DAG for sftp sensor async
6bf9b57
Fix some type hints
f70415a
Fix any type hint
b6fe3ad
Move poll_interval to parent poke_interval
e00ff11
Merge branch 'main' of https://github.com/tseruga/astronomer-providers
a2b7c45
Add provider docs for sftp sensor async
adcc456
Add sftp to docs index. remove wildcard from docstring
007fd36
Merge branch 'main' into main
tseruga c1db648
Fix doc build
pankajastro 5d4cd1e
Update trigger event logic for failure scenarios
754096e
Merge branch 'main' into main
tseruga a58f07e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 2649b8c
Merge branch 'main' into main
tseruga 5bf4bcb
Fix type hints to satisfy mypy. Fix docs bad merge conflict fix
203261e
Add more tests to increase code coverage for sftp
69de543
Add even more sftp test to cover last few lines
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import os | ||
from datetime import timedelta | ||
|
||
from airflow import DAG | ||
from airflow.utils.timezone import datetime | ||
|
||
from astronomer.providers.sftp.sensors.sftp import SFTPSensorAsync | ||
|
||
SFTP_CONN_ID = os.getenv("ASTRO_SFTP_CONN_ID", "sftp_default") | ||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) | ||
|
||
default_args = { | ||
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), | ||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)), | ||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), | ||
} | ||
|
||
|
||
with DAG( | ||
dag_id="example_async_sftp_sensor", | ||
start_date=datetime(2022, 1, 1), | ||
schedule_interval=None, | ||
catchup=False, | ||
default_args=default_args, | ||
tags=["example", "async", "sftp"], | ||
) as dag: | ||
# [START howto_sensor_sftp_async] | ||
async_sftp_sensor = SFTPSensorAsync( | ||
task_id="async_sftp_sensor", | ||
sftp_conn_id=SFTP_CONN_ID, | ||
path="path/on/sftp/server", | ||
file_pattern="*.csv", | ||
poke_interval=5, | ||
) | ||
# [END howto_sensor_sftp_async] | ||
|
||
async_sftp_sensor |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
from fnmatch import fnmatch | ||
from typing import List | ||
|
||
import asyncssh | ||
from airflow.exceptions import AirflowException | ||
from airflow.hooks.base import BaseHook | ||
from asgiref.sync import sync_to_async | ||
|
||
|
||
class SFTPHookAsync(BaseHook): | ||
""" | ||
Interact with an SFTP server via asyncssh package | ||
|
||
:param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server | ||
:param host: hostname of the SFTP server | ||
:param port: port of the SFTP server | ||
:param username: username used when authenticating to the SFTP server | ||
:param password: password used when authenticating to the SFTP server | ||
Can be left blank if using a key file | ||
:param known_hosts: path to the known_hosts file on the local file system | ||
If known_hosts is set to the literal "none", then no host verification is performed | ||
:param key_file: path to the client key file used for authentication to SFTP server | ||
:param passphrase: passphrase used with the key_file for authentication to SFTP server | ||
""" | ||
|
||
conn_name_attr = "ssh_conn_id" | ||
default_conn_name = "sftp_default" | ||
conn_type = "sftp" | ||
hook_name = "SFTP" | ||
default_known_hosts = '"~/.ssh/known_hosts"' | ||
|
||
def __init__( # nosec: B107 | ||
self, | ||
sftp_conn_id: str = default_conn_name, | ||
host: str = "", | ||
port: int = 22, | ||
username: str = "", | ||
password: str = "", | ||
known_hosts: str = default_known_hosts, | ||
key_file: str = "", | ||
passphrase: str = "", | ||
) -> None: | ||
self.sftp_conn_id = sftp_conn_id | ||
self.host = host | ||
self.port = port | ||
self.username = username | ||
self.password = password | ||
self.known_hosts = known_hosts | ||
self.key_file = key_file | ||
self.passphrase = passphrase | ||
|
||
async def _get_conn(self) -> asyncssh.SSHClientConnection: | ||
""" | ||
Asynchronously connect to the SFTP server as an SSH client | ||
|
||
The following parameters are provided either in the extra json object in | ||
the SFTP connection definition | ||
|
||
key_file | ||
known_hosts | ||
passphrase | ||
pankajastro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
if self.sftp_conn_id is not None: | ||
conn = await sync_to_async(self.get_connection)(self.sftp_conn_id) | ||
|
||
if conn.extra is not None: | ||
extra_options = conn.extra_dejson | ||
|
||
if "key_file" in extra_options and self.key_file == "": | ||
self.key_file = extra_options.get("key_file") | ||
|
||
if "known_hosts" in extra_options: | ||
self.known_hosts = extra_options.get("known_hosts") | ||
|
||
if "passphrase" in extra_options: | ||
self.passphrase = extra_options.get("passphrase") | ||
|
||
conn_config = { | ||
"host": conn.host, | ||
"port": conn.port, | ||
"username": conn.login, | ||
"password": conn.password, | ||
} | ||
|
||
if self.key_file: | ||
conn_config.update(client_keys=self.key_file) | ||
|
||
if self.known_hosts: | ||
if self.known_hosts.lower() == "none": | ||
conn_config.update(known_hosts=None) | ||
else: | ||
conn_config.update(known_hosts=self.known_hosts) | ||
|
||
if self.passphrase: | ||
conn_config.update(passphrase=self.passphrase) | ||
|
||
ssh_client = await asyncssh.connect(**conn_config) | ||
|
||
return ssh_client | ||
|
||
async def list_directory(self, path: str = "") -> List[str]: | ||
pankajastro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Returns a list of files on the SFTP server at the provided path""" | ||
ssh_conn = await self._get_conn() | ||
sftp_client = await ssh_conn.start_sftp_client() | ||
try: | ||
files = await sftp_client.listdir(path) | ||
return sorted(files) | ||
except asyncssh.SFTPNoSuchFile: | ||
raise AirflowException(f"No files at path {path} found — Deferring") | ||
|
||
async def get_file_by_pattern(self, path: str = "", fnmatch_pattern: str = "") -> str: | ||
""" | ||
Returns the name of a file matching the file pattern at the provided path, if one exists | ||
Otherwise, raises an AirflowException to be handled upstream for deferring | ||
""" | ||
files_list = await self.list_directory(path) | ||
|
||
if files_list is None: | ||
raise AirflowException(f"No files at path {path} found...") | ||
|
||
for file in files_list: | ||
if not fnmatch(file, fnmatch_pattern): | ||
pass | ||
else: | ||
return file | ||
|
||
raise AirflowException(f"No files matching file pattern were found at {path} — Deferring") |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from datetime import timedelta | ||
from typing import Any, Dict, Optional | ||
|
||
from airflow.providers.sftp.sensors.sftp import SFTPSensor | ||
|
||
from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync | ||
from astronomer.providers.sftp.triggers.sftp import SFTPTrigger | ||
from astronomer.providers.utils.typing_compat import Context | ||
|
||
|
||
class SFTPSensorAsync(SFTPSensor): | ||
""" | ||
Polls an SFTP server continuously until a file_pattern is matched at a defined path | ||
|
||
:param path: The path on the SFTP server to search for a file matching the file pattern. | ||
Authentication method used in the SFTP connection must have access to this path | ||
:param file_pattern: Pattern to be used for matching against the list of files at the path above. | ||
Uses the fnmatch module from std library to perform the matching. | ||
""" | ||
|
||
def __init__(self, *, path: str, file_pattern: str = "", **kwargs: Any) -> None: | ||
|
||
self.path = path | ||
self.file_pattern = file_pattern | ||
|
||
super().__init__(path=path, file_pattern=file_pattern, **kwargs) | ||
self.hook = SFTPHookAsync(sftp_conn_id=self.sftp_conn_id) | ||
|
||
def execute(self, context: Context) -> None: | ||
""" | ||
Logic that the sensor uses to correctly identify which trigger to | ||
execute, and defer execution as expected. | ||
""" | ||
self.defer( | ||
timeout=timedelta(seconds=self.timeout), | ||
trigger=SFTPTrigger( | ||
path=self.path, | ||
file_pattern=self.file_pattern, | ||
sftp_conn_id=self.sftp_conn_id, | ||
poke_interval=self.poke_interval, | ||
), | ||
method_name="execute_complete", | ||
) | ||
|
||
def execute_complete(self, context: Dict[str, Any], event: Optional[Dict[Any, Any]] = None) -> None: | ||
pankajastro marked this conversation as resolved.
Show resolved
Hide resolved
pankajastro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Callback for when the trigger fires - returns immediately. | ||
Relies on trigger to throw an exception, otherwise it assumes execution was | ||
successful. | ||
""" | ||
self.log.info("%s completed successfully.", self.task_id) | ||
pankajastro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return None | ||
pankajastro marked this conversation as resolved.
Show resolved
Hide resolved
|
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import asyncio | ||
from typing import Any, AsyncIterator, Dict, Tuple | ||
|
||
from airflow.exceptions import AirflowException | ||
from airflow.triggers.base import BaseTrigger, TriggerEvent | ||
|
||
from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync | ||
|
||
|
||
class SFTPTrigger(BaseTrigger): | ||
""" | ||
Trigger that fires when either the path on the SFTP server does not exist, | ||
or when there are no files matching the file pattern at the path | ||
|
||
:param path: The path on the SFTP server to search for a file matching the file pattern. | ||
Authentication method used in the SFTP connection must have access to this path | ||
:param file_pattern: Pattern to be used for matching against the list of files at the path above. | ||
Uses the fnmatch module from std library to perform the matching. | ||
|
||
:param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server | ||
:param poke_interval: How often, in seconds, to check for the existence of the file on the SFTP server | ||
""" | ||
|
||
def __init__( | ||
self, | ||
path: str, | ||
file_pattern: str = "", | ||
sftp_conn_id: str = "sftp_default", | ||
poke_interval: float = 5, | ||
) -> None: | ||
super().__init__() | ||
self.path = path | ||
self.file_pattern = file_pattern | ||
self.sftp_conn_id = sftp_conn_id | ||
self.poke_interval = poke_interval | ||
|
||
def serialize(self) -> Tuple[str, Dict[str, Any]]: | ||
"""Serializes SFTPTrigger arguments and classpath""" | ||
return ( | ||
"astronomer.providers.sftp.triggers.sftp.SFTPTrigger", | ||
{ | ||
"path": self.path, | ||
"file_pattern": self.file_pattern, | ||
"sftp_conn_id": self.sftp_conn_id, | ||
"poke_interval": self.poke_interval, | ||
}, | ||
) | ||
|
||
async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override] | ||
""" | ||
Makes a series of asynchronous calls to sftp servers via async sftp hook. It yields a Trigger if | ||
file matching file pattern exists at the specified path, otherwise it throws an exception. | ||
""" | ||
hook = self._get_async_hook() | ||
while True: | ||
pankajastro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
await hook.get_file_by_pattern(path=self.path, fnmatch_pattern=self.file_pattern) | ||
yield TriggerEvent(True) | ||
except AirflowException: | ||
await asyncio.sleep(self.poke_interval) | ||
pankajastro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def _get_async_hook(self) -> SFTPHookAsync: | ||
return SFTPHookAsync(sftp_conn_id=self.sftp_conn_id) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
SFTP Example DAG | ||
------------------ | ||
|
||
.. toctree:: | ||
:hidden: | ||
:maxdepth: 2 | ||
|
||
SFTP Sensors <sensors/sftp> | ||
|
||
|
||
* `SFTP Sensors <sensors/sftp.html>`_ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
SFTP Sensor Async | ||
""""""""""""""""" | ||
|
||
|
||
Checks for the existence of a file on an SFTP server | ||
:class:`~astronomer.providers.sftp.sensors.sftp.SFTPSensorAsync`. | ||
|
||
.. exampleinclude:: /../astronomer/providers/sftp/example_dags/example_sftp.py | ||
:language: python | ||
:dedent: 4 | ||
:start-after: [START howto_sensor_sftp_async] | ||
:end-before: [END howto_sensor_sftp_async] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.