Skip to content

Commit bc1d20e

Browse files
authored
Merge pull request #4 from singchia/sync
Sync
2 parents 8deda53 + e32b972 commit bc1d20e

File tree

5 files changed

+201
-54
lines changed

5 files changed

+201
-54
lines changed

bench/main.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
_ "net/http/pprof"
8+
"syscall"
9+
10+
"github.com/jumboframes/armorigo/sigaction"
11+
"github.com/singchia/yafsm"
12+
)
13+
14+
const (
15+
INIT = "init"
16+
CONN_SENT = "conn_sent"
17+
CONN_RECV = "conn_recv"
18+
CONNED = "conned"
19+
ABNORMAL = "abnormal"
20+
CLOSE_SENT = "close_sent"
21+
CLOSE_RECV = "close_recv"
22+
CLOSE_HALF = "close_half"
23+
CLOSED = "closed"
24+
FINI = "fini"
25+
26+
ET_CONNSENT = "connsent"
27+
ET_CONNRECV = "connrecv"
28+
ET_CONNACK = "connack"
29+
ET_ERROR = "error"
30+
ET_EOF = "eof"
31+
ET_CLOSESENT = "closesent"
32+
ET_CLOSERECV = "closerecv"
33+
ET_CLOSEACK = "closeack"
34+
ET_FINI = "fini"
35+
)
36+
37+
func initFSM(fsm *yafsm.FSM) {
38+
init := fsm.AddState(INIT)
39+
connrecv := fsm.AddState(CONN_RECV)
40+
conned := fsm.AddState(CONNED)
41+
closesent := fsm.AddState(CLOSE_SENT)
42+
closerecv := fsm.AddState(CLOSE_RECV)
43+
closehalf := fsm.AddState(CLOSE_HALF)
44+
closed := fsm.AddState(FINI)
45+
fini := fsm.AddState(FINI)
46+
fsm.SetState(INIT)
47+
48+
// events
49+
fsm.AddEvent(ET_CONNRECV, init, connrecv)
50+
fsm.AddEvent(ET_CONNACK, connrecv, conned)
51+
52+
fsm.AddEvent(ET_ERROR, init, closed)
53+
fsm.AddEvent(ET_ERROR, connrecv, closed)
54+
fsm.AddEvent(ET_ERROR, conned, closed)
55+
fsm.AddEvent(ET_ERROR, closesent, closed)
56+
fsm.AddEvent(ET_ERROR, closerecv, closed)
57+
58+
fsm.AddEvent(ET_EOF, connrecv, closed)
59+
fsm.AddEvent(ET_EOF, conned, closed)
60+
61+
fsm.AddEvent(ET_CLOSESENT, conned, closesent)
62+
fsm.AddEvent(ET_CLOSESENT, closerecv, closesent) // close and been closed at same time
63+
fsm.AddEvent(ET_CLOSESENT, closehalf, closehalf) // close and been closed at same time
64+
65+
fsm.AddEvent(ET_CLOSERECV, conned, closerecv)
66+
fsm.AddEvent(ET_CLOSERECV, closesent, closerecv) // close and been closed at same time
67+
fsm.AddEvent(ET_CLOSERECV, closehalf, closehalf) // close and been closed at same time
68+
69+
fsm.AddEvent(ET_CLOSEACK, closesent, closehalf)
70+
fsm.AddEvent(ET_CLOSEACK, closerecv, closehalf)
71+
fsm.AddEvent(ET_CLOSEACK, closehalf, closed)
72+
// fini
73+
fsm.AddEvent(ET_FINI, init, fini)
74+
fsm.AddEvent(ET_FINI, connrecv, fini)
75+
fsm.AddEvent(ET_FINI, conned, fini)
76+
fsm.AddEvent(ET_FINI, closesent, fini)
77+
fsm.AddEvent(ET_FINI, closerecv, fini)
78+
fsm.AddEvent(ET_FINI, closehalf, fini)
79+
fsm.AddEvent(ET_FINI, closed, fini)
80+
}
81+
82+
func main() {
83+
go func() {
84+
http.ListenAndServe("0.0.0.0:6061", nil)
85+
}()
86+
87+
fsms := []*yafsm.FSM{}
88+
count := 100000
89+
for i := 0; i < count; i++ {
90+
fsm := yafsm.NewFSM()
91+
fsms = append(fsms, fsm)
92+
initFSM(fsm)
93+
fsm.EmitEvent(ET_CONNRECV)
94+
fsm.EmitEvent(CLOSE_SENT)
95+
fsm.EmitEvent(ET_ERROR)
96+
fsm.EmitEvent(ET_FINI)
97+
fsm.Close()
98+
}
99+
fmt.Println("done")
100+
101+
sig := sigaction.NewSignal()
102+
sig.Add(syscall.SIGINT)
103+
sig.Wait(context.Background())
104+
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ module github.com/singchia/yafsm
22

33
go 1.15
44

5-
require github.com/jumboframes/armorigo v0.1.0
5+
require github.com/jumboframes/armorigo v0.2.3

go.sum

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/jumboframes/armorigo v0.2.3 h1:Yf/Oxc81mtHKBBL6tnpZ7jWZ50TdqfdoeByxRZD8Uzo=
4+
github.com/jumboframes/armorigo v0.2.3/go.mod h1:sXe0R32y6V3oJD2eXcPzMlimvZx0xIDiLedpQOy06t4=
5+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
6+
github.com/singchia/go-timer/v2 v2.0.3/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=
7+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
8+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
9+
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
10+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
11+
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
12+
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
13+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
14+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
15+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

pkg/prioqueue/prioqueue.go

+1
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ func (pq *PrioQueue) PopSync() interface{} {
209209
}
210210
}
211211

212+
// user should release the reference to pq after Close
212213
func (pq *PrioQueue) Close() {
213214
pq.mutex.Lock()
214215
defer pq.mutex.Unlock()

yafsm.go

+80-53
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,26 @@ func (et *Event) AddHandler(handler EventHandler) {
6262
et.handlers = append(et.handlers, handler)
6363
}
6464

65+
type FSMOption func(*FSM)
66+
67+
func WithAsync() FSMOption {
68+
return func(fsm *FSM) {
69+
fsm.async = true
70+
}
71+
}
72+
6573
type FSM struct {
6674
state string
6775
states map[string]*State
6876
events map[string]*list.List
77+
78+
async bool
6979
mutex sync.RWMutex
7080
pq *prioqueue.PrioQueue
7181
cancel context.CancelFunc
7282
}
7383

74-
func NewFSM() *FSM {
84+
func NewFSM(opts ...FSMOption) *FSM {
7585
pq, _ := prioqueue.NewPrioQueue()
7686
ctx, cancel := context.WithCancel(context.Background())
7787
fsm := &FSM{
@@ -80,7 +90,12 @@ func NewFSM() *FSM {
8090
states: make(map[string]*State),
8191
events: make(map[string]*list.List),
8292
}
83-
go fsm.emit(ctx)
93+
for _, opt := range opts {
94+
opt(fsm)
95+
}
96+
if fsm.async {
97+
go fsm.emit(ctx)
98+
}
8499
return fsm
85100
}
86101

@@ -90,11 +105,9 @@ func (fsm *FSM) Init(state string) *State {
90105
}
91106

92107
func (fsm *FSM) Close() {
93-
fsm.cancel()
94-
fsm.cancel = nil
95-
96108
fsm.mutex.Lock()
97109
defer fsm.mutex.Unlock()
110+
98111
for k, _ := range fsm.states {
99112
delete(fsm.states, k)
100113
}
@@ -106,63 +119,66 @@ func (fsm *FSM) Close() {
106119
delete(fsm.events, k)
107120
}
108121
fsm.pq.Close()
122+
fsm.cancel()
109123
}
110124

111125
func (fsm *FSM) emit(ctx context.Context) {
112126
for {
113127
select {
114128
case <-ctx.Done():
115-
fsm.pq = nil
116-
fsm.events = nil
117-
fsm.states = nil
118129
return
119-
120130
default:
121-
data := fsm.pq.PopSync()
122-
if data == nil {
123-
continue
124-
}
125-
switch ec := data.(type) {
126-
case *eventchan:
127-
et := (*Event)(nil)
128-
129-
fsm.mutex.RLock()
130-
etList, ok := fsm.events[ec.event]
131-
if !ok {
132-
ec.ch <- ErrEventNotExist
133-
close(ec.ch)
134-
fsm.mutex.RUnlock()
135-
continue
136-
}
137-
for elem := etList.Front(); elem != nil; elem = elem.Next() {
138-
tmp := elem.Value.(*Event)
139-
if tmp.From.State == fsm.state {
140-
et = tmp
141-
}
142-
}
143-
fsm.mutex.RUnlock()
144-
145-
if et == nil {
146-
ec.ch <- ErrIllegalStateForEvent
147-
close(ec.ch)
148-
continue
149-
}
150-
for _, left := range et.From.lefts {
151-
left(et.From)
152-
}
153-
for _, handler := range et.handlers {
154-
handler(et)
155-
}
156-
for _, enter := range et.To.enters {
157-
enter(et.To)
158-
}
159-
fsm.mutex.Lock()
160-
fsm.state = et.To.State
161-
fsm.mutex.Unlock()
162-
ec.ch <- nil
163-
close(ec.ch)
131+
fsm.emitOne()
132+
}
133+
}
134+
}
135+
136+
func (fsm *FSM) emitOne() {
137+
data := fsm.pq.PopSync()
138+
if data == nil {
139+
return
140+
}
141+
switch ec := data.(type) {
142+
case *eventchan:
143+
et := (*Event)(nil)
144+
145+
fsm.mutex.RLock()
146+
etList, ok := fsm.events[ec.event]
147+
if !ok {
148+
ec.ch <- ErrEventNotExist
149+
close(ec.ch)
150+
fsm.mutex.RUnlock()
151+
return
152+
}
153+
fsm.mutex.RUnlock()
154+
155+
fsm.mutex.Lock()
156+
for elem := etList.Front(); elem != nil; elem = elem.Next() {
157+
tmp := elem.Value.(*Event)
158+
if tmp.From.State == fsm.state {
159+
et = tmp
164160
}
165161
}
162+
if et == nil {
163+
ec.ch <- ErrIllegalStateForEvent
164+
close(ec.ch)
165+
fsm.mutex.Unlock()
166+
return
167+
}
168+
fsm.state = et.To.State
169+
fsm.mutex.Unlock()
170+
171+
for _, left := range et.From.lefts {
172+
left(et.From)
173+
}
174+
for _, handler := range et.handlers {
175+
handler(et)
176+
}
177+
for _, enter := range et.To.enters {
178+
enter(et.To)
179+
}
180+
ec.ch <- nil
181+
close(ec.ch)
166182
}
167183
}
168184

@@ -390,11 +406,13 @@ func (fsm *FSM) EmitEvent(event string) error {
390406
event: event,
391407
ch: ch,
392408
}
393-
394409
err := fsm.pq.Push(eventchan)
395410
if err != nil {
396411
return err
397412
}
413+
if !fsm.async {
414+
fsm.emitOne()
415+
}
398416
err = <-ch
399417
return err
400418
}
@@ -419,6 +437,9 @@ func (fsm *FSM) EmitEventAsync(event string) <-chan error {
419437
ch <- err
420438
return ch
421439
}
440+
if !fsm.async {
441+
fsm.emitOne()
442+
}
422443
return ch
423444
}
424445

@@ -440,6 +461,9 @@ func (fsm *FSM) EmitPrioEvent(prio int, event string) error {
440461
if err != nil {
441462
return err
442463
}
464+
if !fsm.async {
465+
fsm.emitOne()
466+
}
443467
err = <-ch
444468
return err
445469
}
@@ -464,5 +488,8 @@ func (fsm *FSM) EmitPrioEventAsync(prio int, event string) <-chan error {
464488
ch <- err
465489
return ch
466490
}
491+
if !fsm.async {
492+
fsm.emitOne()
493+
}
467494
return ch
468495
}

0 commit comments

Comments
 (0)