Skip to content

Commit 8658718

Browse files
authored
Merge pull request #148 from davrodpin/misc/rpc
Add rpc command
2 parents 2d2a082 + ba8dd8b commit 8658718

File tree

7 files changed

+634
-19
lines changed

7 files changed

+634
-19
lines changed

cmd/misc.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cmd
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
)
6+
7+
var miscCmd = &cobra.Command{
8+
Use: "misc",
9+
Short: "A set of miscelaneous commands",
10+
Args: cobra.MinimumNArgs(1),
11+
Run: func(cmd *cobra.Command, arg []string) {},
12+
}
13+
14+
func init() {
15+
rootCmd.AddCommand(miscCmd)
16+
}

cmd/misc_rpc.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package cmd
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/davrodpin/mole/mole"
8+
9+
log "github.com/sirupsen/logrus"
10+
"github.com/spf13/cobra"
11+
)
12+
13+
var (
14+
method, params string
15+
16+
miscRpcCmd = &cobra.Command{
17+
Use: "rpc [alias or id] [method] [params]",
18+
Short: "Executes a remote procedure call on a given mole instance",
19+
Long: "Executes a remote procedure call on a given mole instance",
20+
Args: func(cmd *cobra.Command, args []string) error {
21+
if len(args) < 2 {
22+
return fmt.Errorf("not enough arguments.")
23+
}
24+
25+
id = args[0]
26+
method = args[1]
27+
28+
if len(args) == 3 {
29+
params = args[2]
30+
}
31+
32+
return nil
33+
},
34+
Run: func(cmd *cobra.Command, arg []string) {
35+
resp, err := mole.Rpc(id, method, params)
36+
if err != nil {
37+
log.WithError(err).WithFields(log.Fields{
38+
"id": id,
39+
}).Error("error executing remote procedure.")
40+
os.Exit(1)
41+
}
42+
43+
fmt.Printf(resp)
44+
},
45+
}
46+
)
47+
48+
func init() {
49+
miscCmd.AddCommand(miscRpcCmd)
50+
}

count.out

+470
Large diffs are not rendered by default.

fsutils/fsutils.go

+11
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,14 @@ func CreateInstanceDir(appId string) (string, error) {
5656

5757
return d, nil
5858
}
59+
60+
// InstanceDir returns the location where all files related to a specific mole
61+
// instance are persisted.
62+
func InstanceDir(id string) (string, error) {
63+
home, err := Dir()
64+
if err != nil {
65+
return "", err
66+
}
67+
68+
return filepath.Join(home, id), nil
69+
}

mole/app.go

+31
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package mole
22

33
import (
4+
"context"
5+
"encoding/json"
46
"fmt"
57
"io/ioutil"
68
"os"
79
"path/filepath"
810
"strconv"
911

1012
"github.com/davrodpin/mole/fsutils"
13+
"github.com/davrodpin/mole/rpc"
14+
1115
"github.com/hpcloud/tail"
1216
)
1317

@@ -129,3 +133,30 @@ func ShowLogs(id string, follow bool) error {
129133

130134
return nil
131135
}
136+
137+
// Rpc calls a remote procedure on another mole instance given its id or alias.
138+
func Rpc(id, method string, params interface{}) (string, error) {
139+
d, err := fsutils.InstanceDir(id)
140+
if err != nil {
141+
return "", err
142+
}
143+
144+
rf := filepath.Join(d, "rpc")
145+
146+
addr, err := ioutil.ReadFile(rf)
147+
if err != nil {
148+
return "", err
149+
}
150+
151+
resp, err := rpc.Call(context.Background(), string(addr), method, params)
152+
if err != nil {
153+
return "", err
154+
}
155+
156+
r, err := json.MarshalIndent(resp, "", " ")
157+
if err != nil {
158+
return "", err
159+
}
160+
161+
return string(r), nil
162+
}

rpc/rpc.go

+32-13
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,16 @@ package rpc
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"net"
78
"sync"
89

9-
"encoding/json"
10-
1110
log "github.com/sirupsen/logrus"
1211
"github.com/sourcegraph/jsonrpc2"
1312
)
1413

1514
var registeredMethods = sync.Map{}
16-
var listener net.Listener
1715

1816
const (
1917
// DefaultAddress is the network address used by the rpc server if none is given.
@@ -29,7 +27,7 @@ func Start(address string) (net.Addr, error) {
2927
address = DefaultAddress
3028
}
3129

32-
listener, err = net.Listen("tcp", address)
30+
lis, err := net.Listen("tcp", address)
3331
if err != nil {
3432
return nil, err
3533
}
@@ -39,7 +37,7 @@ func Start(address string) (net.Addr, error) {
3937

4038
go func() {
4139
for {
42-
conn, err := listener.Accept()
40+
conn, err := lis.Accept()
4341
if err != nil {
4442
log.WithError(err).Warnf("error establishing connection with rpc client.")
4543
}
@@ -48,7 +46,7 @@ func Start(address string) (net.Addr, error) {
4846
}
4947
}()
5048

51-
return listener.Addr(), nil
49+
return lis.Addr(), nil
5250
}
5351

5452
// Handler handles JSON-RPC requests and notifications.
@@ -79,8 +77,7 @@ func (h *Handler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2
7977
return
8078
}
8179

82-
m, _ := registeredMethods.Load(req.Method)
83-
rm, err := m.(Method)(req.Params)
80+
params, err := req.Params.MarshalJSON()
8481
if err != nil {
8582
log.WithFields(log.Fields{
8683
"notification": req.Notif,
@@ -96,9 +93,31 @@ func (h *Handler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2
9693
})
9794

9895
sendResponse(ctx, conn, req, resp)
96+
}
97+
98+
return
99+
}
100+
101+
m, _ := registeredMethods.Load(req.Method)
102+
rm, err := m.(Method)(params)
103+
if err != nil {
104+
log.WithFields(log.Fields{
105+
"notification": req.Notif,
106+
"method": req.Method,
107+
"id": req.ID,
108+
}).WithError(err).Warn("error executing rpc method.")
99109

100-
return
110+
if !req.Notif {
111+
resp := &jsonrpc2.Response{}
112+
resp.SetResult(jsonrpc2.Error{
113+
Code: jsonrpc2.CodeInternalError,
114+
Message: fmt.Sprintf("error executing rpc method %s", req.Method),
115+
})
116+
117+
sendResponse(ctx, conn, req, resp)
101118
}
119+
120+
return
102121
}
103122

104123
if !req.Notif {
@@ -116,10 +135,10 @@ func Register(name string, method Method) {
116135
// Method represents a procedure that can be called remotely.
117136
type Method func(params interface{}) (json.RawMessage, error)
118137

119-
// Call initiates a JSON-RPC call using the specified method and waits for the
120-
// response.
121-
func Call(ctx context.Context, method string, params interface{}) (map[string]interface{}, error) {
122-
tc, err := net.Dial("tcp", listener.Addr().String())
138+
// Call initiates a JSON-RPC call to a given rpc server address, using the
139+
// specified method and waits for the response.
140+
func Call(ctx context.Context, addr, method string, params interface{}) (map[string]interface{}, error) {
141+
tc, err := net.Dial("tcp", addr)
123142
if err != nil {
124143
return nil, err
125144
}

rpc/rpc_test.go

+24-6
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,37 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"net"
78
"os"
89
"testing"
910

1011
"github.com/davrodpin/mole/rpc"
1112
)
1213

14+
var (
15+
addr net.Addr
16+
)
17+
1318
func TestHandler(t *testing.T) {
19+
responseTemplate := `{"message":"%s"}`
1420
method := "test"
15-
expectedResponse := `{"message":"test"}`
21+
paramValue := "param"
22+
expectedResponse := fmt.Sprintf(responseTemplate, paramValue)
1623

1724
rpc.Register(method, func(params interface{}) (json.RawMessage, error) {
18-
return json.RawMessage(expectedResponse), nil
25+
var r []uint8
26+
var ok bool
27+
28+
if r, ok = params.([]uint8); !ok {
29+
return nil, fmt.Errorf("invalid parameter")
30+
}
31+
32+
m := fmt.Sprintf(responseTemplate, string(r[1:6]))
33+
34+
return json.RawMessage(m), nil
1935
})
2036

21-
response, err := rpc.Call(context.Background(), method, "param")
37+
response, err := rpc.Call(context.Background(), addr.String(), method, paramValue)
2238
if err != nil {
2339
t.Errorf("error while calling remote procedure: %v", err)
2440
}
@@ -37,7 +53,7 @@ func TestMethodNotRegistered(t *testing.T) {
3753
method := "methodnotregistered"
3854
expectedResponse := fmt.Sprintf(`{"code":-32601,"data":null,"message":"method %s not found"}`, method)
3955

40-
response, err := rpc.Call(context.Background(), method, "param")
56+
response, err := rpc.Call(context.Background(), addr.String(), method, "param")
4157
if err != nil {
4258
t.Errorf("error while calling remote procedure: %v", err)
4359
}
@@ -60,7 +76,7 @@ func TestMethodWithError(t *testing.T) {
6076
return nil, fmt.Errorf("error")
6177
})
6278

63-
response, err := rpc.Call(context.Background(), method, "param")
79+
response, err := rpc.Call(context.Background(), addr.String(), method, "param")
6480
if err != nil {
6581
t.Errorf("error while calling remote procedure: %v", err)
6682
}
@@ -76,7 +92,9 @@ func TestMethodWithError(t *testing.T) {
7692
}
7793

7894
func TestMain(m *testing.M) {
79-
_, err := rpc.Start(rpc.DefaultAddress)
95+
var err error
96+
97+
addr, err = rpc.Start(rpc.DefaultAddress)
8098
if err != nil {
8199
fmt.Printf("error initializing rpc server: %v", err)
82100
os.Exit(1)

0 commit comments

Comments
 (0)