Skip to content
This repository was archived by the owner on Mar 15, 2024. It is now read-only.

Implement /v1/retrieval-events handler #4

Merged
merged 1 commit into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
recorder
17 changes: 10 additions & 7 deletions cmd/recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,42 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"

recorder "github.com/filecoin-project/lassie-event-recorder"
"github.com/filecoin-project/lassie-event-recorder/eventrecorder"
"github.com/ipfs/go-log/v2"
)

var logger = log.Logger("lassie/event_recorder/cmd")

func main() {
log.SetAllLoggers(log.LevelInfo)
ctx := context.Background()

// TODO: add flags for all options eventually.
httpListenAddr := flag.String("httpListenAddr", "0.0.0.0:40080", "The HTTP server listen address in address:port format.")

flag.Parse()

r, err := recorder.NewRecorder(
recorder.WithHttpServerListenAddr(*httpListenAddr),
)
addrOpt := eventrecorder.WithHttpServerListenAddr(*httpListenAddr)
r, err := eventrecorder.NewEventRecorder(ctx, addrOpt)
if err != nil {
logger.Fatalw("Failed to instantiate recorder", "err", err)
}
ctx := context.Background()

if err = r.Start(ctx); err != nil {
logger.Fatalw("Failed to start recorder", "err", err)
}

sch := make(chan os.Signal, 1)
signal.Notify(sch, os.Interrupt)
<-sch
fmt.Println()
logger.Info("Terminating...")
if err := r.Shutdown(ctx); err != nil {
logger.Warnw("Failed to shut down server.", "err", err)
} else {
logger.Info("Shut down server successfully.")
logger.Info("Shut down server successfully")
}
}
22 changes: 11 additions & 11 deletions options.go → eventrecorder/config.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
package recorder
package eventrecorder

import "time"

type (
Option func(*options) error
options struct {
config struct {
httpServerListenAddr string
httpServerReadTimeout time.Duration
httpServerReadHeaderTimeout time.Duration
httpServerWriteTimeout time.Duration
httpServerIdleTimeout time.Duration
httpServerMaxHeaderBytes int
}
option func(*config) error
)

func newOptions(o ...Option) (*options, error) {
opts := options{
func newConfig(opts []option) (*config, error) {
cfg := &config{
httpServerListenAddr: "0.0.0.0:8080",
httpServerReadTimeout: 5 * time.Second,
httpServerReadHeaderTimeout: 5 * time.Second,
httpServerWriteTimeout: 5 * time.Second,
httpServerIdleTimeout: 10 * time.Second,
httpServerMaxHeaderBytes: 2048,
}
for _, apply := range o {
if err := apply(&opts); err != nil {
for _, opt := range opts {
if err := opt(cfg); err != nil {
return nil, err
}
}
return &opts, nil
return cfg, nil
}

func WithHttpServerListenAddr(a string) Option {
return func(o *options) error {
o.httpServerListenAddr = a
func WithHttpServerListenAddr(addr string) option {
return func(cfg *config) error {
cfg.httpServerListenAddr = addr
return nil
}
}
124 changes: 124 additions & 0 deletions eventrecorder/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package eventrecorder

import (
"errors"
"fmt"
"time"

types "github.com/filecoin-project/lassie/pkg/types"
"github.com/google/uuid"
)

var (
validPhases = []string{"indexer", "query", "retrieval"}
validEventNames = []string{
"accepted",
"candidates-filtered",
"candidates-found",
"connected",
"failure",
"first-byte-received",
"proposed",
"query-asked",
"query-asked-filtered",
"started",
"success",
}
)

type event struct {
RetrievalId *types.RetrievalID `json:"retrievalId"`
InstanceId *string `json:"instanceId,omitempty"`
Cid *string `json:"cid"`
StorageProviderId *string `json:"storageProviderId"`
Phase *types.Phase `json:"phase"`
PhaseStartTime *time.Time `json:"phaseStartTime"`
EventName *types.EventCode `json:"eventName"`
EventTime *time.Time `json:"eventTime"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these all pointers because they are optional fields? Validation implementation suggests otherwise. Which leaves the question why pointers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed them to be pointers because I need to check if their values are nil to gaurd against missing properties.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why not check for zero values?

EventDetails interface{} `json:"eventDetails,omitempty"`
}

func (e event) validate() error {
// RetrievalId
if e.RetrievalId == nil {
return errors.New("Property retrievalId is required")
}
if _, err := uuid.Parse(e.RetrievalId.String()); err != nil {
return errors.New("Property retrievalId should be a valud v4 uuid")
}

// InstanceId
if e.InstanceId == nil {
return errors.New("Property instanceId is required")
}

// Cid
if e.Cid == nil {
return errors.New("Property cid is required")
}

// StorageProviderId
if e.StorageProviderId == nil {
return errors.New("Property storageProviderId is required")
}

// Phase
if e.Phase == nil {
return errors.New("Property phase is required")
}
isValidPhase := false
for _, phase := range validPhases {
if string(*e.Phase) == phase {
isValidPhase = true
break
}
}
if !isValidPhase {
return errors.New(fmt.Sprintf("Property phase failed validation. Phase must be created with one of the following values: %v", validPhases))
}

// PhaseStartTime
if e.PhaseStartTime == nil {
return errors.New("Property phaseStartTime is required")
}

// EventName
if e.EventName == nil {
return errors.New("Property eventName is required")
}
isValidEventName := false
for _, phase := range validEventNames {
if string(*e.EventName) == phase {
isValidEventName = true
break
}
}
if !isValidEventName {
return errors.New(fmt.Sprintf("Property eventName failed validation. Event name must be created with one of the following values: %v", validEventNames))
}

// EventTime
if e.EventTime == nil {
return errors.New("Property eventTime is required")
}

return nil
}

type eventBatch struct {
Events []event `json:"events"`
}

func (e eventBatch) validate() error {
if e.Events == nil {
return errors.New("Property events is required")
}

for _, event := range e.Events {
if err := event.validate(); err != nil {
return err
}
}

return nil
}
153 changes: 153 additions & 0 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package eventrecorder

import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"os"
"strings"

"github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5/pgxpool"
)

var logger = log.Logger("lassie/event_recorder")

type EventRecorder struct {
cfg *config
server *http.Server
db *pgxpool.Pool
}

func NewEventRecorder(ctx context.Context, opts ...option) (*EventRecorder, error) {
cfg, err := newConfig(opts)
if err != nil {
return nil, errors.New(fmt.Sprint("Failed to apply option:", err))
}

var recorder EventRecorder
recorder.cfg = cfg
recorder.server = &http.Server{
Addr: recorder.cfg.httpServerListenAddr,
Handler: recorder.httpServerMux(),
ReadTimeout: recorder.cfg.httpServerReadTimeout,
ReadHeaderTimeout: recorder.cfg.httpServerReadHeaderTimeout,
WriteTimeout: recorder.cfg.httpServerWriteTimeout,
IdleTimeout: recorder.cfg.httpServerWriteTimeout,
MaxHeaderBytes: recorder.cfg.httpServerMaxHeaderBytes,
}

poolConfig, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
if err != nil {
return nil, errors.New(fmt.Sprint("Unable to parse DATABASE_URL:", err))
}

db, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
return nil, errors.New(fmt.Sprint("Failed to create connection pool:", err))
}
recorder.db = db

return &recorder, nil
}

func (r *EventRecorder) Start(_ context.Context) error {
ln, err := net.Listen("tcp", r.server.Addr)
if err != nil {
return err
}
go func() { _ = r.server.Serve(ln) }()
logger.Infow("Server started", "addr", ln.Addr())
return nil
}

func (r *EventRecorder) httpServerMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/v1/retrieval-events", r.handleRetrievalEvents)
return mux
}

func (r *EventRecorder) handleRetrievalEvents(res http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
res.Header().Add("Allow", http.MethodPost)
http.Error(res, "", http.StatusMethodNotAllowed)
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusMethodNotAllowed)
return
}

// Check if we're getting JSON content
contentType := req.Header.Get("Content-Type")
if !strings.HasPrefix(contentType, "application/json") {
http.Error(res, "Not an acceptable content type. Content type must be application/json.", http.StatusBadRequest)
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusBadRequest)
return
}

// Decode JSON body
var batch eventBatch
if err := json.NewDecoder(req.Body).Decode(&batch); err != nil {
http.Error(res, err.Error(), http.StatusBadRequest)
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusBadRequest)
return
}

// Validate JSON
if err := batch.validate(); err != nil {
http.Error(res, err.Error(), http.StatusBadRequest)
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusBadRequest)
return
}

ctx := context.Background()
var errs []error
for _, event := range batch.Events {
query := `
INSERT INTO retrieval_events(
retrieval_id,
instance_id,
cid,
storage_provider_id,
phase,
phase_start_time,
event_name,
event_time,
event_details
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`
values := []interface{}{
event.RetrievalId.String(),
event.InstanceId,
event.Cid,
event.StorageProviderId,
event.Phase,
event.PhaseStartTime,
event.EventName,
event.EventTime,
event.EventDetails,
}
_, err := r.db.Exec(ctx, query, values...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do batch insertion here

Copy link
Contributor Author

@kylehuntsman kylehuntsman Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried reading the docs and couldn't understand how to make that work. I agree though. I ended up here just to get something working.

if err != nil {
logger.Errorw("Could not execute insert query for retrieval event", "values", values, "err", err.Error())
errs = append(errs, err)
continue
}

logger.Debug("Saved retrieval event")
}

if len(errs) != 0 {
http.Error(res, "", http.StatusInternalServerError)
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusInternalServerError)
return
} else {
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusOK)
}
}

func (r *EventRecorder) Shutdown(ctx context.Context) error {
return r.server.Shutdown(ctx)
}
Loading