Skip to content

Update Orchestrator to Latest Changes and Prepare for Future L3 Support #524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Apr 16, 2025

Conversation

heemankv
Copy link
Contributor

@heemankv heemankv commented Mar 8, 2025

Description

This PR brings the orchestrator in this repo up to date with its latest changes. Moving forward, all orchestrator-related PRs will be raised directly in this repo, allowing us to archive the madara-orch repo.

Additionally, this update makes the orchestrator more production-ready by resolving a memory fabrication issue using jammalloc. We’ve also refactored the logic to better support future L3 integrations.

PR Type

  • Feature

Other Information

This update ensures a more stable and scalable orchestrator, setting the foundation for future improvements.

@heemankv heemankv self-assigned this Mar 8, 2025
@heemankv heemankv force-pushed the rebase/orchestrator-272b013a branch from 2464970 to 00bc605 Compare March 8, 2025 07:07
@Mohiiit Mohiiit changed the title update: orchestrator merge Update Orchestrator to Latest Changes and Prepare for Future L3 Support Mar 12, 2025
@Trantorian1 Trantorian1 moved this to In review in Madara Mar 17, 2025
@Trantorian1 Trantorian1 added feature Request for new feature or enhancement Project Orch labels Mar 17, 2025
@notlesh
Copy link
Contributor

notlesh commented Mar 17, 2025

This PR brings the orchestrator in this repo up to date with its latest changes.

For clarity, does this mean that it picks up from orchestrator PRs that were abandoned (because they were done while the monorepo was in progress)?

If so, it would be good to link to the relevant PRs.

@Mohiiit
Copy link
Member

Mohiiit commented Mar 19, 2025

Yes this includes all the changes that were done to the orch while monorepo was getting merged, links to the PR merged:

  1. Feat: Retry endpoint added madara-orchestrator#198
  2. refactor: job isolation done madara-orchestrator#204
  3. Memory Leak Fix: Implementing jemalloc Allocator madara-orchestrator#205
  4. Changes based on Griddy Chain madara-orchestrator#206
  5. Fix/improvements madara-orchestrator#207

cc: @notlesh

@Mohiiit Mohiiit requested a review from notlesh March 19, 2025 05:47
Copy link
Contributor

@raynaudoe raynaudoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uACK, left a few comments

}

pub async fn get_active_jobs(&self) -> HashSet<Uuid> {
self.active_jobs.lock().await.clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a read only lock might be a little more efficient

Copy link
Contributor Author

@heemankv heemankv Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would like to hear more on this, can you provide more context / we can also hop on a meet if you'd like

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to understand the use case (to understand if we really want only one reader), but seems that this function is never call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right, this was being used in a more complex implementation earlier, I guess we missed removing it, removing it now.
Great catch!

Ok(Ok(permit)) => {
{
let mut active_jobs = self.active_jobs.lock().await;
active_jobs.insert(job.id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe check here if active_jobs already has job.id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, let's discuss this with your idea of a read only lock,

log::warn!("DA job for the first block is not yet completed. Returning safely...");
return Ok(());
}
}
}

let mut blocks_to_process: Vec<u64> = find_successive_blocks_in_vector(blocks_to_process);

let mut blocks_to_process = find_successive_blocks_in_vector(blocks_to_process);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this vector cannot find any successive blocks or there are dangling blocks that don't have their corresponding neighbours? Those blocks would eventually get processed, right?

@raynaudoe
Copy link
Contributor

@heemankv can you please merge main so the branch has the latest fixes for CI ?

async fn update_state_for_block(
&self,
config: Arc<Config>,
block_no: u64,
snos: StarknetOsOutput,
nonce: u64,
program_output: Vec<[u8; 32]>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a Felt as the type here?

Copy link
Contributor Author

@heemankv heemankv Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because as of now this code-piece serves both Ethereum and Starknet,
IMO it would be very type intensive to change it just for Starknet.
Let me see what I can do

Copy link
Contributor Author

@heemankv heemankv Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any specific advantage that we achieve when we use the type Felt here (which we can only use for starknet) or is it to ensure type consistency ?
I think we can create an issue for this and handle this later if this is for type consistency, since this isn't a part of this PR's scope and would be a really Good First Issue.
Wdyt ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's used for multiple types, then Felt doesn't make sense. Seems fine for now, a Good First Issue for someone with some inspiration would be good :)

let mut job_item = build_job_item(JobType::DataSubmission, JobStatus::PendingVerification, 1);

// Set process_attempt_no to 1 to simulate max attempts reached
job_item.metadata.common.process_attempt_no = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a constant or some value you can query instead of hard coding this to 1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...Or one you could define here in the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just for testing purposes,
we are putting in 1 here to overcome the value defined here :

fn max_process_attempts(&self) -> u64 {
1
}

Comment on lines -742 to +772
// create a job
// Create a job
Copy link
Contributor

@notlesh notlesh Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These kind of changes create a lot of noise (and massive headaches / merge conflicts when merging code), are you doing this manually or does your editor do it for you? 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, I understand, it's the editor doing it by itself, IMO it's correct only, I don't see any harm in it.
lemme know if there are many like these

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the entire PR was littered with changes like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I understand,
We'll make a note of it for sure,
Do you believe these changes should be reverted ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably fine unless others feel strongly about it. If you try to remove it, git's -p switch can be useful (e.g. git checkout -p or git add -p when preparing a commit).


// Update job_item to use the proper metadata structure for SNOS jobs
job_item.metadata.specific = JobSpecificMetadata::Snos(SnosMetadata {
block_number: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these tests so far have used block 0, which is a very special case -- it might be good to test with other cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO that shouldn't matter for these tests, since its just verifying against mock DB server if it's able to interact with stuff or not.

specific: JobSpecificMetadata::StateUpdate(StateUpdateMetadata {
blocks_to_settle: vec![6, 7, 8], // Gap between 4 and 6
snos_output_paths: vec![
format!("{}/{}", 6, SNOS_OUTPUT_FILE_NAME),
Copy link
Contributor

@notlesh notlesh Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern ("{}/{}") is repeated a lot in this PR, it could be really helpful for readability/documentation/DRY to create a util fn for it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue for this here : #568

tx_hash: None,
}),
},
_ => panic!("Invalid job type"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave this out so the compiler can help

Copy link
Contributor Author

@heemankv heemankv Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ProofRegistration is added but yet not handled we have to keep this statement, It shall be removed after L3 support for orchestrator gets merge.

Copy link
Contributor

@notlesh notlesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good for the most part, I left a lot of comments but I don't have too much context so not all of it is relevant.

@raynaudoe
Copy link
Contributor

Hi @heemankv, can you please check the failing tests?

Copy link
Contributor

@HermanObst HermanObst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heemankv Some preliminary comments, Ill continue with the rest of the PR :)


// Create the StateUpdate-specific metadata
let state_update_metadata = StateUpdateMetadata {
blocks_to_settle: vec![block_number],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now this can only be 1 block, right?
I guess this is in preparation for Applicative recursion?

Copy link
Contributor Author

@heemankv heemankv Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a helper function that adds the block -1 state update job to the db, where block is the one being tested in e2e. This is to ensure that the job is created by the orch's state update logic.

Copy link
Contributor Author

@heemankv heemankv Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In preparation for Applicative recursion

No this is not for AR, rather we decided to do 10 (arbitrary number) block's state update together just so save up on the process of sending a job to queue and waiting for it to be picked, Since there's no way for us to do state update in parallel (due to nonce issue), we manually manage the nonce while sending upto 10 SU together.

@@ -357,13 +378,26 @@ pub async fn mock_proving_job_endpoint_output(sharp_client: &mut SharpClient) {

/// Puts after SNOS job state into the database
pub async fn put_job_data_in_db_da(mongo_db: &MongoDbServer, l2_block_number: String) {
// Create the DA-specific metadata
let da_metadata = DaMetadata {
block_number: l2_block_number.parse::<u64>().unwrap() - 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this -1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it seems to be the same as the internal_id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make sure this can't underflow (do a checked_sub() or saturating_sub()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a helper function that adds the block -1 DA job to the db, where block is the one being tested in e2e. This is to brute force way to ensure that the job is created by the orch's DA logic.

Comment on lines 17 to 20
pub struct JobProcessingState {
pub semaphore: Semaphore,
pub active_jobs: Mutex<HashSet<Uuid>>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heemankv do we need a Mutex to protect HashSet<Uuid>?
This will enforce that only one thread can write or read.

Maybe you just need RwLock that allows many threads to read the data (but only one can write)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: in the latest code, we've removed active_jobs since it was not being used.

}

pub async fn get_active_jobs(&self) -> HashSet<Uuid> {
self.active_jobs.lock().await.clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to understand the use case (to understand if we really want only one reader), but seems that this function is never call

@@ -344,14 +392,16 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not as of now, paths are provided by the worker itself, so ideally it shouldn't be here now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this comment since now paths are provided by the worker itself.

#[serde(with = "chrono::serde::ts_seconds_option")]
pub verification_completed_at: Option<DateTime<Utc>>,
/// Reason for job failure if any
pub failure_reason: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know the possible errors?
If we do, it always makes sense to include those types in the type system, rather than a concatenation of chars

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, and in the refactor we have included error types, it would be part of the refactor effort

Comment on lines +40 to +60
/// Macro to implement TryInto for JobSpecificMetadata variants
macro_rules! impl_try_into_metadata {
($variant:ident, $type:ident) => {
impl TryInto<$type> for JobSpecificMetadata {
type Error = eyre::Error;

fn try_into(self) -> Result<$type, Self::Error> {
match self {
JobSpecificMetadata::$variant(metadata) => Ok(metadata),
_ => Err(eyre!(concat!("Invalid metadata type: expected ", stringify!($variant), " metadata"))),
}
}
}
};
}

// Implement TryInto for all metadata types
impl_try_into_metadata!(Snos, SnosMetadata);
impl_try_into_metadata!(Proving, ProvingMetadata);
impl_try_into_metadata!(Da, DaMetadata);
impl_try_into_metadata!(StateUpdate, StateUpdateMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think implementing the try into for every type its much more clear, rather than a macro. Still its just my opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, could we add unit test on this to showcase usage (and ensure correctness)?

Comment on lines 6 to 12
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ProvingInputType {
/// Path to an existing proof
Proof(String),
/// Path to a Cairo PIE file
CairoPie(String),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be named as ProvingInputTypePath, right? as this struct is a path and the type is still not created
(Actually, we are "trusting" this path points to some data that then can be converted in a meaningful type)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid, I've changed it to ProvingInputTypePath

/// Block number to prove
pub block_number: u64,
/// Path to the input file (proof or Cairo PIE)
pub input_path: Option<ProvingInputType>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this relates to ProvingInputType?

Could you add some docs explaining it? (e.g: when is a proof, when a Cairo PIE, do they change in the job lifetime?, when can be None, etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this impl is a little ahead of itself, ProvingInputTypePath is to be heavily used in L3 Support, given we'll send the proofs instead of the pie for the bridge proofs while sending Cairo-Pie for the first proof.

Although I would like to follow YAGNI but here given it's already written and we know we are going to use it, we might as well keep it, TBH anything works for me, wdyt ?

Copy link
Contributor

@HermanObst HermanObst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second run

/// * `metadata` - Additional key-value pairs associated with the job
///
/// # Returns
/// * `Result<JobItem, JobError>` - The created job item or an error
async fn create_job(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused with the create_job function that actually creates the job in db, etc.
Maybe this should be named: create_job_item? And add in docs that the different impls of this function are called by create_job depending the job_type we want to create?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can change the db function from create_job to create_job_item.
Sounds fine ? WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!

/// # Returns
/// * `Result<(), JobError>` - Success or an error
///
/// # State Transitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// # State Transitions
/// # State Transitions depending Job state while entering this function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or something like that xD

Copy link
Contributor Author

@heemankv heemankv Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure about what you are looking for here, would appreciate elaboration.
Thanks!

/// # Returns
/// * `Result<(), JobError>` - Success or an error
///
/// # State Transitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// # State Transitions
/// # State Transitions depending Job state while entering this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

I am unsure about what you are looking for here, would appreciate elaboration.
Thanks!

Comment on lines 17 to 23
pub process_attempt_no: u64,
/// Number of times the job has been retried after processing failures
pub process_retry_attempt_no: u64,
/// Number of times the job has been verified
pub verification_attempt_no: u64,
/// Number of times the job has been retried after verification failures
pub verification_retry_attempt_no: u64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just being annoying here: u64 do not make sense for this :P
I would say u16 is more than sufficient

Copy link
Contributor Author

@heemankv heemankv Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid argument, changed.

#[serde(with = "chrono::serde::ts_seconds_option")]
pub verification_completed_at: Option<DateTime<Utc>>,
/// Reason for job failure if any
pub failure_reason: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not for this PR, but maybe it makes sense for this to be a Option<Vec<String>>?
I dont know if its relevant for us, but basically that way we wont loose the failure reasons and would have a log of them.

cc @Mohiiit @heemankv

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid argument, adding an issue to track this : #575

Copy link
Contributor

@HermanObst HermanObst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general comment I think the current implementation has scattered state transition logic, making it hard to reason about the full lifecycle. We should consider refactoring into a centralized state machine to improve clarity and testability.

I left some more comments, please answer :)

Moving from that, this changes LGTM

/// * `metadata` - Additional key-value pairs associated with the job
///
/// # Returns
/// * `Result<JobItem, JobError>` - The created job item or an error
async fn create_job(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!

Comment on lines +322 to +324
#[case(vec![651052, 651054, 651051, 651056], "numbers aren't sorted in increasing order")]
#[case(vec![651052, 651052, 651052, 651052], "Duplicated block numbers")]
#[case(vec![651052, 651052, 651053, 651053], "Duplicated block numbers")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heemankv so if the block numbers are not ordered the system fails?
Is a way to recover?

@raynaudoe

Copy link
Contributor Author

@heemankv heemankv Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah,
No there's no way to recover from it as of now, since we expect the values to be sorted, as they are processed in that fashion

// Set input path as CairoPie type
input_path: snos_metadata.cairo_pie_path.map(ProvingInputTypePath::CairoPie),
// Set download path if needed
download_proof: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be always None?
@heemankv

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this change is also ahead of itself, it is being heavily used in the L3 PR we'll be raising soon.

)
.await?,
Some(last_block_processed_in_last_job),
Some(last_block_processed),
)
}
None => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heemankv how can this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would need more context on what you are asking. 😅

@Mohiiit Mohiiit merged commit a99088a into main Apr 16, 2025
21 of 26 checks passed
@github-project-automation github-project-automation bot moved this from In review to Done in Madara Apr 16, 2025
@heemankv heemankv deleted the rebase/orchestrator-272b013a branch May 22, 2025 19:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Request for new feature or enhancement Project Orch
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

6 participants