Skip to content

Draft: Make sure historical state events don't come down /transactions for application services (MSC2716) #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
2 changes: 1 addition & 1 deletion internal/b/hs_with_application_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var BlueprintHSWithApplicationService = MustValidate(Blueprint{
ApplicationServices: []ApplicationService{
{
ID: "my_as_id",
URL: "http://localhost:9000",
URL: "http://host.docker.internal:9111",
SenderLocalpart: "the-bridge-user",
RateLimited: false,
},
Expand Down
134 changes: 124 additions & 10 deletions tests/msc2716_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ package tests

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/tidwall/gjson"

"github.com/matrix-org/complement/internal/b"
Expand Down Expand Up @@ -255,7 +257,7 @@ func TestImportHistoricalMessages(t *testing.T) {
})
})

t.Run("Historical events from /batch_send do not come down in an incremental sync", func(t *testing.T) {
t.Run("Historical events from batch_send do not come down in an incremental sync", func(t *testing.T) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the / in the test name so I can actually run it individually, COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS=1 COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh TestImportHistoricalMessages/parallel/Historical_events_from_batch_send_do_not_come_down_in_an_incremental_sync

See https://gist.github.com/MadLittleMods/4ab08f51609fab759247f299a4e33406 for why there is a problem using a / to match a single test.

t.Parallel()

roomID := as.CreateRoom(t, createPublicRoomOpts)
Expand All @@ -269,6 +271,10 @@ func TestImportHistoricalMessages(t *testing.T) {
// Create some "live" events to saturate and fill up the /sync response
createMessagesInRoom(t, alice, roomID, 5)

// Get a /sync `since` pagination token we can try paginating from later
// on
since := doInitialSync(t, alice)

// Import a historical event
batchSendRes := batchSendHistoricalMessages(
t,
Expand All @@ -283,25 +289,122 @@ func TestImportHistoricalMessages(t *testing.T) {
)
batchSendResBody := client.ParseJSON(t, batchSendRes)
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")
historicalEventId := historicalEventIDs[0]
historicalStateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")

// This is just a dummy event we search for after the historicalEventId
// This is just a dummy event we search for after the historicalEventIDs/historicalStateEventIDs
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1)
eventIDAfterHistoricalImport := eventIDsAfterHistoricalImport[0]

// Sync until we find the eventIDAfterHistoricalImport.
// If we're able to see the eventIDAfterHistoricalImport that occurs after
// the historicalEventId without seeing eventIDAfterHistoricalImport in
// between, we're probably safe to assume it won't sync
alice.SyncUntil(t, "", `{ "room": { "timeline": { "limit": 3 } } }`, "rooms.join."+client.GjsonEscape(roomID)+".timeline.events", func(r gjson.Result) bool {
if r.Get("event_id").Str == historicalEventId {
t.Fatalf("We should not see the %s historical event in /sync response but it was present", historicalEventId)
// Sync from before we did any batch sending until we find the
// eventIDAfterHistoricalImport. If we're able to see
// eventIDAfterHistoricalImport without any the
// historicalEventIDs/historicalStateEventIDs in between, we're probably
// safe to assume it won't sync.
alice.SyncUntil(t, since, "", "rooms.join."+client.GjsonEscape(roomID)+".timeline.events", func(r gjson.Result) bool {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this test more clear on what's happening. We now paginate sync from before we /batch_send until an event that occurred after /batch_send and just make sure we didnt' see any historical state or events in between.

if includes(r.Get("event_id").Str, historicalEventIDs) || includes(r.Get("event_id").Str, historicalStateEventIDs) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we additionally check for the historicalStateEventIDs now

t.Fatalf("We should not see the %s historical event in /sync response but it was present", r.Get("event_id").Str)
}

return r.Get("event_id").Str == eventIDAfterHistoricalImport
})
})

t.Run("Historical events from batch_send do not get pushed out as application service transactions", func(t *testing.T) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split out the /sync stuff into a different PR (which can be accepted) and leave this PR with me.

Thanks for the review @kegsay 🐢, I've split out the /sync stuff to #235

t.Parallel()

// Create a listener and handler to stub an application service listening
// for transactions from a homeserver.
Copy link
Collaborator Author

@MadLittleMods MadLittleMods Nov 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we want to create NewApplicationService to mirror federation.NewServer?

The Application service here just needs to listen to a single endpoint though (simple http listener), nothing special.

handler := mux.NewRouter()
// Application Service API: /_matrix/app/v1/transactions/{txnId}
waiter := NewWaiter()
var eventIDsWeSawOverTransactions []string
var eventIDAfterHistoricalImport string
handler.HandleFunc("/transactions/{txnId}", func(w http.ResponseWriter, req *http.Request) {
must.MatchRequest(t, req, match.HTTPRequest{
JSON: []match.JSON{
match.JSONArrayEach("events", func(r gjson.Result) error {
// Add to our running list of events
eventIDsWeSawOverTransactions = append(eventIDsWeSawOverTransactions, r.Get("event_id").Str)

// If we found the event that occurs after our batch send. we can
// probably safely assume the historical events won't come later.
if r.Get("event_id").Str != "" && r.Get("event_id").Str == eventIDAfterHistoricalImport {
defer waiter.Finish()
}

return nil
}),
},
})

// Acknowledge that we've seen the transaction
w.WriteHeader(200)
w.Write([]byte("{}"))
}).Methods("PUT")

srv := &http.Server{
Addr: ":9111",
Handler: handler,
}
go func() {
srv.ListenAndServe()
}()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inspired by the Server.Listen code

defer srv.Shutdown(context.Background())
// ----------------------------------------------------------

// Create the room all of the action is going to happen in
roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to insert our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

// Import a historical event
batchSendRes := batchSendHistoricalMessages(
t,
as,
roomID,
eventIdBefore,
"",
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore),
createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 1),
// Status
200,
)
batchSendResBody := client.ParseJSON(t, batchSendRes)
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")
historicalStateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")

// This is just a dummy event we search for after the historicalEventIDs/historicalStateEventIDs
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1)
eventIDAfterHistoricalImport = eventIDsAfterHistoricalImport[0]

// Check if eventIDAfterHistoricalImport already came over `/transactions`.
if !includes(eventIDAfterHistoricalImport, eventIDsWeSawOverTransactions) {
// If not, wait 5 seconds for to see if it happens. The waiter will only
// resolve if we see eventIDAfterHistoricalImport, otherwise timeout
waiter.Wait(t, 5*time.Second)
}

// Now, that we know eventIDAfterHistoricalImport came over /transactions,
// we can probably safely assume the historical events won't come later.

// Check to make sure the historical events didn't come over /transactions
for _, historicalEventID := range historicalEventIDs {
if includes(historicalEventID, eventIDsWeSawOverTransactions) {
t.Fatalf("We should not see the %s historical event come over /transactions but it did", historicalEventID)
}
}
// Check to make sure the historical state events didn't come over /transactions
for _, historicalStateEventID := range historicalStateEventIDs {
if includes(historicalStateEventID, eventIDsWeSawOverTransactions) {
t.Fatalf("We should not see the %s historical state event come over /transactions but it did", historicalStateEventID)
}
}
})

t.Run("Batch send endpoint only returns state events that we passed in via state_events_at_start", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -926,6 +1029,17 @@ func reversed(in []string) []string {
return out
}

// Find a given "needle" string in a list of strings, the haystack
func includes(needle string, haystack []string) bool {
for _, item := range haystack {
if needle == item {
return true
}
}

return false
}

func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) {
t.Helper()
start := time.Now()
Expand Down