Skip to content

Commit 3077ea4

Browse files
committed
fix(IoT): Fixing race conditions during StreamThread cleanup
1 parent 9a39679 commit 3077ea4

File tree

3 files changed

+119
-200
lines changed

3 files changed

+119
-200
lines changed

AWSIoT/Internal/AWSIoTStreamThread.m

Lines changed: 81 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,13 @@ @interface AWSIoTStreamThread()
2727
@property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval;
2828
@property(nonatomic, assign) BOOL isRunning;
2929
@property(nonatomic, assign) BOOL shouldDisconnect;
30-
31-
// Add synchronization primitives
32-
@property(nonatomic, strong) dispatch_queue_t cleanupQueue;
33-
@property(nonatomic, strong) dispatch_semaphore_t cleanupSemaphore;
34-
@property(nonatomic, assign) BOOL isCleaningUp;
30+
@property(nonatomic, strong) dispatch_queue_t serialQueue;
31+
@property(nonatomic, assign) BOOL didCleanUp;
3532
@end
3633

3734
@implementation AWSIoTStreamThread
3835

39-
- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
36+
- (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session
4037
decoderInputStream:(nonnull NSInputStream *)decoderInputStream
4138
encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream {
4239
return [self initWithSession:session
@@ -56,184 +53,127 @@ - (instancetype)initWithSession:(nonnull AWSMQTTSession *)session
5653
_outputStream = outputStream;
5754
_defaultRunLoopTimeInterval = 10;
5855
_shouldDisconnect = NO;
59-
_isCleaningUp = NO;
60-
61-
// Initialize synchronization primitives
62-
_cleanupQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.cleanup", DISPATCH_QUEUE_SERIAL);
63-
_cleanupSemaphore = dispatch_semaphore_create(1);
56+
_serialQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.syncQueue", DISPATCH_QUEUE_SERIAL);
57+
_didCleanUp = NO;
6458
}
6559
return self;
6660
}
6761

6862
- (void)main {
69-
@autoreleasepool {
70-
AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);
71-
72-
if (![self setupRunLoop]) {
73-
AWSDDLogError(@"Failed to setup run loop for thread: [%@]", self);
74-
return;
75-
}
76-
77-
[self startIOOperations];
78-
79-
while ([self shouldContinueRunning]) {
80-
@autoreleasepool {
81-
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
82-
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
83-
}
84-
}
85-
86-
[self performCleanup];
87-
88-
AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
89-
}
90-
}
91-
92-
- (BOOL)setupRunLoop {
9363
if (self.isRunning) {
94-
AWSDDLogError(@"Thread already running");
95-
return NO;
64+
AWSDDLogWarn(@"Attempted to start a thread that is already running: [%@]", self);
65+
return;
9666
}
97-
67+
68+
AWSDDLogVerbose(@"Started execution of Thread: [%@]", self);
69+
//This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread.
9870
self.runLoopForStreamsThread = [NSRunLoop currentRunLoop];
99-
100-
// Setup timer with weak reference to prevent retain cycles
71+
72+
//Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop
73+
//below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence.
10174
__weak typeof(self) weakSelf = self;
10275
self.defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0]
10376
interval:60.0
104-
target:weakSelf
105-
selector:@selector(timerHandler:)
106-
userInfo:nil
107-
repeats:YES];
108-
109-
if (!self.defaultRunLoopTimer) {
110-
AWSDDLogError(@"Failed to create run loop timer");
111-
return NO;
112-
}
77+
repeats:YES
78+
block:^(NSTimer * _Nonnull timer) {
79+
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@", weakSelf, weakSelf.isRunning ? @"YES" : @"NO", weakSelf.isCancelled ? @"YES" : @"NO");
80+
}];
11381
[self.runLoopForStreamsThread addTimer:self.defaultRunLoopTimer
11482
forMode:NSDefaultRunLoopMode];
115-
self.isRunning = YES;
116-
return YES;
117-
}
11883

119-
- (void)startIOOperations {
84+
self.isRunning = YES;
12085
if (self.outputStream) {
12186
[self.outputStream scheduleInRunLoop:self.runLoopForStreamsThread
122-
forMode:NSDefaultRunLoopMode];
87+
forMode:NSDefaultRunLoopMode];
12388
[self.outputStream open];
12489
}
90+
91+
//Update the runLoop and runLoopMode in session.
12592
[self.session connectToInputStream:self.decoderInputStream
12693
outputStream:self.encoderOutputStream];
94+
95+
while ([self shouldContinueRunning]) {
96+
//This will continue run until the thread is cancelled
97+
//Run one cycle of the runloop. This will return after a input source event or timer event is processed
98+
[self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode
99+
beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]];
100+
}
101+
102+
[self cleanUp];
103+
104+
AWSDDLogVerbose(@"Finished execution of Thread: [%@]", self);
127105
}
128106

129107
- (BOOL)shouldContinueRunning {
130108
__block BOOL shouldRun;
131-
dispatch_sync(self.cleanupQueue, ^{
109+
dispatch_sync(self.serialQueue, ^{
132110
shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil;
133111
});
134112
return shouldRun;
135113
}
136114

137-
- (void)invalidateTimer {
138-
dispatch_sync(self.cleanupQueue, ^{
139-
if (self.defaultRunLoopTimer) {
140-
[self.defaultRunLoopTimer invalidate];
141-
self.defaultRunLoopTimer = nil;
142-
}
143-
});
144-
}
145-
146115
- (void)cancel {
147116
AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self);
148-
[self cancelWithDisconnect:NO];
117+
dispatch_sync(self.serialQueue, ^{
118+
self.isRunning = NO;
119+
[super cancel];
120+
});
149121
}
150122

151123
- (void)cancelAndDisconnect:(BOOL)shouldDisconnect {
152-
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]",
153-
shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
154-
[self cancelWithDisconnect:shouldDisconnect];
124+
AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self);
125+
dispatch_sync(self.serialQueue, ^{
126+
self.shouldDisconnect = shouldDisconnect;
127+
self.isRunning = NO;
128+
[super cancel];
129+
});
155130
}
156131

157-
- (void)cancelWithDisconnect:(BOOL)shouldDisconnect {
158-
// Ensure thread-safe property updates
159-
dispatch_sync(self.cleanupQueue, ^{
160-
if (!self.isCleaningUp) {
161-
self.shouldDisconnect = shouldDisconnect;
162-
self.isRunning = NO;
163-
[super cancel];
164-
165-
// Invalidate timer to trigger run loop exit
166-
[self invalidateTimer];
132+
- (void)cleanUp {
133+
dispatch_sync(self.serialQueue, ^{
134+
if (self.didCleanUp) {
135+
AWSDDLogVerbose(@"Clean up already called for thread: [%@]", (NSThread *)self);
136+
return;
167137
}
168-
});
169-
}
170138

171-
- (void)performCleanup {
172-
dispatch_semaphore_wait(self.cleanupSemaphore, DISPATCH_TIME_FOREVER);
173-
174-
if (self.isCleaningUp) {
175-
dispatch_semaphore_signal(self.cleanupSemaphore);
176-
return;
177-
}
178-
179-
self.isCleaningUp = YES;
180-
dispatch_semaphore_signal(self.cleanupSemaphore);
181-
182-
dispatch_sync(self.cleanupQueue, ^{
183-
[self cleanupResources];
184-
});
185-
}
139+
self.didCleanUp = YES;
140+
if (self.defaultRunLoopTimer) {
141+
[self.defaultRunLoopTimer invalidate];
142+
self.defaultRunLoopTimer = nil;
143+
}
186144

187-
- (void)cleanupResources {
188-
if (self.shouldDisconnect) {
189-
[self closeSession];
190-
[self closeStreams];
191-
} else {
192-
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
193-
}
194-
195-
// Handle onStop callback
196-
dispatch_block_t stopBlock = self.onStop;
197-
if (stopBlock) {
198-
self.onStop = nil;
199-
stopBlock();
200-
}
201-
}
145+
if (self.shouldDisconnect) {
146+
if (self.session) {
147+
[self.session close];
148+
self.session = nil;
149+
}
202150

203-
- (void)closeSession {
204-
if (self.session) {
205-
[self.session close];
206-
self.session = nil;
207-
}
208-
}
151+
if (self.outputStream) {
152+
self.outputStream.delegate = nil;
153+
[self.outputStream close];
154+
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
155+
forMode:NSDefaultRunLoopMode];
156+
self.outputStream = nil;
157+
}
209158

210-
- (void)closeStreams {
211-
if (self.outputStream) {
212-
self.outputStream.delegate = nil;
213-
[self.outputStream close];
214-
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
215-
forMode:NSDefaultRunLoopMode];
216-
self.outputStream = nil;
217-
}
218-
219-
if (self.decoderInputStream) {
220-
[self.decoderInputStream close];
221-
self.decoderInputStream = nil;
222-
}
223-
224-
if (self.encoderOutputStream) {
225-
[self.encoderOutputStream close];
226-
self.encoderOutputStream = nil;
227-
}
228-
}
159+
if (self.decoderInputStream) {
160+
[self.decoderInputStream close];
161+
self.decoderInputStream = nil;
162+
}
229163

230-
- (void)timerHandler:(NSTimer*)theTimer {
231-
AWSDDLogVerbose(@"Default run loop timer executed on Thread: [%@]. isRunning = %@. isCancelled = %@",
232-
self, self.isRunning ? @"YES" : @"NO", self.isCancelled ? @"YES" : @"NO");
233-
}
164+
if (self.encoderOutputStream) {
165+
[self.encoderOutputStream close];
166+
self.encoderOutputStream = nil;
167+
}
168+
} else {
169+
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
170+
}
234171

235-
- (void)dealloc {
236-
AWSDDLogVerbose(@"Deallocating AWSIoTStreamThread: [%@]", self);
172+
if (self.onStop) {
173+
self.onStop();
174+
self.onStop = nil;
175+
}
176+
});
237177
}
238178

239179
@end

0 commit comments

Comments
 (0)