Skip to content

Commit 02a668f

Browse files
authored
add interceptors for http transport options (#20)
* add intercptors in http options * add interceptors headers to connect + fix to close + fix tests * fix typo Signed-off-by: Koaz <[email protected]>
1 parent 5d5f356 commit 02a668f

File tree

2 files changed

+124
-32
lines changed

2 files changed

+124
-32
lines changed

src/lib/rxrpc-http-transport.spec.ts

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1-
import {RxRpcHttpConnection, RxRpcHttpTransport} from './rxrpc-http-transport';
1+
import {
2+
RxRpcHttpConnection,
3+
RxRpcHttpTransport,
4+
RxRpcHttpTransportInterceptor,
5+
RxRpcHttpTransportRequestConfig
6+
} from './rxrpc-http-transport';
27
import axios from 'axios';
8+
import {Observable, of} from "rxjs";
9+
import {HttpAttributes} from "./rxrpc-http-attributes";
310

411
jest.mock('axios');
5-
const mockedAxios = axios as jest.Mocked<typeof axios>
612

713
function delay(ms: number){
814
return new Promise( resolve => setTimeout(resolve, ms) );
@@ -14,18 +20,44 @@ describe('RxRpc Http Transport test suite', function () {
1420
let incomingMessages: any[];
1521
let resp: {}
1622

23+
let interceptors: RxRpcHttpTransportInterceptor[] = [];
24+
let headerKey1 = "hk1";
25+
let headerKey2 = "hk2"
26+
let headerValue1 = "value1";
27+
let headerValue2 = "value2";
28+
let mockedAxios: jest.Mocked<typeof axios>
29+
1730
beforeEach(() => {
18-
transport = new RxRpcHttpTransport("https://funnyName/");
31+
mockedAxios = axios as jest.Mocked<typeof axios>
32+
interceptors.push(new class implements RxRpcHttpTransportInterceptor {
33+
intercept(requestConfig: RxRpcHttpTransportRequestConfig): Observable<RxRpcHttpTransportRequestConfig> {
34+
requestConfig.headers[headerKey1] = headerValue1;
35+
return of(requestConfig);
36+
}
37+
})
38+
interceptors.push(new class implements RxRpcHttpTransportInterceptor {
39+
intercept(requestConfig: RxRpcHttpTransportRequestConfig): Observable<RxRpcHttpTransportRequestConfig> {
40+
requestConfig.headers[headerKey2] = headerValue2;
41+
return of(requestConfig);
42+
}
43+
})
44+
45+
transport = new RxRpcHttpTransport("https://funnyName/", {interceptors: interceptors});
46+
1947
clientId = "12345678";
2048
incomingMessages = [];
2149
resp = {
2250
headers: {"x-rpc-client-id": clientId}
2351
};
2452
});
2553

26-
it('Connect', async () => {
27-
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
54+
afterEach(() => {
55+
mockedAxios.post.mockReset();
56+
interceptors = [];
57+
});
2858

59+
it('Connect', async () => {
60+
mockAndVerifyExpectedHeaders();
2961
transport.connect().subscribe(connection => incomingMessages.push(connection['clientId']))
3062
await delay(1000);
3163
expect(incomingMessages[0]).toEqual(clientId)
@@ -35,11 +67,13 @@ describe('RxRpc Http Transport test suite', function () {
3567
const data1 = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #0\",\"error\":null}}"
3668
const data2 = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #1\",\"error\":null}}"
3769
resp['data'] = JSON.parse(`[${data1},\n${data2}]`)
38-
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
39-
40-
transport.connect().subscribe(connection => connection.poll().subscribe(msg => {
41-
incomingMessages.push(msg);
42-
}))
70+
mockAndVerifyExpectedHeaders();
71+
transport.connect().subscribe(connection => {
72+
mockAndVerifyExpectedHeadersWithClientId();
73+
connection.poll().subscribe(msg => {
74+
incomingMessages.push(msg);
75+
})
76+
})
4377
await delay(1000);
4478
expect(incomingMessages.length).toEqual(2);
4579
expect(incomingMessages[0]).toEqual(JSON.parse(data1));
@@ -49,33 +83,61 @@ describe('RxRpc Http Transport test suite', function () {
4983
it('Poll with data as JSON', async () => {
5084
const data = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #0\",\"error\":null}}"
5185
resp['data'] = JSON.parse(`[${data}]`)
52-
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
53-
54-
transport.connect().subscribe(connection => connection.poll().subscribe(msg => {
55-
incomingMessages.push(msg);
56-
}))
86+
mockAndVerifyExpectedHeaders();
87+
transport.connect().subscribe(connection => {
88+
mockAndVerifyExpectedHeadersWithClientId();
89+
connection.poll().subscribe(msg => {
90+
incomingMessages.push(msg);
91+
})
92+
})
5793
await delay(1000);
5894
expect(incomingMessages.length).toEqual(1);
5995
expect(incomingMessages[0]).toEqual(JSON.parse(data));
6096
})
6197

6298
it('Close', async () => {
63-
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
64-
99+
mockAndVerifyExpectedHeaders();
65100
transport.connect().subscribe(connection => incomingMessages.push(connection))
66101
await delay(1000);
67102
const connection = incomingMessages[0] as RxRpcHttpConnection;
103+
mockAndVerifyExpectedHeadersWithClientId();
68104
connection.close()
69105
expect(connection['pollingSubscription'].closed).toEqual(true)
70106
})
71107

72108
it('Error', async () => {
73-
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
74-
109+
mockAndVerifyExpectedHeaders();
75110
transport.connect().subscribe(connection => incomingMessages.push(connection))
76111
await delay(1000);
77112
const connection = incomingMessages[0] as RxRpcHttpConnection;
113+
mockAndVerifyExpectedHeadersWithClientId();
78114
connection.error(null)
79115
expect(connection['pollingSubscription'].closed).toEqual(true)
80116
})
117+
118+
function mockAndVerifyExpectedHeaders() {
119+
mockAndVerifyHeaders(getHeaders());
120+
}
121+
function mockAndVerifyExpectedHeadersWithClientId() {
122+
mockAndVerifyHeaders(getHeadersWithClientId());
123+
}
124+
function mockAndVerifyHeaders(expectedHeaders: {}) {
125+
mockedAxios.post.mockImplementation((url, msg, options) => {
126+
expect(options.headers).toEqual(expectedHeaders);
127+
return Promise.resolve(resp);
128+
});
129+
}
130+
131+
function getHeadersWithClientId() {
132+
let expectedHeaders = getHeaders();
133+
expectedHeaders[HttpAttributes.ClientIdAttribute] = clientId;
134+
return expectedHeaders;
135+
}
136+
137+
function getHeaders() {
138+
let expectedHeaders = {};
139+
expectedHeaders[headerKey1] = headerValue1;
140+
expectedHeaders[headerKey2] = headerValue2;
141+
return expectedHeaders;
142+
}
81143
})

src/lib/rxrpc-http-transport.ts

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,43 @@
11
import {RxRpcConnection, RxRpcTransport} from './rxrpc-transport';
22
import {interval, Observable, of, Subject, Subscription} from 'rxjs';
3-
import {mergeMap, map, retry, filter, tap} from "rxjs/operators";
3+
import {map, mergeMap, retry, tap} from "rxjs/operators";
44
import {HttpAttributes} from "./rxrpc-http-attributes";
5-
import axios, {AxiosResponse} from 'axios';
5+
import axios from 'axios';
66
import {fromPromise} from "rxjs/internal-compatibility";
77
import {fromArray} from "rxjs/internal/observable/fromArray";
88

99
export interface RxRpcHttpTransportOptions {
1010
pollingPeriodMillis?: number
1111
pollingRetryCount?: number
12+
interceptors?: RxRpcHttpTransportInterceptor[]
13+
}
14+
15+
export interface RxRpcHttpTransportRequestConfig {
16+
headers: {[key: string]: string}
17+
}
18+
19+
export interface RxRpcHttpTransportInterceptor {
20+
intercept(requestConfig: RxRpcHttpTransportRequestConfig) : Observable<RxRpcHttpTransportRequestConfig>;
1221
}
1322

1423
export class RxRpcHttpConnection implements RxRpcConnection {
24+
readonly interceptors: RxRpcHttpTransportInterceptor[] = [];
1525
readonly messages: Observable<any>;
1626
private pollingSubscription: Subscription;
1727
private readonly incoming = new Subject<any>();
1828

1929
constructor(private readonly uri: string,
2030
private readonly clientId: string,
2131
options: RxRpcHttpTransportOptions) {
32+
33+
this.interceptors = options.interceptors || [];
34+
this.interceptors.push({
35+
intercept(requestConfig: RxRpcHttpTransportRequestConfig): Observable<RxRpcHttpTransportRequestConfig> {
36+
requestConfig.headers[HttpAttributes.ClientIdAttribute] = clientId;
37+
return of(requestConfig);
38+
}
39+
});
40+
2241
this.messages = this.incoming;
2342
this.pollingSubscription = interval(options.pollingPeriodMillis)
2443
.pipe(
@@ -32,6 +51,7 @@ export class RxRpcHttpConnection implements RxRpcConnection {
3251

3352
close() {
3453
this.pollingSubscription.unsubscribe();
54+
RxRpcHttpConnection.postWithInterceptors(`${this.uri}/disconnect`, this.interceptors).subscribe();
3555
}
3656

3757
error(error: any) {
@@ -46,10 +66,20 @@ export class RxRpcHttpConnection implements RxRpcConnection {
4666
return this.post('polling');
4767
}
4868

49-
post(path: string, msg?: any): Observable<any> {
69+
static postWithInterceptors(url: string, interceptors: RxRpcHttpTransportInterceptor[], body?: any): Observable<any> {
5070
const headers = {};
51-
headers[HttpAttributes.ClientIdAttribute] = this.clientId;
52-
return fromPromise(axios.post<string>(`${this.uri}/${path}`, msg, {headers: headers}))
71+
72+
let config: Observable<RxRpcHttpTransportRequestConfig> = of({headers: headers});
73+
74+
// flatten all interceptor observable
75+
interceptors.forEach(interceptor => config = config.pipe(mergeMap(cfg => interceptor.intercept(cfg))));
76+
77+
return config.pipe(mergeMap(cfg =>
78+
fromPromise(axios.post<string>(url, body, {headers: cfg.headers}))));
79+
}
80+
81+
post(path: string, msg?: any): Observable<any> {
82+
return RxRpcHttpConnection.postWithInterceptors(`${this.uri}/${path}`, this.interceptors, msg)
5383
.pipe(
5484
map(resp => resp.data),
5585
mergeMap(data => {
@@ -59,27 +89,27 @@ export class RxRpcHttpConnection implements RxRpcConnection {
5989
obj => this.incoming.next(obj),
6090
err => this.incoming.error(err)
6191
)
62-
);
92+
)
6393
}
6494
}
6595

6696
export class RxRpcHttpTransport implements RxRpcTransport {
6797
private readonly options: RxRpcHttpTransportOptions;
6898
private static readonly defaultOptions: RxRpcHttpTransportOptions = {
6999
pollingPeriodMillis: HttpAttributes.DefaultClientPollingPeriodMillis,
70-
pollingRetryCount: HttpAttributes.DefaultClientPollingRetryCount
100+
pollingRetryCount: HttpAttributes.DefaultClientPollingRetryCount,
101+
interceptors: []
71102
}
72103

73104
constructor(private readonly uri: string, options?: RxRpcHttpTransportOptions) {
74105
this.options = {...RxRpcHttpTransport.defaultOptions, ...options} || RxRpcHttpTransport.defaultOptions
75106
}
76107

77108
connect(): Observable<RxRpcHttpConnection> {
78-
return fromPromise(axios.post<string>(`${this.uri}/connect`))
79-
.pipe(
80-
map( res => {
81-
const clientId = res.headers[HttpAttributes.ClientIdAttribute.toLowerCase()];
82-
return new RxRpcHttpConnection(this.uri, clientId, this.options);
83-
}));
109+
return RxRpcHttpConnection.postWithInterceptors(`${this.uri}/connect`, this.options.interceptors)
110+
.pipe(map( res => {
111+
const clientId = res.headers[HttpAttributes.ClientIdAttribute.toLowerCase()];
112+
return new RxRpcHttpConnection(this.uri, clientId, this.options);
113+
}));
84114
}
85115
}

0 commit comments

Comments
 (0)