Skip to content
This repository was archived by the owner on Jun 26, 2020. It is now read-only.

Commit c808438

Browse files
author
Mark Hahnenberg
committed
[client-sync] Transaction-ify processMessage
Summary: We do lots of writes while processing a single message. To reduce the write churn from lots of mini-transactions, this diff threads one overarching transaction to everything in processMessage. Test Plan: Run locally Reviewers: spang, evan, juan Reviewed By: juan Differential Revision: https://phab.nylas.com/D4394
1 parent b323e38 commit c808438

File tree

6 files changed

+105
-98
lines changed

6 files changed

+105
-98
lines changed

packages/client-sync/src/message-processor/detect-thread.js

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ function emptyThread({Thread, accountId}, options = {}) {
1414
return t;
1515
}
1616

17-
async function findOrBuildByReferences(db, message) {
17+
async function findOrBuildByReferences(db, message, transaction) {
1818
const {Thread, Reference, Label, Folder} = db;
1919

2020
let matchingRef = null;
@@ -30,6 +30,7 @@ async function findOrBuildByReferences(db, message) {
3030
include: [
3131
{ model: Thread, include: [{model: Label}, {model: Folder}]},
3232
],
33+
transaction,
3334
});
3435
}
3536

@@ -39,16 +40,17 @@ async function findOrBuildByReferences(db, message) {
3940
return matchingRef ? matchingRef.thread : emptyThread(db, {});
4041
}
4142

42-
async function findOrBuildByRemoteThreadId(db, remoteThreadId) {
43+
async function findOrBuildByRemoteThreadId(db, remoteThreadId, transaction) {
4344
const {Thread, Label, Folder} = db;
4445
const existing = await Thread.find({
4546
where: {remoteThreadId},
4647
include: [{model: Label}, {model: Folder}],
48+
transaction,
4749
});
4850
return existing || emptyThread(db, {remoteThreadId});
4951
}
5052

51-
async function detectThread({db, messageValues}) {
53+
async function detectThread({db, messageValues, transaction}) {
5254
if (!(messageValues.labels instanceof Array)) {
5355
throw new Error("detectThread expects labels to be an inflated array.");
5456
}
@@ -58,9 +60,9 @@ async function detectThread({db, messageValues}) {
5860

5961
let thread = null;
6062
if (messageValues.gThrId) {
61-
thread = await findOrBuildByRemoteThreadId(db, messageValues.gThrId)
63+
thread = await findOrBuildByRemoteThreadId(db, messageValues.gThrId, transaction)
6264
} else {
63-
thread = await findOrBuildByReferences(db, messageValues)
65+
thread = await findOrBuildByReferences(db, messageValues, transaction)
6466
}
6567

6668
if (!(thread.labels instanceof Array)) {
@@ -81,7 +83,7 @@ async function detectThread({db, messageValues}) {
8183
}
8284

8385
thread.subject = cleanSubject(messageValues.subject);
84-
await thread.updateFromMessages({messages: [messageValues]});
86+
await thread.updateFromMessages({messages: [messageValues], transaction});
8587
return thread;
8688
}
8789

packages/client-sync/src/message-processor/extract-contacts.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ function isContactMeaningful(contact) {
1313
return true
1414
}
1515

16-
async function extractContacts({db, messageValues, logger = console} = {}) {
16+
async function extractContacts({db, messageValues, logger = console, transaction} = {}) {
1717
const {Contact} = db
1818
let allContacts = [];
1919
['to', 'from', 'bcc', 'cc'].forEach((field) => {
@@ -37,13 +37,14 @@ async function extractContacts({db, messageValues, logger = console} = {}) {
3737
where: {
3838
id: Array.from(contactsDataById.keys()),
3939
},
40+
transaction,
4041
})
4142

4243
for (const c of contactsDataById.values()) {
4344
const existing = existingContacts.find(({id}) => id === c.id);
4445

4546
if (!existing) {
46-
Contact.create(c).catch(Sequelize.ValidationError, (err) => {
47+
Contact.create(c, {transaction}).catch(Sequelize.ValidationError, (err) => {
4748
if (err.name !== "SequelizeUniqueConstraintError") {
4849
logger.warn('Unknown error inserting contact', err);
4950
throw err;
@@ -52,12 +53,12 @@ async function extractContacts({db, messageValues, logger = console} = {}) {
5253
// and beat us to inserting. Since contacts are never deleted within
5354
// an account, we can safely assume that we can perform an update
5455
// instead.
55-
Contact.find({where: {id: c.id}}).then(
56-
(row) => { row.update(c) });
56+
Contact.find({where: {id: c.id}, transaction}).then(
57+
(row) => { row.update(c, {transaction}) });
5758
}
5859
});
5960
} else {
60-
existing.update(c);
61+
existing.update(c, {transaction});
6162
}
6263
}
6364
}

packages/client-sync/src/message-processor/extract-files.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ function collectFilesFromStruct({db, messageValues, struct, fileIds = new Set()}
4545
return collected;
4646
}
4747

48-
async function extractFiles({db, messageValues, struct}) {
49-
const files = collectFilesFromStruct({db, messageValues, struct});
48+
async function extractFiles({db, messageValues, struct, transaction}) {
49+
const files = collectFilesFromStruct({db, messageValues, struct, transaction});
5050
if (files.length > 0) {
5151
for (const file of files) {
52-
await file.save()
52+
await file.save({transaction})
5353
}
5454
}
5555
return Promise.resolve(files)

packages/client-sync/src/message-processor/index.js

Lines changed: 79 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -103,60 +103,67 @@ class MessageProcessor {
103103

104104
async _processMessage({db, accountId, folder, imapMessage, struct, desiredParts, logger}) {
105105
try {
106-
const {Message, Folder, Label} = db;
106+
const {sequelize, Message, Folder, Label} = db;
107107
const messageValues = await MessageFactory.parseFromImap(imapMessage, desiredParts, {
108108
db,
109109
folder,
110110
accountId,
111111
});
112-
const existingMessage = await Message.findById(messageValues.id, {
113-
include: [{model: Folder, as: 'folder'}, {model: Label, as: 'labels'}],
114-
});
115112
let processedMessage;
116-
if (existingMessage) {
117-
// TODO: optimize to not do a full message parse for existing messages
118-
processedMessage = await this._processExistingMessage({
119-
logger,
120-
struct,
121-
messageValues,
122-
existingMessage,
123-
})
124-
} else {
125-
processedMessage = await this._processNewMessage({
126-
logger,
127-
struct,
128-
messageValues,
129-
})
130-
}
113+
await sequelize.transaction(async (transaction) => {
114+
const existingMessage = await Message.findById(messageValues.id, {
115+
include: [{model: Folder, as: 'folder'}, {model: Label, as: 'labels'}],
116+
transaction,
117+
});
118+
if (existingMessage) {
119+
// TODO: optimize to not do a full message parse for existing messages
120+
processedMessage = await this._processExistingMessage({
121+
db,
122+
logger,
123+
struct,
124+
messageValues,
125+
existingMessage,
126+
transaction,
127+
})
128+
} else {
129+
processedMessage = await this._processNewMessage({
130+
db,
131+
logger,
132+
struct,
133+
messageValues,
134+
transaction,
135+
})
136+
}
131137

132-
// Inflate the serialized oldestProcessedDate value, if it exists
133-
let oldestProcessedDate;
134-
if (folder.syncState && folder.syncState.oldestProcessedDate) {
135-
oldestProcessedDate = new Date(folder.syncState.oldestProcessedDate);
136-
}
137-
const justProcessedDate = messageValues.date ? new Date(messageValues.date) : new Date()
138-
139-
// Update the oldestProcessedDate if:
140-
// a) justProcessedDate is after the year 1980. We don't want to base this
141-
// off of messages with borked 1970 dates.
142-
// AND
143-
// b) i) We haven't set oldestProcessedDate yet
144-
// OR
145-
// ii) justProcessedDate is before oldestProcessedDate and in a different
146-
// month. (We only use this to update the sync status in Nylas Mail,
147-
// which uses month precision. Updating a folder's syncState triggers
148-
// many re-renders in Nylas Mail, so we only do it as necessary.)
149-
if (justProcessedDate > new Date("1980") && (
150-
!oldestProcessedDate || (
151-
(justProcessedDate.getMonth() !== oldestProcessedDate.getMonth() ||
152-
justProcessedDate.getFullYear() !== oldestProcessedDate.getFullYear()) &&
153-
justProcessedDate < oldestProcessedDate))) {
154-
await folder.updateSyncState({oldestProcessedDate: justProcessedDate})
155-
}
138+
// Inflate the serialized oldestProcessedDate value, if it exists
139+
let oldestProcessedDate;
140+
if (folder.syncState && folder.syncState.oldestProcessedDate) {
141+
oldestProcessedDate = new Date(folder.syncState.oldestProcessedDate);
142+
}
143+
const justProcessedDate = messageValues.date ? new Date(messageValues.date) : new Date()
144+
145+
// Update the oldestProcessedDate if:
146+
// a) justProcessedDate is after the year 1980. We don't want to base this
147+
// off of messages with borked 1970 dates.
148+
// AND
149+
// b) i) We haven't set oldestProcessedDate yet
150+
// OR
151+
// ii) justProcessedDate is before oldestProcessedDate and in a different
152+
// month. (We only use this to update the sync status in Nylas Mail,
153+
// which uses month precision. Updating a folder's syncState triggers
154+
// many re-renders in Nylas Mail, so we only do it as necessary.)
155+
if (justProcessedDate > new Date("1980") && (
156+
!oldestProcessedDate || (
157+
(justProcessedDate.getMonth() !== oldestProcessedDate.getMonth() ||
158+
justProcessedDate.getFullYear() !== oldestProcessedDate.getFullYear()) &&
159+
justProcessedDate < oldestProcessedDate))) {
160+
await folder.updateSyncState({oldestProcessedDate: justProcessedDate}, {transaction})
161+
}
156162

157-
const activity = `🔃 ✉️ (${folder.name}) "${messageValues.subject}" - ${messageValues.date}`
158-
logger.log(activity)
159-
SyncActivity.reportSyncActivity(accountId, activity)
163+
const activity = `🔃 ✉️ (${folder.name}) "${messageValues.subject}" - ${messageValues.date}`
164+
logger.log(activity)
165+
SyncActivity.reportSyncActivity(accountId, activity)
166+
});
160167
return processedMessage
161168
} catch (err) {
162169
await this._onError({imapMessage, desiredParts, folder, err, logger});
@@ -198,7 +205,7 @@ class MessageProcessor {
198205
// Replaces ["<rfc2822messageid>", ...] with [[object Reference], ...]
199206
// Creates references that do not yet exist, and adds the correct
200207
// associations as well
201-
async _addReferences(db, message, thread, references) {
208+
async _addReferences(db, message, thread, references, transaction) {
202209
const {Reference} = db;
203210

204211
let existingReferences = [];
@@ -207,6 +214,7 @@ class MessageProcessor {
207214
where: {
208215
rfc2822MessageId: references,
209216
},
217+
transaction,
210218
});
211219
}
212220

@@ -216,45 +224,43 @@ class MessageProcessor {
216224
}
217225
for (const mid of references) {
218226
if (!refByMessageId[mid]) {
219-
refByMessageId[mid] = await Reference.create({rfc2822MessageId: mid, threadId: thread.id});
227+
refByMessageId[mid] = await Reference.create({rfc2822MessageId: mid, threadId: thread.id}, {transaction});
220228
}
221229
}
222230

223231
const referencesInstances = references.map(mid => refByMessageId[mid]);
224-
await message.addReferences(referencesInstances);
232+
await message.addReferences(referencesInstances, {transaction});
225233
message.referencesOrder = referencesInstances.map(ref => ref.id);
226-
await thread.addReferences(referencesInstances);
234+
await thread.addReferences(referencesInstances, {transaction});
227235
}
228236

229-
async _processNewMessage({messageValues, struct, logger = console} = {}) {
230-
const {accountId} = messageValues;
231-
const db = await LocalDatabaseConnector.forAccount(accountId);
237+
async _processNewMessage({db, messageValues, struct, logger = console, transaction} = {}) {
232238
const {Message} = db
233239

234-
const thread = await detectThread({db, messageValues});
240+
const thread = await detectThread({db, messageValues, transaction});
235241
messageValues.threadId = thread.id;
236-
const createdMessage = await Message.create(messageValues);
242+
const createdMessage = await Message.create(messageValues, {transaction});
237243

238244
if (messageValues.labels) {
239-
await createdMessage.addLabels(messageValues.labels)
245+
await createdMessage.addLabels(messageValues.labels, {transaction})
240246
// Note that the labels aren't officially associated until save() is called later
241247
}
242248

243-
await this._addReferences(db, createdMessage, thread, messageValues.references);
249+
await this._addReferences(db, createdMessage, thread, messageValues.references, transaction);
244250

245251
// TODO: need to delete dangling references somewhere (maybe at the
246252
// end of the sync loop?)
247253

248-
const files = await extractFiles({db, messageValues, struct});
254+
const files = await extractFiles({db, messageValues, struct, transaction});
249255
// Don't count inline images (files with contentIds) as attachments
250256
if (files.some(f => !f.contentId) && !thread.hasAttachments) {
251257
thread.hasAttachments = true;
252-
await thread.save();
258+
await thread.save({transaction});
253259
}
254-
await extractContacts({db, messageValues, logger});
260+
await extractContacts({db, messageValues, logger, transaction});
255261

256262
createdMessage.isProcessed = true;
257-
await createdMessage.save()
263+
await createdMessage.save({transaction})
258264
return createdMessage
259265
}
260266

@@ -271,10 +277,7 @@ class MessageProcessor {
271277
* or because we interrupted the sync loop before the message was fully
272278
* processed.
273279
*/
274-
async _processExistingMessage({existingMessage, messageValues, struct} = {}) {
275-
const {accountId} = messageValues;
276-
const db = await LocalDatabaseConnector.forAccount(accountId);
277-
280+
async _processExistingMessage({db, existingMessage, messageValues, struct, transaction} = {}) {
278281
/**
279282
* There should never be a reason to update the body of a message
280283
* already in the database.
@@ -288,38 +291,39 @@ class MessageProcessor {
288291
*/
289292
const newMessageWithoutBody = _.clone(messageValues)
290293
delete newMessageWithoutBody.body;
291-
await existingMessage.update(newMessageWithoutBody);
294+
await existingMessage.update(newMessageWithoutBody, {transaction});
292295
if (messageValues.labels && messageValues.labels.length > 0) {
293-
await existingMessage.setLabels(messageValues.labels)
296+
await existingMessage.setLabels(messageValues.labels, {transaction})
294297
}
295298

296299
let thread = await existingMessage.getThread({
297300
include: [{model: db.Folder, as: 'folders'}, {model: db.Label, as: 'labels'}],
301+
transaction,
298302
});
299303
if (!existingMessage.isProcessed) {
300304
if (!thread) {
301-
thread = await detectThread({db, messageValues});
305+
thread = await detectThread({db, messageValues, transaction});
302306
existingMessage.threadId = thread.id;
303307
} else {
304-
await thread.updateFromMessages({db, messages: [existingMessage]})
308+
await thread.updateFromMessages({db, messages: [existingMessage], transaction})
305309
}
306-
await this._addReferences(db, existingMessage, thread, messageValues.references);
307-
const files = await extractFiles({db, messageValues: existingMessage, struct});
310+
await this._addReferences(db, existingMessage, thread, messageValues.references, transaction);
311+
const files = await extractFiles({db, messageValues: existingMessage, struct, transaction});
308312
// Don't count inline images (files with contentIds) as attachments
309313
if (files.some(f => !f.contentId) && !thread.hasAttachments) {
310314
thread.hasAttachments = true;
311-
await thread.save();
315+
await thread.save({transaction});
312316
}
313-
await extractContacts({db, messageValues: existingMessage});
317+
await extractContacts({db, messageValues: existingMessage, transaction});
314318
existingMessage.isProcessed = true;
315319
} else {
316320
if (!thread) {
317321
throw new Error(`Existing processed message ${existingMessage.id} doesn't have thread`)
318322
}
319323
}
320324

321-
await existingMessage.save();
322-
await thread.updateLabelsAndFolders();
325+
await existingMessage.save({transaction});
326+
await thread.updateLabelsAndFolders({transaction});
323327
return existingMessage
324328
}
325329
}

packages/client-sync/src/models/folder.es6

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ export default (sequelize, Sequelize) => {
6767
)
6868
},
6969

70-
async updateSyncState(nextSyncState = {}) {
70+
async updateSyncState(nextSyncState = {}, {transaction} = {}) {
7171
if (_.isMatch(this.syncState, nextSyncState)) {
7272
return Promise.resolve();
7373
}
74-
await this.reload(); // Fetch any recent syncState updates
74+
await this.reload({transaction}); // Fetch any recent syncState updates
7575
this.syncState = Object.assign(this.syncState, nextSyncState);
76-
return this.save();
76+
return this.save({transaction});
7777
},
7878

7979
syncProgress() {

0 commit comments

Comments
 (0)