Skip to content

Commit 8b70952

Browse files
author
allenzhli
committed
remove RespIterator and update to use chan idiom
Signed-off-by: allenzhli <[email protected]>
1 parent 5c1796b commit 8b70952

File tree

4 files changed

+20
-42
lines changed

4 files changed

+20
-42
lines changed

pkg/alertmanager/api.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,11 @@ func (am *MultitenantAlertmanager) ListAllConfigs(w http.ResponseWriter, r *http
204204
}
205205

206206
done := make(chan struct{})
207-
iter := util.NewRespIter(make(chan []byte))
207+
iter := make(chan []byte)
208208

209209
go func() {
210210
util.StreamWriteResponse(w, iter, "text/yaml")
211-
done <- struct{}{}
211+
close(done)
212212
}()
213213

214214
err = concurrency.ForEachUser(r.Context(), userIDs, fetchConcurrency, func(ctx context.Context, userID string) error {
@@ -229,13 +229,16 @@ func (am *MultitenantAlertmanager) ListAllConfigs(w http.ResponseWriter, r *http
229229
return err
230230
}
231231

232-
iter.Put(data)
232+
select {
233+
case iter <- data:
234+
case <-done: // stop early, if sending response has already finished
235+
}
233236

234237
return nil
235238
})
236239
if err != nil {
237240
level.Error(logger).Log("msg", "failed to list all alertmanager configs", "err", err)
238241
}
239-
iter.Close()
242+
close(iter)
240243
<-done
241244
}

pkg/ruler/ruler.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -830,11 +830,11 @@ func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) {
830830
}
831831

832832
done := make(chan struct{})
833-
iter := util.NewRespIter(make(chan []byte))
833+
iter := make(chan []byte)
834834

835835
go func() {
836836
util.StreamWriteResponse(w, iter, "text/yaml")
837-
done <- struct{}{}
837+
close(done)
838838
}()
839839

840840
err = concurrency.ForEachUser(req.Context(), userIDs, fetchRulesConcurrency, func(ctx context.Context, userID string) error {
@@ -850,13 +850,16 @@ func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) {
850850
return err
851851
}
852852

853-
iter.Put(data)
853+
select {
854+
case iter <- data:
855+
case <-done: // stop early, if sending response has already finished
856+
}
854857

855858
return nil
856859
})
857860
if err != nil {
858861
level.Error(logger).Log("msg", "failed to list all alertmanager configs", "err", err)
859862
}
860-
iter.Close()
863+
close(iter)
861864
<-done
862865
}

pkg/util/http.go

+2-30
Original file line numberDiff line numberDiff line change
@@ -98,36 +98,8 @@ func RenderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Templa
9898
}
9999
}
100100

101-
// RespIterator is a iterator for stream http response
102-
type RespIterator interface {
103-
// Close must be called to release resources once the iterator is not
104-
// used anymore.
105-
Close()
106-
// Next returns a channel that will be closed once the iterator is
107-
// exhausted.
108-
Next() <-chan []byte
109-
// Put input a new item to iterator
110-
Put(v []byte)
111-
}
112-
113-
type respIter struct {
114-
ch chan []byte
115-
}
116-
117-
// NewRespIter returns a new streamResp
118-
func NewRespIter(ch chan []byte) RespIterator {
119-
return &respIter{ch: ch}
120-
}
121-
func (it respIter) Next() <-chan []byte {
122-
return it.ch
123-
}
124-
func (it respIter) Close() { close(it.ch) }
125-
func (it respIter) Put(v []byte) {
126-
it.ch <- v
127-
}
128-
129101
// StreamWriteResponse stream writes data as http response
130-
func StreamWriteResponse(w http.ResponseWriter, iter RespIterator, contentType string) {
102+
func StreamWriteResponse(w http.ResponseWriter, iter chan []byte, contentType string) {
131103
flusher, ok := w.(http.Flusher)
132104
if !ok {
133105
http.Error(w, "expected http.ResponseWriter to be an http.Flusher", http.StatusInternalServerError)
@@ -140,7 +112,7 @@ func StreamWriteResponse(w http.ResponseWriter, iter RespIterator, contentType s
140112
w.WriteHeader(http.StatusOK)
141113
flusher.Flush()
142114

143-
for m := range iter.Next() {
115+
for m := range iter {
144116
_, _ = w.Write(m)
145117
flusher.Flush()
146118
}

pkg/util/http_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,17 @@ func TestStreamWriteYAMLResponse(t *testing.T) {
118118
w := httptest.NewRecorder()
119119

120120
done := make(chan struct{})
121-
iter := util.NewRespIter(make(chan []byte))
121+
iter := make(chan []byte)
122122
go func() {
123123
util.StreamWriteResponse(w, iter, "text/yaml")
124-
done <- struct{}{}
124+
close(done)
125125
}()
126126
for k, v := range tt.value {
127127
data, err := yaml.Marshal(map[string]*testStruct{k: v})
128128
assert.Nil(t, err)
129-
iter.Put(data)
129+
iter <- data
130130
}
131-
iter.Close()
131+
close(iter)
132132
<-done
133133
assert.Equal(t, tt.expectedContentType, w.Header().Get("Content-Type"))
134134
assert.Equal(t, 200, w.Code)

0 commit comments

Comments
 (0)