diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 04f337ad..fd862bb9 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -12,6 +12,7 @@ import ( "io/ioutil" "net/http" "net/url" + "strings" "testing" "time" @@ -49,13 +50,13 @@ var ( var createPublicRoomOpts = map[string]interface{}{ "preset": "public_chat", "name": "the hangout spot", - "room_version": "org.matrix.msc2716v3", + "room_version": "org.matrix.msc2716v4", } var createPrivateRoomOpts = map[string]interface{}{ "preset": "private_chat", "name": "the hangout spot", - "room_version": "org.matrix.msc2716v3", + "room_version": "org.matrix.msc2716v4", } func TestImportHistoricalMessages(t *testing.T) { @@ -84,9 +85,9 @@ func TestImportHistoricalMessages(t *testing.T) { // come back in the correct order from /messages. // // Final timeline output: ( [n] = historical batch ) - // (oldest) A, B, [insertion, c, d, e, batch] [insertion, f, g, h, batch, insertion], I, J (newest) + // (oldest) A, B, [insertion, c, d, e, batch] [insertion, f, g, h, batch], baseInsertion, I, J (newest) // historical batch 1 historical batch 0 - t.Run("Historical events resolve with proper state in correct order", func(t *testing.T) { + t.Run("Historical events resolve in the correct order", func(t *testing.T) { t.Parallel() roomID := as.CreateRoom(t, createPublicRoomOpts) @@ -174,37 +175,15 @@ func TestImportHistoricalMessages(t *testing.T) { "dir": []string{"b"}, "limit": []string{"100"}, })) - messsageResBody := client.ParseJSON(t, messagesRes) - eventDebugStringsFromResponse := getRelevantEventDebugStringsFromMessagesResponse(t, messsageResBody) - // Since the original body can only be read once, create a new one from the body bytes we just read - messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody)) - - // Copy the array by slice so we can modify it as we iterate in the foreach loop. - // We save the full untouched `expectedEventIDOrder` for use in the log messages - workingExpectedEventIDOrder := expectedEventIDOrder must.MatchResponse(t, messagesRes, match.HTTPResponse{ JSON: []match.JSON{ - match.JSONArrayEach("chunk", func(r gjson.Result) error { - // Find all events in order - if isRelevantEvent(r) { - // Pop the next message off the expected list - nextEventIdInOrder := workingExpectedEventIDOrder[0] - workingExpectedEventIDOrder = workingExpectedEventIDOrder[1:] - - if r.Get("event_id").Str != nextEventIdInOrder { - return fmt.Errorf("Next event found was %s but expected %s\nActualEvents (%d): %v\nExpectedEvents (%d): %v", r.Get("event_id").Str, nextEventIdInOrder, len(eventDebugStringsFromResponse), eventDebugStringsFromResponse, len(expectedEventIDOrder), expectedEventIDOrder) - } - } - - return nil - }), + matcherJSONEventIDArrayInOrder("chunk", + expectedEventIDOrder, + relevantToScrollbackEventFilter, + ), }, }) - - if len(workingExpectedEventIDOrder) != 0 { - t.Fatalf("Expected all events to be matched in message response but there were some left-over events: %s", workingExpectedEventIDOrder) - } }) t.Run("Historical events from multiple users in the same batch", func(t *testing.T) { @@ -238,21 +217,7 @@ func TestImportHistoricalMessages(t *testing.T) { // Status 200, ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - - messagesRes := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - - must.MatchResponse(t, messagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, - }) + validateBatchSendRes(t, as, roomID, batchSendRes, false) }) t.Run("Historical events from /batch_send do not come down in an incremental sync", func(t *testing.T) { @@ -468,21 +433,76 @@ func TestImportHistoricalMessages(t *testing.T) { // Status 200, ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + validateBatchSendRes( + t, + as, + roomID, + batchSendRes, + // We can't validate the state in this case because the invite event + // won't be resolved in `/messages` state field (i.e. only the member + // event is needed to auth those events) + false, + ) + }) - messagesRes := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + t.Run("should resolve member state events for historical events", func(t *testing.T) { + t.Parallel() - must.MatchResponse(t, messagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, - }) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) + + // Create the "live" event we are going to insert our backfilled events next to + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 2) + + // Import a batch of historical events + batchSendRes0 := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 4), + // Status + 200, + ) + validateBatchSendRes( + t, + as, + roomID, + batchSendRes0, + // Validate the state + true, + ) + batchSendResBody0 := client.ParseJSON(t, batchSendRes0) + nextBatchID0 := client.GetJSONFieldStr(t, batchSendResBody0, "next_batch_id") + + // Import another older batch of history from the same user. + // Make sure the meta data and joins still work on the subsequent batch + batchSendRes1 := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + nextBatchID0, + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 4), + // Status + 200, + ) + validateBatchSendRes( + t, + as, + roomID, + batchSendRes1, + // Validate the state + true, + ) }) t.Run("Should be able to send a batch without any state_events_at_start - user already joined in the current room state", func(t *testing.T) { @@ -510,26 +530,14 @@ func TestImportHistoricalMessages(t *testing.T) { // Status 200, ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - - messagesRes := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - must.MatchResponse(t, messagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, - }) - }) - - t.Run("TODO: Test if historical avatar/display name set back in time are picked up on historical messages", func(t *testing.T) { - t.Skip("Skipping until implemented") - // TODO: Try adding avatar and displayName and see if historical messages get this info + validateBatchSendRes( + t, + as, + roomID, + batchSendRes, + false, + ) }) t.Run("TODO: What happens when you point multiple batches at the same insertion event?", func(t *testing.T) { @@ -575,6 +583,12 @@ func TestImportHistoricalMessages(t *testing.T) { return false }) + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ "dir": []string{"b"}, "limit": []string{"100"}, @@ -647,6 +661,12 @@ func TestImportHistoricalMessages(t *testing.T) { return false }) + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ "dir": []string{"b"}, "limit": []string{"100"}, @@ -709,7 +729,7 @@ func TestImportHistoricalMessages(t *testing.T) { "limit": []string{"100"}, })) beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) - eventDebugStringsFromBeforeMarkerResponse := getRelevantEventDebugStringsFromMessagesResponse(t, beforeMarkerMesssageResBody) + eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) // Since the original body can only be read once, create a new one from the body bytes we just read beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) @@ -734,6 +754,12 @@ func TestImportHistoricalMessages(t *testing.T) { // Send the marker event sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ "dir": []string{"b"}, "limit": []string{"100"}, @@ -798,7 +824,7 @@ func TestImportHistoricalMessages(t *testing.T) { "limit": []string{"100"}, })) beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) - eventDebugStringsFromBeforeMarkerResponse := getRelevantEventDebugStringsFromMessagesResponse(t, beforeMarkerMesssageResBody) + eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) // Since the original body can only be read once, create a new one from the body bytes we just read beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) // Make sure the history isn't visible before we expect it to be there. @@ -821,6 +847,12 @@ func TestImportHistoricalMessages(t *testing.T) { // Send the marker event sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ "dir": []string{"b"}, "limit": []string{"100"}, @@ -870,26 +902,16 @@ func TestImportHistoricalMessages(t *testing.T) { // Status 200, ) + // Make sure we can see all of the historical messages + validateBatchSendRes(t, as, roomID, batchSendRes, false) + // Grab the `next_batch_id` for the next batch batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") nextBatchID := client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") - messagesRes := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - - must.MatchResponse(t, messagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, - }) - - // Now try to do a subsequent batch send. This will make sure - // that insertion events are stored/tracked and can be matched up in the next batch - batchSendHistoricalMessages( + // Now try to do a subsequent batch send. This will make sure that + // insertion events are stored/tracked and can be matched up in the next + // batch + batchSendRes1 := batchSendHistoricalMessages( t, as, roomID, @@ -900,6 +922,8 @@ func TestImportHistoricalMessages(t *testing.T) { // Status 200, ) + // Make sure we can see all of the historical messages + validateBatchSendRes(t, as, roomID, batchSendRes1, false) }) t.Run("Not allowed to redact MSC2716 insertion, batch, marker events", func(t *testing.T) { @@ -1004,33 +1028,44 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, } } -func isRelevantEvent(r gjson.Result) bool { - return len(r.Get("content").Get("body").Str) > 0 || - r.Get("type").Str == insertionEventType || - r.Get("type").Str == batchEventType || - r.Get("type").Str == markerEventType +func historicalEventFilter(r gjson.Result) bool { + // This includes messages, insertion, batch, and marker events because we + // include the historical field on all of them. + return r.Get("content").Get(strings.ReplaceAll(historicalContentField, ".", "\\.")).Exists() } -func getRelevantEventDebugStringsFromMessagesResponse(t *testing.T, body []byte) (eventIDsFromResponse []string) { +func relevantToScrollbackEventFilter(r gjson.Result) bool { + return r.Get("type").Str == "m.room.message" || historicalEventFilter(r) +} + +func mustGetRelevantEventDebugStringsFromMessagesResponse(t *testing.T, wantKey string, body []byte, eventFilter func(gjson.Result) bool) (eventIDsFromResponse []string) { t.Helper() - wantKey := "chunk" + debugStrings, err := getRelevantEventDebugStringsFromMessagesResponse(wantKey, body, eventFilter) + if err != nil { + t.Fatal(err) + } + + return debugStrings +} + +func getRelevantEventDebugStringsFromMessagesResponse(wantKey string, body []byte, eventFilter func(gjson.Result) bool) (eventIDsFromResponse []string, err error) { res := gjson.GetBytes(body, wantKey) if !res.Exists() { - t.Fatalf("missing key '%s'", wantKey) + return nil, fmt.Errorf("missing key '%s'", wantKey) } if !res.IsArray() { - t.Fatalf("key '%s' is not an array (was %s)", wantKey, res.Type) + return nil, fmt.Errorf("key '%s' is not an array (was %s)", wantKey, res.Type) } res.ForEach(func(key, r gjson.Result) bool { - if isRelevantEvent(r) { + if eventFilter(r) { eventIDsFromResponse = append(eventIDsFromResponse, r.Get("event_id").Str+" ("+r.Get("content").Get("body").Str+")") } return true }) - return eventIDsFromResponse + return eventIDsFromResponse, nil } // ensureVirtualUserRegistered makes sure the user is registered for the homeserver regardless @@ -1151,7 +1186,8 @@ func createJoinStateEventsForBatchSendRequest( "sender": virtualUserID, "origin_server_ts": insertOriginServerTs, "content": map[string]interface{}{ - "membership": "join", + "membership": "join", + "displayname": fmt.Sprintf("some-display-name-for-%s", virtualUserID), }, "state_key": virtualUserID, } @@ -1253,3 +1289,163 @@ func batchSendHistoricalMessages( return res } + +// Verify that the batch of historical messages looks correct and in order +// (newest -> oldest) from /messages?dir=b. We can also optionally check that +// the historical state resolves for that chunk of messages. +// +// Note: the historical state will only resolve correctly if the first message +// of `/messages` is one of messages in the historical batch. +func validateBatchSendRes(t *testing.T, c *client.CSAPI, roomID string, batchSendRes *http.Response, validateState bool) { + t.Helper() + + batchSendResBody0 := client.ParseJSON(t, batchSendRes) + // Since the original body can only be read once, create a new one from the + // body bytes we just read + batchSendRes.Body = ioutil.NopCloser(bytes.NewBuffer(batchSendResBody0)) + + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody0, "event_ids") + stateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody0, "state_event_ids") + batchEventID := client.GetJSONFieldStr(t, batchSendResBody0, "batch_event_id") + insertionEventID := client.GetJSONFieldStr(t, batchSendResBody0, "insertion_event_id") + baseInsertionEventID := gjson.GetBytes(batchSendResBody0, "base_insertion_event_id").Str + + // Expected list is ordered from newest -> oldest event time + var expectedEventIDOrder []string + if baseInsertionEventID != "" { + expectedEventIDOrder = append(expectedEventIDOrder, baseInsertionEventID) + } + expectedEventIDOrder = append(expectedEventIDOrder, batchEventID) + expectedEventIDOrder = append(expectedEventIDOrder, reversed(historicalEventIDs)...) + expectedEventIDOrder = append(expectedEventIDOrder, insertionEventID) + + if validateState { + // Get a pagination token for the newest-in-time event in the historical batch itself + contextRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "context", expectedEventIDOrder[0]}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "limit": []string{"0"}, + })) + contextResBody := client.ParseJSON(t, contextRes) + batchStartPaginationToken := client.GetJSONFieldStr(t, contextResBody, "end") + + // Fetch a chunk of `/messages` which only contains the historical batch. We + // want to do this because `/messages` only returns the state for the first + // message in the `chunk` and we want to be able assert that the historical + // state is able to be resolved. + messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + // Go backwards (newest -> oldest) (same direction as if you were using scrollback) + "dir": []string{"b"}, + // From the newest-in-time event in the historical batch + "from": []string{batchStartPaginationToken}, + // We are aiming to scrollback to the oldest-in-time event from the + // historical batch + "limit": []string{fmt.Sprintf("%d", len(expectedEventIDOrder))}, + // We add these options to the filter so we get member events in the state field + "filter": []string{"{\"lazy_load_members\":true,\"include_redundant_members\":true}"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + // Double-check that we're in the right place of scrollback + matcherJSONEventIDArrayInOrder("chunk", + expectedEventIDOrder, + historicalEventFilter, + ), + // Make sure the historical m.room.member join state event resolves + // for the given chunk of messages in scrollback. The member event + // will include the displayname and avatar. + match.JSONCheckOffAllowUnwanted("state", makeInterfaceSlice(stateEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) + } + + // Make sure the historical events appear in scrollback without jumping back + // in time specifically. + fullMessagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + must.MatchResponse(t, fullMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + matcherJSONEventIDArrayInOrder("chunk", + expectedEventIDOrder, + historicalEventFilter, + ), + }, + }) +} + +// matcherJSONEventIDArrayInOrder loops through `jsonArrayKey` in the response +// to find the sliding window of expected event ID's(`expectedEventIDOrder`) +// somewhere in the array in order. The expected list can start anywhere in the +// overall list filtered by `eventFilter`. +func matcherJSONEventIDArrayInOrder(jsonArrayKey string, expectedEventIDOrder []string, eventFilter func(gjson.Result) bool) match.JSON { + return func(body []byte) error { + if len(expectedEventIDOrder) == 0 { + return fmt.Errorf("expectedEventIDOrder can not be an empty list") + } + + // Copy the array by slice so we can modify it as we iterate in the foreach loop. + // We save the full untouched `expectedEventIDOrder` for use in the log messages + workingExpectedEventIDOrder := expectedEventIDOrder + + var res gjson.Result + if jsonArrayKey == "" { + res = gjson.ParseBytes(body) + } else { + res = gjson.GetBytes(body, jsonArrayKey) + } + + if !res.Exists() { + return fmt.Errorf("missing key '%s'", jsonArrayKey) + } + if !res.IsArray() { + return fmt.Errorf("key '%s' is not an array", jsonArrayKey) + } + + eventDebugStringsFromResponse, err := getRelevantEventDebugStringsFromMessagesResponse("chunk", body, eventFilter) + if err != nil { + return err + } + + // Loop through the overall event list + foundFirstEvent := false + res.ForEach(func(_, r gjson.Result) bool { + eventID := r.Get("event_id").Str + nextEventIdInOrder := workingExpectedEventIDOrder[0] + + // We need to find the start of the sliding window inside the overall + // event list + if !foundFirstEvent && eventID == nextEventIdInOrder { + foundFirstEvent = true + } + + // Once we found the first event, find all events in order + if foundFirstEvent && eventFilter(r) { + if r.Get("event_id").Str != nextEventIdInOrder { + err = fmt.Errorf("Next event found was %s but expected %s\nActualEvents (%d): %v\nExpectedEvents (%d): %v", r.Get("event_id").Str, nextEventIdInOrder, len(eventDebugStringsFromResponse), eventDebugStringsFromResponse, len(expectedEventIDOrder), expectedEventIDOrder) + return false + } + + // Now that we found it, pop the message off the expected list + workingExpectedEventIDOrder = workingExpectedEventIDOrder[1:] + } + + // Found all of the expected events, stop iterating + if len(workingExpectedEventIDOrder) == 0 { + return false + } + + return true + }) + + // There was some left-over events in the list but we should have found all + // of them + if len(workingExpectedEventIDOrder) != 0 { + return fmt.Errorf("Expected all events to be matched in message response but there were some left-over events (%d): %s\nActualEvents (%d): %v\nExpectedEvents (%d): %v", len(workingExpectedEventIDOrder), workingExpectedEventIDOrder, len(eventDebugStringsFromResponse), eventDebugStringsFromResponse, len(expectedEventIDOrder), expectedEventIDOrder) + } + + return err + } +}