-
Notifications
You must be signed in to change notification settings - Fork 673
Move otelhttp wrappers into internal package #5916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
d2d7c20
move bodyWrapper into the internal package
dmathieu 72fe270
move respWriterWrapper into internal package
dmathieu 8a0aa5d
add changelog entry
dmathieu 32e4f21
PR number
dmathieu 077c1a6
add NewBodyWrapper method
dmathieu c3bfee3
move wrappers into an internal request package
dmathieu 267a875
replace atomic types in BodyWrapper with a mutex
dmathieu cf2dba7
add race detection test to resp writer test
dmathieu a055d2d
Merge branch 'main' into internal-wrappers
dmathieu c747d41
Update instrumentation/net/http/otelhttp/internal/request/body_wrappe…
dmathieu 549707b
Update instrumentation/net/http/otelhttp/internal/request/body_wrappe…
dmathieu 9d67079
fix var name
dmathieu 6ea0cf4
use a global var
dmathieu 300e969
rely on underlying object directly
dmathieu e197b1d
document the callbacks
dmathieu 6946ddd
acquire the lock only once when writing
dmathieu a9d231d
Merge branch 'main' into internal-wrappers
dmathieu 01a1bb1
Write needs a write lock
dmathieu 9a487c0
Merge branch 'main' into internal-wrappers
dmathieu 28a972b
wait until the reader has finished
dmathieu b196e07
Merge branch 'main' into internal-wrappers
dmathieu f2c863e
Merge branch 'main' into internal-wrappers
dmathieu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
75 changes: 75 additions & 0 deletions
75
instrumentation/net/http/otelhttp/internal/request/body_wrapper.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" | ||
|
||
import ( | ||
"io" | ||
"sync" | ||
) | ||
|
||
var _ io.ReadCloser = &BodyWrapper{} | ||
|
||
// BodyWrapper wraps a http.Request.Body (an io.ReadCloser) to track the number | ||
// of bytes read and the last error. | ||
type BodyWrapper struct { | ||
io.ReadCloser | ||
OnRead func(n int64) // must not be nil | ||
|
||
mu sync.Mutex | ||
read int64 | ||
err error | ||
} | ||
|
||
// NewBodyWrapper creates a new BodyWrapper. | ||
// | ||
// The onRead attribute is a callback that will be called every time the data | ||
// is read, with the number of bytes being read. | ||
func NewBodyWrapper(body io.ReadCloser, onRead func(int64)) *BodyWrapper { | ||
return &BodyWrapper{ | ||
ReadCloser: body, | ||
OnRead: onRead, | ||
} | ||
} | ||
|
||
// Read reads the data from the io.ReadCloser, and stores the number of bytes | ||
// read and the error. | ||
func (w *BodyWrapper) Read(b []byte) (int, error) { | ||
n, err := w.ReadCloser.Read(b) | ||
n1 := int64(n) | ||
|
||
w.updateReadData(n1, err) | ||
w.OnRead(n1) | ||
return n, err | ||
} | ||
|
||
func (w *BodyWrapper) updateReadData(n int64, err error) { | ||
w.mu.Lock() | ||
defer w.mu.Unlock() | ||
|
||
w.read += n | ||
if err != nil { | ||
w.err = err | ||
} | ||
} | ||
|
||
// Closes closes the io.ReadCloser. | ||
func (w *BodyWrapper) Close() error { | ||
return w.ReadCloser.Close() | ||
} | ||
|
||
// BytesRead returns the number of bytes read up to this point. | ||
func (w *BodyWrapper) BytesRead() int64 { | ||
w.mu.Lock() | ||
defer w.mu.Unlock() | ||
|
||
return w.read | ||
} | ||
|
||
// Error returns the last error. | ||
func (w *BodyWrapper) Error() error { | ||
w.mu.Lock() | ||
defer w.mu.Unlock() | ||
|
||
return w.err | ||
} |
74 changes: 74 additions & 0 deletions
74
instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package request | ||
|
||
import ( | ||
"errors" | ||
"io" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
var errFirstCall = errors.New("first call") | ||
|
||
func TestBodyWrapper(t *testing.T) { | ||
bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {}) | ||
|
||
data, err := io.ReadAll(bw) | ||
require.NoError(t, err) | ||
assert.Equal(t, "hello world", string(data)) | ||
|
||
assert.Equal(t, int64(11), bw.BytesRead()) | ||
assert.Equal(t, io.EOF, bw.Error()) | ||
} | ||
|
||
type multipleErrorsReader struct { | ||
calls int | ||
} | ||
|
||
type errorWrapper struct{} | ||
|
||
func (errorWrapper) Error() string { | ||
return "subsequent calls" | ||
} | ||
|
||
func (mer *multipleErrorsReader) Read([]byte) (int, error) { | ||
mer.calls = mer.calls + 1 | ||
if mer.calls == 1 { | ||
return 0, errFirstCall | ||
} | ||
|
||
return 0, errorWrapper{} | ||
} | ||
|
||
func TestBodyWrapperWithErrors(t *testing.T) { | ||
bw := NewBodyWrapper(io.NopCloser(&multipleErrorsReader{}), func(int64) {}) | ||
|
||
data, err := io.ReadAll(bw) | ||
require.Equal(t, errFirstCall, err) | ||
assert.Equal(t, "", string(data)) | ||
require.Equal(t, errFirstCall, bw.Error()) | ||
|
||
data, err = io.ReadAll(bw) | ||
require.Equal(t, errorWrapper{}, err) | ||
assert.Equal(t, "", string(data)) | ||
require.Equal(t, errorWrapper{}, bw.Error()) | ||
} | ||
|
||
func TestConcurrentBodyWrapper(t *testing.T) { | ||
bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {}) | ||
|
||
go func() { | ||
_, _ = io.ReadAll(bw) | ||
}() | ||
|
||
assert.NotNil(t, bw.BytesRead()) | ||
assert.Eventually(t, func() bool { | ||
return errors.Is(bw.Error(), io.EOF) | ||
}, time.Second, 10*time.Millisecond) | ||
} |
112 changes: 112 additions & 0 deletions
112
instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" | ||
|
||
import ( | ||
"net/http" | ||
"sync" | ||
) | ||
|
||
var _ http.ResponseWriter = &RespWriterWrapper{} | ||
|
||
// RespWriterWrapper wraps a http.ResponseWriter in order to track the number of | ||
// bytes written, the last error, and to catch the first written statusCode. | ||
// TODO: The wrapped http.ResponseWriter doesn't implement any of the optional | ||
// types (http.Hijacker, http.Pusher, http.CloseNotifier, etc) | ||
// that may be useful when using it in real life situations. | ||
type RespWriterWrapper struct { | ||
http.ResponseWriter | ||
OnWrite func(n int64) // must not be nil | ||
|
||
mu sync.RWMutex | ||
written int64 | ||
statusCode int | ||
err error | ||
wroteHeader bool | ||
} | ||
|
||
// NewRespWriterWrapper creates a new RespWriterWrapper. | ||
// | ||
// The onWrite attribute is a callback that will be called every time the data | ||
// is written, with the number of bytes that were written. | ||
func NewRespWriterWrapper(w http.ResponseWriter, onWrite func(int64)) *RespWriterWrapper { | ||
return &RespWriterWrapper{ | ||
ResponseWriter: w, | ||
OnWrite: onWrite, | ||
statusCode: http.StatusOK, // default status code in case the Handler doesn't write anything | ||
} | ||
} | ||
|
||
// Write writes the bytes array into the [ResponseWriter], and tracks the | ||
// number of bytes written and last error. | ||
func (w *RespWriterWrapper) Write(p []byte) (int, error) { | ||
w.mu.Lock() | ||
defer w.mu.Unlock() | ||
|
||
w.writeHeader(http.StatusOK) | ||
|
||
n, err := w.ResponseWriter.Write(p) | ||
|
||
n1 := int64(n) | ||
w.OnWrite(n1) | ||
w.written += n1 | ||
w.err = err | ||
return n, err | ||
} | ||
|
||
// WriteHeader persists initial statusCode for span attribution. | ||
// All calls to WriteHeader will be propagated to the underlying ResponseWriter | ||
// and will persist the statusCode from the first call. | ||
// Blocking consecutive calls to WriteHeader alters expected behavior and will | ||
// remove warning logs from net/http where developers will notice incorrect handler implementations. | ||
func (w *RespWriterWrapper) WriteHeader(statusCode int) { | ||
w.mu.Lock() | ||
defer w.mu.Unlock() | ||
|
||
w.writeHeader(statusCode) | ||
} | ||
|
||
// writeHeader persists the status code for span attribution, and propagates | ||
// the call to the underlying ResponseWriter. | ||
// It does not acquire a lock, and therefore assumes that is being handled by a | ||
// parent method. | ||
func (w *RespWriterWrapper) writeHeader(statusCode int) { | ||
if !w.wroteHeader { | ||
w.wroteHeader = true | ||
w.statusCode = statusCode | ||
} | ||
w.ResponseWriter.WriteHeader(statusCode) | ||
} | ||
|
||
// Flush implements [http.Flusher]. | ||
func (w *RespWriterWrapper) Flush() { | ||
w.WriteHeader(http.StatusOK) | ||
|
||
if f, ok := w.ResponseWriter.(http.Flusher); ok { | ||
f.Flush() | ||
} | ||
} | ||
|
||
// BytesWritten returns the number of bytes written. | ||
func (w *RespWriterWrapper) BytesWritten() int64 { | ||
w.mu.RLock() | ||
defer w.mu.RUnlock() | ||
|
||
return w.written | ||
} | ||
|
||
// BytesWritten returns the HTTP status code that was sent. | ||
func (w *RespWriterWrapper) StatusCode() int { | ||
w.mu.RLock() | ||
defer w.mu.RUnlock() | ||
|
||
return w.statusCode | ||
} | ||
|
||
// Error returns the last error. | ||
func (w *RespWriterWrapper) Error() error { | ||
w.mu.RLock() | ||
defer w.mu.RUnlock() | ||
|
||
return w.err | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.