Skip to content

Commit a597bf6

Browse files
committed
Keeping Headers definition as byte. Marshalling/Unmarshalling Headers as signed numbers.
1 parent eede131 commit a597bf6

File tree

2 files changed

+40
-11
lines changed

2 files changed

+40
-11
lines changed

events/kafka.go

+37-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
package events
44

5+
import (
6+
"encoding/json"
7+
)
8+
59
type KafkaEvent struct {
610
EventSource string `json:"eventSource"`
711
EventSourceARN string `json:"eventSourceArn"`
@@ -10,12 +14,37 @@ type KafkaEvent struct {
1014
}
1115

1216
type KafkaRecord struct {
13-
Topic string `json:"topic"`
14-
Partition int64 `json:"partition"`
15-
Offset int64 `json:"offset"`
16-
Timestamp MilliSecondsEpochTime `json:"timestamp"`
17-
TimestampType string `json:"timestampType"`
18-
Key string `json:"key,omitempty"`
19-
Value string `json:"value,omitempty"`
20-
Headers []map[string][]int8 `json:"headers"`
17+
Topic string `json:"topic"`
18+
Partition int64 `json:"partition"`
19+
Offset int64 `json:"offset"`
20+
Timestamp MilliSecondsEpochTime `json:"timestamp"`
21+
TimestampType string `json:"timestampType"`
22+
Key string `json:"key,omitempty"`
23+
Value string `json:"value,omitempty"`
24+
Headers []map[string]JSONNumberBytes `json:"headers"`
25+
}
26+
27+
// JSONNumberBytes represents array of bytes in Headers field.
28+
type JSONNumberBytes []byte
29+
30+
// MarshalJSON converts byte array into array of signed integers.
31+
func (b JSONNumberBytes) MarshalJSON() ([]byte, error) {
32+
signedNumbers := make([]int8, len(b))
33+
for i, value := range b {
34+
signedNumbers[i] = int8(value)
35+
}
36+
return json.Marshal(signedNumbers)
37+
}
38+
39+
// UnmarshalJSON converts a given json with potential negative values into byte array.
40+
func (b *JSONNumberBytes) UnmarshalJSON(data []byte) error {
41+
var signedNumbers []int8
42+
if err := json.Unmarshal(data, &signedNumbers); err != nil {
43+
return err
44+
}
45+
*b = make(JSONNumberBytes, len(signedNumbers))
46+
for i, value := range signedNumbers {
47+
(*b)[i] = byte(value)
48+
}
49+
return nil
2150
}

events/kafka_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ func TestKafkaEventMarshaling(t *testing.T) {
2121
}
2222

2323
// expected values for header
24-
var headerValues [5]int8
24+
var headerValues [5]byte
2525
headerValues[0] = 118
26-
headerValues[1] = -36
26+
headerValues[1] = 220 // -36 + 256
2727
headerValues[2] = 0
2828
headerValues[3] = 127
29-
headerValues[4] = -128
29+
headerValues[4] = 128 // -128 + 256
3030

3131
assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092")
3232
assert.Equal(t, inputEvent.EventSource, "aws:kafka")

0 commit comments

Comments
 (0)