Skip to content

Commit 3d5dc5e

Browse files
author
Ole
committed
initial
1 parent 9285299 commit 3d5dc5e

13 files changed

+740
-0
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.idea/
2+
src
3+
cbsd-mq-router
4+
log

Makefile

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
all:
2+
@./build.sh
3+
clean:
4+
rm -f bs_router
5+
install: all
6+
install bs_router /usr/local/sbin
7+
install rc.d/bs_router /usr/local/etc/rc.d/bs_router
8+
uninstall:
9+
rm -f /usr/local/sbin/bs_router /usr/local/etc/rc.d/bs_router
10+

README.md

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# cbsd-mq-router
2+
3+
CBSD message queue router
4+
5+
Client + sample: https://github.com/cbsd/bs_router-client
6+
7+
# Installation
8+
9+
setenv GOPATH $( realpath . )
10+
11+
go get
12+
13+
go build
14+
15+
pkg update -f
16+
17+
pkg install -y beanstalkd
18+
19+
service beanstalkd enable
20+
21+
sysrc beanstalkd_flags="-l 127.0.0.1"
22+
23+
service beanstalkd start
24+
25+
./bs_router

beanstalk.go

+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"time"
6+
"encoding/json"
7+
"github.com/beanstalkd/go-beanstalk"
8+
)
9+
10+
11+
// beanstalk config struct
12+
type BeanstalkConfig struct {
13+
Uri string `json:"uri"`
14+
Tube string `json:"tube"`
15+
ReplyTubePrefix string `json:"reply_tube_prefix"`
16+
ReconnectTimeout int `json:"reconnect_timeout"`
17+
ReserveTimeout int `json:"reserve_timeout"`
18+
PublishTimeout int `json:"publish_timeout"`
19+
LogDir string `json:"logdir"`
20+
}
21+
22+
func beanstalkdPublish(config BeanstalkConfig, tube string, body []byte) error {
23+
24+
amqpURI := config.Uri
25+
c, err := beanstalk.Dial("tcp", amqpURI)
26+
27+
if err != nil {
28+
Infof("Publish/callback: unable connect to beanstalkd broker:%s", err)
29+
return nil
30+
}
31+
32+
mytube := &beanstalk.Tube{Conn: c, Name: tube}
33+
id, err := mytube.Put([]byte(body), 1, 0, time.Duration(config.PublishTimeout)*time.Second)
34+
if err != nil {
35+
Infof("\nPublish err: %d\n",err)
36+
return err
37+
}
38+
Infof("\nPublish id: %d\n",id)
39+
40+
return nil
41+
}
42+
43+
func beanstalkdLoop(config BeanstalkConfig) error {
44+
for {
45+
beanstalkdConsume(config)
46+
Infof("broker disconnected, sleep and retry:%d\n", config.ReconnectTimeout)
47+
time.Sleep(time.Duration(config.ReconnectTimeout) * time.Second)
48+
}
49+
return nil
50+
}
51+
52+
func WakeOnJob(ch chan bool, config BeanstalkConfig, id uint64, body []byte) {
53+
54+
Infof("\nwake up and delete job id: %d\n",id)
55+
comment := Comment{}
56+
Infof("\nWI: %d\n",id)
57+
comment.JobID = id
58+
response := fmt.Sprintf("%v", comment.JobID)
59+
Infof("response %s\n", response)
60+
//callback
61+
Infof("recv msg: %s", string(body))
62+
err := json.Unmarshal(body, &comment)
63+
if err != nil {
64+
Infof("json decode error %s", err)
65+
}
66+
callbackQueueName := fmt.Sprintf("%s%d",config.ReplyTubePrefix,comment.JobID)
67+
Infof("callback queue name: %s\n",callbackQueueName)
68+
err, cbsdTask := DoProcess(&comment, config.LogDir)
69+
if err != nil {
70+
Errorf("doprocess error:", err)
71+
panic(err)
72+
}
73+
b, err := json.Marshal(cbsdTask)
74+
if err != nil {
75+
Errorf("error:", err)
76+
}
77+
Infof("FINE: %s\n",b)
78+
err = beanstalkdPublish(config,callbackQueueName,b)
79+
ch <- true
80+
}
81+
82+
83+
func beanstalkdConsume(config BeanstalkConfig) error {
84+
85+
amqpURI := config.Uri
86+
tube := config.Tube
87+
88+
c, err := beanstalk.Dial("tcp", amqpURI)
89+
90+
if err != nil {
91+
Infof("Unable connect to beanstalkd broker:%s", err)
92+
return nil
93+
}
94+
95+
Infof("Subscribe tube: %s, reserve timeout: %d", tube, config.ReserveTimeout)
96+
97+
c.TubeSet = *beanstalk.NewTubeSet(c, tube)
98+
99+
for {
100+
// The BS library does not understand the network/BS problems and hangs forever.
101+
// ping in backround?
102+
id, body, err := c.Reserve(time.Duration(config.ReserveTimeout) * time.Second)
103+
104+
if err != nil {
105+
Infof("\nid: %d, res: %s\n",id, err.Error())
106+
}
107+
108+
if id == 0 {
109+
continue
110+
}
111+
c.Delete(id)
112+
113+
ch := make(chan bool)
114+
go WakeOnJob(ch, config, id, body)
115+
}
116+
117+
return nil
118+
}

bget.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os/exec"
6+
"strings"
7+
)
8+
9+
// get bhyve properties via cbsd bget command
10+
func bget(jname string, properties string) string {
11+
var result string
12+
// todo: rewrite to SQLite3
13+
cmdStr := fmt.Sprintf("/usr/local/bin/cbsd bget jname=%s mode=quiet %s", jname, properties)
14+
cmdArgs := strings.Fields(cmdStr)
15+
cmd := exec.Command(cmdArgs[0], cmdArgs[1:len(cmdArgs)]...)
16+
out, err := cmd.CombinedOutput()
17+
if err != nil {
18+
Infof("bget cmd.Run() failed (cbsd bget jname=%s mode=quiet %s) with %s\n", jname, properties, err)
19+
return ""
20+
}
21+
22+
result=(string(out))
23+
fmt.Printf("bget str: [%s]\n", result)
24+
25+
return result
26+
}

bhyve-dsk.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os/exec"
6+
"strings"
7+
)
8+
9+
// get bhyve properties via cbsd bhyve-dsk command
10+
func bhyvedsk(jname string, properties string) string {
11+
var result string
12+
// todo: rewrite to SQLite3
13+
cmdStr := fmt.Sprintf("/usr/local/bin/cbsd bhyve-dsk mode=get jname=%s %s", jname, properties)
14+
cmdArgs := strings.Fields(cmdStr)
15+
cmd := exec.Command(cmdArgs[0], cmdArgs[1:len(cmdArgs)]...)
16+
out, err := cmd.CombinedOutput()
17+
if err != nil {
18+
Infof("bhyve-dsk cmd.Run() failed (cbsd bhyve-dsk mode=get jname=%s %s) with %s\n", jname, properties, err)
19+
return ""
20+
}
21+
result=(string(out))
22+
fmt.Printf("bhyve-dsk str: [%s]\n", result)
23+
24+
return result
25+
}

build.sh

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/bin/sh
2+
export PATH="/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/usr/local/bin"
3+
pgm="${0##*/}" # Program basename
4+
progdir="${0%/*}" # Program directory
5+
6+
cd ${proddir}
7+
8+
# Check go install
9+
if [ -z "$( which go )" ]; then
10+
echo "error: Go is not installed. Please install go: pkg install -y lang/go"
11+
exit 1
12+
fi
13+
14+
# Check go version
15+
GOVERS="$( go version | cut -d " " -f 3 )"
16+
if [ -z "${GOVERS}" ]; then
17+
echo "unable to determine: go version"
18+
exit 1
19+
fi
20+
21+
export GOPATH="${progdir}"
22+
export GOBIN="${progdir}"
23+
24+
set -e
25+
go get
26+
go build -ldflags "${LDFLAGS} -extldflags '-static'" -o "${progdir}/cbsd-mq-router"

cbsd-mq-router.json

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"cbsdenv": "/usr/jails",
3+
"cbsdcolor": false,
4+
"broker": "beanstalkd",
5+
"logfile": "/dev/stdout",
6+
"beanstalkd": {
7+
"uri": "127.0.0.1:11300",
8+
"tube": "cbsd_zpool1",
9+
"reply_tube_prefix": "cbsd_zpool1_result_id",
10+
"reconnect_timeout": 5,
11+
"reserve_timeout": 5,
12+
"publish_timeout": 5,
13+
"logdir": "/var/log/cbsdmq"
14+
}
15+
}

0 commit comments

Comments
 (0)