2
2
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3
3
#
4
4
5
- from __future__ import annotations
6
5
7
6
from abc import ABC
8
- from typing import Any , Iterable , List , Mapping , MutableMapping , Optional , Type
7
+ from typing import Any , Iterable , Mapping , MutableMapping , Optional , Type
9
8
10
9
import requests
11
10
from airbyte_cdk .models import SyncMode
12
11
from airbyte_cdk .sources .streams .http import HttpStream
13
12
13
+ ASANA_ERRORS_MAPPING = {
14
+ 402 : "This stream is available to premium organizations and workspaces only" ,
15
+ 403 : "Missing permissions to consume this stream enough permissions" ,
16
+ 404 : "The object specified by the request does not exist" ,
17
+ 451 : "This request was blocked for legal reasons" ,
18
+ }
19
+
14
20
15
21
class AsanaStream (HttpStream , ABC ):
16
22
url_base = "https://app.asana.com/api/1.0/"
17
-
18
23
primary_key = "gid"
19
-
20
24
# Asana pagination could be from 1 to 100.
21
25
page_size = 100
26
+ raise_on_http_errors = True
27
+
28
+ @property
29
+ def AsanaStreamType (self ) -> Type :
30
+ return self .__class__
31
+
32
+ def should_retry (self , response : requests .Response ) -> bool :
33
+ if response .status_code in ASANA_ERRORS_MAPPING .keys ():
34
+ self .logger .error (
35
+ f"Skipping stream { self .name } . { ASANA_ERRORS_MAPPING .get (response .status_code )} . Full error message: { response .text } "
36
+ )
37
+ setattr (self , "raise_on_http_errors" , False )
38
+ return False
39
+ return super ().should_retry (response )
22
40
23
41
def backoff_time (self , response : requests .Response ) -> Optional [int ]:
24
42
delay_time = response .headers .get ("Retry-After" )
@@ -31,17 +49,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
31
49
if next_page :
32
50
return {"offset" : next_page ["offset" ]}
33
51
34
- def request_params (
35
- self , stream_state : Mapping [str , Any ], stream_slice : Mapping [str , any ] = None , next_page_token : Mapping [str , Any ] = None
36
- ) -> MutableMapping [str , Any ]:
37
-
52
+ def request_params (self , next_page_token : Mapping [str , Any ] = None , ** kwargs ) -> MutableMapping [str , Any ]:
38
53
params = {"limit" : self .page_size }
39
-
40
54
params .update (self .get_opt_fields ())
41
-
42
55
if next_page_token :
43
56
params .update (next_page_token )
44
-
45
57
return params
46
58
47
59
def get_opt_fields (self ) -> MutableMapping [str , str ]:
@@ -81,7 +93,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
81
93
response_json = response .json ()
82
94
yield from response_json .get ("data" , []) # Asana puts records in a container array "data"
83
95
84
- def read_slices_from_records (self , stream_class : Type [ AsanaStream ] , slice_field : str ) -> Iterable [Optional [Mapping [str , Any ]]]:
96
+ def read_slices_from_records (self , stream_class : AsanaStreamType , slice_field : str ) -> Iterable [Optional [Mapping [str , Any ]]]:
85
97
"""
86
98
General function for getting parent stream (which should be passed through `stream_class`) slice.
87
99
Generates dicts with `gid` of parent streams.
@@ -100,9 +112,7 @@ class WorkspaceRelatedStream(AsanaStream, ABC):
100
112
into the path or will pass it as a request parameter.
101
113
"""
102
114
103
- def stream_slices (
104
- self , sync_mode : SyncMode , cursor_field : List [str ] = None , stream_state : Mapping [str , Any ] = None
105
- ) -> Iterable [Optional [Mapping [str , Any ]]]:
115
+ def stream_slices (self , ** kwargs ) -> Iterable [Optional [Mapping [str , Any ]]]:
106
116
workspaces_stream = Workspaces (authenticator = self .authenticator )
107
117
for workspace in workspaces_stream .read_records (sync_mode = SyncMode .full_refresh ):
108
118
yield {"workspace_gid" : workspace ["gid" ]}
@@ -114,10 +124,8 @@ class WorkspaceRequestParamsRelatedStream(WorkspaceRelatedStream, ABC):
114
124
So this is basically the whole point of this class - to pass `workspace` as request argument.
115
125
"""
116
126
117
- def request_params (
118
- self , stream_state : Mapping [str , Any ], stream_slice : Mapping [str , Any ] = None , next_page_token : Mapping [str , Any ] = None
119
- ) -> MutableMapping [str , Any ]:
120
- params = super ().request_params (stream_state = stream_state , stream_slice = stream_slice , next_page_token = next_page_token )
127
+ def request_params (self , stream_slice : Mapping [str , Any ] = None , ** kwargs ) -> MutableMapping [str , Any ]:
128
+ params = super ().request_params (** kwargs )
121
129
params ["workspace" ] = stream_slice ["workspace_gid" ]
122
130
return params
123
131
@@ -128,9 +136,7 @@ class ProjectRelatedStream(AsanaStream, ABC):
128
136
argument in request.
129
137
"""
130
138
131
- def stream_slices (
132
- self , sync_mode : SyncMode , cursor_field : List [str ] = None , stream_state : Mapping [str , Any ] = None
133
- ) -> Iterable [Optional [Mapping [str , Any ]]]:
139
+ def stream_slices (self , ** kwargs ) -> Iterable [Optional [Mapping [str , Any ]]]:
134
140
yield from self .read_slices_from_records (stream_class = Projects , slice_field = "project_gid" )
135
141
136
142
@@ -158,9 +164,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
158
164
task_gid = stream_slice ["task_gid" ]
159
165
return f"tasks/{ task_gid } /stories"
160
166
161
- def stream_slices (
162
- self , sync_mode : SyncMode , cursor_field : List [str ] = None , stream_state : Mapping [str , Any ] = None
163
- ) -> Iterable [Optional [Mapping [str , Any ]]]:
167
+ def stream_slices (self , ** kwargs ) -> Iterable [Optional [Mapping [str , Any ]]]:
164
168
yield from self .read_slices_from_records (stream_class = Tasks , slice_field = "task_gid" )
165
169
166
170
@@ -173,10 +177,8 @@ class Tasks(ProjectRelatedStream):
173
177
def path (self , ** kwargs ) -> str :
174
178
return "tasks"
175
179
176
- def request_params (
177
- self , stream_state : Mapping [str , Any ], stream_slice : Mapping [str , Any ] = None , next_page_token : Mapping [str , Any ] = None
178
- ) -> MutableMapping [str , Any ]:
179
- params = super ().request_params (stream_state = stream_state , stream_slice = stream_slice , next_page_token = next_page_token )
180
+ def request_params (self , stream_slice : Mapping [str , Any ] = None , ** kwargs ) -> MutableMapping [str , Any ]:
181
+ params = super ().request_params (stream_slice = stream_slice , ** kwargs )
180
182
params ["project" ] = stream_slice ["project_gid" ]
181
183
return params
182
184
@@ -202,9 +204,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
202
204
team_gid = stream_slice ["team_gid" ]
203
205
return f"teams/{ team_gid } /team_memberships"
204
206
205
- def stream_slices (
206
- self , sync_mode : SyncMode , cursor_field : List [str ] = None , stream_state : Mapping [str , Any ] = None
207
- ) -> Iterable [Optional [Mapping [str , Any ]]]:
207
+ def stream_slices (self , ** kwargs ) -> Iterable [Optional [Mapping [str , Any ]]]:
208
208
yield from self .read_slices_from_records (stream_class = Teams , slice_field = "team_gid" )
209
209
210
210
0 commit comments