Skip to content

Commit 85e7bc7

Browse files
authored
Merge pull request #385 from ianpyw/master
Add bootstarpServers and headers to Kafka event
2 parents ee817dd + 151e061 commit 85e7bc7

File tree

3 files changed

+37
-13
lines changed

3 files changed

+37
-13
lines changed

events/kafka.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
package events
44

55
type KafkaEvent struct {
6-
EventSource string `json:"eventSource"`
7-
EventSourceARN string `json:"eventSourceArn"`
8-
Records map[string][]KafkaRecord `json:"records"`
6+
EventSource string `json:"eventSource"`
7+
EventSourceARN string `json:"eventSourceArn"`
8+
Records map[string][]KafkaRecord `json:"records"`
9+
BootstrapServers string `json:"bootstrapServers"`
910
}
1011

1112
type KafkaRecord struct {
@@ -16,4 +17,5 @@ type KafkaRecord struct {
1617
TimestampType string `json:"timestampType"`
1718
Key string `json:"key,omitempty"`
1819
Value string `json:"value,omitempty"`
20+
Headers []map[string][]byte `json:"headers"`
1921
}

events/kafka_test.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,25 @@ func TestKafkaEventMarshaling(t *testing.T) {
2020
t.Errorf("could not unmarshal event. details: %v", err)
2121
}
2222

23+
assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092")
24+
assert.Equal(t, inputEvent.EventSource, "aws:kafka")
25+
assert.Equal(t, inputEvent.EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4")
2326
for _, records := range inputEvent.Records {
2427
for _, record := range records {
2528
utc := record.Timestamp.UTC()
2629
assert.Equal(t, 2020, utc.Year())
30+
assert.Equal(t, record.Key, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj")
31+
assert.Equal(t, record.Value, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj")
32+
33+
for _, header := range record.Headers {
34+
for key, value := range header {
35+
assert.Equal(t, key, "headerKey")
36+
var headerValue string = string(value)
37+
assert.Equal(t, headerValue, "headerValue")
38+
}
39+
}
2740
}
2841
}
29-
30-
// 3. serialize to JSON
31-
outputJson, err := json.Marshal(inputEvent)
32-
if err != nil {
33-
t.Errorf("could not marshal event. details: %v", err)
34-
}
35-
36-
// 4. check result
37-
assert.JSONEq(t, string(inputJson), string(outputJson))
3842
}
3943

4044
func TestKafkaMarshalingMalformedJson(t *testing.T) {

events/testdata/kafka-event.json

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"eventSource": "aws:kafka",
33
"eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4",
4+
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
45
"records": {
56
"AWSKafkaTopic-0": [
67
{
@@ -10,7 +11,24 @@
1011
"timestamp": 1595035749700,
1112
"timestampType": "CREATE_TIME",
1213
"key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
13-
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj"
14+
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
15+
"headers": [
16+
{
17+
"headerKey": [
18+
104,
19+
101,
20+
97,
21+
100,
22+
101,
23+
114,
24+
86,
25+
97,
26+
108,
27+
117,
28+
101
29+
]
30+
}
31+
]
1432
}
1533
]
1634
}

0 commit comments

Comments
 (0)