Skip to content

Commit 0da7b83

Browse files
authored
Merge pull request #316 from MacPaw/315-streaming-session-is-ended-before-any-output
Rewrite the parser by more closely following spec
2 parents b1c34ae + e3b02de commit 0da7b83

6 files changed

+420
-53
lines changed

Sources/OpenAI/Private/Streaming/ServerSentEventsStreamInterpreter.swift

+18-40
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,26 @@ import Foundation
1212
///
1313
/// - Note: This class is NOT thread safe. It is a caller's responsibility to call all the methods in a thread-safe manner.
1414
final class ServerSentEventsStreamInterpreter <ResultType: Codable & Sendable>: @unchecked Sendable, StreamInterpreter {
15+
private let parser = ServerSentEventsStreamParser()
1516
private let streamingCompletionMarker = "[DONE]"
1617
private var previousChunkBuffer = ""
1718

1819
private var onEventDispatched: ((ResultType) -> Void)?
1920
private var onError: ((Error) -> Void)?
2021
private let parsingOptions: ParsingOptions
2122

23+
enum InterpeterError: Error {
24+
case unhandledStreamEventType(String)
25+
}
26+
2227
init(parsingOptions: ParsingOptions) {
2328
self.parsingOptions = parsingOptions
29+
30+
parser.setCallbackClosures { [weak self] event in
31+
self?.processEvent(event)
32+
} onError: { [weak self] error in
33+
self?.onError?(error)
34+
}
2435
}
2536

2637
/// Sets closures an instance of type. Not thread safe.
@@ -41,46 +52,13 @@ final class ServerSentEventsStreamInterpreter <ResultType: Codable & Sendable>:
4152
return
4253
}
4354

44-
guard let stringContent = String(data: data, encoding: .utf8) else {
45-
onError?(StreamingError.unknownContent)
46-
return
47-
}
48-
49-
self.processJSON(from: stringContent)
55+
parser.processData(data: data)
5056
}
5157

52-
private func processJSON(from stringContent: String) {
53-
if stringContent.isEmpty {
54-
return
55-
}
56-
57-
let fullChunk = "\(previousChunkBuffer)\(stringContent)"
58-
let chunkLines = fullChunk
59-
.components(separatedBy: .newlines)
60-
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
61-
.filter { $0.isEmpty == false }
62-
63-
var jsonObjects: [String] = []
64-
for line in chunkLines {
65-
66-
// Skip comments
67-
if line.starts(with: ":") { continue }
68-
69-
// Get JSON object
70-
let jsonData = line
71-
.components(separatedBy: "data:")
72-
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
73-
.filter { $0.isEmpty == false }
74-
jsonObjects.append(contentsOf: jsonData)
75-
}
76-
77-
previousChunkBuffer = ""
78-
79-
guard jsonObjects.isEmpty == false, jsonObjects.first != streamingCompletionMarker else {
80-
return
81-
}
82-
83-
jsonObjects.enumerated().forEach { (index, jsonContent) in
58+
private func processEvent(_ event: ServerSentEventsStreamParser.Event) {
59+
switch event.eventType {
60+
case "message":
61+
let jsonContent = event.decodedData
8462
guard jsonContent != streamingCompletionMarker && !jsonContent.isEmpty else {
8563
return
8664
}
@@ -97,12 +75,12 @@ final class ServerSentEventsStreamInterpreter <ResultType: Codable & Sendable>:
9775
if let decoded = JSONResponseErrorDecoder(decoder: decoder).decodeErrorResponse(data: jsonData) {
9876
onError?(decoded)
9977
return
100-
} else if index == jsonObjects.count - 1 {
101-
previousChunkBuffer = "data: \(jsonContent)" // Chunk ends in a partial JSON
10278
} else {
10379
onError?(error)
10480
}
10581
}
82+
default:
83+
onError?(InterpeterError.unhandledStreamEventType(event.eventType))
10684
}
10785
}
10886
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
//
2+
// ServerSentEventsStreamInterpreter.swift
3+
// OpenAI
4+
//
5+
// Created by Oleksii Nezhyborets on 11.03.2025.
6+
//
7+
8+
import Foundation
9+
10+
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
11+
/// 9.2.6 Interpreting an event stream
12+
final class ServerSentEventsStreamParser: @unchecked Sendable {
13+
private var onEventDispatched: ((Event) -> Void)?
14+
private var onError: ((Error) -> Void)?
15+
16+
private var isFirstChunk = true
17+
private var dataBuffer: Data = .init()
18+
private var eventTypeBuffer: String = .init()
19+
private var retry: Int?
20+
private var lastEventIdBuffer: String?
21+
private var incompleteLine: Data?
22+
23+
private let cr: UInt8 = 0x0D // carriage return
24+
private let lf: UInt8 = 0x0A // line feed
25+
private let colon: UInt8 = 0x3A
26+
private let space: UInt8 = 0x20
27+
28+
struct Event {
29+
let id: String?
30+
let data: Data
31+
let decodedData: String
32+
let eventType: String
33+
let retry: Int?
34+
}
35+
36+
enum ParsingError: Error {
37+
case decodingFieldNameFailed(fieldNameData: Data, lineData: Data)
38+
case decodingFieldValueFailed(valueData: Data, lineData: Data)
39+
case retryIsNotImplemented
40+
}
41+
42+
/// Sets closures an instance of type in a thread safe manner
43+
///
44+
/// - Parameters:
45+
/// - onEventDispatched: Can be called multiple times per `processData`
46+
/// - onError: Will only be called once per `processData`
47+
func setCallbackClosures(onEventDispatched: @escaping @Sendable (Event) -> Void, onError: @escaping @Sendable (Error) -> Void) {
48+
self.onEventDispatched = onEventDispatched
49+
self.onError = onError
50+
}
51+
52+
func processData(data: Data) {
53+
var chunk = data
54+
55+
/// The [UTF-8 decode](https://encoding.spec.whatwg.org/#utf-8-decode) algorithm strips one leading UTF-8 Byte Order Mark (BOM), if any.
56+
if isFirstChunk && data.starts(with: [0xEF, 0xBB, 0xBF]) {
57+
chunk = data.advanced(by: 3)
58+
}
59+
60+
if let incompleteLine {
61+
chunk = incompleteLine + chunk
62+
}
63+
64+
let (lines, incompleteLine) = lines(fromStream: chunk)
65+
66+
for line in lines {
67+
do {
68+
try parseLine(lineData: line)
69+
} catch {
70+
onError?(error)
71+
}
72+
}
73+
74+
self.incompleteLine = incompleteLine
75+
isFirstChunk = false
76+
}
77+
78+
private func lines(fromStream streamData: Data) -> (complete: [Data], incomplete: Data?) {
79+
guard !streamData.isEmpty else {
80+
return ([], nil)
81+
}
82+
83+
// The stream must then be parsed by reading everything line by line, with a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character not preceded by a U+000D CARRIAGE RETURN (CR) character, and a single U+000D CARRIAGE RETURN (CR) character not followed by a U+000A LINE FEED (LF) character being the ways in which a line can end.
84+
var previousCharacter: UInt8 = 0
85+
var lineBeginningIndex = 0
86+
var lines: [Data] = []
87+
88+
for i in 0..<streamData.count {
89+
let currentChar = streamData[i]
90+
if currentChar == lf {
91+
// The description above basically says that "if the char is LF - it's end if line, regardless what precedes or follows it
92+
lines.append(streamData.subdata(in: lineBeginningIndex..<i))
93+
lineBeginningIndex = i + 1
94+
} else if currentChar == cr {
95+
// The description above basically says that "CR is not end of line only if followed by LF"
96+
// So we skip CR to make sure that the next one is not LF
97+
} else if previousCharacter == cr {
98+
// The char is not CR or LF, but the previous char was CR, so it was the end of line
99+
lines.append(streamData.subdata(in: lineBeginningIndex..<i-1))
100+
lineBeginningIndex = i
101+
} else {
102+
// The char is not CR or LF, and the previous one is not either
103+
// Simply skipping
104+
}
105+
106+
previousCharacter = 0
107+
}
108+
109+
if lineBeginningIndex < streamData.count - 1 {
110+
return (lines, streamData.subdata(in: lineBeginningIndex..<streamData.count))
111+
} else {
112+
return (lines, nil)
113+
}
114+
}
115+
116+
private func parseLine(lineData: Data) throws {
117+
// If the line is empty (a blank line)
118+
// - Dispatch the event
119+
if lineData.isEmpty {
120+
dispatchEvent()
121+
return
122+
}
123+
124+
// If the line starts with a U+003A COLON character (:)
125+
// - Ignore the line.
126+
if lineData.first == colon {
127+
return
128+
}
129+
130+
// Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
131+
//
132+
// Collect the characters on the line after the first U+003A COLON character (:), and let value be that string.
133+
// If value starts with a U+0020 SPACE character, remove it from value.
134+
//
135+
// Process the field using the steps described below, using field as the field name and value as the field value.
136+
let fieldNameData = lineData.prefix(while: { $0 != colon })
137+
138+
guard let fieldName = String(data: fieldNameData, encoding: .utf8) else {
139+
throw ParsingError.decodingFieldNameFailed(fieldNameData: fieldNameData, lineData: lineData)
140+
}
141+
142+
// If the line contains a U+003A COLON character (:)
143+
if fieldNameData.count != lineData.count {
144+
// Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
145+
//
146+
// Collect the characters on the line after the first U+003A COLON character (:), and let value be that string.
147+
// If value starts with a U+0020 SPACE character, remove it from value.
148+
//
149+
// Process the field using field as the field name and value as the field value.
150+
151+
// Collect the characters on the line after the first U+003A COLON character (:), and let value be that string.
152+
var valueData = lineData.suffix(from: fieldNameData.count + 1)
153+
// If value starts with a U+0020 SPACE character, remove it from value.
154+
if valueData.first == space {
155+
valueData.removeFirst()
156+
}
157+
158+
try processField(name: fieldName, valueData: valueData, lineData: lineData)
159+
} else {
160+
// Otherwise, the string is not empty but does not contain a U+003A COLON character (:)
161+
//
162+
// Process the field using the whole line as the field name, and the empty string as the field value.
163+
try processField(name: fieldName, valueData: .init(), lineData: lineData)
164+
}
165+
}
166+
167+
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
168+
///
169+
/// When the user agent is required to dispatch the event, the user agent must process the data buffer, the event type buffer, and the last event ID buffer using steps appropriate for the user agent.
170+
///
171+
/// The link above has a defined list of step for Web Browsers. For other user agents (like this one), it says:
172+
/// "the appropriate steps to dispatch the event are implementation dependent, but at a minimum they must set the data and event type buffers to the empty string before returning."
173+
///
174+
/// We would partially use that list for the implementation
175+
private func dispatchEvent() {
176+
// 1. Set the last event ID string of the event source to the value of the last event ID buffer. The buffer does not get reset, so the last event ID string of the event source remains set to this value until the next time it is set by the server.
177+
// Current Implementation Note: As don't have EventSource, the only thing we take is the ID buffer should not get reset
178+
179+
// 2. If the data buffer is an empty string, set the data buffer and the event type buffer to the empty string and return.
180+
if dataBuffer.isEmpty {
181+
eventTypeBuffer = .init()
182+
retry = nil
183+
return
184+
}
185+
186+
// 3. If the data buffer's last character is a U+000A LINE FEED (LF) character, then remove the last character from the data buffer.
187+
if dataBuffer.last == lf {
188+
dataBuffer.removeLast()
189+
}
190+
191+
// 4. Let event be the result of creating an event using MessageEvent, in the relevant realm of the EventSource object.
192+
// Current Implementation Note: we ignore this point as we don't have said MessageEvent and EventSource types
193+
194+
// 5. Initialize event's type attribute to "message", its data attribute to data, its origin attribute to the serialization of the origin of the event stream's final URL (i.e., the URL after redirects), and its lastEventId attribute to the last event ID string of the event source.
195+
// Current Implementation Notes:
196+
// - we'll use "message" for event type if not specified otherwise
197+
// - we ignore origin for now, will implement later if needed
198+
// - we don't have a notion of event source, so we'll just put id into the event itself
199+
200+
// 6. If the event type buffer has a value other than the empty string, change the type of the newly created event to equal the value of the event type buffer.
201+
let event = Event(
202+
id: lastEventIdBuffer,
203+
data: dataBuffer,
204+
decodedData: .init(data: dataBuffer, encoding: .utf8) ?? "",
205+
eventType: eventTypeBuffer.isEmpty ? "message" : eventTypeBuffer,
206+
retry: retry
207+
)
208+
209+
// 7. Set the data buffer and the event type buffer to the empty string.
210+
dataBuffer = .init()
211+
eventTypeBuffer = .init()
212+
retry = nil
213+
214+
// 8. Queue a task which, if the readyState attribute is set to a value other than CLOSED, dispatches the newly created event at the EventSource object.
215+
// Current Implementation Note: we don't have said states at the moment, so we'll just dispatch the event
216+
onEventDispatched?(event)
217+
}
218+
219+
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#processField
220+
///
221+
/// The steps to process the field given a field name and a field value depend on the field name, as given in the following list. Field names must be compared literally, with no case folding performed.
222+
private func processField(name: String, valueData: Data, lineData: Data) throws {
223+
switch name {
224+
case "event":
225+
/// **If the field name is "event"**
226+
/// Set the event type buffer to field value.
227+
let valueString = try value(fromData: valueData, lineData: lineData)
228+
eventTypeBuffer = valueString
229+
case "data":
230+
/// **If the field name is "data"**
231+
/// Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer.
232+
dataBuffer += valueData
233+
dataBuffer += Data([lf])
234+
case "id":
235+
/// **If the field name is "id"
236+
///
237+
/// If the field value does not contain U+0000 NULL...
238+
if valueData.first(where: { $0 == 0x00 }) == nil {
239+
// ...set the last event ID buffer to the field value.
240+
let valueString = try value(fromData: valueData, lineData: lineData)
241+
lastEventIdBuffer = valueString
242+
} else {
243+
// Otherwise, ignore the field.
244+
}
245+
case "retry":
246+
/// **If the field value consists of only ASCII digits...**
247+
if valueData.allSatisfy({ $0 >= 0x30 && $0 <= 0x39 }) {
248+
// ...interpret the field value as an integer in base ten, and set the event stream's reconnection time to that integer.
249+
let valueString = try value(fromData: valueData, lineData: lineData)
250+
if let valueInt = Int(valueString) {
251+
retry = valueInt
252+
}
253+
} else {
254+
// Otherwise, ignore the field.
255+
}
256+
default:
257+
/// **Otherwise**
258+
///
259+
/// The field is ignored.
260+
break
261+
}
262+
}
263+
264+
private func value(fromData valueData: Data, lineData: Data) throws -> String {
265+
guard let valueString = String(data: valueData, encoding: .utf8) else {
266+
throw ParsingError.decodingFieldValueFailed(valueData: valueData, lineData: lineData)
267+
}
268+
269+
return valueString
270+
}
271+
}

Sources/OpenAI/Public/Utilities/ParsingOptions.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import Foundation
99

10-
public struct ParsingOptions: OptionSet {
10+
public struct ParsingOptions: OptionSet, Sendable {
1111
public let rawValue: Int
1212

1313
public init(rawValue: Int) {

0 commit comments

Comments
 (0)