-
๋ธ๋ก์ปค
- ์นดํ์นด ํด๋ฌ์คํฐ๋ ์ฌ๋ฌ๊ฐ์ ๋ธ๋ก์ปค (์๋ฒ)
- ๊ฐ๊ฐ์ ๋ธ๋ก์ปค๋ id๊ฐ ์๋ค
- Kafka ํด๋ฌ์คํฐ์ ํ ๋ธ๋ก์ปค์ ์ฐ๊ฒฐํ๋ฉด, ํด๋น ๋ธ๋ก์ปค๊ฐ ํด๋ฌ์คํฐ์ ๋ฉํ๋ฐ์ดํฐ(๋ค๋ฅธ ๋ธ๋ก์ปค ์ ๋ณด ํฌํจ)๋ฅผ ์ ๊ณตํ์ฌ ํด๋ฌ์คํฐ ์ ์ฒด์ ํต์ ํ ์ ์๊ฒ ๋๋ค.
- ์ต์ ์ถ์ฒ ๋ธ๋ก์ปค๋ 3๊ฐ -> ๋ฐฑ๊ฐ ๋๊ฐ ๋ง๋ค ์ ๋ ์๋ค
- ํ ํฝ์ ํํฐ์
๋ค์ broker๋ค์ ๋๋๋ค => ํํฐ์
๊ณผ ๋ธ๋ก์ปค๊ฐ ๋ง์์ง ์๋ก ์ฌ๋ฌ ๊ณณ์ ๋๋๋ค.
-
Bootstrap Server
- Kafka ํด๋ฌ์คํฐ์ ํด๋ผ์ด์ธํธ(์: Producer๋ Consumer) ๊ฐ์ ์ด๊ธฐ ์ฐ๊ฒฐ์ ์ค์ ํ๋ ๋ฐ ํ์ํ ์๋ฒ ์ ๋ณด๋ฅผ ์ ๊ณตํฉ๋๋ค.
- ํด๋ผ์ด์ธํธ๋ Bootstrap Server๋ก ์ด๊ธฐ ์ฐ๊ฒฐ์ ์๋ํ๊ณ , ํด๋ฌ์คํฐ์ ์ ์ฒด ๋ฉํ๋ฐ์ดํฐ(๋ธ๋ก์ปค ID, ํํฐ์ ์ ๋ณด, ํ ํฝ ์ ๋ณด ๋ฑ)๋ฅผ ๋ฐ์์ต๋๋ค.
- bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 ๊ฐ์ด ์ค์ ํ๋ค.
- ์ฌ๊ธฐ์ broker1, broker2, broker3๋ Kafka ๋ธ๋ก์ปค์ ์ฃผ์์ด๋ฉฐ, ํด๋ผ์ด์ธํธ๋ ์ด๋ค ์ค ํ๋์ ์ฐ๊ฒฐํด ํด๋ฌ์คํฐ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ป์ต๋๋ค.
-
Zookeeper
์์ฐ ํ๊ฒฝ์์๋ **Zookeeper ๋ ธ๋ ์ฌ๋ฌ ๊ฐ(Ensemble)**๋ฅผ ์ด์ํ๋ ๊ฒ์ด ์ฅ์ ๋์์ ์ํด ๋งค์ฐ ์ค์ํฉ๋๋ค. ๋ง์ฝ Zookeeper ์๋ฒ ํ๋๊ฐ ๋ค์ด๋๋๋ผ๋, ๋๋ค์์ ๋ ธ๋๊ฐ ์ ์์ ์ผ๋ก ์๋ํ๊ณ ์๋ค๋ฉด Zookeeper Ensemble๋ ๊ณ์ ๊ธฐ๋ฅ์ ์ํํ ์ ์์ต๋๋ค.
-
๋ธ๋ก์ปค ๋ฑ๋ก:
- Kafka ๋ธ๋ก์ปค๊ฐ ์์๋๋ฉด Zookeeper์ ์์ ์ ๋ฑ๋กํฉ๋๋ค.
- Zookeeper๋ ํด๋ฌ์คํฐ ๋ด ๋ชจ๋ ํ์ฑ ๋ธ๋ก์ปค์ ๋ชฉ๋ก์ ์ ์งํฉ๋๋ค.
-
๋ฆฌ๋ ์ ์ถ:
- ๊ฐ ํํฐ์ ์ ๋ํด Zookeeper๋ ๋ฆฌ๋ ๋ธ๋ก์ปค๋ฅผ ์ถ์ ํฉ๋๋ค.
- ๋ง์ฝ ๋ฆฌ๋ ๋ธ๋ก์ปค๊ฐ ๋ค์ด๋๋ฉด, Zookeeper๋ ๋ณต์ ๋ณธ ์ค์์ ์๋ก์ด ๋ฆฌ๋๋ฅผ ์ ์ถํ์ฌ ์์ ์ ์ด์ด๊ฐ๋๋ค. (์ปจํธ๋กค๋ฌ๊ฐ ์๋ก์ด ๋ฆฌ๋๋ฅผ ์ ์ถํ๊ณ Zookeeper์ ๋ฐ์)
-
ํด๋ฌ์คํฐ ๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ:
- Zookeeper๋ Kafka ํด๋ฌ์คํฐ์ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํฉ๋๋ค.
- ์ฌ๊ธฐ์๋ ํ ํฝ, ํํฐ์ , ๋ณต์ ๋ณธ ๋ฑ์ ์ ๋ณด๊ฐ ํฌํจ๋ฉ๋๋ค.
-
์๋น์ ์คํ์ :
- Zookeeper๋ ์๋น์๊ฐ ํน์ ํ ํฝ์์ ํ์ฌ ์ฝ๊ณ ์๋ ์คํ์ ์ ์ ์ฅํ ์ ์์ต๋๋ค.
- ํ์ง๋ง ์ต์ Kafka ๋ฒ์ ์์๋ ์๋น์ ์คํ์
์ Kafka ์์ฒด์ ํ ํฝ(์:
__consumer_offsets
)์ ์ ์ฅํฉ๋๋ค. ์์ ์๋ Zookeeper๊ฐ ์คํ์ ์ ์ ์ฅํ์ผ๋, ํ์ฌ๋ Kafka ์์ฒด๊ฐ ์ด๋ฅผ ๊ด๋ฆฌํฉ๋๋ค.
-
Zookeeper ๊ธฐ๋ฐ vs KRaft(Kafka Raft) ๊ธฐ๋ฐ
- Kafka Controller๋ ํด๋ฌ์คํฐ๋ฅผ ๊ด๋ฆฌํ๋ ์ค์ ๋ ธ๋์ด๋ฉฐ, Zookeeper๋ฅผ ์ฌ์ฉํ ์๋ ์๊ณ KRaft ๋ฐฉ์์ ์ฌ์ฉํ ์๋ ์์.
- Zookeeper ๊ธฐ๋ฐ Kafka (๊ธฐ์กด ๋ฐฉ์)
- Zookeeper๊ฐ ํด๋ฌ์คํฐ์ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ๊ด๋ฆฌ
- ๋ธ๋ก์ปค ์ค ํ๋๊ฐ Controller๋ก ์ ์ถ๋์ด ๋ฉํ๋ฐ์ดํฐ ๋ณ๊ฒฝ์ ๋ด๋น
- ๋ง์ฝ ์ปจํธ๋กค๋ฌ๊ฐ ๋ค์ด๋๋ฉด Zookeeper๊ฐ ์๋ก์ด ์ปจํธ๋กค๋ฌ๋ฅผ ์ ์ถ
- ๋จ์ : Zookeeper์์ ํต์ ์ด ํ์ํ์ฌ ๋ณต์ก์ฑ์ด ์ฆ๊ฐํ๊ณ ์ง์ฐ ์๊ฐ์ด ๋ฐ์
- KRaft
- Zookeeper ์์ด Kafka ๋ด๋ถ์์ ์ปจํธ๋กค๋ฌ๋ฅผ ๊ด๋ฆฌ
- ์ฌ๋ฌ ๊ฐ์ Controller ๋ ธ๋๊ฐ ์กด์ฌํ๋ฉฐ, ์ด ์ค ํ๋๊ฐ Quorum Leader(์ต๊ณ ๊ด๋ฆฌ์)๊ฐ ๋จ. [controller1, controller2, broker1, broker2]
- Raft Consensus ์๊ณ ๋ฆฌ์ฆ์ ์ฌ์ฉํ์ฌ ๋ค์๊ฒฐ ๋ฐฉ์์ผ๋ก ํด๋ฌ์คํฐ ์ํ๋ฅผ ๋๊ธฐํ.
- ์ฅ์ : Zookeeper ์์ด Kafka๋ง์ผ๋ก ํด๋ฌ์คํฐ๋ฅผ ๊ด๋ฆฌํ๋ฏ๋ก ๋ ๋น ๋ฅด๊ณ ์์ ์ .
โ Kafka์ Zookeeper์ ์ํธ์์ฉ ์์ฝ
- Kafka ๋ธ๋ก์ปค๋ Zookeeper์ ephemeral ๋
ธ๋๋ก ์์ ์ ๋ฑ๋ก (
/brokers/ids/<broker_id>
) - ์ด ๋ ธ๋๋ ๋ธ๋ก์ปค๊ฐ ์ฃฝ๊ฑฐ๋ ๋คํธ์ํฌ๊ฐ ๋๊ธฐ๋ฉด ์๋์ผ๋ก ์ญ์ ๋จ
- โ Zookeeper๊ฐ ํํธ๋นํธ๋ฅผ ๋ณด๋ด๋ ๊ฑด ์๋
- โ Kafka ๋ธ๋ก์ปค๊ฐ Zookeeper์ ์ธ์ ์ ์ ์งํจ์ผ๋ก์จ ์ด์์์์ ํํ
- ์ปจํธ๋กค๋ฌ๋ Zookeeper์ watch๋ฅผ ๊ฑธ์ด ๋ ธ๋ ์ญ์ ์ด๋ฒคํธ ๊ฐ์ง โ ๋ธ๋ก์ปค ์ฅ์ ํ๋จ
- ์ปจํธ๋กค๋ฌ๊ฐ ๋ธ๋ก์ปค ์ฅ์ ๋ฅผ ๊ฐ์งํ๋ฉด โ ISR ์ค ํ๋๋ฅผ ์ ๋ฆฌ๋๋ก ์ ์ถ
- Zookeeper๋ ๋จ์ํ ์ํ๋ฅผ ์ ์ฅํ๊ณ ๊ฐ์ ํธ๋ฆฌ๊ฑฐ๋ง ์ ๊ณต, ์ ์ถ ๋ก์ง์ Kafka ์ปจํธ๋กค๋ฌ๊ฐ ์ํ
- Zookeeper๋ ํ ํฝ, ํํฐ์ , ๋ธ๋ก์ปค ๋ฑ ํด๋ฌ์คํฐ ์ ๋ณด๋ฅผ ์ ์ฅ
- ์ต์ Kafka๋ KRaft ๋ชจ๋์์ Zookeeper ์์ด๋ ์์ฒด ๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ ๊ฐ๋ฅ
- ์์ ์ Zookeeper๊ฐ ์คํ์ ์ ์ฅ
- ์ต์ ๋ฒ์ ์
__consumer_offsets
ํ ํฝ์ ์ฌ์ฉํ์ฌ Kafka ๋ด๋ถ์ ์ ์ฅ
ํญ๋ชฉ | Zookeeper ๊ธฐ๋ฐ | KRaft ๊ธฐ๋ฐ |
---|---|---|
์ปจํธ๋กค๋ฌ ์ ์ถ | Zookeeper๊ฐ ์ ์ถ ์ฐธ์ฌ | Raft ์๊ณ ๋ฆฌ์ฆ ์ฌ์ฉ |
๋ฉํ๋ฐ์ดํฐ ์ ์ฅ | Zookeeper์ ์ ์ฅ | Kafka ๋ด๋ถ ์ ์ฅ |
ํํธ๋นํธ ๊ฐ์ง | Zookeeper ์ธ์ ๊ฐ์ง | Kafka ๋ด๋ถ ์ฒ๋ฆฌ |
์ฅ์ | ์์ ์ ์ด์ง๋ง ๋ณต์ก | ๋ ๋จ์ํ๊ณ ๋น ๋ฆ (ZK ์ ๊ฑฐ) |
-
Topic
- ์ด๋ค ๋ฐ์ดํฐ ํํ๋ ์ ์ฅํ ์ ์๋ ํน์ ๋ฐ์ดํฐ ์คํธ๋ฆผ (๋ฉ์์ง์ ์ํ์ค)
- db์ ํ ์ด๋ธ๊ณผ ๋น์ทํ ๊ฐ๋
- topic name์ผ๋ก ๊ตฌ๋ถ์ง์ด์ง
- producer๋ก ํน์ ํ ํฝ์ ๋ฐ์ดํฐ๋ฅผ ๋ฃ๊ณ consumer๋ก ํ ํฝ์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋๋ค.
- ์์ : ํธ๋ญ์ด ์นดํ์นด ์๋ฒ๋ก ์์ ์ ์์น ์ ๋ณด๋ฅผ ๋ณด๋ -> ์์น ์ ๋ณด์๋ TRUCK ID์ TRUCK POSITON์ ๋ํ ์ ๋ณด๊ฐ ๋ด๊ฒจ์์ -> ์ด๊ฑธ trucks_gps ๋ผ๋ ํ ํฝ์ ๋ง๋ค์ด์ ๊ฑฐ๊ธฐ์๋ค๊ฐ ์ ์ฌํจ
-
Topic ๋ณต์
- Replication Factor๋ ์ผ๋ฐ์ ์ผ๋ก 3์ผ๋ก ์ค์ ๋๋ฉฐ, ๋ธ๋ก์ปค ๊ฐ์๋ฅผ ์ด๊ณผํ ์ ์๋ค.
- ํ๋์ ํํฐ์ ์ด ๋ฆฌ๋์ isr๋ก ๋๋์ด ๊ฐ๊ฐ ๋ค๋ฅธ ๋ธ๋ก์ปค์ ์ ์ฅ๋จ
- ๋ฆฌ๋๋ฅผ ๊ฐ์ง ๋ธ๋ก์ปค๊ฐ ์ฃฝ์ผ๋ฉด ๋ค๋ฅธ ๋ธ๋ก์ปค์ isr์ ๋ฆฌ๋๋ก ๋ฐ๊ฟ
- ์นดํ์นด 2.4๋ถํฐ๋ ํ๋ก๋์๋ leader์ ์ฐ๊ณ ์ปจ์๋จธ๋ isr(ํ๋ก์)์์ ์ฝ๋ ํํ๋ ๋ฑ์ฅํ๋ค. ๋ง์ฝ leader๋ณด๋ค isr์์ ์ฝ๋ ๊ฒ ๋ ํจ์จ์ ์ด๋ฉด ์ฌ์ฉ.
-
Partition
- topic์ ์ฌ๋ฌ๊ฐ์ ํํฐ์ ์ผ๋ก ๊ตฌ์ฑ๋ ์ ์๋ค.
- ๊ฐ๊ฐ์ ํํฐ์ ๋ค์ ๊ฐ์์ ์์๋ก ์ ๋ ฌ๋์ด ์๋ค.
- ํํฐ์ ๋ค์ ๊ฐ์ offest์ด๋ผ๊ณ ๋ถ๋ฅด๋ incremental id๊ฐ ์๋ค.
- offset์ ์ฌ์ฌ์ฉ๋์ง ์๋๋ค. ์ด์ ๋ฉ์์ง๊ฐ ์ ๋ถ ์ญ์ ๋์ด๋...
- ํํฐ์ ์ ์ฐ์ฌ์ง ๋ฐ์ดํฐ๋ ๋ณ๊ฒฝ ๋ถ๊ฐ๋ฅํ๋ค. (IMMUTABLE)
- ๋ฐ์ดํฐ๋ ํน์ ๊ธฐ๊ฐ๊น์ง๋ ์ ์งํ๋ค. (๋ํดํธ 7์ผ)
-
Partitioner
- DefaultPartitioner (๊ธฐ๋ณธ๊ฐ) Key๊ฐ ์์ผ๋ฉด ํด์ ๊ธฐ๋ฐ, ์์ผ๋ฉด Round Robin ํค ๊ธฐ๋ฐ ์์ ์ ์ง, ๋ถํ ๋ถ์ฐ ํน์ ํค์ ํธ๋ํฝ ์ง์ค ๊ฐ๋ฅ
- UniformStickyPartitioner ํค ์๋ ๋ฉ์์ง๋ฅผ ํน์ ํํฐ์ ์ ์ผ์ ๋ ๋ฌถ์ด์ ์ ์ก ๋ฐฐ์น ์ต์ ํ, ์์ ์ ์ง ๊ฐ๋ฅ ํน์ ํํฐ์ ๊ณผ๋ถํ ๊ฐ๋ฅ
- Custom Partitioner ๋น์ฆ๋์ค ๋ก์ง์ ๋ฐ๋ผ ์ง์ ํํฐ์ ์ง์ ๋ก๋ ๋ฐธ๋ฐ์ฑ ์ต์ ํ ๊ฐ๋ฅ ๊ตฌํ์ด ๋ณต์กํ ์ ์์
1๏ธโฃ ๋ฆฌ๋ ๋ธ๋ก์ปค ์ฅ์ ๋ฐ์ : ํน์ ๋ธ๋ก์ปค๊ฐ ๋ค์ด๋๋ฉด์ ๋ฆฌ๋ ํํฐ์ ์ด ๋นํ์ฑํ๋จ.
2๏ธโฃ Kafka ์ปจํธ๋กค๋ฌ๊ฐ ์ฅ์ ๊ฐ์ง ๋ฐ ๋ฆฌ๋ ๊ต์ฒด : ์ปจํธ๋กค๋ฌ ๋ธ๋ก์ปค๊ฐ ๋ค์ด๋ ๋ธ๋ก์ปค๋ฅผ ๊ฐ์งํ๊ณ , ํด๋น ํํฐ์ ์ ISR ์ค ํ๋๋ฅผ ์๋ก์ด ๋ฆฌ๋๋ก ์น๊ฒฉํจ. (Zookeeper/KRaft๋ ์ปจํธ๋กค๋ฌ ์ ์ถ์ฉ)
3๏ธโฃ ๋ฉํ๋ฐ์ดํฐ ๊ฐฑ์ ๋ฐ ๋ธ๋ก์ปค ์ ํ : ์ปจํธ๋กค๋ฌ๋ ์๋ก์ด ๋ฆฌ๋ ์ ๋ณด๋ฅผ ๋ชจ๋ ๋ธ๋ก์ปค์ ์ ํ. ๋ธ๋ก์ปค๋ค์ ๋ฉํ๋ฐ์ดํฐ ํ ์ด๋ธ์ ๊ฐฑ์ .
4๏ธโฃ Producer/Consumer๋ ์๋ฌ ๊ฐ์ง ํ ์๋ ๋ฉํ๋ฐ์ดํฐ ๊ฐฑ์ : NotLeaderForPartition ์์ธ ๋ฑ์ ํตํด ๋ฆฌ๋ ๋ณ๊ฒฝ์ ๊ฐ์งํ๊ณ , ์๋์ผ๋ก ์ ๋ฆฌ๋๋ก ์์ฒญ์ ์ฌ์๋ํจ.
-
Producer
- ํ ํฝ์ ๋ฐ์ดํฐ๋ฅผ ์
- ์นดํ์นด ๋ฉ์์ง ํํ {key-binary, value -binary, ์์ถ ํ์ gzip ๋ฑ๋ฑ, ํค๋ (์ต์ ๋) ํค๋ฒจ๋ฅ ํํ, ํํฐ์ ๊ณผ offset, ํ์์คํฌํ}
- ์์ฐ์๋ ์ด๋ ํํฐ์
์ ์ธ์ง๋ฅผ ์๊ณ ์๋ค. ์ฆ ํํฐ์
๊ฒฐ์ ์ ์์ฐ์๊ฐ ํ๋ค. (ํ๋ก๋์ ๋ด๋ถ์ ํํฐ์
๋๊ฐ ๊ฒฐ์ ํจ)
- key๋ฅผ ์ค์ ํด์ ๋ณด๋ด๋ฉด ํด์๊ฐ์ ๊ณ์ฐํด์ ์ด๋ ํํฐ์ ์ ๋ฃ์์ง ์ ํจ. ํค๊ฐ ๊ฐ์ ๋ฉ์์ง๋ ํ ํํฐ์ ๋ด์์ ์์๊ฐ ์ ์ง๋๋ค.
- key๋ฅผ ์ค์ ํ์ง ์์ผ๋ฉด round robin ๋ฐฉ์์ผ๋ก ํํฐ์ ์ ๋ฃ์. ํ์ฌ๋ sticky ๋ฐฉ์์ ์ฌ์ฉํจ.
- Recovery
- ๋ธ๋ก์ปค ์ผ์์ ๋ค์ด: retries ์ค์ ์ ํ์ฉํ์ฌ ์๋ ์ฌ์๋
- ๋ฆฌ๋ ๋ธ๋ก์ปค ์ฅ์ : ๋ฉํ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ ํ ์๋ก์ด ๋ฆฌ๋๋ก ์ ํ
- ๋คํธ์ํฌ ์ฅ์ : delivery.timeout.ms(์์ฒญ ๋ฐ ์ฌ์๋๊น์ง ํฌํจํ ํ์์์)์ request.timeout.ms(ํ๋์ ์์ฒญ์ ๋ํ ํ์์์)๋ก ํ์์์ ์ค์
- ์ฑ๋ฅ ์ต์ ํ: linger.ms(๋ฐฐ์น ์๋ ๋๊ธฐ์๊ฐ)์ batch.size๋ฅผ ์กฐ์ ํ์ฌ ๋ฉ์์ง ๋ฐฐ์น ์ฒ๋ฆฌ
- ๋ฐ์ดํฐ ์ ์ค ๋ฐฉ์ง: acks=all ๋ฐ min.insync.replicas ์ค์ ์ผ๋ก ์์ ํ ์ ์ฅ ๋ณด์ฅ (min.insync.replicas๋ ๋ฆฌ๋๋ฅผ ํฌํจํด ์ต์ํ ๋ช ๊ฐ์ ๋ณต์ ๋ณธ์ด ๋ฐ์ดํฐ์ ๋๊ธฐํ๋์ด ์์ด์ผ ํ๋์ง ์ง์ ํ๋ ๊ฒ, ๋ธ๋ก์ปค์ชฝ ์ค์ ์ด๋ค. ๋ง์ฝ ๋ธ๋ก์ปค๊ฐ ์ฃฝ์ด์ replicas๊ฐ 1์ด ๋๋ฉด min.insync.replicas=2๋ก ์ค์ ๋์ด ์๋ค๋ฉด ack ์๋ณด๋)
-
๋ฐ์ดํฐ ์ ์ก
- properties๋ก ์ฐ๊ฒฐํ ์นดํ์นด์ ๋ํ ์ ๋ณด ์ค์
- send๋ก ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ด๋ฉด ์์์ ๋ฐฐ์น๋ก ๋ณด๋ -> ์ด๋์ ๋ ๋ฐ์ดํฐ๋ฅผ ๋ชจ์์ ๋ณด๋ธ๋ค.
- flushํ๋ฉด ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ธ๋ค. close ์ง์ ์ ํด์ค์ผํจ.
-
Retry
- Producer๊ฐ ๋ฉ์์ง๋ฅผ ๋ธ๋ก์ปค์ ์ ์กํ ๋ ์คํจํ๋ฉด ์๋์ผ๋ก ์ฌ์๋
- ๋ฉ์์ง ์ ์ก ์คํจ ์ ์ฆ์ ์ฌ์๋ํ์ง ์๊ณ retry.backoff.ms ๋งํผ ๋๊ธฐ
- retries ํ์๋งํผ ์ฌ์๋ ํ์๋ ์คํจํ๋ฉด ์์ธ ๋ฐ์ (TimeoutException)
- acks=all์ ์ฌ์ฉํ๋ฉด ๋ฆฌ๋ ๋ธ๋ก์ปค๊ฐ ๋ณ๊ฒฝ๋ ๊ฒฝ์ฐ์๋ ์ฌ์๋ ๊ฐ๋ฅ
- retries ๊ฐ์ด ํฌ๋ฉด ์ค๋ณต ๋ฉ์์ง๊ฐ ์ ์ก๋ ๊ฐ๋ฅ์ฑ์ด ์์ โ Idempotence ์ค์ (enable.idempotence=true) ์ถ์ฒ (๊ฐ์ ๋ฉ์์ง๋ฅผ ์ฌ๋ฌ ๋ฒ ์ ์กํด๋ ์ค์ ๋ก ์นดํ์นด์๋ ํ ๋ฒ๋ง ์ ์ฅ) ํ๋ก๋์id + seq ๋ก ์ค๋ณตํ์ธ (seq๋ ํ๋ก๋์๊ฐ ๋ถ์ธ๋ค.)
- max.in.flight.requests.per.connection ๊ฐ์ด ๋๋ฌด ํฌ๋ฉด ์์ ๋ณด์ฅ์ด ์ด๋ ค์ธ ์ ์์. ํนํ, ๋คํธ์ํฌ ์ฅ์ ๋ ์๋ต ์์์ ์ฐจ์ด๋ก ์ธํด ๋ฐ์ํ ์ ์์ต๋๋ค.
-
๋ฉฑ๋ฑ
- Kafka์์ ๊ฐ Producer๋ ๊ณ ์ ํ Producer ID (PID)๋ฅผ ๊ฐ์ง
- ์๋ก์ด Producer๊ฐ ์์ฑ๋ ๋๋ง๋ค ์๋ก์ด PID๊ฐ ๋ถ์ฌ๋จ (ex. ์ ํ๋ฆฌ์ผ์ด์ ์ด ์ฌ์์๋๋ฉด ์๋ก์ด PID๊ฐ ์์ฑ๋จ)
- Producer๊ฐ ์ ์กํ๋ ๊ฐ ๋ฉ์์ง์ ์์ฐจ์ ์ธ Sequence Number๋ฅผ ๋ถ์ฌ
- Kafka Broker๋ ๋์ผํ PID์์ ์ค๋ณต๋ Sequence Number๊ฐ ๋ค์ด์ค๋ฉด ๋ฌด์
-
Consumer
- ํ ํฝ์์ ๋ฐ์ดํฐ๊ฐ์ ธ์ด
- ํํฐ์ ๋ง๋ค ์ฝ๊ธฐ offset์ด ์์ (๊ฐ์ ํํฐ์ ์์์๋ง ์์ ๋ณด์ฅ)
- ํค์ ๋ฒจ๋ฅ๋ฅผ deserializer ํ๋ค. deserializer๋ ์ปจ์๋จธ์์ ์ค์ ํด์ผํ๊ณ ์ค์ ํ๋ ค๋ฉด ๋ฉ์์ง ํฌ๋ฉง์ ๋ํ ์ ๋ณด๊ฐ ์์ด์ผ ํ๋ค.
- Recovery
- ๋ฆฌ๋ ๋ธ๋ก์ปค ์ฅ์ : ๋ฉํ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ ํ ์๋ก์ด ๋ฆฌ๋๋ก ์ ํ
- Consumer ์ฅ์ : Consumer Group ๋ฆฌ๋ฐธ๋ฐ์ฑ ํ ์๋ ๋ณต๊ตฌ (session.timeout.ms(์ปจ์๋จธ๊ฐ ๋ช ms ๋์ heartbeat๋ฅผ ๋ณด๋ด์ง ์์ผ๋ฉด ์ฃฝ์๋ค๊ณ ๊ฐ์ฃผ)์ heartbeat.interval.ms(์ปจ์๋จธ๊ฐ ๋ธ๋ก์ปค์ heartbeat๋ฅผ ๋ณด๋ด๋ ์ฃผ๊ธฐ))
- ๋ฐ์ดํฐ ์ ์ค ๋ฐฉ์ง : commit์ ํตํด์ ์ด๋๊น์ง ์ฒ๋ฆฌํ๋์ง. Offset์ commitํ์ง ์์ผ๋ฉด ์ปจ์๋จธ๊ฐ ์ฌ์์ ์ ์ด์ ๋ฐ์ดํฐ๋ถํฐ ๋ค์ ์ฝ์ ์ ์์.
-
Consumer Group
- ๊ฐ์ ๊ทธ๋ฃน์ด๋ฉด ๊ฐ๊ฐ ๋ค๋ฅธ ํํฐ์
์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋๋ค.
- consumer1์ partition0 , consumer2์ partition1 ๊ฒฐ๊ตญ, ๊ทธ๋ฃน ํ๋๋ ํ๋์ ํ ํฝ์ ์ํ ํํฐ์ ๋ค์ ์ ๋ถ ์ฝ๋๋ค.
- ์ปจ์๋จธ ๊ทธ๋ฃน์ ์ปจ์๋จธ๊ฐ ํํฐ์ ๋ณด๋ค ๋ง์ผ๋ฉด ์ปจ์๋จธ ํ๋๋ ์๋ฌด๊ฒ๋ ์ํจ
- ํ๋์ ํ ํฝ์ ์ฌ๋ฌ ์ปจ์๋จธ ๊ทธ๋ฃน์ด ์ฝ์ ์ ์๋ค.
- ๊ทธ๋ฃน๋น offset๋ค์ __consumer_offsets ์ด๋ผ๋ ํ ํฝ์ ์ ์ฅํ๋ค.
- ์ปค๋ฐ ์ ๋ต
- at least once: ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๋ฉด ์ปค๋ฐ
- at most once ๋ฉ์์ง ๋ฐ์ผ๋ฉด ์ปค๋ฐ
- exactly once
- ๊ฐ์ ๊ทธ๋ฃน์ด๋ฉด ๊ฐ๊ฐ ๋ค๋ฅธ ํํฐ์
์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋๋ค.
-
Static Group Membership
- ์ผ์์ ์ธ ๋คํธ์ํฌ ์ฅ์ ๋๋ ์ปจ์๋จธ ์ฌ์์ ์์๋ ๋ถํ์ํ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ด ๋ฐ์ํ๋ฉด ์ฑ๋ฅ์ด ์ ํ๋ ์ ์์
- ์ปจ์๋จธ๊ฐ ์ฌ์์๋๊ฑฐ๋ ์ผ์์ ์ผ๋ก ์ฐ๊ฒฐ์ด ๋์ด์ ธ๋ ๊ธฐ์กด ํํฐ์ ํ ๋น์ ์ ์งํ ์ ์์
- ๊ธฐ๋ณธ Dynamic Group Membership
- ์ปจ์๋จธ๊ฐ poll()์ ํธ์ถํ๋ฉด ๊ทธ๋ฃน์ ์ฐธ์ฌ (Group Coordinator๊ฐ ๊ด๋ฆฌ).
- ์ปจ์๋จธ๊ฐ ์ธ์ ํ์์์(session.timeout.ms) ๋ด์ ์๋ตํ์ง ์์ผ๋ฉด ์ ๊ฑฐ๋๊ณ ๋ฆฌ๋ฐธ๋ฐ์ฑ ๋ฐ์.
- ์ปจ์๋จธ๊ฐ ๋ค์ ์ฐ๊ฒฐ๋๋ฉด ์๋ก์ด ๋ฉค๋ฒ๋ก ๋ฑ๋ก๋๋ฉฐ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ด ๋ค์ ๋ฐ์.
- Static Group Membership
- ์ปจ์๋จธ๊ฐ ๊ทธ๋ฃน์ ์ฐธ์ฌํ ๋ ๊ณ ์ ํ ๋ฉค๋ฒ ID (group.instance.id)๋ฅผ ์ค์ .
- ์ปจ์๋จธ๊ฐ ์ผ์์ ์ผ๋ก ์ฐ๊ฒฐ์ด ๋๊ฒจ๋ Group Coordinator๊ฐ ๊ธฐ์กด ๋ฉค๋ฒ๋ก ์ ์ง.
- ๊ฐ์ ๋ฉค๋ฒ ID๋ฅผ ๊ฐ์ง ์ปจ์๋จธ๊ฐ ๋ค์ ์ฐ๊ฒฐ๋๋ฉด ๊ธฐ์กด ํํฐ์ ํ ๋น ์ ์ง.
- ์์ ํ ์๋ก์ด ์ปจ์๋จธ๊ฐ ์ถ๊ฐ๋๊ฑฐ๋, ๊ธฐ์กด ๋ฉค๋ฒ ID๊ฐ ์๋ ์ปจ์๋จธ๋ง ๋ฆฌ๋ฐธ๋ฐ์ฑ ๋ฐ์.
-
Retry
- ์คํ์
์ Commitํ์ง ์๊ณ ์ฌ์ฒ๋ฆฌ
- ์๋ ์ปค๋ฐ์ ๋นํ์ฑํ (enable.auto.commit=false)
- ์์ธ ๋ฐ์ ์ ์คํ์ ์ Commitํ์ง ์์ผ๋ฉด ๊ฐ์ ๋ฉ์์ง๋ฅผ ๋ค์ ์๋นํ ์ ์์
- ๋ฌดํ ๋ฃจํ ๋ฐ์ ๊ฐ๋ฅ โ ์คํจํ ๋ฉ์์ง๋ง ๊ณ์ ๊ฐ์ ธ์ฌ ์ ์์
- Dead Letter Queue (DLQ) ์ฌ์ฉ
- Kafka์์ ์คํจํ ๋ฉ์์ง๋ฅผ ๋ณ๋์ ํ ํฝ(DLQ)์ผ๋ก ๋ณด๋ด๊ณ , ๋์ค์ ์ฌ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ.
- ๋ฌดํ ๋ฃจํ ๋ฐฉ์ง์ฑ ์ด๋ค
- ์คํ์
์ Commitํ์ง ์๊ณ ์ฌ์ฒ๋ฆฌ
- ํธ๋์ญ์ ์ด ์์๋๋ฉด ๋ฉ์์ง๋ค์ด ์ ์ก๋๊ธฐ ์์ํ์ง๋ง, ์์ง ํธ๋์ญ์ ์ํ๋ฅผ ๋ํ๋ด๋ ๋ง์ปค(Commit/Abort) ๋ ์ ์ก๋์ง ์์ต๋๋ค.
- ํธ๋์ญ์ ์ด "ํ์ฑํ"๋ ์ํ์ผ ๋ฟ, ์ด ์์ ์์๋ ๋ฉ์์ง๊ฐ ์์ ์ํ๋ก ํํฐ์ ์ ์ ์ฅ๋ฉ๋๋ค.
- Kafka๋ ํธ๋์ญ์ ID๋ฅผ ๋ฉ์์ง์ ํจ๊ป ์ ์ฅํ์ฌ, ์ดํ์ ์ด ๋ฉ์์ง๊ฐ ์ด๋ ํธ๋์ญ์ ์ ์ํ๋์ง๋ฅผ ์ถ์ ํฉ๋๋ค. (beginTransaction() ํธ์ถ๋ง๋ค ๊ณ ์ ํ ํธ๋์ญ์ ID๊ฐ ์์ฑ)
- Producer๊ฐ
send()
๋ฅผ ํธ์ถํ๋ฉด, ๋ฉ์์ง๋ ๋ธ๋ก์ปค์ ํํฐ์ ์ ์ ์ฅ๋ฉ๋๋ค. - ์ด๋ ์ ์ฅ๋ ๋ฉ์์ง๋ "๋ฏธํ์ (Uncommitted)" ์ํ์ ๋๋ค.
- ์ด ๋ฉ์์ง๋ค์ ํธ๋์ญ์ ์ ํฌํจ๋๋ฉฐ, ์์ง ์ปค๋ฐ๋์ง ์์๊ธฐ ๋๋ฌธ์ ๋ฌดํจํ๋ ์ ์๋ ์ํ์ ๋๋ค.
commitTransaction()
์ด ํธ์ถ๋๋ฉด, Commit Marker๊ฐ ํด๋น ํธ๋์ญ์ ์ ๋ฉ์์ง์ ํจ๊ป ๊ธฐ๋ก๋ฉ๋๋ค.- ์ด ๋ง์ปค๋ ํธ๋์ญ์
์ด ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋์์์ ๋ํ๋ด๋ ํ์๋ก, ์ด์ ํด๋น ๋ฉ์์ง๋ "ํ์ (Committed)" ์ํ๋ก ๊ฐ์ฃผ๋ฉ๋๋ค.
Partition 0: [TxMsg1 (์ปค๋ฐ๋จ), TxMsg2 (์ปค๋ฐ๋จ), Commit Marker]
abortTransaction()
์ด ํธ์ถ๋๋ฉด, Abort Marker๊ฐ ๊ธฐ๋ก๋๊ณ ํด๋น ํธ๋์ญ์ ์ ๋ฉ์์ง๋ค์ ๋ฌดํจํ(Aborted) ๋ฉ๋๋ค.- ์ด ๋ง์ปค๋ ํธ๋์ญ์
์ด ์คํจํ์์ ๋ํ๋ด๋ฉฐ, ๋ฉ์์ง๊ฐ ๋กค๋ฐฑ๋์์์ ํ์ํฉ๋๋ค.
Partition 0: [TxMsg1 (๋ฌดํจํ๋จ), TxMsg2 (๋ฌดํจํ๋จ), Abort Marker]
- isolation.level=read_committed ์ค์ ํ๋ฉด ์ปค๋ฐ๋ ๋ฉ์์ง๋ง ์ฝ๊ณ , ๋กค๋ฐฑ๋ ๋ฉ์์ง๋ ๋ฌด์.
- isolation.level=read_uncommitted ์ค์ ํ๋ฉด ๋ชจ๋ ๋ฉ์์ง๋ฅผ ๋ณผ ์ ์์ผ๋ฉฐ, ๋กค๋ฐฑ๋ ๋ฉ์์ง๋ ํฌํจ๋จ.
- ๊ฐ์ ํธ๋์ญ์
๋ด์ ๋ฉ์์ง๋ ์ปจ์๋จธ์์ ์ฒ๋ฆฌ ์์๊ฐ ๋ฌ๋ผ์ง ์ ์์ต๋๋ค. ๊ทธ๊ฑธ ๋ฐฉ์งํ๋ ค๋ฉด...
- ํ๋์ ํธ๋์ญ์ ๋ฉ์์ง๋ฅผ ํ๋์ ํํฐ์ ์ผ๋ก๋ง ๋ณด๋ด๊ธฐ
- ์ปจ์๋จธ์์ ๋ฉ์์ง์ ํ์์คํฌํ๋ฅผ ๊ธฐ์ค์ผ๋ก ์ ๋ ฌ
- ์ปจ์๋จธ ๊ทธ๋ฃน์ 1๊ฐ๋ง ์ฌ์ฉํ์ฌ ๋จ์ผ ๋ฉ์์ง ํด๋ง ์ปจ์๋จธ๋ก ์ฒ๋ฆฌ
-
topic
- ํ ํฝ ์์ฑ (ํํฐ์ ์ค์ ๊ฐ๋ฅ, replica ๊ฐฏ์ ์ค์ ๊ฐ๋ฅ -> ๋ค๋ง ๋ธ๋ก์ปค๊ฐ 1๊ฐ๋ฉด replica ๊ฐฏ์ ์ค์ ๋ถ๊ฐ๋ฅ)
- ํ ํฝ ๋ฆฌ์คํธ ๋ณด๊ธฐ
- ํน์ ํ ํฝ ์์ธ๋ณด๊ธฐ -> ํํฐ์ ๊ณผ ๋ ํ๋ฆฌ์นด ๋ค์ด ๋์ด, ๋ช๋ฒ ๋ธ๋ก์ปค์ ์๋์ง๋ ๋์ด
- ํ ํฝ ์ญ์ ๊ฐ๋ฅ
-
producer
- ๋ฉ์์ง ๋ณด๋ด๊ธฐ (ack ์ต์ ์ฌ์ฉ๊ฐ๋ฅ)
- ์๋ ํ ํฝ์ผ๋ก ์ฐ๊ฒฐํด์ ๋ฉ์์ง๋ณด๋ด๋ฉด ํ์์์๋จ -> ํน์ ์ต์ ์์๋ ๋ฆฌ๋๊ฐ ์๋ค๋ ๊ฒฝ๊ณ ๊ฐ ๋์ค๊ณ ํ ํฝ์ด ์๋์์ฑ๋จ
- ํํฐ์ ๋๋ฅผ ์ค์ ํ ์ ์๋ค. ์๋ฅผ๋ค์ด roundrobin (prd์์๋ ์ฐ๋ฉด ์๋จ)
-
Consumer
- ์ด์ ๋ฉ์์ง ์ ๋ถ ํ์ธํ๋๋ก ์ค์ ํ ์ ๋ ์๊ณ ์์ผ๋ก ์ค๋ ๋ฉ์์ง๋ค๋ง ํ์ธํ๊ฒ ํ ์๋ ์๋ค
- ๋ฉ์์ง์ ํค, ๋ฒจ๋ฅ, ํ์์คํฌํ, ํํฐ์ ์ ๋ณด๋ฅผ ๊ฐ์ด ๋์ค๊ฒ consume ํ ์ ์๋ค.
- ํ์ฌ ์ปจ์๋จธ ๊ทธ๋ฃน์ ๋ณผ ์ ์๋ค.
- ์ปจ์๋จธ ๊ทธ๋ฃน ์์ธ๋ฅผ ๋ณผ ์ ์๋ค. ํํฐ์ ๋ง๋ค offset ์ ๋ณด๊ฐ ๋ค ๋์ด, ์ฐ๊ฒฐ๋ ์ปจ์๋จธ๋ค ์ ๋ณด๋ ๋์ค๊ณ lag๋ ๋์ด (๋ง์ง๋ง ๋ฉ์์ง ์คํ์ - ๋จน์ ๋ฉ์์ง ์คํ์ )
- ํน์ ์ปจ์๋จธ๊ทธ๋ฃน์ ์คํ์ ์ ์ด๊ธฐํ ํน์ ๋ณ๊ฒฝํ ์ ์๋ค. (--reset-offsets)
-
์์ ์คํ๊ฒฐ๊ณผ ๋ณด๊ธฐ
--reset-offsets
์ต์ ์ผ๋ก ์คํ- Kafka Consumer ์คํ์
์ ์ฌ์ค์ ํ๋ ค๋ฉด
--reset-offsets
์ ํจ๊ป ์ ์ ํ ์ต์ ์ ์ฌ์ฉํฉ๋๋ค. ์ด ๋ช ๋ น์ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ฝ๊ฑฐ๋ ๊ฑด๋๋ฐ๋๋ก ์ค์ ํ๋ ๋ฐ ์ํฅ์ ๋ฏธ์น๋ฏ๋ก, ์ ์ฉ ์ --dry-run
์ผ๋ก ํ์ธํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
-
์ญํ
- Kafka์ ์ธ๋ถ ์์คํ (๋ฐ์ดํฐ๋ฒ ์ด์ค, ํ์ผ ์์คํ , ํด๋ผ์ฐ๋ ์๋น์ค ๋ฑ)์ ์ฝ๊ฒ ์ฐ๊ฒฐํ ์ ์๋๋ก ๋์์ฃผ๋ ํ๋ ์์ํฌ
- ์ฝ๋๋ฅผ ์์ฑํ์ง ์๊ณ ๋ค์ํ ๋ฐ์ดํฐ ์์ค๋ฅผ ์ฐ๋
- ๋๋์ ๋ฐ์ดํฐ๋ฅผ ์์ ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์๋๋ก ๋ณ๋ ฌ ์ฒ๋ฆฌ ๋ฐ ์ค๋ฅ ๋ณต๊ตฌ ๊ธฐ๋ฅ์ ์ ๊ณต
- ์ด๋ฏธ ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์นดํ์นด๋ก ํ์ฉํ๋ ค๊ณ ํ ๋ ์ฌ์ฉํ๋ค.
-
์ปดํฌ๋ํธ
- Source Connector (์์ค ์ปค๋ฅํฐ)
- ์ธ๋ถ ์์คํ โ Kafka๋ก ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๋ ์ญํ
- ์: MySQL, PostgreSQL, MongoDB, S3, HTTP API ๋ฑ์ ๋ฐ์ดํฐ๋ฅผ Kafka๋ก ์ ์ก
- Sink Connector (์ฑํฌ ์ปค๋ฅํฐ)
- Kafka โ ์ธ๋ถ ์์คํ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๋ ์ญํ
- ์: Kafka ๋ฐ์ดํฐ๋ฅผ MySQL, Elasticsearch, HDFS, S3 ๋ฑ์ ์ ์ฅ
- Source Connector (์์ค ์ปค๋ฅํฐ)
-
์ผ๋ํธ ํด๋ฌ์คํฐ
- ์ฌ๋ฌ๊ฐ์ ์ปค๋ฅํฐ๋ก ๊ตฌ์ฑ๋์ด ์์.
- Worker: Kafka Connect ํ๋ก์ธ์ค (Connector ์คํ ๋ด๋น). Standalone ๋ชจ๋์์๋ 1๊ฐ์ Worker, Distributed ๋ชจ๋์์๋ ์ฌ๋ฌ ๊ฐ์ Worker๊ฐ ํด๋ฌ์คํฐ๋ก ๋์
- Connector: ๋ฐ์ดํฐ ์์ค โ Kafka ์ฐ๊ฒฐ (Source/Sink)
- Task: Connector ๋ด๋ถ์์ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋๋ ์์
-
์คํ (์์ Elasticsearch)
- Kafka + Zookeeper ์คํ ์ค
- Elasticsearch ์คํ ์ค
- Kafka Connect ์คํ ๊ฐ๋ฅ
- Kafka Connect Elasticsearch ํ๋ฌ๊ทธ์ธ ์ค์น ํ connect์ ๋ฑ๋ก
-
์ญํ
- Kafka Streams๋ Kafka ๋ด์์ ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๊ณ ๋ณํํ ์ ์๋๋ก ๋์์ฃผ๋ ๊ฒฝ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ (์คํ๋ง ๋ถํธ์์ ์ฌ์ฉ๊ฐ)
- Kafka ๊ธฐ๋ฐ์ ๋ถ์ฐ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ: Kafka์์ ์ง์์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ , ๊ฐ๊ณต ๋ฐ ๋ถ์ํ ํ ๋ค์ Kafka์ ์ ์ฅ ๊ฐ๋ฅ
- ๊ฐ๋ ฅํ Kafka ํตํฉ: Kafka์ Consumer๋ก ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ , ์ฒ๋ฆฌ ํ Producer์ฒ๋ผ Kafka์ ๋ฐ์ดํฐ ์ ์ฅ (Producer ์ญํ ์ ๋ด๋ถ์ ์ผ๋ก ์ฒ๋ฆฌ๋จ)
- Stateful & Stateless ์ฒ๋ฆฌ ์ง์: ๋ฐ์ดํฐ๋ฅผ ๋จ์ ๋ณํํ๋ Stateless ์ฐ์ฐ๋ถํฐ, ์๋์ฐ(Windowing) ๊ธฐ๋ฐ ์ง๊ณ ๋ฐ ์กฐ์ธ(Join) ๋ฑ Stateful ์ฐ์ฐ ๊ฐ๋ฅ
- Streams API ์ ๊ณต: map(), filter(), groupBy(), join(), windowing() ๋ฑ์ ๋ค์ํ API ์ง์
-
๋์
- Kafka Streams ์ ํ๋ฆฌ์ผ์ด์ ์ Kafka Consumer ์ญํ ์ ์ํํ๋ฉฐ, Kafka Topic์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ ๊ฐ๊ณตํ ํ ์๋ก์ด Kafka Topic์ ์ ์ฅ
- ๋ด๋ถ์ ์ผ๋ก State Store(RocksDB)๋ฅผ ์ฌ์ฉํ์ฌ ์ํ ์ ์ฅ ๊ฐ๋ฅ
- Exactly-once Processing ์ง์ โ ๋ฐ์ดํฐ ์ค๋ณต ์์ด ์์ ์ ์ธ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ ๊ฐ๋ฅ
- Scale-out ๊ฐ๋ฅ โ ๋์ผํ Kafka Streams ์ ํ๋ฆฌ์ผ์ด์ ์ ์ฌ๋ฌ ๊ฐ ์คํํ๋ฉด ์๋์ผ๋ก ํํฐ์ ๊ธฐ๋ฐ ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ํ
-
์์
- ์ค์๊ฐ ๋ฐ์ดํฐ ์ง๊ณ: ๋งค์ถ ๋ฐ์ดํฐ์ SUM, COUNT ๋ฑ์ ๊ณ์ฐํ์ฌ Kafka Topic์ ์ ์ฅ
- ์ด๋ฒคํธ ์คํธ๋ฆผ ํํฐ๋ง: ํน์ ์กฐ๊ฑด์ ๋ง์กฑํ๋ ์ด๋ฒคํธ๋ง ํํฐ๋งํ์ฌ ์๋ก์ด Topic์ ์ ์ฅ
- IoT ์ผ์ ๋ฐ์ดํฐ ๋ถ์: ์ผ์์์ ๋ค์ด์ค๋ ๋ฐ์ดํฐ๋ฅผ windowing()์ ํ์ฉํ์ฌ 1๋ถ ๋จ์ ํ๊ท ๊ณ์ฐ
- ์ฌ์ฉ์ ํ๋ ๋ถ์: ํน์ ์๊ฐ ๋์ ๋์ผํ ์ฌ์ฉ์์ ์ด๋ฒคํธ๋ฅผ session windowing์ ํ์ฉํด ๊ทธ๋ฃนํ
- ๋ค์ค ์คํธ๋ฆผ ์กฐ์ธ: ์ฌ๋ฌ Topic์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด join()์ ํตํด ๊ด๋ จ ๋ฐ์ดํฐ๋ฅผ ๊ฒฐํฉ
- ์ญํ
- Kafka ๋ฉ์์ง์ ๋ฐ์ดํฐ ๊ตฌ์กฐ(Schema)๋ฅผ ์ค์์์ ๊ด๋ฆฌํ๋ ์๋น์ค (์นดํ์นด์ ๊ฐ์ด ์ฐ๋ ๋ ๋ฆฝ๋ ์๋น์ค) -> ์ด๊ฒ์ ํตํด์ ์นดํ์นด๋ ๋ฐ์ดํฐ์ ํํ์ ๋ํด์ ์ ๊ฒฝ์ฐ์ง ์๊ณ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ค. (Kafka๋ ๋ฐ์ดํธ ๋ฐฐ์ด(byte[]) ํํ๋ก ๋ฉ์์ง๋ฅผ ์ ์ฅํ๋ฏ๋ก ๋ฐ์ดํฐ์ ๊ตฌ์กฐ(Schema)๋ Kafka ์์ฒด๊ฐ ์๋๋ผ Producer/Consumer์ ์ฑ ์)
- Producer์ Consumer๊ฐ ๊ฐ์ ๋ฐ์ดํฐ ๊ตฌ์กฐ(Schema)๋ฅผ ๊ณต์ ํ๋๋ก ๊ฐ์ ํ ์ ์์
- ์๋ก์ด ํ๋ ์ถ๊ฐ ๋ฑ Schema ๋ณ๊ฒฝ ์์๋ ํธํ์ฑ ์ ์ง ๊ฐ๋ฅ
- Producer๊ฐ ์๋ชป๋ ๋ฐ์ดํฐ ํ์์ ๋ณด๋ผ ๊ฒฝ์ฐ, Schema Registry์์ ๊ฑฐ๋ถํ์ฌ ๋ฐ์ดํฐ ๋ฌด๊ฒฐ์ฑ์ ์ ์ง
- API๋ฅผ ํตํด Schema ๋ฑ๋ก, ์ ๋ฐ์ดํธ, ๋ฒ์ ๊ด๋ฆฌ ๊ฐ๋ฅ
-
๋์
- Producer๊ฐ ๋ฐ์ดํฐ๋ฅผ Kafka๋ก ์ ์ก (Producer๋ ํ ํฝ ์ด๋ฆ๊ณผ ๊ฐ์ฒด๋ง ์ค์ ํด์ ๋ณด๋ผ ๋ฟ)
- ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ธฐ ์ ์ Schema Registry์์ Schema ID ํ์ธ
- ๋ฐ์ดํฐ๋ฅผ Schema์ ๋ง์ถฐ ์ง๋ ฌํ(Serialize) ํ Kafka๋ก ์ ์ก (schema id๊ฐ ์์ผ๋ฉด ๋ฑ๋ก)
- Schema Registry๊ฐ Schema ์ ์ฅ ๋ฐ ๊ด๋ฆฌ
- ์๋ก์ด Schema๊ฐ ๋ฑ๋ก๋๋ฉด ๋ฒ์ ๊ด๋ฆฌ
- ๊ธฐ์กด Schema์ ๋น๊ตํ์ฌ ํธํ์ฑ ๊ฒ์ฌ
- Consumer๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ ์ญ์ง๋ ฌํ(Deserialize)
-
Kafka์์ ๋ฐ์ ๋ฐ์ดํฐ๋ฅผ Schema Registry์์ ๊ฐ์ ธ์จ Schema๋ฅผ ์ด์ฉํด ๋ณํ (Consumer๋ Kafka ๋ฉ์์ง ์์ ํฌํจ๋ Schema ID๋ฅผ ์ฝ๊ณ , ๊ทธ ID์ ํด๋นํ๋ Schema๋ฅผ Schema Registry์์ ์กฐํํด์ ์ญ์ง๋ ฌํ)
-
- ์ค์ ๋ก ์ด๋ป๊ฒ ๋์ํ๋? user ๊ฐ์ฒด ํ๋ ์ถ๊ฐ: ์๋ฅผ ๋ค์ด user ๊ฐ์ฒด์ ํ๋ age๋ฅผ ์ถ๊ฐํ๋ค๊ณ ๊ฐ์ ํด ๋ด ์๋ค. Schema Registry์์๋ ๊ธฐ์กด ์คํค๋ง์ ์๋ก์ด ์คํค๋ง์ ํธํ์ฑ ๊ฒ์ฌ๋ฅผ ์งํํฉ๋๋ค. ํธํ์ฑ ๊ฒ์ฌ์์ ๋ฌธ์ ๊ฐ ์์ผ๋ฉด ์๋ก์ด Schema ID๊ฐ ํ ๋น๋๊ณ , ์๋ก์ด ๋ฐ์ดํฐ ๊ตฌ์กฐ๋ก user ๊ฐ์ฒด๋ฅผ ์ง๋ ฌํ/์ญ์ง๋ ฌํํ ์ ์๊ฒ ๋ฉ๋๋ค. user ๊ฐ์ฒด ํ๋ ์ญ์ : ๋ง์ฝ ํ๋๋ฅผ ์ญ์ ํ๋ ๋ณ๊ฒฝ์ ํ๋ค๋ฉด, ํธํ์ฑ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, name ํ๋๋ฅผ ์ญ์ ํ๋ค๊ณ ํ๋ฉด, ๊ธฐ์กด ๋ฐ์ดํฐ๋ฅผ ์ฝ์ผ๋ ค๋ ์๋น์๋ ํธํ์ฑ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค. ์ด ๊ฒฝ์ฐ, ํธํ์ฑ ๊ฒ์ฌ์ ์คํจํ๋ฉด Schema Registry์์ ๋ฑ๋ก์ ๊ฑฐ๋ถํ๊ฒ ๋ฉ๋๋ค.
-
- Producer๊ฐ ๋ฐ์ดํฐ๋ฅผ Kafka๋ก ์ ์ก (Producer๋ ํ ํฝ ์ด๋ฆ๊ณผ ๊ฐ์ฒด๋ง ์ค์ ํด์ ๋ณด๋ผ ๋ฟ)
-
์ฅ์
- Kafka ๋ธ๋ก์ปค์ ๋ถํ๋ฅผ ์ค์ด๊ธฐ ์ํด
- Schema๋ฅผ Kafka ๋ด๋ถ์์ ๊ด๋ฆฌํ๋ฉด ๋ธ๋ก์ปค๊ฐ ๋ถ๋ด์ ๋ ๊ฐ์ง๊ฒ ๋จ
- Schema Registry๋ฅผ ๋ถ๋ฆฌํ๋ฉด Kafka ๋ธ๋ก์ปค์ ์ฑ๋ฅ์ ์ ์งํ ์ ์์.
- ๋
๋ฆฝ์ ์ธ Schema ๊ด๋ฆฌ ๊ฐ๋ฅ
- Kafka ์ด์ธ์ ์์คํ ์์๋ Schema๋ฅผ ์ฌ์ฉํ ์ ์์
- ์๋ฅผ ๋ค์ด, Kafka๊ฐ ์๋ ๋ค๋ฅธ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ API ์๋น์ค์์๋ Schema Registry๋ฅผ ํ์ฉ ๊ฐ๋ฅ.
- Kafka ๋ธ๋ก์ปค์ ๋ถํ๋ฅผ ์ค์ด๊ธฐ ์ํด
-
Partition ์ด๊ธฐ์ค์
- ํน์ Topic์ Partition์ ์ฆ๊ฐ์ํค๋ฉด ๊ธฐ์กด์ ํน์ Partition์ผ๋ก ๊ฐ๋ Key๊ฐ ๋ค๋ฅธ Partition์ผ๋ก ๋ถ์ฐ๋ ์ ์์ด ์์(Ordering) ๋ณด์ฅ์ด ๊นจ์ง ์ํ์ด ์์. Key ordering์ ์ ์งํ๋ ค๋ฉด ์ด๊ธฐ Partition ๊ฐ์๋ฅผ ์ถฉ๋ถํ ๊ณ ๋ คํ๊ณ , ๋ฏธ๋ฆฌ ํ์ฅ ๊ฐ๋ฅํ ๊ตฌ์กฐ๋ก ์ค๊ณํ๋ ๊ฒ์ด ์ค์ํจ.
- ์ค์ ์์
- ๋ธ๋ก์ปค๊ฐ 6๊ฐ์ดํ๋ฉด ๋ธ๋ก์ปค ๊ฐฏ์ * 3
- ๋ธ๋ก์ปค๊ฐ 12๊ฐ์ด์์ด๋ฉด ๋ธ๋ก์ปค ๊ฐฏ์ *2
- ์ฑ๋ฅ์ ๋ํ ๊ฒ๋ค์ ์ ๋ต์ด ์๋ ๊ฒ์ด ์๋๊ณ ํ ์คํธ๋ฅผ ํตํด์ ์ต์ ํํด์ผํจ. producer์ throughtput๊ณผ consumer์ ๋ณ๋ ฌ์ฑ์ด ์ค์ํ๋ค.
- zookeeper๋ฅผ ์ฌ์ฉํ๋ฉด ํด๋ฌ์คํฐ๋ด์ ์ต๋ 200,000 partition๋ง ์ค์ ๊ฐ๋ฅ. ๋ํ ๋ธ๋ก์ปค๋น ์ฝ 4000๊ฐ ์ดํ์ partition์ ์ถ์ฒํจ.
- kraft ๋ชจ๋๋ฉด ์๋ฐฑ๋ง๊ฐ์ partition ๊ฐ๋ฅ
- ๋ง์ฝ 20๋ง๊ฐ๊ฐ ๋๋ partiton์ ์ด์ํ์ผํ๋ค๋ฉด, ๊ทธ๋ฅ ํด๋ฌ์คํฐ๋ฅผ ํ๋ ๋ ๋ง๋๋ ๊ฒ๋ ๋ฐฉ๋ฒ์ด๋ค.
-
Replica Factor ์ด๊ธฐ์ค์
- ๋ณดํต ์ต์2, ์ต๋4
- replica factor๋ฅผ ์ฆ๊ฐ์ํค๋ฉด kafka์ ๋ถํ๊ฐ ์ฆ๊ฐํ๋ค. disk space ์ latancy
- ๋ด๊ตฌ์ฑ์ ์ํด์๋ ๋ํ๊ณ , ๋ ๋์ ์ฑ๋ฅ์ ์ํด์๋ ๋ฎ์ถ๋ค. ํน์ min.insync.replicas ์ค์ ํ๋ค.