diff --git a/deprecated.go b/deprecated.go new file mode 100644 index 0000000..1343b2e --- /dev/null +++ b/deprecated.go @@ -0,0 +1,16 @@ +package flow + +import moved "github.com/libp2p/go-libp2p-core/metrics" + +// Deprecated: Use github.com/libp2p/go-libp2p/metrics.IdleRate instead. +// Warning: it's not possible to alias variables in Go. Setting a value here may have no effect. +var IdleRate = moved.IdleRate + +// Deprecated: Use github.com/libp2p/go-libp2p/metrics.Snapshot instead. +type Snapshot = moved.Snapshot + +// Deprecated: Use github.com/libp2p/go-libp2p/metrics.Meter instead. +type Meter = moved.Meter + +// Deprecated: Use github.com/libp2p/go-libp2p/metrics.MeterRegistry instead. +type MeterRegistry = moved.MeterRegistry diff --git a/flow_test.go b/flow_test.go deleted file mode 100644 index 27a93fe..0000000 --- a/flow_test.go +++ /dev/null @@ -1,175 +0,0 @@ -package flow - -import ( - "math" - "sync" - "testing" - "time" -) - -func TestBasic(t *testing.T) { - var wg sync.WaitGroup - wg.Add(100) - for i := 0; i < 100; i++ { - go func() { - defer wg.Done() - ticker := time.NewTicker(40 * time.Millisecond) - defer ticker.Stop() - - m := new(Meter) - for i := 0; i < 300; i++ { - m.Mark(1000) - <-ticker.C - } - actual := m.Snapshot() - if !approxEq(actual.Rate, 25000, 500) { - t.Errorf("expected rate 25000 (±500), got %f", actual.Rate) - } - - for i := 0; i < 200; i++ { - m.Mark(200) - <-ticker.C - } - - // Adjusts - actual = m.Snapshot() - if !approxEq(actual.Rate, 5000, 200) { - t.Errorf("expected rate 5000 (±200), got %f", actual.Rate) - } - - // Let it settle. - time.Sleep(2 * time.Second) - - // get the right total - actual = m.Snapshot() - if actual.Total != 340000 { - t.Errorf("expected total %d, got %d", 340000, actual.Total) - } - }() - } - wg.Wait() -} - -func TestShared(t *testing.T) { - var wg sync.WaitGroup - wg.Add(20 * 21) - for i := 0; i < 20; i++ { - m := new(Meter) - for j := 0; j < 20; j++ { - go func() { - defer wg.Done() - ticker := time.NewTicker(40 * time.Millisecond) - defer ticker.Stop() - for i := 0; i < 300; i++ { - m.Mark(50) - <-ticker.C - } - - for i := 0; i < 200; i++ { - m.Mark(10) - <-ticker.C - } - }() - } - go func() { - defer wg.Done() - time.Sleep(40 * 300 * time.Millisecond) - actual := m.Snapshot() - if !approxEq(actual.Rate, 25000, 250) { - t.Errorf("expected rate 25000 (±250), got %f", actual.Rate) - } - - time.Sleep(40 * 200 * time.Millisecond) - - // Adjusts - actual = m.Snapshot() - if !approxEq(actual.Rate, 5000, 50) { - t.Errorf("expected rate 5000 (±50), got %f", actual.Rate) - } - - // Let it settle. - time.Sleep(2 * time.Second) - - // get the right total - actual = m.Snapshot() - if actual.Total != 340000 { - t.Errorf("expected total %d, got %d", 340000, actual.Total) - } - }() - } - wg.Wait() -} - -func TestUnregister(t *testing.T) { - var wg sync.WaitGroup - wg.Add(100 * 2) - pause := make(chan struct{}) - - for i := 0; i < 100; i++ { - m := new(Meter) - go func() { - defer wg.Done() - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - for i := 0; i < 40; i++ { - m.Mark(1) - <-ticker.C - } - - <-pause - time.Sleep(2 * time.Second) - - for i := 0; i < 40; i++ { - m.Mark(2) - <-ticker.C - } - }() - go func() { - defer wg.Done() - time.Sleep(40 * 100 * time.Millisecond) - - actual := m.Snapshot() - if !approxEq(actual.Rate, 10, 1) { - t.Errorf("expected rate 10 (±1), got %f", actual.Rate) - } - - <-pause - - actual = m.Snapshot() - if actual.Total != 40 { - t.Errorf("expected total 4000, got %d", actual.Total) - } - time.Sleep(2*time.Second + 40*100*time.Millisecond) - - actual = m.Snapshot() - if !approxEq(actual.Rate, 20, 4) { - t.Errorf("expected rate 20 (±4), got %f", actual.Rate) - } - time.Sleep(2 * time.Second) - actual = m.Snapshot() - if actual.Total != 120 { - t.Errorf("expected total 120, got %d", actual.Total) - } - }() - - } - time.Sleep(60 * time.Second) - globalSweeper.mutex.Lock() - if len(globalSweeper.meters) != 0 { - t.Errorf("expected all sweepers to be unregistered: %d", len(globalSweeper.meters)) - } - globalSweeper.mutex.Unlock() - close(pause) - - wg.Wait() - - globalSweeper.mutex.Lock() - if len(globalSweeper.meters) != 100 { - t.Errorf("expected all sweepers to be registered: %d", len(globalSweeper.meters)) - } - globalSweeper.mutex.Unlock() -} - -func approxEq(a, b, err float64) bool { - return math.Abs(a-b) < err -} diff --git a/go.mod b/go.mod index 908c4c3..ea64469 100644 --- a/go.mod +++ b/go.mod @@ -1 +1,3 @@ module github.com/libp2p/go-flow-metrics + +require github.com/libp2p/go-libp2p-core v0.0.0-20190416150442-54a1b70f07da diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8f6963a --- /dev/null +++ b/go.sum @@ -0,0 +1,112 @@ +github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78= +github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= +github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= +github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= +github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= +github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= +github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= +github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY= +github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= +github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU= +github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= +github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc= +github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= +github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= +github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= +github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= +github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= +github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= +github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= +github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw= +github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= +github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= +github.com/libp2p/go-libp2p-core v0.0.0-20190416121404-7f05a8a4f5c8 h1:RHG/byuvHV83SDzb4apslsTp6hS+d/MGXavG2aMvUIo= +github.com/libp2p/go-libp2p-core v0.0.0-20190416130012-1e60799a3db2 h1:QZJwCrEHDdqVRKbuOhGq783Wq2WkR1HpQ2LQ0Cxcrfs= +github.com/libp2p/go-libp2p-core v0.0.0-20190416130012-1e60799a3db2/go.mod h1:A6c9cHiasMtqu1NBtJHk/famE2iZNK60gx8JLuawdKg= +github.com/libp2p/go-libp2p-core v0.0.0-20190416150442-54a1b70f07da h1:zkOK0lDgEDQbkX+R5n3RGtRQQs3Ldh+FH0TEEg8S3P0= +github.com/libp2p/go-libp2p-core v0.0.0-20190416150442-54a1b70f07da/go.mod h1:5WmdfBZgh0OiotzGCDVDutKBwXGIf3rRDtxbNF33WYo= +github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH/LmYl8gw= +github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE= +github.com/libp2p/go-libp2p-peer v0.0.1/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo= +github.com/libp2p/go-libp2p-peer v0.1.0/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo= +github.com/libp2p/go-libp2p-peerstore v0.0.2/go.mod h1:RabLyPVJLuNQ+GFyoEkfi8H4Ti6k/HtZJ7YKgtSq+20= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= +github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ= +github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= +github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ= +github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= +github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs= +github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= +github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= +github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= +github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s= +github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= +github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= +github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d1tnvM7ihcFwHMSstNAVUU= +github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= +github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ= +github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= +github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= +github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA= +golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M= +golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190219092855-153ac476189d h1:Z0Ahzd7HltpJtjAHHxX8QFP3j1yYgiuvjbjRzDj/KH0= +golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/meter.go b/meter.go deleted file mode 100644 index 412dd3d..0000000 --- a/meter.go +++ /dev/null @@ -1,44 +0,0 @@ -package flow - -import ( - "fmt" - "sync/atomic" -) - -// Snapshot is a rate/total snapshot. -type Snapshot struct { - Rate float64 - Total uint64 -} - -func (s Snapshot) String() string { - return fmt.Sprintf("%d (%f/s)", s.Total, s.Rate) -} - -// Meter is a meter for monitoring a flow. -type Meter struct { - accumulator uint64 - - // Take lock. - snapshot Snapshot -} - -// Mark updates the total. -func (m *Meter) Mark(count uint64) { - if count > 0 && atomic.AddUint64(&m.accumulator, count) == count { - // I'm the first one to bump this above 0. - // Register it. - globalSweeper.Register(m) - } -} - -// Snapshot gets a consistent snapshot of the total and rate. -func (m *Meter) Snapshot() Snapshot { - globalSweeper.mutex.RLock() - defer globalSweeper.mutex.RUnlock() - return m.snapshot -} - -func (m *Meter) String() string { - return m.Snapshot().String() -} diff --git a/meter_test.go b/meter_test.go deleted file mode 100644 index f0bd810..0000000 --- a/meter_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package flow - -import ( - "fmt" - "math" - "time" -) - -func ExampleMeter() { - meter := new(Meter) - t := time.NewTicker(100 * time.Millisecond) - for i := 0; i < 100; i++ { - <-t.C - meter.Mark(30) - } - - // Get the current rate. This will be accurate *now* but not after we - // sleep (because we calculate it using EWMA). - rate := meter.Snapshot().Rate - - // Sleep 2 seconds to allow the total to catch up. We snapshot every - // second so the total may not yet be accurate. - time.Sleep(2 * time.Second) - - // Get the current total. - total := meter.Snapshot().Total - - fmt.Printf("%d (%d/s)\n", total, roundTens(rate)) - // Output: 3000 (300/s) -} - -func roundTens(x float64) int64 { - return int64(math.Floor(x/10+0.5)) * 10 -} diff --git a/registry.go b/registry.go deleted file mode 100644 index 226a482..0000000 --- a/registry.go +++ /dev/null @@ -1,35 +0,0 @@ -package flow - -import ( - "sync" -) - -// MeterRegistry is a registry for named meters. -type MeterRegistry struct { - meters sync.Map -} - -// Get gets (or creates) a meter by name. -func (r *MeterRegistry) Get(name string) *Meter { - if m, ok := r.meters.Load(name); ok { - return m.(*Meter) - } - m, _ := r.meters.LoadOrStore(name, new(Meter)) - return m.(*Meter) -} - -// Remove removes the named meter from the registry. -// -// Note: The only reason to do this is to save a bit of memory. Unused meters -// don't consume any CPU (after they go idle). -func (r *MeterRegistry) Remove(name string) { - r.meters.Delete(name) -} - -// ForEach calls the passed function for each registered meter. -func (r *MeterRegistry) ForEach(iterFunc func(string, *Meter)) { - r.meters.Range(func(k, v interface{}) bool { - iterFunc(k.(string), v.(*Meter)) - return true - }) -} diff --git a/registry_test.go b/registry_test.go deleted file mode 100644 index 24adf7c..0000000 --- a/registry_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package flow - -import ( - "testing" - "time" -) - -func TestRegistry(t *testing.T) { - r := new(MeterRegistry) - m1 := r.Get("first") - m2 := r.Get("second") - m1.Mark(10) - m2.Mark(30) - - time.Sleep(2 * time.Second) - - if total := r.Get("first").Snapshot().Total; total != 10 { - t.Errorf("expected first total to be 10, got %d", total) - } - if total := r.Get("second").Snapshot().Total; total != 30 { - t.Errorf("expected second total to be 30, got %d", total) - } - - expectedMeters := map[string]*Meter{ - "first": m1, - "second": m2, - } - r.ForEach(func(n string, m *Meter) { - if expectedMeters[n] != m { - t.Errorf("wrong meter '%s'", n) - } - delete(expectedMeters, n) - }) - if len(expectedMeters) != 0 { - t.Errorf("missing meters: '%v'", expectedMeters) - } - - r.Remove("first") - - found := false - r.ForEach(func(n string, m *Meter) { - if n != "second" { - t.Errorf("found unexpected meter: %s", n) - return - } - if found { - t.Error("found meter twice") - } - found = true - }) - - if !found { - t.Errorf("didn't find second meter") - } - - m3 := r.Get("first") - if m3 == m1 { - t.Error("should have gotten a new meter") - } - if total := m3.Snapshot().Total; total != 0 { - t.Errorf("expected first total to now be 0, got %d", total) - } - - expectedMeters = map[string]*Meter{ - "first": m3, - "second": m2, - } - r.ForEach(func(n string, m *Meter) { - if expectedMeters[n] != m { - t.Errorf("wrong meter '%s'", n) - } - delete(expectedMeters, n) - }) - if len(expectedMeters) != 0 { - t.Errorf("missing meters: '%v'", expectedMeters) - } -} diff --git a/sweeper.go b/sweeper.go deleted file mode 100644 index 21ecf31..0000000 --- a/sweeper.go +++ /dev/null @@ -1,153 +0,0 @@ -package flow - -import ( - "math" - "sync" - "sync/atomic" - "time" -) - -// IdleRate the rate at which we declare a meter idle (and stop tracking it -// until it's re-registered). -// -// The default ensures that 1 event every ~30s will keep the meter from going -// idle. -var IdleRate = 1e-13 - -// Alpha for EWMA of 1s -var alpha = 1 - math.Exp(-1.0) - -// The global sweeper. -var globalSweeper sweeper - -type sweeper struct { - sweepOnce sync.Once - meters []*Meter - mutex sync.RWMutex - lastUpdateTime time.Time - registerChannel chan *Meter -} - -func (sw *sweeper) start() { - sw.registerChannel = make(chan *Meter, 16) - go sw.run() -} - -func (sw *sweeper) run() { - for m := range sw.registerChannel { - sw.register(m) - sw.runActive() - } -} - -func (sw *sweeper) register(m *Meter) { - // Add back the snapshot total. If we unregistered this - // one, we set it to zero. - atomic.AddUint64(&m.accumulator, m.snapshot.Total) - sw.meters = append(sw.meters, m) -} - -func (sw *sweeper) runActive() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - sw.lastUpdateTime = time.Now() - for len(sw.meters) > 0 { - // Scale back allocation. - if len(sw.meters)*2 < cap(sw.meters) { - newMeters := make([]*Meter, len(sw.meters)) - copy(newMeters, sw.meters) - sw.meters = newMeters - } - - select { - case <-ticker.C: - sw.update() - case m := <-sw.registerChannel: - sw.register(m) - } - } - sw.meters = nil - // Till next time. -} - -func (sw *sweeper) update() { - sw.mutex.Lock() - defer sw.mutex.Unlock() - - now := time.Now() - tdiff := now.Sub(sw.lastUpdateTime) - if tdiff <= 0 { - return - } - sw.lastUpdateTime = now - timeMultiplier := float64(time.Second) / float64(tdiff) - - newLen := len(sw.meters) - - for i, m := range sw.meters { - total := atomic.LoadUint64(&m.accumulator) - instant := timeMultiplier * float64(total-m.snapshot.Total) - - if m.snapshot.Rate == 0 { - m.snapshot.Rate = instant - } else { - m.snapshot.Rate += alpha * (instant - m.snapshot.Rate) - } - m.snapshot.Total = total - - // This is equivalent to one zeros, then one, then 30 zeros. - // We'll consider that to be "idle". - if m.snapshot.Rate > IdleRate { - continue - } - - // Ok, so we are idle... - - // Mark this as idle by zeroing the accumulator. - swappedTotal := atomic.SwapUint64(&m.accumulator, 0) - - // So..., are we really idle? - if swappedTotal > total { - // Not so idle... - // Now we need to make sure this gets re-registered. - - // First, add back what we removed. If we can do this - // fast enough, we can put it back before anyone - // notices. - currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal) - - // Did we make it? - if currentTotal == swappedTotal { - // Yes! Nobody noticed, move along. - continue - } - // No. Someone noticed and will (or has) put back into - // the registration channel. - // - // Remove the snapshot total, it'll get added back on - // registration. - // - // `^uint64(total - 1)` is the two's complement of - // `total`. It's the "correct" way to subtract - // atomically in go. - atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) - } - - // Reset the rate, keep the total. - m.snapshot.Rate = 0 - newLen-- - sw.meters[i] = sw.meters[newLen] - } - - // trim the meter list - for i := newLen; i < len(sw.meters); i++ { - sw.meters[i] = nil - } - sw.meters = sw.meters[:newLen] -} - -func (sw *sweeper) Register(m *Meter) { - sw.sweepOnce.Do(sw.start) - sw.registerChannel <- m -}