This repository was archived by the owner on Mar 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Implement /v1/retrieval-events handler #4
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
recorder |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,39 +1,39 @@ | ||
package recorder | ||
package eventrecorder | ||
|
||
import "time" | ||
|
||
type ( | ||
Option func(*options) error | ||
options struct { | ||
config struct { | ||
httpServerListenAddr string | ||
httpServerReadTimeout time.Duration | ||
httpServerReadHeaderTimeout time.Duration | ||
httpServerWriteTimeout time.Duration | ||
httpServerIdleTimeout time.Duration | ||
httpServerMaxHeaderBytes int | ||
} | ||
option func(*config) error | ||
) | ||
|
||
func newOptions(o ...Option) (*options, error) { | ||
opts := options{ | ||
func newConfig(opts []option) (*config, error) { | ||
cfg := &config{ | ||
httpServerListenAddr: "0.0.0.0:8080", | ||
httpServerReadTimeout: 5 * time.Second, | ||
httpServerReadHeaderTimeout: 5 * time.Second, | ||
httpServerWriteTimeout: 5 * time.Second, | ||
httpServerIdleTimeout: 10 * time.Second, | ||
httpServerMaxHeaderBytes: 2048, | ||
} | ||
for _, apply := range o { | ||
if err := apply(&opts); err != nil { | ||
for _, opt := range opts { | ||
if err := opt(cfg); err != nil { | ||
return nil, err | ||
} | ||
} | ||
return &opts, nil | ||
return cfg, nil | ||
} | ||
|
||
func WithHttpServerListenAddr(a string) Option { | ||
return func(o *options) error { | ||
o.httpServerListenAddr = a | ||
func WithHttpServerListenAddr(addr string) option { | ||
return func(cfg *config) error { | ||
cfg.httpServerListenAddr = addr | ||
return nil | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package eventrecorder | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
types "github.com/filecoin-project/lassie/pkg/types" | ||
"github.com/google/uuid" | ||
) | ||
|
||
var ( | ||
validPhases = []string{"indexer", "query", "retrieval"} | ||
validEventNames = []string{ | ||
"accepted", | ||
"candidates-filtered", | ||
"candidates-found", | ||
"connected", | ||
"failure", | ||
"first-byte-received", | ||
"proposed", | ||
"query-asked", | ||
"query-asked-filtered", | ||
"started", | ||
"success", | ||
} | ||
) | ||
|
||
type event struct { | ||
RetrievalId *types.RetrievalID `json:"retrievalId"` | ||
InstanceId *string `json:"instanceId,omitempty"` | ||
Cid *string `json:"cid"` | ||
StorageProviderId *string `json:"storageProviderId"` | ||
Phase *types.Phase `json:"phase"` | ||
PhaseStartTime *time.Time `json:"phaseStartTime"` | ||
EventName *types.EventCode `json:"eventName"` | ||
EventTime *time.Time `json:"eventTime"` | ||
EventDetails interface{} `json:"eventDetails,omitempty"` | ||
} | ||
|
||
func (e event) validate() error { | ||
// RetrievalId | ||
if e.RetrievalId == nil { | ||
return errors.New("Property retrievalId is required") | ||
} | ||
if _, err := uuid.Parse(e.RetrievalId.String()); err != nil { | ||
return errors.New("Property retrievalId should be a valud v4 uuid") | ||
} | ||
|
||
// InstanceId | ||
if e.InstanceId == nil { | ||
return errors.New("Property instanceId is required") | ||
} | ||
|
||
// Cid | ||
if e.Cid == nil { | ||
return errors.New("Property cid is required") | ||
} | ||
|
||
// StorageProviderId | ||
if e.StorageProviderId == nil { | ||
return errors.New("Property storageProviderId is required") | ||
} | ||
|
||
// Phase | ||
if e.Phase == nil { | ||
return errors.New("Property phase is required") | ||
} | ||
isValidPhase := false | ||
for _, phase := range validPhases { | ||
if string(*e.Phase) == phase { | ||
isValidPhase = true | ||
break | ||
} | ||
} | ||
if !isValidPhase { | ||
return errors.New(fmt.Sprintf("Property phase failed validation. Phase must be created with one of the following values: %v", validPhases)) | ||
} | ||
|
||
// PhaseStartTime | ||
if e.PhaseStartTime == nil { | ||
return errors.New("Property phaseStartTime is required") | ||
} | ||
|
||
// EventName | ||
if e.EventName == nil { | ||
return errors.New("Property eventName is required") | ||
} | ||
isValidEventName := false | ||
for _, phase := range validEventNames { | ||
if string(*e.EventName) == phase { | ||
isValidEventName = true | ||
break | ||
} | ||
} | ||
if !isValidEventName { | ||
return errors.New(fmt.Sprintf("Property eventName failed validation. Event name must be created with one of the following values: %v", validEventNames)) | ||
} | ||
|
||
// EventTime | ||
if e.EventTime == nil { | ||
return errors.New("Property eventTime is required") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type eventBatch struct { | ||
Events []event `json:"events"` | ||
} | ||
|
||
func (e eventBatch) validate() error { | ||
if e.Events == nil { | ||
return errors.New("Property events is required") | ||
} | ||
|
||
for _, event := range e.Events { | ||
if err := event.validate(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
package eventrecorder | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net" | ||
"net/http" | ||
"os" | ||
"strings" | ||
|
||
"github.com/ipfs/go-log/v2" | ||
"github.com/jackc/pgx/v5/pgxpool" | ||
) | ||
|
||
var logger = log.Logger("lassie/event_recorder") | ||
|
||
type EventRecorder struct { | ||
cfg *config | ||
server *http.Server | ||
db *pgxpool.Pool | ||
} | ||
|
||
func NewEventRecorder(ctx context.Context, opts ...option) (*EventRecorder, error) { | ||
cfg, err := newConfig(opts) | ||
if err != nil { | ||
return nil, errors.New(fmt.Sprint("Failed to apply option:", err)) | ||
} | ||
|
||
var recorder EventRecorder | ||
recorder.cfg = cfg | ||
recorder.server = &http.Server{ | ||
Addr: recorder.cfg.httpServerListenAddr, | ||
Handler: recorder.httpServerMux(), | ||
ReadTimeout: recorder.cfg.httpServerReadTimeout, | ||
ReadHeaderTimeout: recorder.cfg.httpServerReadHeaderTimeout, | ||
WriteTimeout: recorder.cfg.httpServerWriteTimeout, | ||
IdleTimeout: recorder.cfg.httpServerWriteTimeout, | ||
MaxHeaderBytes: recorder.cfg.httpServerMaxHeaderBytes, | ||
} | ||
|
||
poolConfig, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL")) | ||
if err != nil { | ||
return nil, errors.New(fmt.Sprint("Unable to parse DATABASE_URL:", err)) | ||
} | ||
|
||
db, err := pgxpool.NewWithConfig(ctx, poolConfig) | ||
if err != nil { | ||
return nil, errors.New(fmt.Sprint("Failed to create connection pool:", err)) | ||
} | ||
recorder.db = db | ||
|
||
return &recorder, nil | ||
} | ||
|
||
func (r *EventRecorder) Start(_ context.Context) error { | ||
ln, err := net.Listen("tcp", r.server.Addr) | ||
if err != nil { | ||
return err | ||
} | ||
go func() { _ = r.server.Serve(ln) }() | ||
logger.Infow("Server started", "addr", ln.Addr()) | ||
return nil | ||
} | ||
|
||
func (r *EventRecorder) httpServerMux() *http.ServeMux { | ||
mux := http.NewServeMux() | ||
mux.HandleFunc("/v1/retrieval-events", r.handleRetrievalEvents) | ||
return mux | ||
} | ||
|
||
func (r *EventRecorder) handleRetrievalEvents(res http.ResponseWriter, req *http.Request) { | ||
if req.Method != http.MethodPost { | ||
res.Header().Add("Allow", http.MethodPost) | ||
http.Error(res, "", http.StatusMethodNotAllowed) | ||
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusMethodNotAllowed) | ||
return | ||
} | ||
|
||
// Check if we're getting JSON content | ||
contentType := req.Header.Get("Content-Type") | ||
if !strings.HasPrefix(contentType, "application/json") { | ||
http.Error(res, "Not an acceptable content type. Content type must be application/json.", http.StatusBadRequest) | ||
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusBadRequest) | ||
return | ||
} | ||
|
||
// Decode JSON body | ||
var batch eventBatch | ||
if err := json.NewDecoder(req.Body).Decode(&batch); err != nil { | ||
http.Error(res, err.Error(), http.StatusBadRequest) | ||
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusBadRequest) | ||
return | ||
} | ||
|
||
// Validate JSON | ||
if err := batch.validate(); err != nil { | ||
http.Error(res, err.Error(), http.StatusBadRequest) | ||
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusBadRequest) | ||
return | ||
} | ||
|
||
ctx := context.Background() | ||
var errs []error | ||
for _, event := range batch.Events { | ||
query := ` | ||
INSERT INTO retrieval_events( | ||
retrieval_id, | ||
instance_id, | ||
cid, | ||
storage_provider_id, | ||
phase, | ||
phase_start_time, | ||
event_name, | ||
event_time, | ||
event_details | ||
) | ||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) | ||
` | ||
values := []interface{}{ | ||
event.RetrievalId.String(), | ||
event.InstanceId, | ||
event.Cid, | ||
event.StorageProviderId, | ||
event.Phase, | ||
event.PhaseStartTime, | ||
event.EventName, | ||
event.EventTime, | ||
event.EventDetails, | ||
} | ||
_, err := r.db.Exec(ctx, query, values...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should do batch insertion here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried reading the docs and couldn't understand how to make that work. I agree though. I ended up here just to get something working. |
||
if err != nil { | ||
logger.Errorw("Could not execute insert query for retrieval event", "values", values, "err", err.Error()) | ||
errs = append(errs, err) | ||
continue | ||
} | ||
|
||
logger.Debug("Saved retrieval event") | ||
} | ||
|
||
if len(errs) != 0 { | ||
http.Error(res, "", http.StatusInternalServerError) | ||
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusInternalServerError) | ||
return | ||
} else { | ||
logger.Infof("%s %s %d", req.Method, req.URL.Path, http.StatusOK) | ||
} | ||
} | ||
|
||
func (r *EventRecorder) Shutdown(ctx context.Context) error { | ||
return r.server.Shutdown(ctx) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these all pointers because they are optional fields? Validation implementation suggests otherwise. Which leaves the question why pointers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed them to be pointers because I need to check if their values are nil to gaurd against missing properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious why not check for zero values?