Skip to content

Commit 8b88a33

Browse files
committed
Init commit
Signed-off-by: Sahil Yeole <[email protected]>
0 parents  commit 8b88a33

File tree

4 files changed

+243
-0
lines changed

4 files changed

+243
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
bin

Makefile

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
run: build
2+
@./bin/zendb
3+
build:
4+
@go build -o bin/zendb .

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module zendb
2+
3+
go 1.22.2

main.go

+235
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"log/slog"
7+
"net"
8+
"strconv"
9+
"strings"
10+
"time"
11+
)
12+
13+
const DEFAULT_PORT = 6379
14+
15+
// Commands
16+
const (
17+
PING = "PING"
18+
ECHO = "ECHO"
19+
SET = "SET"
20+
GET = "GET"
21+
)
22+
23+
// Returns
24+
const (
25+
PONG = "PONG"
26+
OK = "OK"
27+
)
28+
29+
const NULLBULKSTRING = "$-1\r\n"
30+
31+
type Config struct {
32+
lnAddr string
33+
}
34+
35+
type Server struct {
36+
cfg Config
37+
ln net.Listener
38+
peers map[*Peer]struct{}
39+
addPeerChan chan *Peer
40+
delPeerChan chan *Peer
41+
msgChan chan Message
42+
db map[string]Data
43+
}
44+
45+
type Peer struct {
46+
conn net.Conn
47+
msgChan chan Message
48+
deletePeerChan chan *Peer
49+
db *map[string]Data
50+
}
51+
52+
type Data struct {
53+
value string
54+
px int
55+
}
56+
57+
type Message struct {
58+
cmd []byte
59+
peer Peer
60+
}
61+
62+
func NewMessage(peer Peer, msg []byte) Message {
63+
return Message{
64+
cmd: msg,
65+
peer: peer,
66+
}
67+
}
68+
69+
func NewPeer(conn net.Conn, msgChan chan Message, delete chan *Peer, db *map[string]Data) *Peer {
70+
return &Peer{
71+
conn: conn,
72+
msgChan: msgChan,
73+
deletePeerChan: delete,
74+
db: db,
75+
}
76+
}
77+
78+
func (p *Peer) reader() {
79+
buf := make([]byte, 1024)
80+
for {
81+
defer func() {
82+
p.deletePeerChan <- p
83+
p.conn.Close()
84+
}()
85+
n, err := p.conn.Read(buf)
86+
if err != nil {
87+
return
88+
}
89+
msgBuf := make([]byte, n)
90+
copy(msgBuf, buf[:n])
91+
p.msgChan <- NewMessage(*p, msgBuf)
92+
}
93+
}
94+
95+
func NewServer(c Config) *Server {
96+
if len(c.lnAddr) == 0 {
97+
c.lnAddr = fmt.Sprintf(":%d", DEFAULT_PORT)
98+
}
99+
return &Server{
100+
cfg: c,
101+
peers: make(map[*Peer]struct{}),
102+
addPeerChan: make(chan *Peer),
103+
delPeerChan: make(chan *Peer),
104+
msgChan: make(chan Message),
105+
db: make(map[string]Data),
106+
}
107+
}
108+
109+
func (s *Server) Start() error {
110+
ln, err := net.Listen("tcp", s.cfg.lnAddr)
111+
if err != nil {
112+
return err
113+
}
114+
s.ln = ln
115+
go s.handlePeersLoop()
116+
return s.AcceptLoop()
117+
}
118+
119+
func (s *Server) handlePeersLoop() {
120+
for {
121+
select {
122+
case peer := <-s.addPeerChan:
123+
s.peers[peer] = struct{}{}
124+
case msg := <-s.msgChan:
125+
HandleMessage(msg)
126+
case peer := <-s.delPeerChan:
127+
delete(s.peers, peer)
128+
}
129+
}
130+
}
131+
132+
func HandleMessage(msg Message) {
133+
splits := strings.Split(string(msg.cmd), "\r\n")
134+
inputBulkStrings := []string{}
135+
// commandLen := "0"
136+
// Command Parser
137+
for i, v := range splits {
138+
if v == "" {
139+
continue
140+
}
141+
if strings.Contains(v, "*") {
142+
// commandLen = strings.Split(v, "*")[1]
143+
// println("len", commandLen)
144+
}
145+
if strings.Contains(v, "$") {
146+
inputBulkStrings = append(inputBulkStrings, splits[i+1])
147+
}
148+
}
149+
150+
// Command Handler
151+
for i, v := range inputBulkStrings {
152+
switch strings.ToUpper(v) {
153+
case PING:
154+
msg.peer.conn.Write([]byte(bulkString(PONG)))
155+
case ECHO:
156+
if i+1 < len(inputBulkStrings) {
157+
echoReturn := inputBulkStrings[i+1]
158+
msg.peer.conn.Write([]byte(bulkString(echoReturn)))
159+
} else {
160+
msg.peer.conn.Write([]byte(bulkString("ECHO Command requires a string argument")))
161+
}
162+
case SET:
163+
px := -1
164+
if i+4 < len(inputBulkStrings) {
165+
if inputBulkStrings[i+3] == "px" {
166+
x, err := strconv.Atoi(inputBulkStrings[i+4])
167+
if err != nil {
168+
msg.peer.conn.Write([]byte(bulkString("Invalid px value")))
169+
} else {
170+
px = x
171+
}
172+
}
173+
}
174+
if i+2 < len(inputBulkStrings) {
175+
key := inputBulkStrings[i+1]
176+
value := inputBulkStrings[i+2]
177+
(*msg.peer.db)[key] = Data{value: value, px: px}
178+
msg.peer.conn.Write([]byte(bulkString(OK)))
179+
} else {
180+
msg.peer.conn.Write([]byte(bulkString("SET Command requires a key and value argument")))
181+
}
182+
if px > 0 {
183+
go func() {
184+
time.Sleep(time.Duration(px) * time.Millisecond)
185+
delete((*msg.peer.db), inputBulkStrings[i+1])
186+
}()
187+
}
188+
case GET:
189+
if i+1 < len(inputBulkStrings) {
190+
key := inputBulkStrings[i+1]
191+
value, ok := (*msg.peer.db)[key]
192+
if ok {
193+
msg.peer.conn.Write([]byte(bulkString(value.value)))
194+
} else {
195+
msg.peer.conn.Write([]byte(NULLBULKSTRING))
196+
}
197+
} else {
198+
msg.peer.conn.Write([]byte(bulkString("GET Command requires a key argument")))
199+
}
200+
201+
// default:
202+
// msg.peer.conn.Write([]byte(bulkString("Unknown Command")))
203+
}
204+
}
205+
}
206+
207+
func bulkString(s string) string {
208+
return fmt.Sprintf("$%d\r\n%s\r\n", len(s), s)
209+
}
210+
211+
func (s *Server) AcceptLoop() error {
212+
for {
213+
conn, err := s.ln.Accept()
214+
if err != nil {
215+
return err
216+
}
217+
go s.handleConn(conn)
218+
}
219+
}
220+
221+
func (s *Server) handleConn(conn net.Conn) {
222+
peer := NewPeer(conn, s.msgChan, s.delPeerChan, &s.db)
223+
s.addPeerChan <- peer
224+
peer.reader()
225+
}
226+
227+
func main() {
228+
port := flag.Int("port", DEFAULT_PORT, "port to listen on")
229+
flag.Parse()
230+
print(fmt.Sprintf("Running on port: %d\n", *port))
231+
s := NewServer(Config{lnAddr: fmt.Sprintf(":%d", *port)})
232+
if err := s.Start(); err != nil {
233+
slog.Error("Error starting server: %v", err)
234+
}
235+
}

0 commit comments

Comments
 (0)