Skip to content

Commit d488bb2

Browse files
teamonhodak
andauthored
Request/Response streaming for Finch adapter, SSE middleware (#540)
* Request/Response streaming for Finch adapter * [Finch] Change error handling, fix for response stream (#573) --------- Co-authored-by: Adam Hodowany <[email protected]>
1 parent 6954617 commit d488bb2

File tree

10 files changed

+430
-19
lines changed

10 files changed

+430
-19
lines changed

README.md

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ Tesla is an HTTP client loosely based on [Faraday](https://github.com/lostisland
1212
It embraces the concept of middleware when processing the request/response cycle.
1313

1414
> Note that this README refers to the `master` branch of Tesla, not the latest
15-
released version on Hex. See [the documentation](https://hexdocs.pm/tesla) for
16-
the documentation of the version you're using.
15+
> released version on Hex. See [the documentation](https://hexdocs.pm/tesla) for
16+
> the documentation of the version you're using.
1717
1818
For the list of changes, checkout the latest [release notes](https://github.com/teamon/tesla/releases).
1919

@@ -83,8 +83,8 @@ config :tesla, adapter: Tesla.Adapter.Hackney
8383
```
8484

8585
> The default adapter is erlang's built-in `httpc`, but it is not recommended
86-
to use it in production environment as it does not validate SSL certificates
87-
[among other issues](https://github.com/teamon/tesla/issues?utf8=%E2%9C%93&q=is%3Aissue+label%3Ahttpc+).
86+
> to use it in production environment as it does not validate SSL certificates
87+
> [among other issues](https://github.com/teamon/tesla/issues?utf8=%E2%9C%93&q=is%3Aissue+label%3Ahttpc+).
8888
8989
## Documentation
9090

@@ -243,7 +243,11 @@ Tesla.get(client, "/", opts: [adapter: [recv_timeout: 30_000]])
243243

244244
## Streaming
245245

246-
If adapter supports it, you can pass a [Stream](https://hexdocs.pm/elixir/main/Stream.html) as body, e.g.:
246+
### Streaming Request Body
247+
248+
If adapter supports it, you can pass a
249+
[Stream](https://hexdocs.pm/elixir/main/Stream.html) as request
250+
body, e.g.:
247251

248252
```elixir
249253
defmodule ElasticSearch do
@@ -259,7 +263,41 @@ defmodule ElasticSearch do
259263
end
260264
```
261265

262-
Each piece of stream will be encoded as JSON and sent as a new line (conforming to JSON stream format).
266+
Each piece of stream will be encoded as JSON and sent as a new line (conforming
267+
to JSON stream format).
268+
269+
### Streaming Response Body
270+
271+
If adapter supports it, you can pass a `response: :stream` option to return
272+
response body as a
273+
[Stream](https://elixir-lang.org/docs/stable/elixir/Stream.html)
274+
275+
```elixir
276+
defmodule OpenAI do
277+
def new(token) do
278+
middleware = [
279+
{Tesla.Middleware.BaseUrl, "https://api.openai.com/v1"},
280+
{Tesla.Middleware.BearerAuth, token: token},
281+
{Tesla.Middleware.JSON, decode_content_types: ["text/event-stream"]},
282+
{Tesla.Middleware.SSE, only: :data}
283+
]
284+
Tesla.client(middleware, {Tesla.Adapter.Finch, name: MyFinch})
285+
end
286+
287+
def completion(client, prompt) do
288+
data = %{
289+
model: "gpt-3.5-turbo",
290+
messages: [%{role: "user", content: prompt}],
291+
stream: true
292+
}
293+
Tesla.post(client, "/chat/completions", data, opts: [adapter: [response: :stream]])
294+
end
295+
end
296+
client = OpenAI.new("<token>")
297+
{:ok, env} = OpenAI.completion(client, "What is the meaning of life?")
298+
env.body
299+
|> Stream.each(fn chunk -> IO.inspect(chunk) end)
300+
```
263301

264302
## Multipart
265303

@@ -476,6 +514,7 @@ use Tesla, except: [:delete, :options]
476514
```elixir
477515
use Tesla, docs: false
478516
```
517+
479518
### Encode only JSON request (do not decode response)
480519

481520
```elixir

lib/tesla/adapter/finch.ex

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,37 +52,99 @@ if Code.ensure_loaded?(Finch) do
5252
@behaviour Tesla.Adapter
5353
alias Tesla.Multipart
5454

55+
@defaults [
56+
receive_timeout: 15_000
57+
]
58+
5559
@impl Tesla.Adapter
5660
def call(%Tesla.Env{} = env, opts) do
57-
opts = Tesla.Adapter.opts(env, opts)
61+
opts = Tesla.Adapter.opts(@defaults, env, opts)
5862

5963
name = Keyword.fetch!(opts, :name)
6064
url = Tesla.build_url(env.url, env.query)
6165
req_opts = Keyword.take(opts, [:pool_timeout, :receive_timeout])
66+
req = build(env.method, url, env.headers, env.body)
6267

63-
case request(name, env.method, url, env.headers, env.body, req_opts) do
68+
case request(req, name, req_opts, opts) do
6469
{:ok, %Finch.Response{status: status, headers: headers, body: body}} ->
6570
{:ok, %Tesla.Env{env | status: status, headers: headers, body: body}}
6671

67-
{:error, mint_error} ->
68-
{:error, Exception.message(mint_error)}
72+
{:error, %Mint.TransportError{reason: reason}} ->
73+
{:error, reason}
74+
75+
{:error, reason} ->
76+
{:error, reason}
6977
end
7078
end
7179

72-
defp request(name, method, url, headers, %Multipart{} = mp, opts) do
80+
defp build(method, url, headers, %Multipart{} = mp) do
7381
headers = headers ++ Multipart.headers(mp)
7482
body = Multipart.body(mp) |> Enum.to_list()
7583

76-
request(name, method, url, headers, body, opts)
84+
build(method, url, headers, body)
7785
end
7886

79-
defp request(_name, _method, _url, _headers, %Stream{}, _opts) do
80-
raise "Streaming is not supported by this adapter!"
87+
defp build(method, url, headers, %Stream{} = body_stream) do
88+
build(method, url, headers, {:stream, body_stream})
8189
end
8290

83-
defp request(name, method, url, headers, body, opts) do
91+
defp build(method, url, headers, body_stream_fun) when is_function(body_stream_fun) do
92+
build(method, url, headers, {:stream, body_stream_fun})
93+
end
94+
95+
defp build(method, url, headers, body) do
8496
Finch.build(method, url, headers, body)
85-
|> Finch.request(name, opts)
97+
end
98+
99+
defp request(req, name, req_opts, opts) do
100+
case opts[:response] do
101+
:stream -> stream(req, name, req_opts)
102+
nil -> Finch.request(req, name, req_opts)
103+
other -> raise "Unknown response option: #{inspect(other)}"
104+
end
105+
end
106+
107+
defp stream(req, name, opts) do
108+
owner = self()
109+
ref = make_ref()
110+
111+
fun = fn
112+
{:status, status}, _acc -> status
113+
{:headers, headers}, status -> send(owner, {ref, {:status, status, headers}})
114+
{:data, data}, _acc -> send(owner, {ref, {:data, data}})
115+
end
116+
117+
task =
118+
Task.async(fn ->
119+
case Finch.stream(req, name, nil, fun, opts) do
120+
{:ok, _acc} -> send(owner, {ref, :eof})
121+
{:error, error} -> send(owner, {ref, {:error, error}})
122+
end
123+
end)
124+
125+
receive do
126+
{^ref, {:status, status, headers}} ->
127+
body =
128+
Stream.unfold(nil, fn _ ->
129+
receive do
130+
{^ref, {:data, data}} ->
131+
{data, nil}
132+
133+
{^ref, :eof} ->
134+
Task.await(task)
135+
nil
136+
after
137+
opts[:receive_timeout] ->
138+
Task.shutdown(task, :brutal_kill)
139+
nil
140+
end
141+
end)
142+
143+
{:ok, %Finch.Response{status: status, headers: headers, body: body}}
144+
after
145+
opts[:receive_timeout] ->
146+
{:error, :timeout}
147+
end
86148
end
87149
end
88150
end

lib/tesla/middleware/json.ex

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,18 @@ defmodule Tesla.Middleware.JSON do
113113
end
114114
end
115115

116+
defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body),
117+
do: {:ok, decode_stream(body, opts)}
118+
116119
defp decode_body(body, opts), do: process(body, :decode, opts)
117120

118121
defp decodable?(env, opts), do: decodable_body?(env) && decodable_content_type?(env, opts)
119122

120123
defp decodable_body?(env) do
121-
(is_binary(env.body) && env.body != "") || (is_list(env.body) && env.body != [])
124+
(is_binary(env.body) && env.body != "") ||
125+
(is_list(env.body) && env.body != []) ||
126+
is_function(env.body) ||
127+
is_struct(env.body, Stream)
122128
end
123129

124130
defp decodable_content_type?(env, opts) do
@@ -128,6 +134,15 @@ defmodule Tesla.Middleware.JSON do
128134
end
129135
end
130136

137+
defp decode_stream(body, opts) do
138+
Stream.map(body, fn chunk ->
139+
case decode_body(chunk, opts) do
140+
{:ok, item} -> item
141+
_ -> chunk
142+
end
143+
end)
144+
end
145+
131146
defp content_types(opts),
132147
do: @default_content_types ++ Keyword.get(opts, :decode_content_types, [])
133148

lib/tesla/middleware/sse.ex

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
defmodule Tesla.Middleware.SSE do
2+
@moduledoc """
3+
Decode Server Sent Events.
4+
5+
This middleware is mostly useful when streaming response body.
6+
7+
## Examples
8+
9+
```
10+
plug Tesla.Middleware.SSE, only: :data
11+
12+
```
13+
14+
## Options
15+
16+
- `:only` - keep only specified keys in event (necessary for using with `JSON` middleware)
17+
- `:decode_content_types` - list of additional decodable content-types
18+
"""
19+
20+
@behaviour Tesla.Middleware
21+
22+
@default_content_types ["text/event-stream"]
23+
24+
@impl Tesla.Middleware
25+
def call(env, next, opts) do
26+
opts = opts || []
27+
28+
with {:ok, env} <- Tesla.run(env, next) do
29+
decode(env, opts)
30+
end
31+
end
32+
33+
def decode(env, opts) do
34+
if decodable_content_type?(env, opts) do
35+
{:ok, %{env | body: decode_body(env.body, opts)}}
36+
else
37+
{:ok, env}
38+
end
39+
end
40+
41+
defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body) do
42+
body
43+
|> Stream.chunk_while(
44+
"",
45+
fn elem, acc ->
46+
{lines, [rest]} = (acc <> elem) |> String.split("\n\n") |> Enum.split(-1)
47+
{:cont, lines, rest}
48+
end,
49+
fn
50+
"" -> {:cont, ""}
51+
acc -> {:cont, acc, ""}
52+
end
53+
)
54+
|> Stream.flat_map(& &1)
55+
|> Stream.map(&decode_message/1)
56+
|> Stream.flat_map(&only(&1, opts[:only]))
57+
end
58+
59+
defp decode_body(binary, opts) when is_binary(binary) do
60+
binary
61+
|> String.split("\n\n")
62+
|> Enum.map(&decode_message/1)
63+
|> Enum.flat_map(&only(&1, opts[:only]))
64+
end
65+
66+
defp decode_message(message) do
67+
message
68+
|> String.split("\n")
69+
|> Enum.map(&decode_body/1)
70+
|> Enum.reduce(%{}, fn
71+
:empty, acc -> acc
72+
{:data, data}, acc -> Map.update(acc, :data, data, &(&1 <> "\n" <> data))
73+
{key, value}, acc -> Map.put_new(acc, key, value)
74+
end)
75+
end
76+
77+
defp decode_body(": " <> comment), do: {:comment, comment}
78+
defp decode_body("data: " <> data), do: {:data, data}
79+
defp decode_body("event: " <> event), do: {:event, event}
80+
defp decode_body("id: " <> id), do: {:id, id}
81+
defp decode_body("retry: " <> retry), do: {:retry, retry}
82+
defp decode_body(""), do: :empty
83+
84+
defp decodable_content_type?(env, opts) do
85+
case Tesla.get_header(env, "content-type") do
86+
nil -> false
87+
content_type -> Enum.any?(content_types(opts), &String.starts_with?(content_type, &1))
88+
end
89+
end
90+
91+
defp content_types(opts),
92+
do: @default_content_types ++ Keyword.get(opts, :decode_content_types, [])
93+
94+
defp only(message, nil), do: [message]
95+
96+
defp only(message, key) do
97+
case Map.get(message, key) do
98+
nil -> []
99+
val -> [val]
100+
end
101+
end
102+
end

mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ defmodule Tesla.Mixfile do
135135
Tesla.Middleware.PathParams,
136136
Tesla.Middleware.Query,
137137
Tesla.Middleware.Retry,
138+
Tesla.Middleware.SSE,
138139
Tesla.Middleware.Telemetry,
139140
Tesla.Middleware.Timeout
140141
]

test/support/adapter_case/stream_request_body.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule Tesla.AdapterCase.StreamRequestBody do
33
quote do
44
alias Tesla.Env
55

6-
describe "Stream" do
6+
describe "Stream Request" do
77
test "stream request body: Stream.map" do
88
request = %Env{
99
method: :post,
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
defmodule Tesla.AdapterCase.StreamResponseBody do
2+
defmacro __using__(_) do
3+
quote do
4+
alias Tesla.Env
5+
6+
describe "Stream Response" do
7+
test "stream response body" do
8+
request = %Env{
9+
method: :get,
10+
url: "#{@http}/stream/20"
11+
}
12+
13+
assert {:ok, %Env{} = response} = call(request, response: :stream)
14+
assert response.status == 200
15+
assert is_function(response.body) || response.body.__struct__ == Stream
16+
17+
body = Enum.to_list(response.body)
18+
assert Enum.count(body) == 20
19+
end
20+
end
21+
end
22+
end
23+
end

0 commit comments

Comments
 (0)