@@ -6,21 +6,19 @@ use mongodb::bson::doc;
6
6
use omniqueue:: QueueError ;
7
7
use rstest:: rstest;
8
8
use tokio:: time:: sleep;
9
- use uuid:: Uuid ;
10
9
11
- use super :: database:: build_job_item;
12
- use crate :: constants:: { BLOB_DATA_FILE_NAME , CAIRO_PIE_FILE_NAME , PROGRAM_OUTPUT_FILE_NAME , SNOS_OUTPUT_FILE_NAME } ;
10
+ use crate :: constants:: CAIRO_PIE_FILE_NAME ;
13
11
use crate :: jobs:: job_handler_factory:: mock_factory;
14
12
use crate :: jobs:: metadata:: {
15
- CommonMetadata , DaMetadata , JobMetadata , JobSpecificMetadata , ProvingInputType , ProvingMetadata , SnosMetadata ,
16
- StateUpdateMetadata ,
13
+ CommonMetadata , JobMetadata , JobSpecificMetadata , ProvingInputType , ProvingMetadata , SnosMetadata ,
17
14
} ;
18
- use crate :: jobs:: types:: { ExternalId , JobItem , JobStatus , JobType , JobVerificationStatus } ;
15
+ use crate :: jobs:: types:: { ExternalId , JobStatus , JobType , JobVerificationStatus } ;
19
16
use crate :: jobs:: { create_job, handle_job_failure, process_job, retry_job, verify_job, Job , JobError , MockJob } ;
20
17
use crate :: queue:: job_queue:: QueueNameForJobType ;
21
18
use crate :: queue:: QueueType ;
22
19
use crate :: tests:: common:: MessagePayloadType ;
23
20
use crate :: tests:: config:: { ConfigType , TestConfigBuilder } ;
21
+ use crate :: tests:: utils:: build_job_item;
24
22
25
23
#[ cfg( test) ]
26
24
pub mod da_job;
@@ -35,13 +33,12 @@ pub mod state_update_job;
35
33
pub mod snos_job;
36
34
37
35
use assert_matches:: assert_matches;
38
- use chrono:: { SubsecRound , Utc } ;
39
36
40
37
/// Tests `create_job` function when job is not existing in the db.
41
38
#[ rstest]
42
39
#[ tokio:: test]
43
40
async fn create_job_job_does_not_exists_in_db_works ( ) {
44
- let job_item = build_job_item_by_type_and_status ( JobType :: SnosRun , JobStatus :: Created , "0" . to_string ( ) ) ;
41
+ let job_item = build_job_item ( JobType :: SnosRun , JobStatus :: Created , 0 ) ;
45
42
let mut job_handler = MockJob :: new ( ) ;
46
43
47
44
// Adding expectation for creation of new job.
@@ -94,7 +91,7 @@ async fn create_job_job_does_not_exists_in_db_works() {
94
91
#[ rstest]
95
92
#[ tokio:: test]
96
93
async fn create_job_job_exists_in_db_works ( ) {
97
- let job_item = build_job_item_by_type_and_status ( JobType :: ProofCreation , JobStatus :: Created , "0" . to_string ( ) ) ;
94
+ let job_item = build_job_item ( JobType :: ProofCreation , JobStatus :: Created , 0 ) ;
98
95
99
96
let services = TestConfigBuilder :: new ( )
100
97
. configure_database ( ConfigType :: Actual )
@@ -190,7 +187,7 @@ async fn process_job_with_job_exists_in_db_and_valid_job_processing_status_works
190
187
let database_client = services. config . database ( ) ;
191
188
192
189
// Create a job with proper metadata structure
193
- let job_item = build_job_item_by_type_and_status ( job_type. clone ( ) , job_status. clone ( ) , "1" . to_string ( ) ) ;
190
+ let job_item = build_job_item ( job_type. clone ( ) , job_status. clone ( ) , 1 ) ;
194
191
195
192
let mut job_handler = MockJob :: new ( ) ;
196
193
@@ -244,7 +241,7 @@ async fn process_job_handles_panic() {
244
241
let database_client = services. config . database ( ) ;
245
242
246
243
// Create a job with proper metadata structure
247
- let job_item = build_job_item_by_type_and_status ( JobType :: SnosRun , JobStatus :: Created , "1" . to_string ( ) ) ;
244
+ let job_item = build_job_item ( JobType :: SnosRun , JobStatus :: Created , 1 ) ;
248
245
249
246
// Creating job in database
250
247
database_client. create_job ( job_item. clone ( ) ) . await . unwrap ( ) ;
@@ -286,7 +283,7 @@ async fn process_job_handles_panic() {
286
283
#[ tokio:: test]
287
284
async fn process_job_with_job_exists_in_db_with_invalid_job_processing_status_errors ( ) {
288
285
// Creating a job with Completed status which is invalid processing.
289
- let job_item = build_job_item_by_type_and_status ( JobType :: SnosRun , JobStatus :: Completed , "1" . to_string ( ) ) ;
286
+ let job_item = build_job_item ( JobType :: SnosRun , JobStatus :: Completed , 1 ) ;
290
287
291
288
// building config
292
289
let services = TestConfigBuilder :: new ( )
@@ -320,7 +317,7 @@ async fn process_job_with_job_exists_in_db_with_invalid_job_processing_status_er
320
317
#[ tokio:: test]
321
318
async fn process_job_job_does_not_exists_in_db_works ( ) {
322
319
// Creating a valid job which is not existing in the db.
323
- let job_item = build_job_item_by_type_and_status ( JobType :: SnosRun , JobStatus :: Created , "1" . to_string ( ) ) ;
320
+ let job_item = build_job_item ( JobType :: SnosRun , JobStatus :: Created , 1 ) ;
324
321
325
322
// building config
326
323
let services = TestConfigBuilder :: new ( )
@@ -366,7 +363,7 @@ async fn process_job_two_workers_process_same_job_works() {
366
363
. await ;
367
364
let db_client = services. config . database ( ) ;
368
365
369
- let job_item = build_job_item_by_type_and_status ( JobType :: SnosRun , JobStatus :: Created , "1" . to_string ( ) ) ;
366
+ let job_item = build_job_item ( JobType :: SnosRun , JobStatus :: Created , 1 ) ;
370
367
371
368
// Creating the job in the db
372
369
db_client. create_job ( job_item. clone ( ) ) . await . unwrap ( ) ;
@@ -422,7 +419,7 @@ async fn process_job_job_handler_returns_error_works() {
422
419
. await ;
423
420
let db_client = services. config . database ( ) ;
424
421
425
- let job_item = build_job_item_by_type_and_status ( JobType :: SnosRun , JobStatus :: Created , "1" . to_string ( ) ) ;
422
+ let job_item = build_job_item ( JobType :: SnosRun , JobStatus :: Created , 1 ) ;
426
423
427
424
// Creating the job in the db
428
425
db_client. create_job ( job_item. clone ( ) ) . await . unwrap ( ) ;
@@ -439,8 +436,7 @@ async fn process_job_job_handler_returns_error_works() {
439
436
#[ rstest]
440
437
#[ tokio:: test]
441
438
async fn verify_job_with_verified_status_works ( ) {
442
- let job_item =
443
- build_job_item_by_type_and_status ( JobType :: DataSubmission , JobStatus :: PendingVerification , "1" . to_string ( ) ) ;
439
+ let job_item = build_job_item ( JobType :: DataSubmission , JobStatus :: PendingVerification , 1 ) ;
444
440
445
441
// building config
446
442
let services = TestConfigBuilder :: new ( )
@@ -486,8 +482,7 @@ async fn verify_job_with_verified_status_works() {
486
482
#[ rstest]
487
483
#[ tokio:: test]
488
484
async fn verify_job_with_rejected_status_adds_to_queue_works ( ) {
489
- let job_item =
490
- build_job_item_by_type_and_status ( JobType :: DataSubmission , JobStatus :: PendingVerification , "1" . to_string ( ) ) ;
485
+ let job_item = build_job_item ( JobType :: DataSubmission , JobStatus :: PendingVerification , 1 ) ;
491
486
492
487
// building config
493
488
let services = TestConfigBuilder :: new ( )
@@ -541,8 +536,7 @@ async fn verify_job_with_rejected_status_works() {
541
536
let database_client = services. config . database ( ) ;
542
537
543
538
// Create a job with proper metadata structure
544
- let mut job_item =
545
- build_job_item_by_type_and_status ( JobType :: DataSubmission , JobStatus :: PendingVerification , "1" . to_string ( ) ) ;
539
+ let mut job_item = build_job_item ( JobType :: DataSubmission , JobStatus :: PendingVerification , 1 ) ;
546
540
547
541
// Set process_attempt_no to 1 to simulate max attempts reached
548
542
job_item. metadata . common . process_attempt_no = 1 ;
@@ -593,8 +587,7 @@ async fn verify_job_with_pending_status_adds_to_queue_works() {
593
587
let database_client = services. config . database ( ) ;
594
588
595
589
// Create a job with proper metadata structure
596
- let job_item =
597
- build_job_item_by_type_and_status ( JobType :: DataSubmission , JobStatus :: PendingVerification , "1" . to_string ( ) ) ;
590
+ let job_item = build_job_item ( JobType :: DataSubmission , JobStatus :: PendingVerification , 1 ) ;
598
591
599
592
// Creating job in database
600
593
database_client. create_job ( job_item. clone ( ) ) . await . unwrap ( ) ;
@@ -646,8 +639,7 @@ async fn verify_job_with_pending_status_works() {
646
639
let database_client = services. config . database ( ) ;
647
640
648
641
// Create a job with proper metadata structure
649
- let mut job_item =
650
- build_job_item_by_type_and_status ( JobType :: DataSubmission , JobStatus :: PendingVerification , "1" . to_string ( ) ) ;
642
+ let mut job_item = build_job_item ( JobType :: DataSubmission , JobStatus :: PendingVerification , 1 ) ;
651
643
652
644
// Set verification_attempt_no to 1 to simulate max attempts reached
653
645
job_item. metadata . common . verification_attempt_no = 1 ;
@@ -684,54 +676,6 @@ async fn verify_job_with_pending_status_works() {
684
676
assert_matches ! ( consumed_messages_verification_queue, QueueError :: NoData ) ;
685
677
}
686
678
687
- fn build_job_item_by_type_and_status ( job_type : JobType , job_status : JobStatus , internal_id : String ) -> JobItem {
688
- let block_number = internal_id. parse :: < u64 > ( ) . unwrap_or ( 0 ) ;
689
-
690
- // Create appropriate specific metadata based on job type
691
- let specific_metadata = match job_type {
692
- JobType :: SnosRun => JobSpecificMetadata :: Snos ( SnosMetadata {
693
- block_number,
694
- full_output : false ,
695
- cairo_pie_path : Some ( format ! ( "{}/{}" , block_number, CAIRO_PIE_FILE_NAME ) ) ,
696
- snos_output_path : Some ( format ! ( "{}/{}" , block_number, SNOS_OUTPUT_FILE_NAME ) ) ,
697
- program_output_path : Some ( format ! ( "{}/{}" , block_number, PROGRAM_OUTPUT_FILE_NAME ) ) ,
698
- snos_fact : None ,
699
- } ) ,
700
- JobType :: DataSubmission => JobSpecificMetadata :: Da ( DaMetadata {
701
- block_number,
702
- blob_data_path : Some ( format ! ( "{}/{}" , block_number, BLOB_DATA_FILE_NAME ) ) ,
703
- tx_hash : None ,
704
- } ) ,
705
- JobType :: ProofCreation => JobSpecificMetadata :: Proving ( ProvingMetadata {
706
- block_number,
707
- input_path : Some ( ProvingInputType :: CairoPie ( format ! ( "{}/{}" , block_number, CAIRO_PIE_FILE_NAME ) ) ) ,
708
- ensure_on_chain_registration : None ,
709
- download_proof : None ,
710
- } ) ,
711
- JobType :: StateTransition => JobSpecificMetadata :: StateUpdate ( StateUpdateMetadata {
712
- blocks_to_settle : vec ! [ block_number] ,
713
- snos_output_paths : vec ! [ format!( "{}/{}" , block_number, SNOS_OUTPUT_FILE_NAME ) ] ,
714
- program_output_paths : vec ! [ format!( "{}/{}" , block_number, PROGRAM_OUTPUT_FILE_NAME ) ] ,
715
- blob_data_paths : vec ! [ format!( "{}/{}" , block_number, BLOB_DATA_FILE_NAME ) ] ,
716
- last_failed_block_no : None ,
717
- tx_hashes : Vec :: new ( ) ,
718
- } ) ,
719
- _ => panic ! ( "Unsupported job type: {:?}" , job_type) ,
720
- } ;
721
-
722
- JobItem {
723
- id : Uuid :: new_v4 ( ) ,
724
- internal_id,
725
- job_type,
726
- status : job_status,
727
- external_id : ExternalId :: Number ( 0 ) ,
728
- metadata : JobMetadata { common : CommonMetadata :: default ( ) , specific : specific_metadata } ,
729
- version : 0 ,
730
- created_at : Utc :: now ( ) . round_subsecs ( 0 ) ,
731
- updated_at : Utc :: now ( ) . round_subsecs ( 0 ) ,
732
- }
733
- }
734
-
735
679
#[ rstest]
736
680
#[ case( JobType :: DataSubmission , JobStatus :: Completed ) ] // code should panic here, how can completed move to dl queue ?
737
681
#[ case( JobType :: SnosRun , JobStatus :: PendingVerification ) ]
0 commit comments