Skip to content

Commit fea5ac2

Browse files
fix: chunked data
1 parent f9a213c commit fea5ac2

File tree

3 files changed

+112
-6
lines changed

3 files changed

+112
-6
lines changed

src/__tests__/__snapshots__/stream-impersonator.test.ts.snap

+32
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,35 @@ impersonate-user: johndoe
206206
207207
"
208208
`;
209+
210+
exports[`StreamImpersonator transfer encoding chunked 1`] = `
211+
"GET /version HTTP/1.1
212+
host: 127.0.0.1:53364
213+
user-agent: node-fetch
214+
accept: */*
215+
accept-encoding: gzip, deflate, br
216+
x-forwarded-for: 127.0.0.1
217+
authorization: Bearer service-account-token
218+
impersonate-user: johndoe
219+
impersonate-group: dev
220+
impersonate-group: ops
221+
222+
POST /apis/authorization.k8s.io/v1/selfsubjectaccessreviews HTTP/1.1
223+
host: 127.0.0.1:53364
224+
user-agent: node-fetch
225+
transfer-encoding: chunked
226+
accept: */*
227+
accept-encoding: gzip, deflate, br
228+
content-type: application/json
229+
x-forwarded-for: 127.0.0.1
230+
authorization: Bearer service-account-token
231+
impersonate-user: johndoe
232+
impersonate-group: dev
233+
impersonate-group: ops
234+
235+
b0
236+
{"spec":{"resourceAttributes":{"namespace":"kube-system","resource":"*","verb":"create"}},"kind":"SelfSubjectAccessReview","apiVersion":"authorization.k8s.io/v1","metadata":{}}
237+
0
238+
239+
"
240+
`;

src/__tests__/stream-impersonator.test.ts

+38-1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,43 @@ MwIDAQAB
141141
expect(destination.buffer.toString()).toContain(`foo\r\nbar`);
142142
});
143143

144+
it("transfer encoding chunked", () => {
145+
parser.boredServer = boredServer;
146+
parser.publicKey = jwtPublicKey;
147+
148+
const token = jwt.sign({
149+
exp: Math.floor(Date.now() / 1000) + (60 * 60),
150+
sub: "johndoe",
151+
groups: ["dev", "ops"],
152+
aud: [boredServer]
153+
}, jwtPrivateKey, { algorithm: "RS256" });
154+
155+
stream.pipe(parser).pipe(destination);
156+
stream.write(`GET /version HTTP/1.1\r\n`);
157+
stream.write(`Host: 127.0.0.1:53364\r\n`);
158+
stream.write(`User-Agent: node-fetch\r\n`);
159+
stream.write(`Accept: */*\r\n`);
160+
stream.write(`Accept-Encoding: gzip, deflate, br\r\n`);
161+
stream.write(`Authorization: Bearer ${token}\r\n`);
162+
stream.write(`X-Forwarded-For: 127.0.0.1\r\n\r\n`);
163+
stream.write(`POST /apis/authorization.k8s.io/v1/selfsubjectaccessreviews HTTP/1.1\r\n`);
164+
stream.write(`Host: 127.0.0.1:53364\r\n`);
165+
stream.write(`User-Agent: node-fetch\r\n`);
166+
stream.write(`Transfer-Encoding: chunked\r\n`);
167+
stream.write(`Accept: */*\r\n`);
168+
stream.write(`Accept-Encoding: gzip, deflate, br\r\n`);
169+
stream.write(`Authorization: Bearer ${token}\r\n`);
170+
stream.write(`Content-Type: application/json\r\n`);
171+
stream.write(`X-Forwarded-For: 127.0.0.1\r\n`);
172+
stream.write(`\r\n`);
173+
stream.write(`b0\r\n`);
174+
stream.write(`{"spec":{"resourceAttributes":{"namespace":"kube-system","resource":"*","verb":"create"}},"kind":"SelfSubjectAccessReview","apiVersion":"authorization.k8s.io/v1","metadata":{}}\r\n`);
175+
stream.write(`0\r\n`);
176+
stream.write(`\r\n`);
177+
178+
expect(destination.buffer.toString()).toMatchSnapshot();
179+
});
180+
144181
it ("handles newline splitted to separate chunks", async () => {
145182
parser.boredServer = boredServer;
146183
parser.publicKey = jwtPublicKey;
@@ -287,7 +324,7 @@ MwIDAQAB
287324
expect(destination.buffer.toString()).toContain("first chunk second chunk");
288325
});
289326

290-
it ("handles headers and body in same stream chunk", async () => {
327+
it("handles headers and body in same stream chunk", async () => {
291328
parser.boredServer = boredServer;
292329
parser.publicKey = jwtPublicKey;
293330

src/stream-impersonator.ts

+42-5
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@ type Headers = Array<Array<string>>;
1616

1717
type GetSaToken = () => string;
1818

19+
const endOfHeadersMarker = "\r\n\r\n";
20+
1921
export class StreamImpersonator extends Transform {
2022
public boredServer = "";
2123
public publicKey = "";
2224

25+
private headersReceived = false;
2326
private chunks: Buffer[] = [];
2427
private httpParser: HTTPParserJS;
2528
private upgrade = false;
@@ -83,17 +86,20 @@ export class StreamImpersonator extends Transform {
8386
};
8487

8588
this.httpParser.onBody = (
86-
bodyChunk: Buffer,
87-
start: number,
88-
len: number,
89+
90+
_bodyChunk: Buffer,
91+
92+
_start: number,
93+
94+
_len: number,
8995
) => {
90-
logger.trace("onBody");
91-
this.chunks.push(bodyChunk.subarray(start, start + len));
96+
//
9297
};
9398

9499
this.httpParser.onMessageComplete = () => {
95100
logger.trace("onMessageComplete");
96101
this.flushChunks();
102+
this.headersReceived = false;
97103
};
98104
}
99105

@@ -128,14 +134,45 @@ export class StreamImpersonator extends Transform {
128134
return callback();
129135
}
130136

137+
this.chunks.push(chunk);
131138
this.partialMessage.push(chunk);
132139

140+
const receivedSoFar = Buffer.concat(this.partialMessage);
141+
142+
// +endOfHeadersMarker.length to include the \r\n\r\n in the header
143+
const headerEndIndex = receivedSoFar.indexOf(endOfHeadersMarker) + endOfHeadersMarker.length;
144+
145+
// Wait for more data if headers are incomplete and not received yet
146+
if (headerEndIndex === -1 && !this.headersReceived) {
147+
return callback();
148+
}
149+
133150
const handleError = (err: Error) => {
134151
this.partialMessage = [];
135152
logger.error("[IMPERSONATOR] Error parsing HTTP data: %s", String(err));
136153
throw err;
137154
};
138155

156+
// Parse headers if not parsed yet
157+
if (!this.headersReceived) {
158+
this.headersReceived = true;
159+
160+
// Extract headers
161+
const bufferToParse = Buffer.concat(this.partialMessage).subarray(0, headerEndIndex);
162+
163+
const bytesParsed = this.httpParser.execute(bufferToParse);
164+
165+
if (bytesParsed instanceof Error) {
166+
return handleError(bytesParsed);
167+
}
168+
169+
// Remove parsed headers from the buffer
170+
this.partialMessage = removeBytesFromBuffersHead(this.partialMessage, bytesParsed);
171+
172+
// onHeadersComplete set this.chunks to [], we set the rest of the bytes after the header back
173+
this.chunks = [Buffer.concat(this.partialMessage)];
174+
}
175+
139176
try {
140177
const bufferToParse = Buffer.concat(this.partialMessage);
141178
const bytesParsed = this.httpParser.execute(bufferToParse);

0 commit comments

Comments
 (0)