Skip to content

Commit 5cdb227

Browse files
Support update or remove topic properties (#1381)
1 parent ad9a7d1 commit 5cdb227

File tree

2 files changed

+52
-10
lines changed

2 files changed

+52
-10
lines changed

pulsaradmin/pkg/admin/topic.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ type Topics interface {
5151
// GetProperties returns the properties of a topic
5252
GetProperties(topic utils.TopicName) (map[string]string, error)
5353

54+
// UpdateProperties updates the properties of a topic
55+
UpdateProperties(topic utils.TopicName, properties map[string]string) error
56+
57+
// RemoveProperty removes a property with the given key of a topic
58+
RemoveProperty(topic utils.TopicName, key string) error
59+
5460
// Delete a topic, this function can delete both partitioned or non-partitioned topic
5561
//
5662
// @param topic
@@ -433,6 +439,16 @@ func (t *topics) GetProperties(topic utils.TopicName) (map[string]string, error)
433439
return properties, err
434440
}
435441

442+
func (t *topics) UpdateProperties(topic utils.TopicName, properties map[string]string) error {
443+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties")
444+
return t.pulsar.Client.Put(endpoint, properties)
445+
}
446+
447+
func (t *topics) RemoveProperty(topic utils.TopicName, key string) error {
448+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties")
449+
return t.pulsar.Client.DeleteWithQueryParams(endpoint, map[string]string{"key": key})
450+
}
451+
436452
func (t *topics) Delete(topic utils.TopicName, force bool, nonPartitioned bool) error {
437453
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
438454
if nonPartitioned {

pulsaradmin/pkg/admin/topic_test.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,37 +65,63 @@ func TestCreateTopic(t *testing.T) {
6565
t.Error("Couldn't find topic: " + topic)
6666
}
6767

68-
func TestTopics_CreateWithProperties(t *testing.T) {
68+
func TestTopics_Properties(t *testing.T) {
69+
t.Run("NonPartitioned", func(t *testing.T) {
70+
internalTestTopicsProperties(t, 0)
71+
})
72+
t.Run("Partitioned", func(t *testing.T) {
73+
internalTestTopicsProperties(t, 4)
74+
})
75+
}
76+
77+
func verifyTopicProperties(t *testing.T, admin Client, topic *utils.TopicName,
78+
expected map[string]string) {
79+
properties, err := admin.Topics().GetProperties(*topic)
80+
assert.NoError(t, err)
81+
assert.Equal(t, expected, properties)
82+
}
83+
84+
func internalTestTopicsProperties(t *testing.T, partitions int) {
6985
topic := newTopicName()
7086
cfg := &config.Config{}
7187
admin, err := New(cfg)
7288
assert.NoError(t, err)
7389
assert.NotNil(t, admin)
7490

75-
// Create non-partition topic
7691
topicName, err := utils.GetTopicName(topic)
7792
assert.NoError(t, err)
78-
err = admin.Topics().CreateWithProperties(*topicName, 0, map[string]string{
93+
err = admin.Topics().CreateWithProperties(*topicName, partitions, map[string]string{
7994
"key1": "value1",
8095
})
8196
assert.NoError(t, err)
97+
verifyTopicProperties(t, admin, topicName, map[string]string{"key1": "value1"})
8298

8399
properties, err := admin.Topics().GetProperties(*topicName)
84100
assert.NoError(t, err)
85101
assert.Equal(t, properties["key1"], "value1")
86102

87-
// Create partition topic
88-
topic = newTopicName()
89-
topicName, err = utils.GetTopicName(topic)
103+
newProperties := map[string]string{
104+
"key1": "value1-updated",
105+
"key2": "value2",
106+
}
107+
err = admin.Topics().UpdateProperties(*topicName, newProperties)
108+
assert.NoError(t, err)
109+
verifyTopicProperties(t, admin, topicName, newProperties)
110+
111+
err = admin.Topics().UpdateProperties(*topicName, map[string]string{"key3": "value3"})
90112
assert.NoError(t, err)
91-
err = admin.Topics().CreateWithProperties(*topicName, 4, map[string]string{
113+
verifyTopicProperties(t, admin, topicName, map[string]string{
114+
"key1": "value1-updated",
92115
"key2": "value2",
116+
"key3": "value3",
93117
})
94-
assert.NoError(t, err)
95118

96-
properties, err = admin.Topics().GetProperties(*topicName)
119+
err = admin.Topics().RemoveProperty(*topicName, "key1")
97120
assert.NoError(t, err)
98-
assert.Equal(t, properties["key2"], "value2")
121+
verifyTopicProperties(t, admin, topicName, map[string]string{
122+
"key2": "value2",
123+
"key3": "value3",
124+
})
99125
}
100126

101127
func TestPartitionState(t *testing.T) {

0 commit comments

Comments
 (0)