Skip to content

Commit b341756

Browse files
authored
feat: replace message_ttl with static max retry count (#2638)
This PR replaces the `courier.message_ttl` configuration option with a `courier.message_retries` option to limit how often the sending of a message is retried before it is marked as `abandoned`. BREAKING CHANGES: This is a breaking change, as it removes the `courier.message_ttl` config key and replaces it with a counter `courier.message_retries`. Closes #402 Closes #1598
1 parent 573bd16 commit b341756

18 files changed

+213
-78
lines changed

.schemastore/config.schema.json

+6-8
Original file line numberDiff line numberDiff line change
@@ -1471,15 +1471,13 @@
14711471
"/conf/courier-templates"
14721472
]
14731473
},
1474-
"message_ttl": {
1475-
"description": "Defines a Time-To-Live for courier messages that could not be delivered. After the defined TTL has expired for a message that message is abandoned.",
1476-
"type": "string",
1477-
"pattern": "^([0-9]+(ns|us|ms|s|m|h))+$",
1478-
"default": "1h",
1474+
"message_retries": {
1475+
"description": "Defines the maximum number of times the sending of a message is retried after it failed before it is marked as abandoned",
1476+
"type": "integer",
1477+
"default": 5,
14791478
"examples": [
1480-
"1h",
1481-
"1m",
1482-
"1s"
1479+
10,
1480+
60
14831481
]
14841482
},
14851483
"smtp": {

courier/courier_dispatcher.go

+38-26
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,37 @@ package courier
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/pkg/errors"
87
)
98

109
func (c *courier) DispatchMessage(ctx context.Context, msg Message) error {
10+
maxRetries := c.deps.CourierConfig(ctx).CourierMessageRetries()
11+
12+
if msg.SendCount > maxRetries {
13+
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
14+
c.deps.Logger().
15+
WithError(err).
16+
WithField("message_id", msg.ID).
17+
Error(`Unable to reset the retried message's status to "abandoned".`)
18+
return err
19+
}
20+
21+
// Skip the message
22+
c.deps.Logger().
23+
WithField("message_id", msg.ID).
24+
Warnf(`Message was abandoned because it did not deliver after %d attempts`, msg.SendCount)
25+
return nil
26+
}
27+
28+
if err := c.deps.CourierPersister().IncrementMessageSendCount(ctx, msg.ID); err != nil {
29+
c.deps.Logger().
30+
WithError(err).
31+
WithField("message_id", msg.ID).
32+
Error(`Unable to increment the message's "send_count" field`)
33+
return err
34+
}
35+
1136
switch msg.Type {
1237
case MessageTypeEmail:
1338
if err := c.dispatchEmail(ctx, msg); err != nil {
@@ -48,35 +73,22 @@ func (c *courier) DispatchQueue(ctx context.Context) error {
4873
return err
4974
}
5075

51-
ttl := c.deps.CourierConfig(ctx).CourierMessageTTL()
52-
5376
for k, msg := range messages {
54-
if time.Now().After(msg.CreatedAt.Add(ttl)) {
55-
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
56-
if c.failOnError {
57-
return err
58-
}
59-
c.deps.Logger().
60-
WithError(err).
61-
WithField("message_id", msg.ID).
62-
Error(`Unable to reset the timed out message's status to "abandoned".`)
63-
}
64-
} else {
65-
if err := c.DispatchMessage(ctx, msg); err != nil {
66-
for _, replace := range messages[k:] {
67-
if err := c.deps.CourierPersister().SetMessageStatus(ctx, replace.ID, MessageStatusQueued); err != nil {
68-
if c.failOnError {
69-
return err
70-
}
71-
c.deps.Logger().
72-
WithError(err).
73-
WithField("message_id", replace.ID).
74-
Error(`Unable to reset the failed message's status to "queued".`)
77+
if err := c.DispatchMessage(ctx, msg); err != nil {
78+
79+
for _, replace := range messages[k:] {
80+
if err := c.deps.CourierPersister().SetMessageStatus(ctx, replace.ID, MessageStatusQueued); err != nil {
81+
if c.failOnError {
82+
return err
7583
}
84+
c.deps.Logger().
85+
WithError(err).
86+
WithField("message_id", replace.ID).
87+
Error(`Unable to reset the failed message's status to "queued".`)
7688
}
77-
78-
return err
7989
}
90+
91+
return err
8092
}
8193
}
8294

courier/courier_dispatcher_test.go

+65-10
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,75 @@ package courier_test
33
import (
44
"context"
55
"testing"
6-
"time"
76

87
"github.com/gofrs/uuid"
9-
"github.com/sirupsen/logrus"
108
"github.com/stretchr/testify/require"
119

1210
"github.com/ory/kratos/courier"
11+
"github.com/ory/kratos/courier/template"
1312
templates "github.com/ory/kratos/courier/template/email"
1413
"github.com/ory/kratos/driver/config"
1514
"github.com/ory/kratos/internal"
1615
)
1716

18-
func TestMessageTTL(t *testing.T) {
19-
if testing.Short() {
20-
t.SkipNow()
21-
}
17+
func queueNewMessage(t *testing.T, ctx context.Context, c courier.Courier, d template.Dependencies) uuid.UUID {
18+
t.Helper()
19+
id, err := c.QueueEmail(ctx, templates.NewTestStub(d, &templates.TestStubModel{
20+
21+
Subject: "test-subject-1",
22+
Body: "test-body-1",
23+
}))
24+
require.NoError(t, err)
25+
return id
26+
}
27+
28+
func TestDispatchMessageWithInvalidSMTP(t *testing.T) {
2229
ctx := context.Background()
2330

2431
conf, reg := internal.NewRegistryDefaultWithDSN(t, "")
25-
conf.MustSet(config.ViperKeyCourierMessageTTL, 1*time.Nanosecond)
32+
conf.MustSet(config.ViperKeyCourierMessageRetries, 5)
33+
conf.MustSet(config.ViperKeyCourierSMTPURL, "http://foo.url")
2634

27-
reg.Logger().Level = logrus.TraceLevel
35+
ctx, cancel := context.WithCancel(ctx)
36+
defer cancel()
37+
38+
c := reg.Courier(ctx)
39+
40+
t.Run("case=failed sending", func(t *testing.T) {
41+
id := queueNewMessage(t, ctx, c, reg)
42+
message, err := reg.CourierPersister().LatestQueuedMessage(ctx)
43+
require.NoError(t, err)
44+
require.Equal(t, id, message.ID)
45+
46+
err = c.DispatchMessage(ctx, *message)
47+
// sending the email fails, because there is no SMTP server at foo.url
48+
require.Error(t, err)
49+
50+
messages, err := reg.CourierPersister().NextMessages(ctx, 10)
51+
require.Len(t, messages, 1)
52+
})
53+
54+
t.Run("case=max retries reached", func(t *testing.T) {
55+
id := queueNewMessage(t, ctx, c, reg)
56+
message, err := reg.CourierPersister().LatestQueuedMessage(ctx)
57+
require.NoError(t, err)
58+
require.Equal(t, id, message.ID)
59+
message.SendCount = 6
60+
61+
err = c.DispatchMessage(ctx, *message)
62+
require.NoError(t, err)
63+
64+
messages, err := reg.CourierPersister().NextMessages(ctx, 1)
65+
require.Empty(t, messages)
66+
})
67+
68+
}
69+
70+
func TestDispatchMessage2(t *testing.T) {
71+
ctx := context.Background()
72+
73+
conf, reg := internal.NewRegistryDefaultWithDSN(t, "")
74+
conf.MustSet(config.ViperKeyCourierMessageRetries, 1)
2875

2976
c := reg.Courier(ctx)
3077

@@ -39,9 +86,17 @@ func TestMessageTTL(t *testing.T) {
3986
require.NoError(t, err)
4087
require.NotEqual(t, uuid.Nil, id)
4188

42-
c.DispatchQueue(ctx)
89+
// Fails to deliver the first time
90+
err = c.DispatchQueue(ctx)
91+
require.Error(t, err)
92+
93+
// Retry once, as we set above - still fails
94+
err = c.DispatchQueue(ctx)
95+
require.Error(t, err)
4396

44-
time.Sleep(1 * time.Second)
97+
// Now it has been retried once, which means 2 > 1 is true and it is no longer tried
98+
err = c.DispatchQueue(ctx)
99+
require.NoError(t, err)
45100

46101
var message courier.Message
47102
err = reg.Persister().GetConnection(ctx).

courier/message.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,21 @@ const (
2727

2828
// swagger:ignore
2929
type Message struct {
30-
ID uuid.UUID `json:"-" faker:"-" db:"id"`
31-
NID uuid.UUID `json:"-" faker:"-" db:"nid"`
32-
Status MessageStatus `json:"-" db:"status"`
33-
Type MessageType `json:"-" db:"type"`
34-
Recipient string `json:"-" db:"recipient"`
35-
Body string `json:"-" db:"body"`
36-
Subject string `json:"-" db:"subject"`
37-
TemplateType TemplateType `json:"-" db:"template_type"`
30+
ID uuid.UUID `json:"id" faker:"-" db:"id"`
31+
NID uuid.UUID `json:"-" faker:"-" db:"nid"`
32+
Status MessageStatus `json:"status" db:"status"`
33+
Type MessageType `json:"type" db:"type"`
34+
Recipient string `json:"recipient" db:"recipient"`
35+
Body string `json:"body" db:"body"`
36+
Subject string `json:"subject" db:"subject"`
37+
TemplateType TemplateType `json:"template_type" db:"template_type"`
3838
TemplateData []byte `json:"-" db:"template_data"`
39+
SendCount int `json:"send_count" db:"send_count"`
3940

4041
// CreatedAt is a helper struct field for gobuffalo.pop.
41-
CreatedAt time.Time `json:"-" faker:"-" db:"created_at"`
42+
CreatedAt time.Time `json:"created_at" faker:"-" db:"created_at"`
4243
// UpdatedAt is a helper struct field for gobuffalo.pop.
43-
UpdatedAt time.Time `json:"-" faker:"-" db:"updated_at"`
44+
UpdatedAt time.Time `json:"updated_at" faker:"-" db:"updated_at"`
4445
}
4546

4647
func (m Message) TableName(ctx context.Context) string {

courier/persistence.go

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type (
1818
SetMessageStatus(context.Context, uuid.UUID, MessageStatus) error
1919

2020
LatestQueuedMessage(ctx context.Context) (*Message, error)
21+
22+
IncrementMessageSendCount(context.Context, uuid.UUID) error
2123
}
2224
PersistenceProvider interface {
2325
CourierPersister() Persister

courier/smtp.go

+14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/tls"
66
"encoding/json"
77
"fmt"
8+
"net/textproto"
89
"strconv"
910
"time"
1011

@@ -199,6 +200,19 @@ func (c *courier) dispatchEmail(ctx context.Context, msg Message) error {
199200
// WithField("email_to", msg.Recipient).
200201
WithField("message_from", from).
201202
Error("Unable to send email using SMTP connection.")
203+
204+
var protoErr *textproto.Error
205+
if containsProtoErr := errors.As(err, &protoErr); containsProtoErr && protoErr.Code >= 500 {
206+
// See https://en.wikipedia.org/wiki/List_of_SMTP_server_return_codes
207+
// If the SMTP server responds with 5xx, sending the message should not be retried (without changing something about the request)
208+
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
209+
c.deps.Logger().
210+
WithError(err).
211+
WithField("message_id", msg.ID).
212+
Error(`Unable to reset the retried message's status to "abandoned".`)
213+
return err
214+
}
215+
}
202216
return errors.WithStack(err)
203217
}
204218

courier/test/persistence.go

+12
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,18 @@ func TestPersister(ctx context.Context, newNetworkUnlessExisting NetworkWrapper,
9595
require.ErrorIs(t, err, courier.ErrQueueEmpty)
9696
})
9797

98+
t.Run("case=incrementing send count", func(t *testing.T) {
99+
originalSendCount := messages[0].SendCount
100+
require.NoError(t, p.SetMessageStatus(ctx, messages[0].ID, courier.MessageStatusQueued))
101+
102+
require.NoError(t, p.IncrementMessageSendCount(ctx, messages[0].ID))
103+
ms, err := p.NextMessages(ctx, 1)
104+
require.NoError(t, err)
105+
require.Len(t, ms, 1)
106+
assert.Equal(t, messages[0].ID, ms[0].ID)
107+
assert.Equal(t, originalSendCount+1, ms[0].SendCount)
108+
})
109+
98110
t.Run("case=network", func(t *testing.T) {
99111
id := x.NewUUID()
100112

driver/config/config.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ const (
7474
ViperKeyCourierSMSRequestConfig = "courier.sms.request_config"
7575
ViperKeyCourierSMSEnabled = "courier.sms.enabled"
7676
ViperKeyCourierSMSFrom = "courier.sms.from"
77-
ViperKeyCourierMessageTTL = "courier.message_ttl"
77+
ViperKeyCourierMessageRetries = "courier.message_retries"
7878
ViperKeySecretsDefault = "secrets.default"
7979
ViperKeySecretsCookie = "secrets.cookie"
8080
ViperKeySecretsCipher = "secrets.cipher"
@@ -257,7 +257,7 @@ type (
257257
CourierTemplatesVerificationValid() *CourierEmailTemplate
258258
CourierTemplatesRecoveryInvalid() *CourierEmailTemplate
259259
CourierTemplatesRecoveryValid() *CourierEmailTemplate
260-
CourierMessageTTL() time.Duration
260+
CourierMessageRetries() int
261261
}
262262
)
263263

@@ -948,8 +948,8 @@ func (p *Config) CourierTemplatesRecoveryValid() *CourierEmailTemplate {
948948
return p.CourierTemplatesHelper(ViperKeyCourierTemplatesRecoveryValidEmail)
949949
}
950950

951-
func (p *Config) CourierMessageTTL() time.Duration {
952-
return p.p.DurationF(ViperKeyCourierMessageTTL, time.Hour)
951+
func (p *Config) CourierMessageRetries() int {
952+
return p.p.IntF(ViperKeyCourierMessageRetries, 5)
953953
}
954954

955955
func (p *Config) CourierSMTPHeaders() map[string]string {

driver/config/config_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1110,13 +1110,13 @@ func TestCourierMessageTTL(t *testing.T) {
11101110

11111111
t.Run("case=configs set", func(t *testing.T) {
11121112
conf, _ := config.New(ctx, logrusx.New("", ""), os.Stderr,
1113-
configx.WithConfigFiles("stub/.kratos.courier.messageTTL.yaml"), configx.SkipValidation())
1114-
assert.Equal(t, conf.CourierMessageTTL(), time.Duration(5*time.Minute))
1113+
configx.WithConfigFiles("stub/.kratos.courier.message_retries.yaml"), configx.SkipValidation())
1114+
assert.Equal(t, conf.CourierMessageRetries(), 10)
11151115
})
11161116

11171117
t.Run("case=defaults", func(t *testing.T) {
11181118
conf, _ := config.New(ctx, logrusx.New("", ""), os.Stderr, configx.SkipValidation())
1119-
assert.Equal(t, conf.CourierMessageTTL(), time.Duration(1*time.Hour))
1119+
assert.Equal(t, conf.CourierMessageRetries(), 5)
11201120
})
11211121
}
11221122

driver/config/stub/.kratos.courier.messageTTL.yaml

-2
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
courier:
2+
message_retries: 10

embedx/config.schema.json

+6-8
Original file line numberDiff line numberDiff line change
@@ -1549,15 +1549,13 @@
15491549
"/conf/courier-templates"
15501550
]
15511551
},
1552-
"message_ttl": {
1553-
"description": "Defines a Time-To-Live for courier messages that could not be delivered. After the defined TTL has expired for a message that message is abandoned.",
1554-
"type": "string",
1555-
"pattern": "^([0-9]+(ns|us|ms|s|m|h))+$",
1556-
"default": "1h",
1552+
"message_retries": {
1553+
"description": "Defines the maximum number of times the sending of a message is retried after it failed before it is marked as abandoned",
1554+
"type": "integer",
1555+
"default": 5,
15571556
"examples": [
1558-
"1h",
1559-
"1m",
1560-
"1s"
1557+
10,
1558+
60
15611559
]
15621560
},
15631561
"smtp": {

0 commit comments

Comments
 (0)