-
Notifications
You must be signed in to change notification settings - Fork 1.8k
draft of decoupling #4893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
draft of decoupling #4893
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
deployment/helm/charts/onyx/templates/celery-worker-document-processing.yaml
Outdated
Show resolved
Hide resolved
@@ -393,7 +399,9 @@ def _run_indexing( | |||
index_attempt: IndexAttempt | None = None | |||
try: | |||
with get_session_with_current_tenant() as db_session_temp: | |||
index_attempt = get_index_attempt(db_session_temp, index_attempt_id) | |||
index_attempt = get_index_attempt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this eager load stuff seems to be exposing some DB behavior ... maybe a bit problematic
backend/onyx/background/celery/versioned_apps/document_processing.py
Outdated
Show resolved
Hide resolved
backend/onyx/background/celery/tasks/document_processing/tasks.py
Outdated
Show resolved
Hide resolved
backend/onyx/background/celery/tasks/document_processing/tasks.py
Outdated
Show resolved
Hide resolved
backend/onyx/background/celery/tasks/document_processing/tasks.py
Outdated
Show resolved
Hide resolved
af01b09
to
9592aa7
Compare
9592aa7
to
512e122
Compare
512e122
to
ee30d7b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
Major architectural change to decouple document fetching from processing in the indexing pipeline, allowing parallel execution through a new blob store intermediary layer.
- Split monolithic indexing process into
docfetching_task
(fetches source documents) anddocument_indexing_pipeline_task
(handles processing/embedding/storage), coordinated via Redis locks - Added new
DocumentBatchStorage
system using FileStore abstraction to manage intermediate document storage between fetching and processing stages - Implemented robust error handling and state management for the decoupled tasks, including proper cleanup of temporary storage
- Updated
run_indexing.py
to support the new pipeline architecture with separateDocExtractionContext
andDocIndexingContext
models - Simplified session handling in
S3BackedFileStore
by movingdb_session
from constructor to method parameters for better dependency injection
35 files reviewed, 9 comments
Edit PR Review Bot Settings | Greptile
# Default parameters for creation | ||
DEFAULT_KWARGS = { | ||
"http2": True, | ||
"limits": lambda: httpx.Limits(), | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Remove empty comment and blank line as they no longer serve a purpose since DEFAULT_KWARGS was moved
backend/onyx/configs/constants.py
Outdated
@@ -66,6 +66,7 @@ | |||
POSTGRES_CELERY_WORKER_LIGHT_APP_NAME = "celery_worker_light" | |||
POSTGRES_CELERY_WORKER_HEAVY_APP_NAME = "celery_worker_heavy" | |||
POSTGRES_CELERY_WORKER_INDEXING_APP_NAME = "celery_worker_indexing" | |||
POSTGRES_CELERY_WORKER_docfetching_APP_NAME = "celery_worker_docfetching" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Use consistent casing ('docfetching' should be 'DOCFETCHING')
POSTGRES_CELERY_WORKER_docfetching_APP_NAME = "celery_worker_docfetching" | |
POSTGRES_CELERY_WORKER_DOCFETCHING_APP_NAME = "celery_worker_docfetching" |
def parallel_yield_from_funcs( | ||
funcs: list[Callable[..., R]], | ||
max_workers: int = 10, | ||
) -> Iterator[R]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Consider allowing args and kwargs parameters to support functions that take arguments. Currently only supports nullary functions.
def parallel_yield_from_funcs( | |
funcs: list[Callable[..., R]], | |
max_workers: int = 10, | |
) -> Iterator[R]: | |
def parallel_yield_from_funcs( | |
funcs: list[Callable[[], R]], | |
max_workers: int = 10, | |
) -> Iterator[R]: |
"onyx.background.celery.tasks.docfetching", | ||
"onyx.background.celery.tasks.indexing", # TODO: remove this and move the task to docfetching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The indexing tasks dependency creates coupling that contradicts the PR's decoupling goal. Need to complete the TODO task migration before merging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
# TODO: change to doc extraction if it doesnt break things | ||
callback.progress("_run_indexing", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Callback still references '_run_indexing' but should be updated to 'doc_extraction' for clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should docfetching specific stuff be moved into a docfetching folder?
if batches_processed > last_batches_completed: | ||
last_batches_completed = batches_processed | ||
last_progress_time = time.monotonic() | ||
elif time.monotonic() - last_progress_time > 3600 * 6: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: 6 hour timeout hardcoded - consider making this configurable via constant or config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
if not self.file_store.has_file( | ||
file_id=file_name, | ||
file_origin=FileOrigin.OTHER, | ||
file_type="application/json", | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: file_type should be constant, define FILE_TYPE = 'application/json' at class level
except Exception as e: | ||
logger.warning(f"Failed to delete extraction state: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: error message indicates 'extraction state' but this is in a loop over all state types
except Exception as e: | |
logger.warning(f"Failed to delete extraction state: {e}") | |
except Exception as e: | |
logger.warning(f"Failed to delete {state_type.value} state: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer logger.exception
here (prints out the stack trace nicely)
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The bare except block here could mask important errors. Consider handling specific exceptions (like ClientError for S3 operations and SQLAlchemyError for database operations) separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss how we want to do the monitoring, but overall looks good 🧐
"onyx.background.celery.tasks.docfetching", | ||
"onyx.background.celery.tasks.indexing", # TODO: remove this and move the task to docfetching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -538,6 +538,27 @@ slackbot: | |||
limits: | |||
cpu: "1000m" | |||
memory: "2000Mi" | |||
celery_worker_docfetching: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing actual worker for this
except Exception as e: | ||
logger.warning(f"Failed to delete extraction state: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer logger.exception
here (prints out the stack trace nicely)
|
||
# Get batch storage (transition to IN_PROGRESS is handled by run_indexing_entrypoint) | ||
with get_session_with_current_tenant() as db_session: | ||
batch_storage = get_document_batch_storage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems slightly problematic (it's also in a few other places). The db session will no longer be valid outside of the with
statement
# TODO: change to doc extraction if it doesnt break things | ||
callback.progress("_run_indexing", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
logger.error(f"Failed to store {state_type} state: {e}") | ||
raise | ||
|
||
def _get_state(self, state_type: str) -> DocumentStorageState | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should state_type
be an Enum type?
) | ||
# Get the document batch storage | ||
with get_session_with_current_tenant() as db_session: | ||
storage = get_document_batch_storage(tenant_id, index_attempt_id, db_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about db_session
if batches_processed > last_batches_completed: | ||
last_batches_completed = batches_processed | ||
last_progress_time = time.monotonic() | ||
elif time.monotonic() - last_progress_time > 3600 * 6: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
) | ||
|
||
with get_session_with_current_tenant() as db_session: | ||
storage = get_document_batch_storage(tenant_id, index_attempt_id, db_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about db_session
tenant_id: str, | ||
) -> int | None: | ||
""" | ||
TODO: update docstring to reflect docfetching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do this TODO now 🥺
bf3c890
to
da665a6
Compare
Description
Addresses https://linear.app/danswer/issue/DAN-2151/decoupled-indexing-pipeline
Note: the following summary describes a single connector indexing, ignoring the mechanisms we use for preventing race conditions, overlapping connector runs, etc
Pre-this-PR indexing pipeline:
0. "check for indexing" task runs, which calls "try_creating_indexing_task" which spawns a new process
for a new scheduled index attempt. That new process (which runs the connector_indexing_task function) does the following:
Note that steps 1-6 are all done in the same spawned process. In this (draft) PR, we decouple step 1 from the remaining steps 2-6 to allow them to run in parallel. On a high level, the approach is to run step (1) in a task that writes its results to a blob store and have steps (2-6) run in an independent task that reads from that blob store as input.
An early implementation had the following issues:
a) both new tasks are run directly with send_task
b) the blob store implementations are only local file storage and s3; need to implement Azure, GCP. Also need to check the actual implementation code to make sure it leverages existing code
c) several minor features from the old implementation are missing, most notably the indexing callback. It isn't clear to me how to restructure that callback across multiple new tasks (each task gets its own callback with information synchronized by index attempt and tenant id maybe?)
^^ these are now addressed, remaining issues:
a) some of the code is located in the wrong files, some dead code (_run_indexing in particular since I was using it as a reference)
b) The error raised when pausing a connector mid-run isn't caught correctly, leading to an "error" state when it should be cancelled.
c) need to test with multi-tenant
d) Want to test with a variety of connectors and varying levels of parallelism
e) TODO: tell docfetching to back off if the indexing jobs are piling up. Also will need logic for fully stopping docfetching and marking the attempt as an error if i.e. the index tasks are just never picking anything up
How Has This Been Tested?
N/A but planning to test across the main connectors in the UI and with all blob store solutions
Backporting (check the box to trigger backport action)
Note: You have to check that the action passes, otherwise resolve the conflicts manually and tag the patches.