Skip to content

fix(event-stream): handle initial response 🚧 #1625

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import { Decoder, Encoder, EventStreamMarshaller as IEventStreamMarshaller, Mess
import { getChunkedStream } from "./getChunkedStream";
import { getMessageUnmarshaller } from "./getUnmarshalledStream";

/**
* @internal
*/
export interface EventStreamMarshaller extends IEventStreamMarshaller {}

/**
* @internal
*/
Expand All @@ -27,7 +22,7 @@ export interface EventStreamMarshallerOptions {
/**
* @internal
*/
export class EventStreamMarshaller {
export class EventStreamMarshaller implements IEventStreamMarshaller {
private readonly eventStreamCodec: EventStreamCodec;
private readonly utfEncoder: Encoder;

Expand All @@ -41,11 +36,9 @@ export class EventStreamMarshaller {
deserializer: (input: Record<string, Message>) => Promise<T>
): AsyncIterable<T> {
const inputStream = getChunkedStream(body);
// @ts-expect-error Type 'SmithyMessageDecoderStream<Record<string, any>>' is not assignable to type 'AsyncIterable<T>'
return new SmithyMessageDecoderStream({
return new SmithyMessageDecoderStream<T>({
messageStream: new MessageDecoderStream({ inputStream, decoder: this.eventStreamCodec }),
// @ts-expect-error Type 'T' is not assignable to type 'Record<string, any>'
deserializer: getMessageUnmarshaller(deserializer, this.utfEncoder),
deserializer: getMessageUnmarshaller<any>(deserializer, this.utfEncoder),
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ export function getMessageUnmarshaller<T extends Record<string, any>>(
const event = {
[message.headers[":event-type"].value as string]: message,
};
const deserialized = await deserializer(event);
if (deserialized.$unknown) return;
return deserialized;
return deserializer(event);
} else {
throw Error(`Unrecognizable event type: ${message.headers[":event-type"].value}`);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Peeks the first frame of the async iterable and writes the values into
* the container if it is an initial-response event.
*
* @internal
*
* @param container - write destination for initial-response.
* @param responseIterable - the response event stream.
*/
export async function writeResponse<T>(
container: Record<string, any>,
responseIterable: AsyncIterable<T>
): Promise<AsyncIterable<T>> {
const asyncIterator = responseIterable[Symbol.asyncIterator]();
// todo: handle empty iterator or timeout.
const firstFrame = await asyncIterator.next();
if (firstFrame.value.$unknown?.["initial-response"]) {
console.log("assigned initial response into container", {
initialResponse: firstFrame.value.$unknown["initial-response"],
});
Object.assign(container, firstFrame.value.$unknown["initial-response"]);
return {
[Symbol.asyncIterator]: () => ({
next: asyncIterator.next.bind(asyncIterator),
}),
};
}
return responseIterable;
}

/**
* @internal
*/
export async function writeRequest<T>() {
throw new Error("not implemented");
}
2 changes: 1 addition & 1 deletion packages/types/src/eventStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export interface EventStreamMarshallerSerFn<StreamType> {
* @public
*
* An interface which provides functions for serializing and deserializing binary event stream
* to/from corresponsing modeled shape.
* to/from corresponding modeled shape.
*/
export interface EventStreamMarshaller<StreamType = any> {
deserialize: EventStreamMarshallerDeserFn<StreamType>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import software.amazon.smithy.utils.SmithyUnstableApi;

/**
* Evnetstream code generator.
* Event stream code generator.
*/
@SmithyUnstableApi
public class EventStreamGenerator {
Expand Down
Loading