Skip to content

Commit 6543425

Browse files
urikoakoazu1
andauthored
add observConnected to RxrpcInvoker and RxrpcClient (#7)
Signed-off-by: koazu1 <[email protected]> Co-authored-by: koazu1 <[email protected]>
1 parent 90fbff1 commit 6543425

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

src/lib/rxrpc-client.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {defer, interval, Observable, of, OperatorFunction, Subject, throwError} from 'rxjs';
2-
import {finalize, flatMap, shareReplay, takeUntil, takeWhile} from 'rxjs/operators'
2+
import {distinctUntilChanged, finalize, flatMap, shareReplay, takeUntil, takeWhile} from 'rxjs/operators'
33
import {Response} from './data/response';
44
import {Result} from './data/result';
55
import {Invocation, Invocations} from './data/invocation';
@@ -30,6 +30,7 @@ export class RxRpcClient extends RxRpcInvoker {
3030
private listeners: RxRpcInvocationListener[] = [];
3131
private currentConnection: RxRpcConnection;
3232
private readonly sharedInvocations = new Map<string, Observable<Result>>();
33+
private readonly connectedSubject = new Subject<boolean>();
3334

3435
constructor(private readonly transport: RxRpcTransport, options?: RxRpcClientOptions) {
3536
super();
@@ -54,6 +55,10 @@ export class RxRpcClient extends RxRpcInvoker {
5455
});
5556
}
5657

58+
public observeConnected(): Observable<boolean> {
59+
return this.connectedSubject.pipe(distinctUntilChanged());
60+
}
61+
5762
public addListener(listener: RxRpcInvocationListener): RxRpcInvocationListenerSubscription {
5863
this.listeners.push(listener);
5964
return { unsubscribe: () => this.listeners = this.listeners.filter(l => l != listener) };
@@ -138,6 +143,7 @@ export class RxRpcClient extends RxRpcInvoker {
138143

139144
private onConnected(connection: RxRpcConnection) {
140145
this.currentConnection = connection;
146+
this.connectedSubject.next(true);
141147
this.currentConnection.messages
142148
.pipe(takeUntil(this.cancelledSubject))
143149
.subscribe(
@@ -156,12 +162,15 @@ export class RxRpcClient extends RxRpcInvoker {
156162
this.invocations.clear();
157163
this.cancelledSubject.next();
158164
this.currentConnection = null;
165+
this.connectedSubject.next(false);
159166
}
160167

161168
private dispatchResponse(response: Response) {
162169
this.listeners.forEach(l => l.onResponse(response));
163-
this.invocations
164-
.get(response.invocationId)
165-
.next(response.result);
170+
if (this.invocations && this.invocations.has(response.invocationId)) {
171+
this.invocations
172+
.get(response.invocationId)
173+
.next(response.result);
174+
}
166175
}
167176
}

src/lib/rxrpc-invoker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ import {Observable} from 'rxjs';
33
export abstract class RxRpcInvoker {
44
public abstract invoke<T>(method: string, args: any): Observable<T>;
55
public abstract invokeShared<T>(method: string, replayCount: number, args: any): Observable<T>;
6+
public abstract observeConnected(): Observable<boolean>;
67
}

0 commit comments

Comments
 (0)