Skip to content

Commit 533afb5

Browse files
authored
Add parameter sftp_prefetch to SFTPToGCSOperator (#33274)
1 parent de17b93 commit 533afb5

File tree

5 files changed

+17
-9
lines changed

5 files changed

+17
-9
lines changed

airflow/providers/google/cloud/transfers/sftp_to_gcs.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class SFTPToGCSOperator(BaseOperator):
6969
If set as a sequence, the identities from the list must grant
7070
Service Account Token Creator IAM role to the directly preceding identity, with first
7171
account from the list granting this role to the originating account (templated).
72+
:param sftp_prefetch: Whether to enable SFTP prefetch, the default is True.
7273
"""
7374

7475
template_fields: Sequence[str] = (
@@ -90,6 +91,7 @@ def __init__(
9091
gzip: bool = False,
9192
move_object: bool = False,
9293
impersonation_chain: str | Sequence[str] | None = None,
94+
sftp_prefetch: bool = True,
9395
**kwargs,
9496
) -> None:
9597
super().__init__(**kwargs)
@@ -103,6 +105,7 @@ def __init__(
103105
self.sftp_conn_id = sftp_conn_id
104106
self.move_object = move_object
105107
self.impersonation_chain = impersonation_chain
108+
self.sftp_prefetch = sftp_prefetch
106109

107110
def execute(self, context: Context):
108111
gcs_hook = GCSHook(
@@ -151,7 +154,7 @@ def _copy_single_object(
151154
)
152155

153156
with NamedTemporaryFile("w") as tmp:
154-
sftp_hook.retrieve_file(source_path, tmp.name)
157+
sftp_hook.retrieve_file(source_path, tmp.name, prefetch=self.sftp_prefetch)
155158

156159
gcs_hook.upload(
157160
bucket_name=self.destination_bucket,

airflow/providers/sftp/hooks/sftp.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,18 @@ def delete_directory(self, path: str) -> None:
223223
conn = self.get_conn()
224224
conn.rmdir(path)
225225

226-
def retrieve_file(self, remote_full_path: str, local_full_path: str) -> None:
226+
def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
227227
"""Transfer the remote file to a local location.
228228
229229
If local_full_path is a string path, the file will be put
230230
at that location.
231231
232232
:param remote_full_path: full path to the remote file
233233
:param local_full_path: full path to the local file
234+
:param prefetch: controls whether prefetch is performed (default: True)
234235
"""
235236
conn = self.get_conn()
236-
conn.get(remote_full_path, local_full_path)
237+
conn.get(remote_full_path, local_full_path, prefetch=prefetch)
237238

238239
def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None:
239240
"""Transfer a local file to the remote location.

airflow/providers/sftp/provider.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ versions:
5454
dependencies:
5555
- apache-airflow>=2.4.0
5656
- apache-airflow-providers-ssh>=2.1.0
57+
- paramiko>=2.8.0
5758

5859
integrations:
5960
- integration-name: SSH File Transfer Protocol (SFTP)

generated/provider_dependencies.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -795,7 +795,8 @@
795795
"sftp": {
796796
"deps": [
797797
"apache-airflow-providers-ssh>=2.1.0",
798-
"apache-airflow>=2.4.0"
798+
"apache-airflow>=2.4.0",
799+
"paramiko>=2.8.0"
799800
],
800801
"cross-providers-deps": [
801802
"openlineage",

tests/providers/google/cloud/transfers/test_sftp_to_gcs.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def test_execute_copy_single_file(self, sftp_hook, gcs_hook):
7373
sftp_hook.assert_called_once_with(SFTP_CONN_ID)
7474

7575
sftp_hook.return_value.retrieve_file.assert_called_once_with(
76-
os.path.join(SOURCE_OBJECT_NO_WILDCARD), mock.ANY
76+
os.path.join(SOURCE_OBJECT_NO_WILDCARD), mock.ANY, prefetch=True
7777
)
7878

7979
gcs_hook.return_value.upload.assert_called_once_with(
@@ -99,6 +99,7 @@ def test_execute_copy_single_file_with_compression(self, sftp_hook, gcs_hook):
9999
sftp_conn_id=SFTP_CONN_ID,
100100
impersonation_chain=IMPERSONATION_CHAIN,
101101
gzip=True,
102+
sftp_prefetch=False,
102103
)
103104
task.execute(None)
104105
gcs_hook.assert_called_once_with(
@@ -108,7 +109,7 @@ def test_execute_copy_single_file_with_compression(self, sftp_hook, gcs_hook):
108109
sftp_hook.assert_called_once_with(SFTP_CONN_ID)
109110

110111
sftp_hook.return_value.retrieve_file.assert_called_once_with(
111-
os.path.join(SOURCE_OBJECT_NO_WILDCARD), mock.ANY
112+
os.path.join(SOURCE_OBJECT_NO_WILDCARD), mock.ANY, prefetch=False
112113
)
113114

114115
gcs_hook.return_value.upload.assert_called_once_with(
@@ -133,6 +134,7 @@ def test_execute_move_single_file(self, sftp_hook, gcs_hook):
133134
gcp_conn_id=GCP_CONN_ID,
134135
sftp_conn_id=SFTP_CONN_ID,
135136
impersonation_chain=IMPERSONATION_CHAIN,
137+
sftp_prefetch=True,
136138
)
137139
task.execute(None)
138140
gcs_hook.assert_called_once_with(
@@ -142,7 +144,7 @@ def test_execute_move_single_file(self, sftp_hook, gcs_hook):
142144
sftp_hook.assert_called_once_with(SFTP_CONN_ID)
143145

144146
sftp_hook.return_value.retrieve_file.assert_called_once_with(
145-
os.path.join(SOURCE_OBJECT_NO_WILDCARD), mock.ANY
147+
os.path.join(SOURCE_OBJECT_NO_WILDCARD), mock.ANY, prefetch=True
146148
)
147149

148150
gcs_hook.return_value.upload.assert_called_once_with(
@@ -181,8 +183,8 @@ def test_execute_copy_with_wildcard(self, sftp_hook, gcs_hook):
181183

182184
sftp_hook.return_value.retrieve_file.assert_has_calls(
183185
[
184-
mock.call("main_dir/test_object3.json", mock.ANY),
185-
mock.call("main_dir/sub_dir/test_object3.json", mock.ANY),
186+
mock.call("main_dir/test_object3.json", mock.ANY, prefetch=True),
187+
mock.call("main_dir/sub_dir/test_object3.json", mock.ANY, prefetch=True),
186188
]
187189
)
188190

0 commit comments

Comments
 (0)