-
Notifications
You must be signed in to change notification settings - Fork 74
Update Orchestrator to Latest Changes and Prepare for Future L3 Support #524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2464970
to
00bc605
Compare
For clarity, does this mean that it picks up from orchestrator PRs that were abandoned (because they were done while the monorepo was in progress)? If so, it would be good to link to the relevant PRs. |
Yes this includes all the changes that were done to the orch while monorepo was getting merged, links to the PR merged:
cc: @notlesh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uACK, left a few comments
} | ||
|
||
pub async fn get_active_jobs(&self) -> HashSet<Uuid> { | ||
self.active_jobs.lock().await.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a read only lock might be a little more efficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would like to hear more on this, can you provide more context / we can also hop on a meet if you'd like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to understand the use case (to understand if we really want only one reader), but seems that this function is never call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right, this was being used in a more complex implementation earlier, I guess we missed removing it, removing it now.
Great catch!
Ok(Ok(permit)) => { | ||
{ | ||
let mut active_jobs = self.active_jobs.lock().await; | ||
active_jobs.insert(job.id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe check here if active_jobs
already has job.id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, let's discuss this with your idea of a read only lock,
log::warn!("DA job for the first block is not yet completed. Returning safely..."); | ||
return Ok(()); | ||
} | ||
} | ||
} | ||
|
||
let mut blocks_to_process: Vec<u64> = find_successive_blocks_in_vector(blocks_to_process); | ||
|
||
let mut blocks_to_process = find_successive_blocks_in_vector(blocks_to_process); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this vector cannot find any successive blocks or there are dangling blocks that don't have their corresponding neighbours? Those blocks would eventually get processed, right?
@heemankv can you please merge main so the branch has the latest fixes for CI ? |
async fn update_state_for_block( | ||
&self, | ||
config: Arc<Config>, | ||
block_no: u64, | ||
snos: StarknetOsOutput, | ||
nonce: u64, | ||
program_output: Vec<[u8; 32]>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use a Felt
as the type here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because as of now this code-piece serves both Ethereum and Starknet,
IMO it would be very type intensive to change it just for Starknet.
Let me see what I can do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any specific advantage that we achieve when we use the type Felt
here (which we can only use for starknet) or is it to ensure type consistency ?
I think we can create an issue for this and handle this later if this is for type consistency, since this isn't a part of this PR's scope and would be a really Good First Issue
.
Wdyt ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's used for multiple types, then Felt
doesn't make sense. Seems fine for now, a Good First Issue
for someone with some inspiration would be good :)
let mut job_item = build_job_item(JobType::DataSubmission, JobStatus::PendingVerification, 1); | ||
|
||
// Set process_attempt_no to 1 to simulate max attempts reached | ||
job_item.metadata.common.process_attempt_no = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a constant or some value you can query instead of hard coding this to 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...Or one you could define here in the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is just for testing purposes,
we are putting in 1 here to overcome the value defined here :
madara/orchestrator/crates/orchestrator/src/jobs/da_job/mod.rs
Lines 197 to 199 in 041b6f1
fn max_process_attempts(&self) -> u64 { | |
1 | |
} |
// create a job | ||
// Create a job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These kind of changes create a lot of noise (and massive headaches / merge conflicts when merging code), are you doing this manually or does your editor do it for you? 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, I understand, it's the editor doing it by itself, IMO it's correct only, I don't see any harm in it.
lemme know if there are many like these
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the entire PR was littered with changes like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I understand,
We'll make a note of it for sure,
Do you believe these changes should be reverted ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably fine unless others feel strongly about it. If you try to remove it, git's -p
switch can be useful (e.g. git checkout -p
or git add -p
when preparing a commit).
|
||
// Update job_item to use the proper metadata structure for SNOS jobs | ||
job_item.metadata.specific = JobSpecificMetadata::Snos(SnosMetadata { | ||
block_number: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of these tests so far have used block 0, which is a very special case -- it might be good to test with other cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO that shouldn't matter for these tests, since its just verifying against mock DB server if it's able to interact with stuff or not.
specific: JobSpecificMetadata::StateUpdate(StateUpdateMetadata { | ||
blocks_to_settle: vec![6, 7, 8], // Gap between 4 and 6 | ||
snos_output_paths: vec![ | ||
format!("{}/{}", 6, SNOS_OUTPUT_FILE_NAME), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern ("{}/{}"
) is repeated a lot in this PR, it could be really helpful for readability/documentation/DRY to create a util fn for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created an issue for this here : #568
tx_hash: None, | ||
}), | ||
}, | ||
_ => panic!("Invalid job type"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd leave this out so the compiler can help
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since ProofRegistration
is added but yet not handled we have to keep this statement, It shall be removed after L3 support for orchestrator gets merge.
orchestrator/crates/orchestrator/src/tests/workers/proving/mod.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good for the most part, I left a lot of comments but I don't have too much context so not all of it is relevant.
Hi @heemankv, can you please check the failing tests? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heemankv Some preliminary comments, Ill continue with the rest of the PR :)
|
||
// Create the StateUpdate-specific metadata | ||
let state_update_metadata = StateUpdateMetadata { | ||
blocks_to_settle: vec![block_number], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now this can only be 1 block, right?
I guess this is in preparation for Applicative recursion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a helper function that adds the block -1
state update job to the db, where block
is the one being tested in e2e. This is to ensure that the job is created by the orch's state update logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In preparation for Applicative recursion
No this is not for AR, rather we decided to do 10 (arbitrary number) block's state update together just so save up on the process of sending a job to queue and waiting for it to be picked, Since there's no way for us to do state update in parallel (due to nonce issue), we manually manage the nonce while sending upto 10 SU together.
@@ -357,13 +378,26 @@ pub async fn mock_proving_job_endpoint_output(sharp_client: &mut SharpClient) { | |||
|
|||
/// Puts after SNOS job state into the database | |||
pub async fn put_job_data_in_db_da(mongo_db: &MongoDbServer, l2_block_number: String) { | |||
// Create the DA-specific metadata | |||
let da_metadata = DaMetadata { | |||
block_number: l2_block_number.parse::<u64>().unwrap() - 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it seems to be the same as the internal_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make sure this can't underflow (do a checked_sub()
or saturating_sub()
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a helper function that adds the block -1 DA job to the db, where block is the one being tested in e2e. This is to brute force way to ensure that the job is created by the orch's DA logic.
pub struct JobProcessingState { | ||
pub semaphore: Semaphore, | ||
pub active_jobs: Mutex<HashSet<Uuid>>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heemankv do we need a Mutex to protect HashSet<Uuid>
?
This will enforce that only one thread can write or read.
Maybe you just need RwLock
that allows many threads to read the data (but only one can write)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: in the latest code, we've removed active_jobs
since it was not being used.
} | ||
|
||
pub async fn get_active_jobs(&self) -> HashSet<Uuid> { | ||
self.active_jobs.lock().await.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to understand the use case (to understand if we really want only one reader), but seems that this function is never call
@@ -344,14 +392,16 @@ pub async fn state_update_to_blob_data( | |||
} | |||
|
|||
/// To store the blob data using the storage client with path <block_number>/blob_data.txt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not as of now, paths are provided by the worker itself, so ideally it shouldn't be here now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this comment since now paths are provided by the worker itself.
#[serde(with = "chrono::serde::ts_seconds_option")] | ||
pub verification_completed_at: Option<DateTime<Utc>>, | ||
/// Reason for job failure if any | ||
pub failure_reason: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know the possible errors?
If we do, it always makes sense to include those types in the type system, rather than a concatenation of chars
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, and in the refactor we have included error types, it would be part of the refactor effort
/// Macro to implement TryInto for JobSpecificMetadata variants | ||
macro_rules! impl_try_into_metadata { | ||
($variant:ident, $type:ident) => { | ||
impl TryInto<$type> for JobSpecificMetadata { | ||
type Error = eyre::Error; | ||
|
||
fn try_into(self) -> Result<$type, Self::Error> { | ||
match self { | ||
JobSpecificMetadata::$variant(metadata) => Ok(metadata), | ||
_ => Err(eyre!(concat!("Invalid metadata type: expected ", stringify!($variant), " metadata"))), | ||
} | ||
} | ||
} | ||
}; | ||
} | ||
|
||
// Implement TryInto for all metadata types | ||
impl_try_into_metadata!(Snos, SnosMetadata); | ||
impl_try_into_metadata!(Proving, ProvingMetadata); | ||
impl_try_into_metadata!(Da, DaMetadata); | ||
impl_try_into_metadata!(StateUpdate, StateUpdateMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think implementing the try into for every type its much more clear, rather than a macro. Still its just my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, could we add unit test on this to showcase usage (and ensure correctness)?
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] | ||
pub enum ProvingInputType { | ||
/// Path to an existing proof | ||
Proof(String), | ||
/// Path to a Cairo PIE file | ||
CairoPie(String), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be named as ProvingInputTypePath
, right? as this struct is a path and the type is still not created
(Actually, we are "trusting" this path points to some data that then can be converted in a meaningful type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid, I've changed it to ProvingInputTypePath
/// Block number to prove | ||
pub block_number: u64, | ||
/// Path to the input file (proof or Cairo PIE) | ||
pub input_path: Option<ProvingInputType>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How this relates to ProvingInputType
?
Could you add some docs explaining it? (e.g: when is a proof, when a Cairo PIE, do they change in the job lifetime?, when can be None, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this impl is a little ahead of itself, ProvingInputTypePath
is to be heavily used in L3 Support
, given we'll send the proofs instead of the pie for the bridge proofs while sending Cairo-Pie for the first proof.
Although I would like to follow YAGNI
but here given it's already written and we know we are going to use it, we might as well keep it, TBH anything works for me, wdyt ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second run
/// * `metadata` - Additional key-value pairs associated with the job | ||
/// | ||
/// # Returns | ||
/// * `Result<JobItem, JobError>` - The created job item or an error | ||
async fn create_job( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused with the create_job
function that actually creates the job in db, etc.
Maybe this should be named: create_job_item
? And add in docs that the different impls of this function are called by create_job
depending the job_type we want to create?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we can change the db function from create_job
to create_job_item
.
Sounds fine ? WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed!
/// # Returns | ||
/// * `Result<(), JobError>` - Success or an error | ||
/// | ||
/// # State Transitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// # State Transitions | |
/// # State Transitions depending Job state while entering this function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or something like that xD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am unsure about what you are looking for here, would appreciate elaboration.
Thanks!
/// # Returns | ||
/// * `Result<(), JobError>` - Success or an error | ||
/// | ||
/// # State Transitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// # State Transitions | |
/// # State Transitions depending Job state while entering this function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
I am unsure about what you are looking for here, would appreciate elaboration.
Thanks!
pub process_attempt_no: u64, | ||
/// Number of times the job has been retried after processing failures | ||
pub process_retry_attempt_no: u64, | ||
/// Number of times the job has been verified | ||
pub verification_attempt_no: u64, | ||
/// Number of times the job has been retried after verification failures | ||
pub verification_retry_attempt_no: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just being annoying here: u64 do not make sense for this :P
I would say u16
is more than sufficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid argument, changed.
#[serde(with = "chrono::serde::ts_seconds_option")] | ||
pub verification_completed_at: Option<DateTime<Utc>>, | ||
/// Reason for job failure if any | ||
pub failure_reason: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a valid argument, adding an issue to track this : #575
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a general comment I think the current implementation has scattered state transition logic, making it hard to reason about the full lifecycle. We should consider refactoring into a centralized state machine to improve clarity and testability.
I left some more comments, please answer :)
Moving from that, this changes LGTM
/// * `metadata` - Additional key-value pairs associated with the job | ||
/// | ||
/// # Returns | ||
/// * `Result<JobItem, JobError>` - The created job item or an error | ||
async fn create_job( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed!
#[case(vec![651052, 651054, 651051, 651056], "numbers aren't sorted in increasing order")] | ||
#[case(vec![651052, 651052, 651052, 651052], "Duplicated block numbers")] | ||
#[case(vec![651052, 651052, 651053, 651053], "Duplicated block numbers")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heemankv so if the block numbers are not ordered the system fails?
Is a way to recover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah,
No there's no way to recover from it as of now, since we expect the values to be sorted, as they are processed in that fashion
// Set input path as CairoPie type | ||
input_path: snos_metadata.cairo_pie_path.map(ProvingInputTypePath::CairoPie), | ||
// Set download path if needed | ||
download_proof: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be always None?
@heemankv
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this change is also ahead of itself, it is being heavily used in the L3 PR we'll be raising soon.
) | ||
.await?, | ||
Some(last_block_processed_in_last_job), | ||
Some(last_block_processed), | ||
) | ||
} | ||
None => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heemankv how can this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would need more context on what you are asking. 😅
Description
This PR brings the orchestrator in this repo up to date with its latest changes. Moving forward, all orchestrator-related PRs will be raised directly in this repo, allowing us to archive the
madara-orch
repo.Additionally, this update makes the orchestrator more production-ready by resolving a memory fabrication issue using
jammalloc
. We’ve also refactored the logic to better support future L3 integrations.PR Type
Other Information
This update ensures a more stable and scalable orchestrator, setting the foundation for future improvements.