1
- import { logger } from '../../utils/logger'
2
- import { v4 as uuid } from 'uuid'
3
-
4
- import { OpenAI } from '@langchain/openai'
1
+ import { JsonOutputParser } from '@langchain/core/output_parsers'
5
2
import { PromptTemplate } from '@langchain/core/prompts'
6
- import { LibraryItem } from '../../entity/library_item '
3
+ import { OpenAI } from '@langchain/openai '
7
4
import {
8
5
htmlToSpeechFile ,
9
6
SpeechFile ,
10
7
SSMLOptions ,
11
8
} from '@omnivore/text-to-speech-handler'
12
9
import axios from 'axios'
10
+ import showdown from 'showdown'
11
+ import yaml from 'yaml'
12
+ import { LibraryItem } from '../../entity/library_item'
13
+ import { TaskState } from '../../generated/graphql'
14
+ import { redisDataSource } from '../../redis_data_source'
15
+ import { Digest , writeDigest } from '../../services/digest'
13
16
import {
14
17
findLibraryItemsByIds ,
15
18
searchLibraryItems ,
16
19
} from '../../services/library_item'
17
- import { redisDataSource } from '../../redis_data_source'
20
+ import { findDeviceTokensByUserId } from '../../services/user_device_tokens'
21
+ import { logger } from '../../utils/logger'
18
22
import { htmlToMarkdown } from '../../utils/parser'
19
- import yaml from 'yaml'
20
- import { JsonOutputParser } from '@langchain/core/output_parsers'
21
- import showdown from 'showdown'
22
- import { Digest , writeDigest } from '../../services/digest'
23
- import { TaskState } from '../../generated/graphql'
23
+ import { sendMulticastPushNotifications } from '../../utils/sendNotification'
24
24
25
25
export type CreateDigestJobSchedule = 'daily' | 'weekly'
26
26
@@ -73,9 +73,18 @@ interface RankedTitle {
73
73
}
74
74
75
75
export const CREATE_DIGEST_JOB = 'create-digest'
76
+ export const CRON_PATTERNS = {
77
+ // every day at 10:30 UTC
78
+ daily : '30 10 * * *' ,
79
+ // every Sunday at 10:30 UTC
80
+ weekly : '30 10 * * 7' ,
81
+ }
76
82
77
83
let digestDefinition : DigestDefinition
78
84
85
+ export const getCronPattern = ( schedule : CreateDigestJobSchedule ) =>
86
+ CRON_PATTERNS [ schedule ]
87
+
79
88
const fetchDigestDefinition = async ( ) : Promise < DigestDefinition > => {
80
89
const promptFileUrl = process . env . PROMPT_FILE_URL
81
90
if ( ! promptFileUrl ) {
@@ -131,7 +140,7 @@ const getPreferencesList = async (userId: string): Promise<LibraryItem[]> => {
131
140
// Makes multiple DB queries and combines the results
132
141
const getCandidatesList = async (
133
142
userId : string ,
134
- libraryItemIds ?: string [ ]
143
+ selectedLibraryItemIds ?: string [ ]
135
144
) : Promise < LibraryItem [ ] > => {
136
145
// use the queries from the digest definitions to lookup preferences
137
146
// There should be a list of multiple queries we use. For now we can
@@ -140,18 +149,25 @@ const getCandidatesList = async (
140
149
// count: 100
141
150
// reason: "most recent 100 items saved over 500 words
142
151
143
- if ( libraryItemIds ) {
144
- logger . info ( 'Using libraryItemIds' )
145
- return findLibraryItemsByIds ( libraryItemIds , userId )
152
+ if ( selectedLibraryItemIds ) {
153
+ return findLibraryItemsByIds ( selectedLibraryItemIds , userId )
146
154
}
147
155
156
+ // get the existing candidate ids from cache
157
+ const key = `digest:${ userId } :existingCandidateIds`
158
+ const existingCandidateIds = await redisDataSource . redisClient ?. get ( key )
159
+
160
+ logger . info ( 'existingCandidateIds: ' , { existingCandidateIds } )
161
+
148
162
const candidates = await Promise . all (
149
163
digestDefinition . candidateSelectors . map ( async ( selector ) => {
150
164
// use the selector to fetch items
151
165
const results = await searchLibraryItems (
152
166
{
153
167
includeContent : true ,
154
- query : selector . query ,
168
+ query : existingCandidateIds
169
+ ? `(${ selector . query } ) -includes:${ existingCandidateIds } ` // exclude the existing candidates
170
+ : selector . query ,
155
171
size : selector . count ,
156
172
} ,
157
173
userId
@@ -172,6 +188,23 @@ const getCandidatesList = async (
172
188
readableContent : htmlToMarkdown ( item . readableContent ) ,
173
189
} ) ) // convert the html content to markdown
174
190
191
+ if ( dedupedCandidates . length === 0 ) {
192
+ logger . info ( 'No new candidates found' )
193
+
194
+ if ( existingCandidateIds ) {
195
+ // reuse the existing candidates
196
+ const existingIds = existingCandidateIds . split ( ',' )
197
+ return findLibraryItemsByIds ( existingIds , userId )
198
+ }
199
+
200
+ // return empty array if no existing candidates
201
+ return [ ]
202
+ }
203
+
204
+ // store the ids in cache
205
+ const candidateIds = dedupedCandidates . map ( ( item ) => item . id ) . join ( ',' )
206
+ await redisDataSource . redisClient ?. set ( key , candidateIds )
207
+
175
208
return dedupedCandidates
176
209
}
177
210
@@ -203,7 +236,7 @@ const createUserProfile = async (
203
236
// it to redis
204
237
const findOrCreateUserProfile = async ( userId : string ) : Promise < string > => {
205
238
// check redis for user profile, return if found
206
- const key = `userProfile :${ userId } `
239
+ const key = `digest :${ userId } :userProfile `
207
240
const existingProfile = await redisDataSource . redisClient ?. get ( key )
208
241
if ( existingProfile ) {
209
242
return existingProfile
@@ -266,6 +299,9 @@ const rankCandidates = async (
266
299
return rankedItems
267
300
}
268
301
302
+ const filterTopics = ( rankedTopics : string [ ] ) =>
303
+ rankedTopics . filter ( ( topic ) => topic ?. length > 0 )
304
+
269
305
// Does some grouping by topic while trying to maintain ranking
270
306
// adds some basic topic diversity
271
307
const chooseRankedSelections = ( rankedCandidates : RankedItem [ ] ) => {
@@ -289,7 +325,6 @@ const chooseRankedSelections = (rankedCandidates: RankedItem[]) => {
289
325
}
290
326
291
327
logger . info ( 'rankedTopics: ' , rankedTopics )
292
- logger . info ( 'finalSelections: ' , selected )
293
328
294
329
const finalSelections = [ ]
295
330
@@ -298,9 +333,15 @@ const chooseRankedSelections = (rankedCandidates: RankedItem[]) => {
298
333
finalSelections . push ( ...matches )
299
334
}
300
335
301
- logger . info ( 'finalSelections: ' , finalSelections )
336
+ logger . info (
337
+ 'finalSelections: ' ,
338
+ finalSelections . map ( ( item ) => item . libraryItem . title )
339
+ )
302
340
303
- return { finalSelections, rankedTopics }
341
+ return {
342
+ finalSelections,
343
+ rankedTopics : filterTopics ( rankedTopics ) ,
344
+ }
304
345
}
305
346
306
347
const summarizeItems = async (
@@ -363,7 +404,9 @@ const generateSpeechFiles = (
363
404
// we should have a QA step here that does some
364
405
// basic checks to make sure the summaries are good.
365
406
const filterSummaries = ( summaries : RankedItem [ ] ) : RankedItem [ ] => {
366
- return summaries . filter ( ( item ) => item . summary . length > 100 )
407
+ return summaries . filter (
408
+ ( item ) => item . summary . length < item . libraryItem . readableContent . length
409
+ )
367
410
}
368
411
369
412
// we can use something more sophisticated to generate titles
@@ -376,11 +419,8 @@ const generateDescription = (
376
419
summaries : RankedItem [ ] ,
377
420
rankedTopics : string [ ]
378
421
) : string =>
379
- `We selected ${
380
- summaries . length
381
- } articles from your last 24 hours of saved items, covering ${ rankedTopics . join (
382
- ', '
383
- ) } .`
422
+ `We selected ${ summaries . length } articles from your last 24 hours of saved items` +
423
+ ( rankedTopics . length ? `, covering ${ rankedTopics . join ( ', ' ) } .` : '.' )
384
424
385
425
// generate content based on the summaries
386
426
const generateContent = ( summaries : RankedItem [ ] ) : string =>
@@ -395,45 +435,77 @@ const generateByline = (summaries: RankedItem[]): string =>
395
435
. join ( ', ' )
396
436
397
437
export const createDigestJob = async ( jobData : CreateDigestJobData ) => {
398
- digestDefinition = await fetchDigestDefinition ( )
438
+ try {
439
+ digestDefinition = await fetchDigestDefinition ( )
399
440
400
- const candidates = await getCandidatesList (
401
- jobData . userId ,
402
- jobData . libraryItemIds
403
- )
404
- const userProfile = await findOrCreateUserProfile ( jobData . userId )
405
- const rankedCandidates = await rankCandidates ( candidates , userProfile )
406
- const { finalSelections, rankedTopics } =
407
- chooseRankedSelections ( rankedCandidates )
441
+ const candidates = await getCandidatesList (
442
+ jobData . userId ,
443
+ jobData . libraryItemIds
444
+ )
445
+ if ( candidates . length === 0 ) {
446
+ logger . info ( 'No candidates found' )
447
+ return writeDigest ( jobData . userId , {
448
+ id : jobData . id ,
449
+ jobState : TaskState . Succeeded ,
450
+ title : 'No articles found' ,
451
+ } )
452
+ }
408
453
409
- const summaries = await summarizeItems ( finalSelections )
454
+ const userProfile = await findOrCreateUserProfile ( jobData . userId )
455
+ const rankedCandidates = await rankCandidates ( candidates , userProfile )
456
+ const { finalSelections, rankedTopics } =
457
+ chooseRankedSelections ( rankedCandidates )
410
458
411
- const filteredSummaries = filterSummaries ( summaries )
459
+ const summaries = await summarizeItems ( finalSelections )
412
460
413
- const speechFiles = generateSpeechFiles ( filteredSummaries , {
414
- ...jobData ,
415
- primaryVoice : jobData . voices ?. [ 0 ] ,
416
- secondaryVoice : jobData . voices ?. [ 1 ] ,
417
- } )
418
- const title = generateTitle ( summaries )
419
- const digest : Digest = {
420
- id : jobData . id ,
421
- title,
422
- content : generateContent ( summaries ) ,
423
- urlsToAudio : [ ] ,
424
- jobState : TaskState . Succeeded ,
425
- speechFiles,
426
- chapters : filteredSummaries . map ( ( item , index ) => ( {
427
- title : item . libraryItem . title ,
428
- id : item . libraryItem . id ,
429
- url : item . libraryItem . originalUrl ,
430
- thumbnail : item . libraryItem . thumbnail ?? undefined ,
431
- wordCount : speechFiles [ index ] . wordCount ,
432
- } ) ) ,
433
- createdAt : new Date ( ) ,
434
- description : generateDescription ( summaries , rankedTopics ) ,
435
- byline : generateByline ( summaries ) ,
436
- }
461
+ const filteredSummaries = filterSummaries ( summaries )
462
+
463
+ const speechFiles = generateSpeechFiles ( filteredSummaries , {
464
+ ...jobData ,
465
+ primaryVoice : jobData . voices ?. [ 0 ] ,
466
+ secondaryVoice : jobData . voices ?. [ 1 ] ,
467
+ } )
468
+ const title = generateTitle ( summaries )
469
+ const digest : Digest = {
470
+ id : jobData . id ,
471
+ title,
472
+ content : generateContent ( summaries ) ,
473
+ jobState : TaskState . Succeeded ,
474
+ speechFiles,
475
+ chapters : filteredSummaries . map ( ( item , index ) => ( {
476
+ title : item . libraryItem . title ,
477
+ id : item . libraryItem . id ,
478
+ url : item . libraryItem . originalUrl ,
479
+ thumbnail : item . libraryItem . thumbnail ?? undefined ,
480
+ wordCount : speechFiles [ index ] . wordCount ,
481
+ } ) ) ,
482
+ createdAt : new Date ( ) ,
483
+ description : generateDescription ( summaries , rankedTopics ) ,
484
+ byline : generateByline ( summaries ) ,
485
+ urlsToAudio : [ ] ,
486
+ }
487
+
488
+ await writeDigest ( jobData . userId , digest )
489
+ } catch ( error ) {
490
+ logger . error ( 'createDigestJob error' , error )
437
491
438
- await writeDigest ( jobData . userId , digest )
492
+ await writeDigest ( jobData . userId , {
493
+ id : jobData . id ,
494
+ jobState : TaskState . Failed ,
495
+ } )
496
+ } finally {
497
+ // send notification
498
+ const tokens = await findDeviceTokensByUserId ( jobData . userId )
499
+ if ( tokens . length > 0 ) {
500
+ const message = {
501
+ notification : {
502
+ title : 'Digest ready' ,
503
+ body : 'Your digest is ready to listen' ,
504
+ } ,
505
+ tokens : tokens . map ( ( token ) => token . token ) ,
506
+ }
507
+
508
+ await sendMulticastPushNotifications ( jobData . userId , message , 'reminder' )
509
+ }
510
+ }
439
511
}
0 commit comments