Skip to content

Commit d0e493f

Browse files
feat: add bytes_sent and bytes_received as metrics (#856)
Adding additional metrics to be reported by Go Connector. The Connector will now report the number of bytes_sent to Cloud SQL and bytes_received from Cloud SQL.
1 parent f482119 commit d0e493f

File tree

4 files changed

+84
-3
lines changed

4 files changed

+84
-3
lines changed

README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,9 @@ Supported metrics include:
399399
- `cloudsqlconn/refresh_success_count`: The number of successful certificate
400400
refresh operations
401401
- `cloudsqlconn/refresh_failure_count`: The number of failed refresh
402-
operations.
402+
operations
403+
- `cloudsqlconn/bytes_sent`: The number of bytes sent to Cloud SQL
404+
- `cloudsqlconn/bytes_received`: The number of bytes received from Cloud SQL
403405

404406
Supported traces include:
405407

dialer.go

+26-2
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
388388
return newInstrumentedConn(tlsConn, func() {
389389
n := atomic.AddUint64(c.openConns, ^uint64(0))
390390
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, cn.String())
391-
}), nil
391+
}, d.dialerID, cn.String()), nil
392392
}
393393

394394
// removeCached stops all background refreshes and deletes the connection
@@ -479,10 +479,12 @@ func (d *Dialer) Warmup(ctx context.Context, icn string, opts ...DialOption) err
479479

480480
// newInstrumentedConn initializes an instrumentedConn that on closing will
481481
// decrement the number of open connects and record the result.
482-
func newInstrumentedConn(conn net.Conn, closeFunc func()) *instrumentedConn {
482+
func newInstrumentedConn(conn net.Conn, closeFunc func(), dialerID, connName string) *instrumentedConn {
483483
return &instrumentedConn{
484484
Conn: conn,
485485
closeFunc: closeFunc,
486+
dialerID: dialerID,
487+
connName: connName,
486488
}
487489
}
488490

@@ -491,6 +493,28 @@ func newInstrumentedConn(conn net.Conn, closeFunc func()) *instrumentedConn {
491493
type instrumentedConn struct {
492494
net.Conn
493495
closeFunc func()
496+
dialerID string
497+
connName string
498+
}
499+
500+
// Read delegates to the underlying net.Conn interface and records number of
501+
// bytes read
502+
func (i *instrumentedConn) Read(b []byte) (int, error) {
503+
bytesRead, err := i.Conn.Read(b)
504+
if err == nil {
505+
go trace.RecordBytesReceived(context.Background(), int64(bytesRead), i.connName, i.dialerID)
506+
}
507+
return bytesRead, err
508+
}
509+
510+
// Write delegates to the underlying net.Conn interface and records number of
511+
// bytes written
512+
func (i *instrumentedConn) Write(b []byte) (int, error) {
513+
bytesWritten, err := i.Conn.Write(b)
514+
if err == nil {
515+
go trace.RecordBytesSent(context.Background(), int64(bytesWritten), i.connName, i.dialerID)
516+
}
517+
return bytesWritten, err
494518
}
495519

496520
// Close delegates to the underlying net.Conn interface and reports the close

internal/trace/metrics.go

+38
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ var (
5757
"A failed certificate refresh operation",
5858
stats.UnitDimensionless,
5959
)
60+
mBytesSent = stats.Int64(
61+
"cloudsqlconn/bytes_sent",
62+
"The bytes sent to Cloud SQL",
63+
stats.UnitDimensionless,
64+
)
65+
mBytesReceived = stats.Int64(
66+
"cloudsqlconn/bytes_received",
67+
"The bytes received from Cloud SQL",
68+
stats.UnitDimensionless,
69+
)
6070

6171
latencyView = &view.View{
6272
Name: "cloudsqlconn/dial_latency",
@@ -94,6 +104,20 @@ var (
94104
Aggregation: view.Count(),
95105
TagKeys: []tag.Key{keyInstance, keyDialerID, keyErrorCode},
96106
}
107+
bytesSentView = &view.View{
108+
Name: "cloudsqlconn/bytes_sent",
109+
Measure: mBytesSent,
110+
Description: "The number of bytes sent to Cloud SQL",
111+
Aggregation: view.LastValue(),
112+
TagKeys: []tag.Key{keyInstance, keyDialerID},
113+
}
114+
bytesReceivedView = &view.View{
115+
Name: "cloudsqlconn/bytes_received",
116+
Measure: mBytesReceived,
117+
Description: "The number of bytes received from Cloud SQL",
118+
Aggregation: view.LastValue(),
119+
TagKeys: []tag.Key{keyInstance, keyDialerID},
120+
}
97121

98122
registerOnce sync.Once
99123
registerErr error
@@ -110,6 +134,8 @@ func InitMetrics() error {
110134
dialFailureView,
111135
refreshCountView,
112136
failedRefreshCountView,
137+
bytesSentView,
138+
bytesReceivedView,
113139
); rErr != nil {
114140
registerErr = fmt.Errorf("failed to initialize metrics: %v", rErr)
115141
}
@@ -157,6 +183,18 @@ func RecordRefreshResult(ctx context.Context, instance, dialerID string, err err
157183
stats.Record(ctx, mSuccessfulRefresh.M(1))
158184
}
159185

186+
// RecordBytesSent reports the number of bytes sent to Cloud SQL
187+
func RecordBytesSent(ctx context.Context, num int64, instance, dialerID string) {
188+
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
189+
stats.Record(ctx, mBytesSent.M(num))
190+
}
191+
192+
// RecordBytesReceived reports the number of bytes received from Cloud SQL
193+
func RecordBytesReceived(ctx context.Context, num int64, instance, dialerID string) {
194+
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
195+
stats.Record(ctx, mBytesReceived.M(num))
196+
}
197+
160198
// errorCode returns an error code as given from the SQL Admin API, provided the
161199
// error wraps a googleapi.Error type. If multiple error codes are returned from
162200
// the API, then a comma-separated string of all codes is returned.

metrics_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package cloudsqlconn
1515

1616
import (
17+
"bytes"
1718
"context"
1819
"encoding/json"
1920
"fmt"
@@ -159,6 +160,20 @@ func TestDialerWithMetrics(t *testing.T) {
159160
if err != nil {
160161
t.Fatalf("expected Dial to succeed, but got error: %v", err)
161162
}
163+
// write to conn to test bytes_sent and bytes_received
164+
buf := &bytes.Buffer{}
165+
err = buf.WriteByte('a')
166+
if err != nil {
167+
t.Fatalf("buf.WriteByte failed: %v", err)
168+
}
169+
_, err = conn2.Write(buf.Bytes())
170+
if err != nil {
171+
t.Fatalf("conn.Write failed: %v", err)
172+
}
173+
_, err = conn2.Read(buf.Bytes())
174+
if err != nil {
175+
t.Fatalf("conn.Read failed: %v", err)
176+
}
162177
defer conn2.Close()
163178
// dial a bogus instance
164179
_, err = d.Dial(context.Background(), "my-project:my-region:notaninstance")
@@ -172,6 +187,8 @@ func TestDialerWithMetrics(t *testing.T) {
172187
wantLastValueMetric(t, "cloudsqlconn/open_connections", spy.data(), 2)
173188
wantDistributionMetric(t, "cloudsqlconn/dial_latency", spy.data())
174189
wantCountMetric(t, "cloudsqlconn/refresh_success_count", spy.data())
190+
wantLastValueMetric(t, "cloudsqlconn/bytes_sent", spy.data(), 1)
191+
wantLastValueMetric(t, "cloudsqlconn/bytes_received", spy.data(), 1)
175192

176193
// failure metrics from dialing bogus instance
177194
wantCountMetric(t, "cloudsqlconn/dial_failure_count", spy.data())

0 commit comments

Comments
 (0)