Skip to content

Commit 56b2b20

Browse files
committed
[feat] introduce data stream client
Signed-off-by: zzzk1 <[email protected]>
1 parent a4d5c1c commit 56b2b20

File tree

5 files changed

+152
-3
lines changed

5 files changed

+152
-3
lines changed

pkg/es/client/datastream_client.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) 2025 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package client
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"net/http"
10+
)
11+
12+
var _ DataStreamAPI = (*DataStreamClient)(nil)
13+
14+
type DataStreamClient struct {
15+
Client
16+
}
17+
18+
func (ds DataStreamClient) Create(dataStream string) error {
19+
_, err := ds.request(elasticRequest{
20+
endpoint: "_data_stream/" + dataStream,
21+
method: http.MethodPut,
22+
})
23+
if err != nil {
24+
var responseError ResponseError
25+
if errors.As(err, &responseError) {
26+
if responseError.StatusCode != http.StatusOK {
27+
return responseError.prefixMessage("failed to create data stream: " + dataStream)
28+
}
29+
}
30+
return fmt.Errorf("failed to create data stream: %w", err)
31+
}
32+
return nil
33+
}
+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2025 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package client
5+
6+
import (
7+
"net/http"
8+
"net/http/httptest"
9+
"strings"
10+
"testing"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestCreate(t *testing.T) {
17+
dataStreamName := "jaeger-span"
18+
tests := []struct {
19+
name string
20+
responseCode int
21+
response string
22+
errContains string
23+
}{
24+
{
25+
name: "success",
26+
responseCode: http.StatusOK,
27+
},
28+
{
29+
name: "client error",
30+
responseCode: http.StatusBadRequest,
31+
response: esErrResponse,
32+
errContains: "failed to create data stream: jaeger-span",
33+
},
34+
}
35+
36+
for _, test := range tests {
37+
t.Run(test.name, func(t *testing.T) {
38+
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
39+
assert.True(t, strings.HasSuffix(req.URL.String(), dataStreamName))
40+
assert.Equal(t, http.MethodPut, req.Method)
41+
assert.Equal(t, "Basic foobar", req.Header.Get("Authorization"))
42+
res.WriteHeader(test.responseCode)
43+
res.Write([]byte(test.response))
44+
}))
45+
defer testServer.Close()
46+
47+
c := &DataStreamClient{
48+
Client: Client{
49+
Client: testServer.Client(),
50+
Endpoint: testServer.URL,
51+
BasicAuth: "foobar",
52+
},
53+
}
54+
err := c.Create(dataStreamName)
55+
if test.errContains != "" {
56+
require.ErrorContains(t, err, test.errContains)
57+
}
58+
})
59+
}
60+
}

pkg/es/client/ilm_client_test.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestExists(t *testing.T) {
6868
}
6969
}
7070

71-
const esILMPolicy = `
71+
const defaultILMPolicy = `
7272
{
7373
"policy": {
7474
"phases": {
@@ -94,7 +94,7 @@ const esILMPolicy = `
9494
}
9595
`
9696

97-
func TestCreate(t *testing.T) {
97+
func TestCreateOrUpdate(t *testing.T) {
9898
policy := "jaeger-ilm-policy"
9999
tests := []struct {
100100
name string
@@ -106,6 +106,11 @@ func TestCreate(t *testing.T) {
106106
name: "successful",
107107
responseCode: http.StatusOK,
108108
},
109+
{
110+
name: "not found",
111+
responseCode: http.StatusNotFound,
112+
response: esErrResponse,
113+
},
109114
{
110115
name: "client error",
111116
responseCode: http.StatusBadRequest,
@@ -131,7 +136,7 @@ func TestCreate(t *testing.T) {
131136
},
132137
}
133138
ilmPolicy := types.NewIlmPolicy()
134-
json.Unmarshal([]byte(esILMPolicy), ilmPolicy)
139+
json.Unmarshal([]byte(defaultILMPolicy), ilmPolicy)
135140
fmt.Printf("%+v\n", *ilmPolicy)
136141

137142
err := c.CreateOrUpdate(policy, Policy{ILMPolicy: *ilmPolicy})

pkg/es/client/interfaces.go

+4
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ type IndexManagementLifecycleAPI interface {
2121
Exists(name string) (bool, error)
2222
CreateOrUpdate(policy string, ilmPolicy Policy) error
2323
}
24+
25+
type DataStreamAPI interface {
26+
Create(name string) error
27+
}

pkg/es/client/mocks/DataStreamAPI.go

+47
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)