-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata-pipeline.gpt
87 lines (68 loc) · 2.76 KB
/
data-pipeline.gpt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
tools: data-transform, data-aggregate, start-kafka-broker, stop-kafka-broker, sys.exec, github.com/yglcode/chat-kafka
Do the following step by step, don't move on to the next step until current step is done.
1. start a local kafka broker with name broker.
2. connect to this broker.
3. create a "data-json" topic, and a "data-raw" topic with 3 partitions.
4. pipe container broker's logs (don't follow) to "data-raw".
5. transform data from "data-raw" into json object, and send it to "data-json".
6. aggregate data from "data-json".
7. only summarize the aggregation result in pretty json format.
8. stop and remove broker.
---
name: data-transform
description: receive data from input topic, transform it into json object, and send it to output topic.
args: intopic: kafka topic to receive data from
args: outtopic: kafka topic to send data to
tools: sys.write, sys.read
#!/usr/bin/env bash
# update benthos config file for input/output topics
sed "s/<intopic>/${intopic}/;s/<outtopic>/${outtopic}/;s/<groupid>/$RANDOM/" config_transform.tmpl > config_transform.yaml
OUTDIR="/tmp/pl_xform$(date +"%d%h%m%S")"
OUTTEXT="$OUTDIR/out.txt"
ERRTEXT="$OUTDIR/err.txt"
mkdir $OUTDIR
# run benthos processor
docker run --rm --network host -v $(pwd)/config_transform.yaml:/benthos.yaml jeffail/benthos > $OUTTEXT 2> $ERRTEXT
---
name: data-aggregate
description: aggregate data from input topic and produce final result.
args: intopic: kafka topic to receive data from
tools: sys.write, sys.read
#!/usr/bin/env bash
# update benthos config file for input topic
sed "s/<intopic>/${intopic}/;s/<groupid>/$RANDOM/" config_aggregate.tmpl > config_aggregate.yaml
OUTDIR="/tmp/pl_agg$(date +"%d%h%m%S")"
OUTTEXT="$OUTDIR/out.txt"
ERRTEXT="$OUTDIR/err.txt"
mkdir $OUTDIR
# run benthos processor
docker run --rm --network host -v $(pwd)/config_aggregate.yaml:/benthos.yaml jeffail/benthos > $OUTTEXT 2> $ERRTEXT
cat $OUTTEXT
---
name: start-kafka-broker
description: start a local kafka broker
args: broker: name of broker
tools: sys.exec
#!/usr/bin/env bash
if docker ps -a --filter name=${broker} --format {{.Names}} | grep -Eq "^${broker}\$"; then
if docker ps --filter name=${broker} --format {{.Names}} | grep -Eq "^${broker}\$"; then
echo "Local kafka broker is running"
else
docker start ${broker}
echo "Restart local kafka broker"
fi
else
docker run -d --name ${broker} --network host apache/kafka
echo "Start local kafka broker"
fi
---
name: stop-kafka-broker
description: clean up, stop and remove local kafka broker
args: broker: name of broker
tools: sys.exec
#!/usr/bin/env bash
if docker ps -a --filter name=${broker} --format {{.Names}} | grep -Eq "^${broker}\$"; then
docker stop ${broker}
docker rm ${broker}
echo "Stop and remove local kafka broker"
fi