Skip to content

Exclusive access task #1025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 1, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
346 changes: 346 additions & 0 deletions src/bloqade/analog/task/exclusive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,346 @@
import os
import abc
import uuid
import re

from beartype.typing import Dict
from dataclasses import dataclass, field

from bloqade.analog.task.base import Geometry, CustomRemoteTaskABC
from bloqade.analog.builder.typing import ParamType
from bloqade.analog.submission.ir.parallel import ParallelDecoder
from bloqade.analog.submission.ir.task_results import (
QuEraTaskResults,
QuEraTaskStatusCode,
)
from bloqade.analog.submission.ir.task_specification import QuEraTaskSpecification
from requests import request, get
from bloqade.analog.serialize import Serializer


class HTTPHandlerABC:
@abc.abstractmethod
def submit_task_via_zapier(task_ir: QuEraTaskSpecification, task_id: str):
"""Submit a task and add task_id to the task fields for querying later.

args:
task_ir: The task to be submitted.
task_id: The task id to be added to the task fields.

returns
response: The response from the Zapier webhook. used for error handling

"""
...

Check warning on line 34 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L34

Added line #L34 was not covered by tests

@abc.abstractmethod
def query_task_status(task_id: str):
"""Query the task status from the AirTable.

args:
task_id: The task id to be queried.

returns
response: The response from the AirTable. used for error handling

"""
...

Check warning on line 47 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L47

Added line #L47 was not covered by tests

@abc.abstractmethod
def fetch_results(task_id: str):
"""Fetch the task results from the AirTable.

args:
task_id: The task id to be queried.

returns
response: The response from the AirTable. used for error handling

"""

...

Check warning on line 61 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L61

Added line #L61 was not covered by tests


def convert_preview_to_download(preview_url):
# help function to convert the googledrive preview URL to download URL
# Only used in http handler
match = re.search(r"/d/([^/]+)/", preview_url)
if not match:
raise ValueError("Invalid preview URL format")
file_id = match.group(1)
return f"https://drive.usercontent.google.com/download?id={file_id}&export=download"

Check warning on line 71 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L67-L71

Added lines #L67 - L71 were not covered by tests


class HTTPHandler(HTTPHandlerABC):
def __init__(
self,
zapier_webhook_url: str = None,
zapier_webhook_key: str = None,
vercel_api_url: str = None,
):
self.zapier_webhook_url = zapier_webhook_url or os.environ["ZAPIER_WEBHOOK_URL"]
self.zapier_webhook_key = zapier_webhook_key or os.environ["ZAPIER_WEBHOOK_KEY"]
self.verrcel_api_url = vercel_api_url or os.environ["VERCEL_API_URL"]

Check warning on line 83 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L81-L83

Added lines #L81 - L83 were not covered by tests

def submit_task_via_zapier(
self, task_ir: QuEraTaskSpecification, task_id: str, task_note: str
):
# implement http request logic to submit task via Zapier
request_options = dict(params={"key": self.zapier_webhook_key, "note": task_id})

Check warning on line 89 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L89

Added line #L89 was not covered by tests

# for metadata, task_ir in self._compile_single(shots, use_experimental, args):
json_request_body = task_ir.json(exclude_none=True, exclude_unset=True)

Check warning on line 92 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L92

Added line #L92 was not covered by tests

request_options.update(data=json_request_body)
response = request("POST", self.zapier_webhook_url, **request_options)

Check warning on line 95 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L94-L95

Added lines #L94 - L95 were not covered by tests

if response.status_code == 200:
response_data = response.json()
submit_status = response_data.get("status", None)
return submit_status

Check warning on line 100 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L97-L100

Added lines #L97 - L100 were not covered by tests
else:
print(f"HTTP request failed with status code: {response.status_code}")
print("HTTP responce: ", response.text)
return "Failed"

Check warning on line 104 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L102-L104

Added lines #L102 - L104 were not covered by tests

def query_task_status(self, task_id: str):
response = request(

Check warning on line 107 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L107

Added line #L107 was not covered by tests
"GET",
self.verrcel_api_url,
params={
"searchPattern": task_id,
"magicToken": self.zapier_webhook_key,
"useRegex": False,
},
)
if response.status_code != 200:
return "Not Found"
response_data = response.json()

Check warning on line 118 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L116-L118

Added lines #L116 - L118 were not covered by tests
# Get "matched" from the response
matches = response_data.get("matches", None)

Check warning on line 120 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L120

Added line #L120 was not covered by tests
# The return is a list of dictionaries
# Verify if the list contains only one element
if matches is None:
print("No task found with the given ID.")
return "Failed"
elif len(matches) > 1:
print("Multiple tasks found with the given ID.")
return "Failed"

Check warning on line 128 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L123-L128

Added lines #L123 - L128 were not covered by tests

# Extract the status from the first dictionary
status = matches[0].get("status")
return status

Check warning on line 132 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L131-L132

Added lines #L131 - L132 were not covered by tests

def fetch_results(self, task_id: str):
response = request(

Check warning on line 135 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L135

Added line #L135 was not covered by tests
"GET",
self.verrcel_api_url,
params={
"searchPattern": task_id,
"magicToken": self.zapier_webhook_key,
"useRegex": False,
},
)
if response.status_code != 200:
print(f"HTTP request failed with status code: {response.status_code}")
print("HTTP responce: ", response.text)
return None

Check warning on line 147 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L144-L147

Added lines #L144 - L147 were not covered by tests

response_data = response.json()

Check warning on line 149 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L149

Added line #L149 was not covered by tests
# Get "matched" from the response
matches = response_data.get("matches", None)

Check warning on line 151 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L151

Added line #L151 was not covered by tests
# The return is a list of dictionaries
# Verify if the list contains only one element
if matches is None:
print("No task found with the given ID.")
return None
elif len(matches) > 1:
print("Multiple tasks found with the given ID.")
return None
record = matches[0]
if record.get("status") == "Completed":
googledoc = record.get("resultsFileUrl")

Check warning on line 162 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L154-L162

Added lines #L154 - L162 were not covered by tests

# convert the preview URL to download URL
googledoc = convert_preview_to_download(googledoc)
res = get(googledoc)
res.raise_for_status()
data = res.json()

Check warning on line 168 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L165-L168

Added lines #L165 - L168 were not covered by tests

task_results = QuEraTaskResults(**data)
return task_results

Check warning on line 171 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L170-L171

Added lines #L170 - L171 were not covered by tests


class TestHTTPHandler(HTTPHandlerABC):
pass


@dataclass
@Serializer.register
class ExclusiveRemoteTask(CustomRemoteTaskABC):
_task_ir: QuEraTaskSpecification | None
_metadata: Dict[str, ParamType]
_parallel_decoder: ParallelDecoder | None
_http_handler: HTTPHandlerABC = field(default_factory=HTTPHandler)
_task_id: str | None = None
_task_result_ir: QuEraTaskResults | None = None

def __post_init__(self):
float_sites = list(

Check warning on line 189 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L189

Added line #L189 was not covered by tests
map(lambda x: (float(x[0]), float(x[1])), self._task_ir.lattice.sites)
)
self._geometry = Geometry(

Check warning on line 192 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L192

Added line #L192 was not covered by tests
float_sites, self._task_ir.lattice.filling, self._parallel_decoder
)

@classmethod
def from_compile_results(cls, task_ir, metadata, parallel_decoder):
return cls(

Check warning on line 198 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L198

Added line #L198 was not covered by tests
_task_ir=task_ir,
_metadata=metadata,
_parallel_decoder=parallel_decoder,
)

def _submit(self, force: bool = False) -> "ExclusiveRemoteTask":
if not force:
if self._task_id is not None:
raise ValueError(

Check warning on line 207 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L205-L207

Added lines #L205 - L207 were not covered by tests
"the task is already submitted with %s" % (self._task_id)
)
self._task_id = str(uuid.uuid4())

Check warning on line 210 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L210

Added line #L210 was not covered by tests

if (

Check warning on line 212 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L212

Added line #L212 was not covered by tests
self._http_handler.submit_task_via_zapier(
self._task_ir, self._task_id, None
)
== "success"
):
self._task_result_ir = QuEraTaskResults(

Check warning on line 218 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L218

Added line #L218 was not covered by tests
task_status=QuEraTaskStatusCode.Accepted
)
else:
self._task_result_ir = QuEraTaskResults(

Check warning on line 222 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L222

Added line #L222 was not covered by tests
task_status=QuEraTaskStatusCode.Failed
)
return self

Check warning on line 225 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L225

Added line #L225 was not covered by tests

def fetch(self):
if self._task_result_ir.task_status is QuEraTaskStatusCode.Unsubmitted:
raise ValueError("Task ID not found.")

Check warning on line 229 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L228-L229

Added lines #L228 - L229 were not covered by tests

if self._task_result_ir.task_status in [

Check warning on line 231 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L231

Added line #L231 was not covered by tests
QuEraTaskStatusCode.Completed,
QuEraTaskStatusCode.Partial,
QuEraTaskStatusCode.Failed,
QuEraTaskStatusCode.Unaccepted,
QuEraTaskStatusCode.Cancelled,
]:
return self

Check warning on line 238 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L238

Added line #L238 was not covered by tests

status = self.status()
if status in [QuEraTaskStatusCode.Completed, QuEraTaskStatusCode.Partial]:
self._task_result_ir = self._http_handler.fetch_results(self._task_id)

Check warning on line 242 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L240-L242

Added lines #L240 - L242 were not covered by tests
else:
self._task_result_ir = QuEraTaskResults(task_status=status)

Check warning on line 244 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L244

Added line #L244 was not covered by tests

return self

Check warning on line 246 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L246

Added line #L246 was not covered by tests

def pull(self):
# Please avoid using this method, it's blocking and the waiting time is hours long
# Throw an error saying this is not supported
raise NotImplementedError(

Check warning on line 251 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L251

Added line #L251 was not covered by tests
"Pulling is not supported. Please use fetch() instead."
)

def cancel(self):
# This is not supported
raise NotImplementedError("Cancelling is not supported.")

Check warning on line 257 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L257

Added line #L257 was not covered by tests

def status(self) -> QuEraTaskStatusCode:
if self._task_id is None:
return QuEraTaskStatusCode.Unsubmitted
res = self._http_handler.query_task_status(self._task_id)
if res == "Failed":
raise ValueError("Query task status failed.")
elif res == "Submitted":
return QuEraTaskStatusCode.Enqueued

Check warning on line 266 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L260-L266

Added lines #L260 - L266 were not covered by tests
# TODO: please add all possible status
elif res == "Completed":
return QuEraTaskStatusCode.Completed
elif res == "Running":

Check warning on line 270 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L268-L270

Added lines #L268 - L270 were not covered by tests
# Not covered by test
return QuEraTaskStatusCode.Executing

Check warning on line 272 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L272

Added line #L272 was not covered by tests
else:
return self._task_result_ir.task_status

Check warning on line 274 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L274

Added line #L274 was not covered by tests

def _result_exists(self):
if self._task_result_ir is None:
return False

Check warning on line 278 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L277-L278

Added lines #L277 - L278 were not covered by tests
else:
if self._task_result_ir.task_status == QuEraTaskStatusCode.Completed:
return True

Check warning on line 281 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L280-L281

Added lines #L280 - L281 were not covered by tests
else:
return False

Check warning on line 283 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L283

Added line #L283 was not covered by tests

def result(self):
if self._task_result_ir is None:
raise ValueError("Task result not found.")
return self._task_result_ir

Check warning on line 288 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L286-L288

Added lines #L286 - L288 were not covered by tests

@property
def metadata(self):
return self._metadata

Check warning on line 292 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L292

Added line #L292 was not covered by tests

@property
def geometry(self):
return self._geometry

Check warning on line 296 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L296

Added line #L296 was not covered by tests

@property
def task_ir(self):
return self._task_ir

Check warning on line 300 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L300

Added line #L300 was not covered by tests

@property
def task_id(self) -> str:
assert isinstance(self._task_id, str), "Task ID is not set"
return self._task_id

Check warning on line 305 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L304-L305

Added lines #L304 - L305 were not covered by tests

@property
def task_result_ir(self):
return self._task_result_ir

Check warning on line 309 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L309

Added line #L309 was not covered by tests

@property
def parallel_decoder(self):
return self._parallel_decoder

Check warning on line 313 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L313

Added line #L313 was not covered by tests

@task_result_ir.setter
def task_result_ir(self, task_result_ir: QuEraTaskResults):
self._task_result_ir = task_result_ir

Check warning on line 317 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L317

Added line #L317 was not covered by tests


@ExclusiveRemoteTask.set_serializer
def _serialze(obj: ExclusiveRemoteTask) -> Dict[str, ParamType]:
return {

Check warning on line 322 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L322

Added line #L322 was not covered by tests
"task_id": obj.task_id or None,
"task_ir": obj.task_ir.dict(by_alias=True, exclude_none=True),
"metadata": obj.metadata,
"parallel_decoder": (
obj.parallel_decoder.dict() if obj.parallel_decoder else None
),
"task_result_ir": obj.task_result_ir.dict() if obj.task_result_ir else None,
}


@ExclusiveRemoteTask.set_deserializer
def _deserializer(d: Dict[str, any]) -> ExclusiveRemoteTask:
d1 = dict()
d1["_task_ir"] = QuEraTaskSpecification(**d["task_ir"])
d1["_parallel_decoder"] = (

Check warning on line 337 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L335-L337

Added lines #L335 - L337 were not covered by tests
ParallelDecoder(**d["parallel_decoder"]) if d["parallel_decoder"] else None
)
d1["_metadata"] = d["metadata"]
d1["_task_result_ir"] = (

Check warning on line 341 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L340-L341

Added lines #L340 - L341 were not covered by tests
QuEraTaskResults(**d["task_result_ir"]) if d["task_result_ir"] else None
)
d1["_task_id"] = d["task_id"]

Check warning on line 344 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L344

Added line #L344 was not covered by tests

return ExclusiveRemoteTask(**d1)

Check warning on line 346 in src/bloqade/analog/task/exclusive.py

View check run for this annotation

Codecov / codecov/patch

src/bloqade/analog/task/exclusive.py#L346

Added line #L346 was not covered by tests