Skip to content

Commit 2ba4bf3

Browse files
authored
[v1.15] Actors: Don't sent Reentrancy ID header when disabled (#8542)
* [v1.15] Actors: Don't sent Reentrancy ID header when disabled Fix sending Actor Reentrancy ID HTTP header when disabled Problem Calling `this.StateManager.TryGetStateAsync()` in the DotNet Actor SDK would return stale data during some invocation scenarios. Impact The latest Actor state data was not being correctly returned during some Actor invocation scenarios using the DotNet Actor SDK. Root cause When Reentrancy was disabled, the Actor Reentrancy ID HTTP header was still being sent to Actor HTTP servers. The DotNet SDK uses the existence of this HTTP header in logic to determine what state should be returned to the Actor. Solution Don't send the Actor Reentrancy ID HTTP header (`"Dapr-Reentrancy-Id"`) when Reentrancy is disabled. Signed-off-by: joshvanl <[email protected]> * Fix actor call placement lock due to reentrancy ID HTTP header. Remove reentancy ID HTTP header check from placement lock during actor calls. Since we always added the reentrancy ID header in v1.15.0, `isReentrancy` bool was always true, so `!isReentrancy` was always false- a no-op to the OR statement. This was added before other changes were made to the actor locking system, but since it was a no-op, it was never caught and removed. Now the reentrancy ID header is (correctly) optionally added to keep backwards compatibility, and the previous actor engine state this check was catering for no longer exists, this check actually breaks the runtime. Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 7c7d136 commit 2ba4bf3

File tree

6 files changed

+228
-8
lines changed

6 files changed

+228
-8
lines changed

docs/release_notes/v1.15.1.md

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
# Dapr 1.15.1
22

3-
This update includes a fix for https://github.com/dapr/dapr/issues/8537.
3+
This update includes bug fixes:
44

5-
## Fixes Dapr not honoring max-body-size when invoking actors
5+
- [Fix Dapr not honoring max-body-size when invoking actors](#fixes-dapr-not-honoring-max-body-size-when-invoking-actors)
6+
- [Fix sending Actor Reentrancy ID HTTP header when disabled](#fix-sending-actor-reentrancy-id-http-header-when-disabled)
7+
8+
## Fix Dapr not honoring max-body-size when invoking actors
69

710
### Problem
811

12+
Issue reported [here](https://github.com/dapr/dapr/issues/8537).
913
When an actor client attempts to invoke an actor with a payload size larger than 4Mb, the call fails with `rpc error: code = ResourceExhausted desc = grpc: received message larger than max`.
1014

1115
### Impact
@@ -18,4 +22,23 @@ The Dapr actor gRPC client did not honor the max-body-size parameter that is pas
1822

1923
### Solution
2024

21-
The DApr actor gRPC client is configured with the proper gRPC call options.
25+
The Dapr actor gRPC client is configured with the proper gRPC call options.
26+
27+
## Fix sending Actor Reentrancy ID HTTP header when disabled
28+
29+
### Problem
30+
31+
Calling `this.StateManager.TryGetStateAsync()` in the DotNet Actor SDK would return stale data during some invocation scenarios.
32+
33+
### Impact
34+
35+
The latest Actor state data was not being correctly returned during some Actor invocation scenarios using the DotNet Actor SDK.
36+
37+
### Root cause
38+
39+
When Reentrancy was disabled, the Actor Reentrancy ID HTTP header was still being sent to Actor HTTP servers.
40+
The DotNet SDK uses the existence of this HTTP header in logic to determine what state should be returned to the Actor.
41+
42+
### Solution
43+
44+
Don't send the Actor Reentrancy ID HTTP header (`"Dapr-Reentrancy-Id"`) when Reentrancy is disabled.

pkg/actors/engine/engine.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,9 @@ func (e *engine) callReminder(ctx context.Context, req *api.Reminder) error {
212212
func (e *engine) callActor(ctx context.Context, req *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
213213
// If we are in a reentrancy which is local, skip the placement lock.
214214
_, isDaprRemote := req.GetMetadata()["X-Dapr-Remote"]
215-
_, isReentrancy := req.GetMetadata()["Dapr-Reentrancy-Id"]
216215
_, isAPICall := req.GetMetadata()["Dapr-API-Call"]
217216

218-
if isAPICall || isDaprRemote || !isReentrancy {
217+
if isAPICall || isDaprRemote {
219218
var cancel context.CancelFunc
220219
var err error
221220
ctx, cancel, err = e.placement.Lock(ctx)

pkg/actors/internal/locker/lock.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131

3232
var ErrLockClosed = errors.New("actor lock is closed")
3333

34+
const headerReentrancyID = "Dapr-Reentrancy-Id"
35+
3436
type lockOptions struct {
3537
actorType string
3638
reentrancyEnabled bool
@@ -229,19 +231,19 @@ func newInflight(id string) *inflight {
229231
}
230232

231233
func (l *lock) idFromRequest(req *internalv1pb.InternalInvokeRequest) (string, bool) {
232-
if req == nil {
234+
if !l.reentrancyEnabled || req == nil {
233235
return uuid.New().String(), false
234236
}
235237

236-
if md := req.GetMetadata()["Dapr-Reentrancy-Id"]; md != nil && len(md.GetValues()) > 0 {
238+
if md := req.GetMetadata()[headerReentrancyID]; md != nil && len(md.GetValues()) > 0 {
237239
return md.GetValues()[0], true
238240
}
239241

240242
id := uuid.New().String()
241243
if req.Metadata == nil {
242244
req.Metadata = make(map[string]*internalv1pb.ListStringValue)
243245
}
244-
req.Metadata["Dapr-Reentrancy-Id"] = &internalv1pb.ListStringValue{
246+
req.Metadata[headerReentrancyID] = &internalv1pb.ListStringValue{
245247
Values: []string{id},
246248
}
247249

pkg/actors/internal/locker/lock_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,35 @@ func Test_ringid(t *testing.T) {
200200
}
201201
}
202202
}
203+
204+
func Test_header(t *testing.T) {
205+
t.Parallel()
206+
207+
t.Run("with header", func(t *testing.T) {
208+
l := newLock(lockOptions{
209+
reentrancyEnabled: true,
210+
maxStackDepth: 10,
211+
})
212+
t.Cleanup(l.close)
213+
214+
req := internalv1pb.NewInternalInvokeRequest("foo")
215+
cancel, err := l.lockRequest(req)
216+
require.NoError(t, err)
217+
t.Cleanup(cancel)
218+
assert.NotEmpty(t, req.GetMetadata()["Dapr-Reentrancy-Id"])
219+
})
220+
221+
t.Run("without header", func(t *testing.T) {
222+
l := newLock(lockOptions{
223+
reentrancyEnabled: false,
224+
maxStackDepth: 10,
225+
})
226+
t.Cleanup(l.close)
227+
228+
req := internalv1pb.NewInternalInvokeRequest("foo")
229+
cancel, err := l.lockRequest(req)
230+
require.NoError(t, err)
231+
t.Cleanup(cancel)
232+
assert.Empty(t, req.GetMetadata()["Dapr-Reentrancy-Id"])
233+
})
234+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
Copyright 2024 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package reentry
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"net/http"
20+
"sync/atomic"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
26+
"github.com/dapr/dapr/tests/integration/framework"
27+
"github.com/dapr/dapr/tests/integration/framework/client"
28+
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
29+
"github.com/dapr/dapr/tests/integration/framework/process/http/app"
30+
"github.com/dapr/dapr/tests/integration/framework/process/placement"
31+
"github.com/dapr/dapr/tests/integration/suite"
32+
"github.com/dapr/kit/ptr"
33+
)
34+
35+
func init() {
36+
suite.Register(new(disabled))
37+
}
38+
39+
type disabled struct {
40+
daprd *daprd.Daprd
41+
called atomic.Int32
42+
rid atomic.Pointer[string]
43+
}
44+
45+
func (d *disabled) Setup(t *testing.T) []framework.Option {
46+
app := app.New(t,
47+
app.WithConfig(`{"entities":["reentrantActor"],"reentrancy":{"enabled":false}}`),
48+
app.WithHandlerFunc("/actors/reentrantActor/myactorid", func(_ http.ResponseWriter, r *http.Request) {}),
49+
app.WithHandlerFunc("/actors/reentrantActor/myactorid/method/foo", func(_ http.ResponseWriter, r *http.Request) {
50+
d.rid.Store(ptr.Of(r.Header.Get("Dapr-Reentrancy-Id")))
51+
d.called.Add(1)
52+
}),
53+
)
54+
55+
place := placement.New(t)
56+
d.daprd = daprd.New(t,
57+
daprd.WithInMemoryActorStateStore("mystore"),
58+
daprd.WithPlacementAddresses(place.Address()),
59+
daprd.WithAppPort(app.Port()),
60+
)
61+
62+
return []framework.Option{
63+
framework.WithProcesses(app, place, d.daprd),
64+
}
65+
}
66+
67+
func (d *disabled) Run(t *testing.T, ctx context.Context) {
68+
d.daprd.WaitUntilRunning(t, ctx)
69+
70+
client := client.HTTP(t)
71+
72+
url := fmt.Sprintf("http://%s/v1.0/actors/reentrantActor/myactorid/method/foo", d.daprd.HTTPAddress())
73+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil)
74+
require.NoError(t, err)
75+
resp, err := client.Do(req)
76+
require.NoError(t, err)
77+
require.NoError(t, resp.Body.Close())
78+
assert.Equal(t, http.StatusOK, resp.StatusCode)
79+
80+
assert.Equal(t, int32(1), d.called.Load())
81+
assert.Empty(t, *d.rid.Load())
82+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
Copyright 2024 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package pertype
15+
16+
import (
17+
"context"
18+
"fmt"
19+
nethttp "net/http"
20+
"sync/atomic"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
26+
"github.com/dapr/dapr/tests/integration/framework"
27+
"github.com/dapr/dapr/tests/integration/framework/client"
28+
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
29+
"github.com/dapr/dapr/tests/integration/framework/process/http/app"
30+
"github.com/dapr/dapr/tests/integration/framework/process/placement"
31+
"github.com/dapr/dapr/tests/integration/suite"
32+
"github.com/dapr/kit/ptr"
33+
)
34+
35+
func init() {
36+
suite.Register(new(disabled))
37+
}
38+
39+
type disabled struct {
40+
daprd *daprd.Daprd
41+
called atomic.Int32
42+
rid atomic.Pointer[string]
43+
}
44+
45+
func (d *disabled) Setup(t *testing.T) []framework.Option {
46+
app := app.New(t,
47+
app.WithConfig(`{"entities":["reentrantActor"],"reentrancy":{"enabled":true},"entitiesConfig":[{"entities":["reentrantActor"],"reentrancy":{"enabled":false}}]}`),
48+
app.WithHandlerFunc("/actors/reentrantActor/myactorid", func(_ nethttp.ResponseWriter, r *nethttp.Request) {}),
49+
app.WithHandlerFunc("/actors/reentrantActor/myactorid/method/foo", func(_ nethttp.ResponseWriter, r *nethttp.Request) {
50+
d.rid.Store(ptr.Of(r.Header.Get("Dapr-Reentrancy-Id")))
51+
d.called.Add(1)
52+
}),
53+
)
54+
55+
place := placement.New(t)
56+
d.daprd = daprd.New(t,
57+
daprd.WithInMemoryActorStateStore("mystore"),
58+
daprd.WithPlacementAddresses(place.Address()),
59+
daprd.WithAppPort(app.Port()),
60+
)
61+
62+
return []framework.Option{
63+
framework.WithProcesses(app, place, d.daprd),
64+
}
65+
}
66+
67+
func (d *disabled) Run(t *testing.T, ctx context.Context) {
68+
d.daprd.WaitUntilRunning(t, ctx)
69+
70+
client := client.HTTP(t)
71+
72+
url := fmt.Sprintf("http://%s/v1.0/actors/reentrantActor/myactorid/method/foo", d.daprd.HTTPAddress())
73+
req, err := nethttp.NewRequestWithContext(ctx, nethttp.MethodPost, url, nil)
74+
require.NoError(t, err)
75+
resp, err := client.Do(req)
76+
require.NoError(t, err)
77+
require.NoError(t, resp.Body.Close())
78+
assert.Equal(t, nethttp.StatusOK, resp.StatusCode)
79+
80+
assert.Equal(t, int32(1), d.called.Load())
81+
assert.Empty(t, *d.rid.Load())
82+
}

0 commit comments

Comments
 (0)