@@ -23,43 +23,6 @@ def __init__(self, service_account_id=None, user_key=None, user_id=None, client_
23
23
def _initialize (self , config ):
24
24
self .config = config
25
25
26
- def _check_exceeded (self , subscription_id ):
27
- host = self .config .get_uri_context ()
28
- headers = self .config .get_headers ()
29
- while True :
30
-
31
- stream_id_uri = host + '/streams/' + "-" .join (subscription_id .split ("-" )[:- 2 ])
32
-
33
- r = requests .get (stream_id_uri , headers = headers )
34
-
35
- try :
36
- if r .json ()['data' ]['attributes' ]['job_status' ] == "DOC_COUNT_EXCEEDED" :
37
- if "Authorization" in headers :
38
- limits_uri = host + '/accounts/' + self .config .oauth2_credentials ()['client_id' ]
39
- else :
40
- limits_uri = host + '/accounts/' + self .config .get_user_key ()
41
- limit_msg = 'NA'
42
- try :
43
- lr = requests .get (limits_uri , headers = headers )
44
- limit_msg = lr .json ()['data' ]['attributes' ]['max_allowed_extracts' ]
45
- except KeyError :
46
- logger .error ('Could not parse account limit request response.' )
47
- logger .error (
48
- 'OOPS! Looks like you\' ve exceeded the maximum number of documents received for your account ' +
49
- '({}). As such, no new documents will be added to your stream\' s queue. However, you won\' t ' +
50
- 'lose access to any documents that have already been added to the queue. These will continue ' +
51
- 'to be streamed to you. Contact your account administrator with any questions or to upgrade ' +
52
- 'your account limits.' .format (limit_msg ))
53
-
54
- except KeyError :
55
- raise Exception (
56
- "Unable to request data from your stream subscription id" )
57
- time .sleep (5 * 60 )
58
-
59
- def check_exceeded_thread (self , subscription_id ):
60
- thread = Thread (target = self ._check_exceeded , args = [subscription_id ], daemon = True )
61
- thread .start ()
62
-
63
26
def listen (self , on_message_callback , maximum_messages = DEFAULT_UNLIMITED_MESSAGES , subscription_id = "" , batch_size = 10 ):
64
27
pubsub_client = pubsub_service .get_client (self .config )
65
28
@@ -69,8 +32,6 @@ def listen(self, on_message_callback, maximum_messages=DEFAULT_UNLIMITED_MESSAGE
69
32
'No subscription specified. You must specify the subscription ID either through an environment ' +
70
33
'variable, a config file or by passing the value to the method.' )
71
34
72
- self .check_exceeded_thread (subscription_id )
73
-
74
35
streaming_credentials = credentials_service .fetch_credentials (
75
36
self .config )
76
37
subscription_path = pubsub_client .subscription_path (
@@ -143,8 +104,6 @@ def ack_message_and_callback(message):
143
104
'No subscription specified. You must specify the subscription ID either through an environment variable, a config file or '
144
105
'by passing the value to the method.' )
145
106
146
- self .check_exceeded_thread (subscription_id )
147
-
148
107
streaming_credentials = credentials_service .fetch_credentials (
149
108
self .config )
150
109
subscription_path = pubsub_client .subscription_path (
0 commit comments