Skip to content

Commit 8ad92a0

Browse files
assafkmMohaban, Assaf
andauthored
Adding Http transport (#13)
Co-authored-by: Mohaban, Assaf <[email protected]>
1 parent 989edb6 commit 8ad92a0

File tree

6 files changed

+178
-3
lines changed

6 files changed

+178
-3
lines changed

package.json

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@
3434
"devDependencies": {
3535
"@types/jest": "^24.0.15",
3636
"jest": "^24.8.0",
37+
"rxjs": "^6.2.2",
3738
"ts-jest": "^24.0.2",
38-
"typescript": "^3.4.5",
39-
"rxjs": "^6.2.2"
39+
"typescript": "^3.4.5"
4040
},
4141
"peerDependencies": {
4242
"rxjs": "^6.2.2"
@@ -47,5 +47,8 @@
4747
"src/lib/**/*",
4848
"*.json",
4949
"LICENSE"
50-
]
50+
],
51+
"dependencies": {
52+
"axios": "^0.21.1"
53+
}
5154
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ export * from './lib/rxrpc-transport';
44
export * from './lib/rxrpc-websocket-transport';
55
export * from './lib/rxrpc-reconnectable-transport';
66
export * from './lib/rxrpc-types';
7+
export * from './lib/rxrpc-http-transport';

src/lib/rxrpc-http-attributes.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
export class HttpAttributes {
3+
static ClientIdAttribute = "X-RPC-CLIENT-ID";
4+
static ClientPollingPeriod = 500;
5+
static ClientPollingRetryCount = 10;
6+
}

src/lib/rxrpc-http-transport.spec.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import {RxRpcHttpConnection, RxRpcHttpTransport} from './rxrpc-http-transport';
2+
import axios from 'axios';
3+
4+
jest.mock('axios');
5+
const mockedAxios = axios as jest.Mocked<typeof axios>
6+
7+
function delay(ms: number){
8+
return new Promise( resolve => setTimeout(resolve, ms) );
9+
}
10+
11+
describe('RxRpc Http Transport test suite', function () {
12+
let transport: RxRpcHttpTransport;
13+
let clientId: string;
14+
let incomingMessages: any[];
15+
let resp: {}
16+
17+
beforeEach(() => {
18+
transport = new RxRpcHttpTransport("http://funnyName/");
19+
clientId = "12345678";
20+
incomingMessages = [];
21+
resp = {
22+
headers: {"x-rpc-client-id": clientId}
23+
};
24+
});
25+
26+
it('Connect', async () => {
27+
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
28+
29+
transport.connect().subscribe(connection => incomingMessages.push(connection['clientId']))
30+
await delay(1000);
31+
expect(incomingMessages[0]).toEqual(clientId)
32+
})
33+
34+
it('Poll with data as a string', async () => {
35+
const data1 = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #0\",\"error\":null}}"
36+
const data2 = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #1\",\"error\":null}}"
37+
resp['data'] = `${data1}\n${data2}`
38+
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
39+
40+
transport.connect().subscribe(connection => connection.poll().subscribe(msg => {
41+
incomingMessages.push(msg);
42+
}))
43+
await delay(1000);
44+
expect(incomingMessages.length).toEqual(2);
45+
expect(incomingMessages[0]).toEqual(JSON.parse(data1));
46+
expect(incomingMessages[1]).toEqual(JSON.parse(data2));
47+
})
48+
49+
it('Poll with data as an object', async () => {
50+
const data = "{\"invocationId\":1,\"result\":{\"type\":\"Data\",\"data\":\"Hello, Angular #0\",\"error\":null}}"
51+
resp['data'] = JSON.parse(data)
52+
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
53+
54+
transport.connect().subscribe(connection => connection.poll().subscribe(msg => {
55+
incomingMessages.push(msg);
56+
}))
57+
await delay(1000);
58+
expect(incomingMessages.length).toEqual(1);
59+
expect(incomingMessages[0]).toEqual(JSON.parse(data));
60+
})
61+
62+
it('Close', async () => {
63+
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
64+
65+
transport.connect().subscribe(connection => incomingMessages.push(connection))
66+
await delay(1000);
67+
const connection = incomingMessages[0] as RxRpcHttpConnection;
68+
connection.close()
69+
expect(connection['pollingSubscription'].closed).toEqual(true)
70+
})
71+
72+
it('Error', async () => {
73+
mockedAxios.post.mockImplementation(() => Promise.resolve(resp));
74+
75+
transport.connect().subscribe(connection => incomingMessages.push(connection))
76+
await delay(1000);
77+
const connection = incomingMessages[0] as RxRpcHttpConnection;
78+
connection.error(null)
79+
expect(connection['pollingSubscription'].closed).toEqual(true)
80+
})
81+
})

src/lib/rxrpc-http-transport.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import {RxRpcConnection, RxRpcTransport} from './rxrpc-transport';
2+
import {from, interval, observable, Observable, of, Subject, Subscription} from 'rxjs';
3+
import {mergeMap, map, retry, tap, filter} from "rxjs/operators";
4+
import {HttpAttributes} from "./rxrpc-http-attributes";
5+
import axios, {AxiosResponse} from 'axios';
6+
import {fromPromise} from "rxjs/internal-compatibility";
7+
import {fromArray} from "rxjs/internal/observable/fromArray";
8+
9+
10+
export class RxRpcHttpConnection implements RxRpcConnection {
11+
readonly messages: Observable<any>;
12+
private pollingSubscription: Subscription;
13+
private readonly incoming = new Subject<any>();
14+
15+
constructor(private readonly uri: string, private readonly clientId: string) {
16+
this.messages = this.incoming;
17+
this.pollingSubscription = interval(HttpAttributes.ClientPollingPeriod)
18+
.pipe(
19+
mergeMap(() => this.poll()),
20+
retry(HttpAttributes.ClientPollingRetryCount))
21+
.subscribe(
22+
() => {},
23+
err => this.incoming.error(err),
24+
() => this.incoming.complete());
25+
}
26+
27+
close() {
28+
this.pollingSubscription.unsubscribe();
29+
}
30+
31+
error(error: any) {
32+
this.close();
33+
}
34+
35+
send(msg: any) {
36+
this.post('message', msg).subscribe();
37+
}
38+
39+
poll(): Observable<any> {
40+
return this.post('polling')
41+
.pipe(
42+
map(resp => resp.data),
43+
filter(data => data !== ""),
44+
mergeMap(data => {
45+
if(typeof data === 'string') {
46+
return fromArray(data.split("\n").map(s => JSON.parse(s)));
47+
}
48+
return of(data);
49+
}),
50+
tap(obj => this.incoming.next(obj)));
51+
}
52+
53+
post(path: string, msg?: any): Observable<AxiosResponse<string>> {
54+
const headers = {};
55+
headers[HttpAttributes.ClientIdAttribute] = this.clientId;
56+
return fromPromise(axios.post<string>(`${this.uri}/${path}`, msg, {headers: headers}))
57+
}
58+
}
59+
60+
export class RxRpcHttpTransport implements RxRpcTransport {
61+
62+
constructor(private readonly uri: string) {}
63+
64+
connect(): Observable<RxRpcHttpConnection> {
65+
return fromPromise(axios.post<string>(`${this.uri}/connect`))
66+
.pipe(
67+
map( res => {
68+
const clientId = res.headers[HttpAttributes.ClientIdAttribute.toLowerCase()];
69+
return new RxRpcHttpConnection(this.uri, clientId);
70+
}));
71+
}
72+
}

yarn.lock

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,13 @@ aws4@^1.8.0:
523523
resolved "https://registry.yarnpkg.com/aws4/-/aws4-1.8.0.tgz#f0e003d9ca9e7f59c7a508945d7b2ef9a04a542f"
524524
integrity sha512-ReZxvNHIOv88FlT7rxcXIIC0fPt4KZqZbOlivyWtXLt8ESx84zd3kMC6iK5jVeS2qt+g7ftS7ye4fi06X5rtRQ==
525525

526+
axios@^0.21.1:
527+
version "0.21.1"
528+
resolved "https://registry.yarnpkg.com/axios/-/axios-0.21.1.tgz#22563481962f4d6bde9a76d516ef0e5d3c09b2b8"
529+
integrity sha512-dKQiRHxGD9PPRIUNIWvZhPTPpl1rf/OxTYKsqKUDjBwYylTvV7SjSHJb9ratfyzM6wCdLCOYLzs73qpg5c4iGA==
530+
dependencies:
531+
follow-redirects "^1.10.0"
532+
526533
babel-jest@^24.8.0:
527534
version "24.8.0"
528535
resolved "https://registry.yarnpkg.com/babel-jest/-/babel-jest-24.8.0.tgz#5c15ff2b28e20b0f45df43fe6b7f2aae93dba589"
@@ -1141,6 +1148,11 @@ find-up@^3.0.0:
11411148
dependencies:
11421149
locate-path "^3.0.0"
11431150

1151+
follow-redirects@^1.10.0:
1152+
version "1.13.3"
1153+
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.13.3.tgz#e5598ad50174c1bc4e872301e82ac2cd97f90267"
1154+
integrity sha512-DUgl6+HDzB0iEptNQEXLx/KhTmDb8tZUHSeLqpnjpknR70H0nC2t9N73BK6fN4hOvJ84pKlIQVQ4k5FFlBedKA==
1155+
11441156
for-in@^1.0.2:
11451157
version "1.0.2"
11461158
resolved "https://registry.yarnpkg.com/for-in/-/for-in-1.0.2.tgz#81068d295a8142ec0ac726c6e2200c30fb6d5e80"

0 commit comments

Comments
 (0)