@@ -121,7 +121,10 @@ impl<C: SseClient> SseClientTransport<C> {
121
121
122
122
let mut sse_stream = client. get_stream ( sse_endpoint. clone ( ) , None , None ) . await ?;
123
123
let message_endpoint = if let Some ( endpoint) = config. use_message_endpoint . clone ( ) {
124
- endpoint. parse :: < http:: Uri > ( ) ?
124
+ let ep = endpoint. parse :: < http:: Uri > ( ) ?;
125
+ let mut sse_endpoint_parts = sse_endpoint. clone ( ) . into_parts ( ) ;
126
+ sse_endpoint_parts. path_and_query = ep. into_parts ( ) . path_and_query ;
127
+ Uri :: from_parts ( sse_endpoint_parts) ?
125
128
} else {
126
129
// wait the endpoint event
127
130
loop {
@@ -132,17 +135,12 @@ impl<C: SseClient> SseClientTransport<C> {
132
135
let Some ( "endpoint" ) = sse. event . as_deref ( ) else {
133
136
continue ;
134
137
} ;
135
- let sse_endpoint = sse. data . unwrap_or_default ( ) ;
136
- break sse_endpoint. parse :: < http:: Uri > ( ) ?;
138
+ let ep = sse. data . unwrap_or_default ( ) ;
139
+
140
+ break message_endpoint ( sse_endpoint. clone ( ) , ep) ?;
137
141
}
138
142
} ;
139
143
140
- // sse: <authority><sse_pq> -> <authority><message_pq>
141
- let message_endpoint = {
142
- let mut sse_endpoint_parts = sse_endpoint. clone ( ) . into_parts ( ) ;
143
- sse_endpoint_parts. path_and_query = message_endpoint. into_parts ( ) . path_and_query ;
144
- Uri :: from_parts ( sse_endpoint_parts) ?
145
- } ;
146
144
let stream = Box :: pin ( SseAutoReconnectStream :: new (
147
145
sse_stream,
148
146
SseClientReconnect {
@@ -160,6 +158,36 @@ impl<C: SseClient> SseClientTransport<C> {
160
158
}
161
159
}
162
160
161
+ fn message_endpoint ( base : http:: Uri , endpoint : String ) -> Result < http:: Uri , http:: uri:: InvalidUri > {
162
+ // If endpoint is a full URL, parse and return it directly
163
+ if endpoint. starts_with ( "http://" ) || endpoint. starts_with ( "https://" ) {
164
+ return endpoint. parse :: < http:: Uri > ( ) ;
165
+ }
166
+
167
+ let mut base_parts = base. into_parts ( ) ;
168
+ let endpoint_clone = endpoint. clone ( ) ;
169
+
170
+ if endpoint. starts_with ( "?" ) {
171
+ // Query only - keep base path and append query
172
+ if let Some ( base_path_and_query) = & base_parts. path_and_query {
173
+ let base_path = base_path_and_query. path ( ) ;
174
+ base_parts. path_and_query = Some ( format ! ( "{}{}" , base_path, endpoint) . parse ( ) ?) ;
175
+ } else {
176
+ base_parts. path_and_query = Some ( format ! ( "/{}" , endpoint) . parse ( ) ?) ;
177
+ }
178
+ } else {
179
+ // Path (with optional query) - replace entire path_and_query
180
+ let path_to_use = if endpoint. starts_with ( "/" ) {
181
+ endpoint // Use absolute path as-is
182
+ } else {
183
+ format ! ( "/{}" , endpoint) // Make relative path absolute
184
+ } ;
185
+ base_parts. path_and_query = Some ( path_to_use. parse ( ) ?) ;
186
+ }
187
+
188
+ http:: Uri :: from_parts ( base_parts) . map_err ( |_| endpoint_clone. parse :: < http:: Uri > ( ) . unwrap_err ( ) )
189
+ }
190
+
163
191
#[ derive( Debug , Clone ) ]
164
192
pub struct SseClientConfig {
165
193
/// client sse endpoint
@@ -188,3 +216,33 @@ impl Default for SseClientConfig {
188
216
}
189
217
}
190
218
}
219
+
220
+ #[ cfg( test) ]
221
+ mod tests {
222
+ use super :: * ;
223
+
224
+ #[ test]
225
+ fn test_message_endpoint ( ) {
226
+ let base_url = "https://localhost/sse" . parse :: < http:: Uri > ( ) . unwrap ( ) ;
227
+
228
+ // Query only
229
+ let result = message_endpoint ( base_url. clone ( ) , "?sessionId=x" . to_string ( ) ) . unwrap ( ) ;
230
+ assert_eq ! ( result. to_string( ) , "https://localhost/sse?sessionId=x" ) ;
231
+
232
+ // Relative path with query
233
+ let result = message_endpoint ( base_url. clone ( ) , "mypath?sessionId=x" . to_string ( ) ) . unwrap ( ) ;
234
+ assert_eq ! ( result. to_string( ) , "https://localhost/mypath?sessionId=x" ) ;
235
+
236
+ // Absolute path with query
237
+ let result = message_endpoint ( base_url. clone ( ) , "/xxx?sessionId=x" . to_string ( ) ) . unwrap ( ) ;
238
+ assert_eq ! ( result. to_string( ) , "https://localhost/xxx?sessionId=x" ) ;
239
+
240
+ // Full URL
241
+ let result = message_endpoint (
242
+ base_url. clone ( ) ,
243
+ "http://example.com/xxx?sessionId=x" . to_string ( ) ,
244
+ )
245
+ . unwrap ( ) ;
246
+ assert_eq ! ( result. to_string( ) , "http://example.com/xxx?sessionId=x" ) ;
247
+ }
248
+ }
0 commit comments