26
26
from zope .interface import implementer
27
27
28
28
from twisted .application .internet import ClientService
29
- from twisted .internet .defer import Deferred
29
+ from twisted .internet .defer import CancelledError , Deferred
30
30
from twisted .internet .endpoints import (
31
31
HostnameEndpoint ,
32
32
TCP4ClientEndpoint ,
33
33
TCP6ClientEndpoint ,
34
34
)
35
35
from twisted .internet .interfaces import IPushProducer , ITransport
36
36
from twisted .internet .protocol import Factory , Protocol
37
+ from twisted .python .failure import Failure
37
38
38
39
logger = logging .getLogger (__name__ )
39
40
@@ -131,9 +132,11 @@ def __init__(
131
132
factory = Factory .forProtocol (Protocol )
132
133
self ._service = ClientService (endpoint , factory , clock = _reactor )
133
134
self ._service .startService ()
135
+ self ._stopping = False
134
136
self ._connect ()
135
137
136
138
def close (self ):
139
+ self ._stopping = True
137
140
self ._service .stopService ()
138
141
139
142
def _connect (self ) -> None :
@@ -146,17 +149,21 @@ def _connect(self) -> None:
146
149
147
150
self ._connection_waiter = self ._service .whenConnected (failAfterFailures = 1 )
148
151
149
- @self ._connection_waiter .addErrback
150
- def fail (r ):
151
- r .printTraceback (file = sys .__stderr__ )
152
+ def fail (failure : Failure ) -> None :
153
+ # If the Deferred was cancelled (e.g. during shutdown) do not try to
154
+ # reconnect (this will cause an infinite loop of errors).
155
+ if failure .check (CancelledError ) and self ._stopping :
156
+ return
157
+
158
+ # For a different error, print the traceback and re-connect.
159
+ failure .printTraceback (file = sys .__stderr__ )
152
160
self ._connection_waiter = None
153
161
self ._connect ()
154
162
155
- @self ._connection_waiter .addCallback
156
- def writer (r ):
163
+ def writer (result : Protocol ) -> None :
157
164
# We have a connection. If we already have a producer, and its
158
165
# transport is the same, just trigger a resumeProducing.
159
- if self ._producer and r .transport is self ._producer .transport :
166
+ if self ._producer and result .transport is self ._producer .transport :
160
167
self ._producer .resumeProducing ()
161
168
self ._connection_waiter = None
162
169
return
@@ -167,12 +174,14 @@ def writer(r):
167
174
168
175
# Make a new producer and start it.
169
176
self ._producer = LogProducer (
170
- buffer = self ._buffer , transport = r .transport , format = self .format ,
177
+ buffer = self ._buffer , transport = result .transport , format = self .format ,
171
178
)
172
- r .transport .registerProducer (self ._producer , True )
179
+ result .transport .registerProducer (self ._producer , True )
173
180
self ._producer .resumeProducing ()
174
181
self ._connection_waiter = None
175
182
183
+ self ._connection_waiter .addCallbacks (writer , fail )
184
+
176
185
def _handle_pressure (self ) -> None :
177
186
"""
178
187
Handle backpressure by shedding records.
0 commit comments