-
Notifications
You must be signed in to change notification settings - Fork 59
Draft: Test that /messages
works on remote homeserver and can be backfilled properly after many batches (MSC2716)
#214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
8099f47
094c5f7
e30bcd4
1e333d6
2022b31
d325349
2fe5180
0604564
83adbe2
4aba836
ffbca43
37109fa
cc7236b
4c8284a
9b90429
677836b
85eb7bd
3532821
1667e15
0589546
606197a
d679384
230c46e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -61,6 +61,7 @@ var createPrivateRoomOpts = map[string]interface{}{ | |||
func TestImportHistoricalMessages(t *testing.T) { | ||||
deployment := Deploy(t, b.BlueprintHSWithApplicationService) | ||||
defer deployment.Destroy(t) | ||||
//defer time.Sleep(2 * time.Hour) | ||||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
|
||||
// Create the application service bridge user that is able to import historical messages | ||||
asUserID := "@the-bridge-user:hs1" | ||||
|
@@ -98,7 +99,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
// | ||||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
// Create the first batch including the "live" event we are going to | ||||
// import our historical events next to. | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -110,7 +111,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
// Create the second batch of events. | ||||
// This will also fill up the buffer so we have to scrollback to the | ||||
// inserted history later. | ||||
eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2) | ||||
eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") | ||||
|
||||
// Insert the most recent batch of historical messages | ||||
insertTime0 := timeAfterEventBefore.Add(timeBetweenMessages * 3) | ||||
|
@@ -214,7 +215,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
// Create the "live" event we are going to insert our historical events next to | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -255,19 +256,74 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
}) | ||||
}) | ||||
|
||||
t.Run("Backfill still works after many batches are imported", func(t *testing.T) { | ||||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
t.Parallel() | ||||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
|
||||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
// Create some normal messages in the timeline | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
// wait X number of ms to ensure that the timestamp changes enough for | ||||
// each of the historical messages we try to import later | ||||
//numBatches := 11 | ||||
numBatches := 2 | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lovely comment but then you seem to use 2. Why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See:
Running this test against Synapse is extremely slow atm, #214 (comment) TODO: Reminder to revert this to 11 before merge |
||||
numHistoricalMessagesPerBatch := 100 | ||||
time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) | ||||
|
||||
// eventIDsAfter | ||||
createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") | ||||
|
||||
// Import a long chain of batches connected to each other. | ||||
// We want to make sure Synapse doesn't blow up after we import | ||||
// many messages. | ||||
var expectedEventIDs []string | ||||
nextBatchID := "" | ||||
for i := 0; i < numBatches; i++ { | ||||
insertTime := timeAfterEventBefore.Add(timeBetweenMessages * time.Duration(numBatches-numHistoricalMessagesPerBatch*i)) | ||||
batchSendRes := batchSendHistoricalMessages( | ||||
t, | ||||
as, | ||||
roomID, | ||||
eventIdBefore, | ||||
nextBatchID, | ||||
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, insertTime), | ||||
createMessageEventsForBatchSendRequest([]string{virtualUserID}, insertTime, numHistoricalMessagesPerBatch), | ||||
// Status | ||||
200, | ||||
) | ||||
batchSendResBody := client.ParseJSON(t, batchSendRes) | ||||
// Make sure we see all of the historical messages | ||||
expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) | ||||
nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") | ||||
} | ||||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
|
||||
// Make sure we see the event at the very start of the message history | ||||
expectedEventIDs = append(expectedEventIDs, eventIdBefore) | ||||
|
||||
// Join the room from a remote homeserver after the historical messages were sent | ||||
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||||
|
||||
// Make sure events can be backfilled from the remote homeserver | ||||
paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs) | ||||
}) | ||||
|
||||
t.Run("Historical events from /batch_send do not come down in an incremental sync", func(t *testing.T) { | ||||
t.Parallel() | ||||
|
||||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
// Create the "live" event we are going to insert our historical events next to | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
// Create some "live" events to saturate and fill up the /sync response | ||||
createMessagesInRoom(t, alice, roomID, 5) | ||||
createMessagesInRoom(t, alice, roomID, 5, "live") | ||||
|
||||
// Import a historical event | ||||
batchSendRes := batchSendHistoricalMessages( | ||||
|
@@ -286,7 +342,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
historicalEventId := historicalEventIDs[0] | ||||
|
||||
// This is just a dummy event we search for after the historicalEventId | ||||
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1, "eventIDsAfterHistoricalImport") | ||||
eventIDAfterHistoricalImport := eventIDsAfterHistoricalImport[0] | ||||
|
||||
// Sync until we find the eventIDAfterHistoricalImport. | ||||
|
@@ -309,7 +365,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
// Create the "live" event we are going to import our historical events next to | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -363,7 +419,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -386,7 +442,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -417,7 +473,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
// Create the "live" event we are going to import our historical events next to | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -469,12 +525,12 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
// eventIDsAfter | ||||
createMessagesInRoom(t, alice, roomID, 3) | ||||
createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") | ||||
|
||||
batchSendRes := batchSendHistoricalMessages( | ||||
t, | ||||
|
@@ -522,7 +578,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -546,7 +602,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
}) | ||||
|
||||
// eventIDsAfter | ||||
createMessagesInRoom(t, alice, roomID, 3) | ||||
createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") | ||||
|
||||
batchSendRes := batchSendHistoricalMessages( | ||||
t, | ||||
|
@@ -597,12 +653,12 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
// Join the room from a remote homeserver before any historical messages are sent | ||||
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||||
|
||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
// eventIDsAfter | ||||
createMessagesInRoom(t, alice, roomID, 10) | ||||
createMessagesInRoom(t, alice, roomID, 10, "eventIDsAfter") | ||||
|
||||
// Mimic scrollback just through the latest messages | ||||
remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ | ||||
|
@@ -685,12 +741,12 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
// Join the room from a remote homeserver before any historical messages are sent | ||||
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||||
|
||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
// eventIDsAfter | ||||
createMessagesInRoom(t, alice, roomID, 3) | ||||
createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") | ||||
|
||||
// Mimic scrollback to all of the messages | ||||
// scrollbackMessagesRes | ||||
|
@@ -778,7 +834,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
// Create the "live" event we are going to import our historical events next to | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -818,7 +874,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
// Create the "live" event we are going to import our historical events next to | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[0] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
|
@@ -913,6 +969,96 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, | |||
} | ||||
} | ||||
|
||||
func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, expectedEventIDs []string) { | ||||
t.Helper() | ||||
start := time.Now() | ||||
|
||||
workingExpectedEventIDMap := make(map[string]string) | ||||
for _, expectedEventID := range expectedEventIDs { | ||||
workingExpectedEventIDMap[expectedEventID] = expectedEventID | ||||
} | ||||
|
||||
var actualEventIDList []string | ||||
checkCounter := 0 | ||||
messageResEnd := "" | ||||
generateErrorMesssageInfo := func() string { | ||||
i := 0 | ||||
leftoverEventIDs := make([]string, len(workingExpectedEventIDMap)) | ||||
for eventID := range workingExpectedEventIDMap { | ||||
leftoverEventIDs[i] = eventID | ||||
i++ | ||||
} | ||||
|
||||
return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s", | ||||
checkCounter, | ||||
len(expectedEventIDs)-len(leftoverEventIDs), | ||||
len(expectedEventIDs), | ||||
len(leftoverEventIDs), | ||||
leftoverEventIDs, | ||||
len(actualEventIDList), | ||||
actualEventIDList, | ||||
) | ||||
} | ||||
|
||||
for { | ||||
if time.Since(start) > 200*c.SyncUntilTimeout { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This timeout is currently very large to accommodate the long ~20s Synapse really has to chug for those requests 👹 and ideally wouldn't have to modify this at all. I would need to look into optimizing Synapse to make this fast which we should probably do anyway as this is painfully slow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is currently set to a whopping 1000s by default complement/internal/docker/deployment.go Line 57 in 136fd60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kegsay Sorry this wasn't clear as undrafting indicates "marked this pull request as ready for review" but I didn't assign you this one yet specifically because of this problem. The test itself is good to go (timeout can be switched to normal and I used Thanks for the review pass though and I'll fix up these other spots ⏩ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Status is still the same as the last update in this thread. It's too slow on Synapse for me to be comfortable merging it yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In terms of optimizing Synapse to make this test viable to run time-wise, I'm a bit blocked on a race condition in some recent code, matrix-org/synapse#12394 (comment) -> matrix-org/synapse#12646 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Now matrix-org/synapse#12988 I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made progress on optimizing Synapse:
|
||||
t.Fatalf( | ||||
"paginateUntilMessageCheckOff timed out. %s", | ||||
generateErrorMesssageInfo(), | ||||
) | ||||
} | ||||
|
||||
messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ | ||||
"dir": []string{"b"}, | ||||
// TODO: Can we do it with 100? | ||||
"limit": []string{"100"}, | ||||
"from": []string{messageResEnd}, | ||||
})) | ||||
messsageResBody := client.ParseJSON(t, messagesRes) | ||||
|
||||
messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") | ||||
|
||||
wantKey := "chunk" | ||||
keyRes := gjson.GetBytes(messsageResBody, wantKey) | ||||
if !keyRes.Exists() { | ||||
t.Fatalf("missing key '%s'", wantKey) | ||||
} | ||||
if !keyRes.IsArray() { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use complement/internal/match/json.go Line 164 in 136fd60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC, I think it was because we can't break early with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the early return really a deal breaker? I don't think it'll affect runtime performance that much? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to re-use |
||||
t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type) | ||||
} | ||||
|
||||
events := keyRes.Array() | ||||
|
||||
if len(events) == 0 { | ||||
t.Fatalf( | ||||
"paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s", | ||||
generateErrorMesssageInfo(), | ||||
) | ||||
} | ||||
|
||||
// logrus.WithFields(logrus.Fields{ | ||||
// "events": events, | ||||
// "messageResEnd": messageResEnd, | ||||
// }).Error("paginateUntilMessageCheckOff chunk") | ||||
for _, ev := range events { | ||||
eventID := ev.Get("event_id").Str | ||||
actualEventIDList = append(actualEventIDList, eventID) | ||||
|
||||
if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists { | ||||
delete(workingExpectedEventIDMap, eventID) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this function doesn't actually check the events are in the correct order, just that they exist? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, just a checkoff function |
||||
} | ||||
|
||||
if len(workingExpectedEventIDMap) == 0 { | ||||
return | ||||
} | ||||
} | ||||
|
||||
checkCounter++ | ||||
// Add a slight delay so we don't hammmer the messages endpoint | ||||
time.Sleep(500 * time.Millisecond) | ||||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
} | ||||
} | ||||
|
||||
func isRelevantEvent(r gjson.Result) bool { | ||||
return len(r.Get("content").Get("body").Str) > 0 || | ||||
r.Get("type").Str == insertionEventType || | ||||
|
@@ -1003,14 +1149,14 @@ func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSA | |||
return markerEventID | ||||
} | ||||
|
||||
func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int) (eventIDs []string) { | ||||
func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int, messageSuffix string) (eventIDs []string) { | ||||
kegsay marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
eventIDs = make([]string, count) | ||||
for i := 0; i < len(eventIDs); i++ { | ||||
newEvent := b.Event{ | ||||
Type: "m.room.message", | ||||
Content: map[string]interface{}{ | ||||
"msgtype": "m.text", | ||||
"body": fmt.Sprintf("Message %d", i), | ||||
"body": fmt.Sprintf("Message %d (%s)", i, messageSuffix), | ||||
}, | ||||
} | ||||
newEventId := c.SendEventSynced(t, roomID, newEvent) | ||||
|
Uh oh!
There was an error while loading. Please reload this page.