Skip to content

Commit af5e0a8

Browse files
authored
[Go SDK] Update slog in prism to fix breaking changes. (#26053)
* [Go SDK] Update slog in prism to fix breaking changes. * Add missing context.TODO() --------- Co-authored-by: lostluck <[email protected]>
1 parent 492e2c9 commit af5e0a8

File tree

5 files changed

+12
-9
lines changed

5 files changed

+12
-9
lines changed

sdks/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ require (
5252
github.com/xitongsys/parquet-go v1.6.2
5353
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
5454
go.mongodb.org/mongo-driver v1.11.2
55-
golang.org/x/exp v0.0.0-20230206171751-46f607a40771
55+
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
5656
golang.org/x/net v0.8.0
5757
golang.org/x/oauth2 v0.6.0
5858
golang.org/x/sync v0.1.0

sdks/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,8 +521,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
521521
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
522522
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
523523
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
524-
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg=
525-
golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
524+
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
525+
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
526526
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
527527
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
528528
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=

sdks/go/pkg/beam/runners/prism/internal/coders.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package internal
1717

1818
import (
1919
"bytes"
20+
"context"
2021
"fmt"
2122
"io"
2223

@@ -84,7 +85,7 @@ func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder)
8485
case urns.CoderIntervalWindow:
8586
cwc = coder.NewIntervalWindow()
8687
default:
87-
slog.Log(slog.LevelError, "makeWindowCoders: unknown urn", slog.String("urn", wc.GetSpec().GetUrn()))
88+
slog.LogAttrs(context.TODO(), slog.LevelError, "makeWindowCoders: unknown urn", slog.String("urn", wc.GetSpec().GetUrn()))
8889
panic(fmt.Sprintf("makeWindowCoders, unknown urn: %v", prototext.Format(wc)))
8990
}
9091
return exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc)

sdks/go/pkg/beam/runners/prism/internal/stage.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package internal
1717

1818
import (
1919
"bytes"
20+
"context"
2021
"fmt"
2122
"io"
2223

@@ -115,7 +116,7 @@ func (s *stage) Execute(j *jobservices.Job, wk *worker.W, comps *pipepb.Componen
115116
ba := rr.GetApplication()
116117
residualData = append(residualData, ba.GetElement())
117118
if len(ba.GetElement()) == 0 {
118-
slog.Log(slog.LevelError, "returned empty residual application", "bundle", rb)
119+
slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
119120
panic("sdk returned empty residual application")
120121
}
121122
for col, wm := range ba.GetOutputWatermarks() {

sdks/go/pkg/beam/runners/prism/internal/worker/worker.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,10 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
152152
if l.Severity >= minsev {
153153
// TODO: Connect to the associated Job for this worker instead of
154154
// logging locally for SDK side logging.
155-
slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
155+
slog.LogAttrs(context.TODO(), toSlogSev(l.GetSeverity()), l.GetMessage(),
156156
slog.String(slog.SourceKey, l.GetLogLocation()),
157157
slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
158-
"worker", wk,
158+
slog.Any("worker", wk),
159159
)
160160
}
161161
}
@@ -220,7 +220,8 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
220220
if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
221221
// TODO. Better pipeline error handling.
222222
if resp.Error != "" {
223-
slog.Log(slog.LevelError, "ctrl.Recv pipeline error", slog.ErrorKey, resp.GetError())
223+
slog.LogAttrs(context.TODO(), slog.LevelError, "ctrl.Recv pipeline error",
224+
slog.String("error", resp.GetError()))
224225
panic(resp.GetError())
225226
}
226227
b.Resp <- resp.GetProcessBundle()
@@ -285,7 +286,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
285286

286287
for req := range wk.DataReqs {
287288
if err := data.Send(req); err != nil {
288-
slog.Log(slog.LevelDebug, "data.Send error", slog.ErrorKey, err)
289+
slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err))
289290
}
290291
}
291292
return nil

0 commit comments

Comments
 (0)