Skip to content

draft of decoupling #4893

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open

draft of decoupling #4893

wants to merge 19 commits into from

Conversation

evan-onyx
Copy link
Contributor

@evan-onyx evan-onyx commented Jun 13, 2025

Description

Addresses https://linear.app/danswer/issue/DAN-2151/decoupled-indexing-pipeline
Note: the following summary describes a single connector indexing, ignoring the mechanisms we use for preventing race conditions, overlapping connector runs, etc

Pre-this-PR indexing pipeline:
0. "check for indexing" task runs, which calls "try_creating_indexing_task" which spawns a new process
for a new scheduled index attempt. That new process (which runs the connector_indexing_task function) does the following:

  1. determines parameters of the indexing attempt (which connector indexing function to run, start and end time, from prev checkpoint or not), then run that connector. Specifically, connectors are responsible for reading data from an outside source and converting it to Onyx documents. At the moment these two steps (reading external data and converting to an Onyx document) are not parallelized in most connectors; that's a subject for future work
  2. upserts documents to postgres (index_doc_batch_prepare)
  3. chunks each document (optionally adds context for contextual rag)
  4. embeds chunks (embed_chunks_with_failure_handling) via a call to the model server
  5. write chunks to vespa (write_chunks_to_vector_db_with_backoff)
  6. update document and indexing metadata in postgres

Note that steps 1-6 are all done in the same spawned process. In this (draft) PR, we decouple step 1 from the remaining steps 2-6 to allow them to run in parallel. On a high level, the approach is to run step (1) in a task that writes its results to a blob store and have steps (2-6) run in an independent task that reads from that blob store as input.

An early implementation had the following issues:
a) both new tasks are run directly with send_task
b) the blob store implementations are only local file storage and s3; need to implement Azure, GCP. Also need to check the actual implementation code to make sure it leverages existing code
c) several minor features from the old implementation are missing, most notably the indexing callback. It isn't clear to me how to restructure that callback across multiple new tasks (each task gets its own callback with information synchronized by index attempt and tenant id maybe?)

^^ these are now addressed, remaining issues:
a) some of the code is located in the wrong files, some dead code (_run_indexing in particular since I was using it as a reference)
b) The error raised when pausing a connector mid-run isn't caught correctly, leading to an "error" state when it should be cancelled.
c) need to test with multi-tenant
d) Want to test with a variety of connectors and varying levels of parallelism
e) TODO: tell docfetching to back off if the indexing jobs are piling up. Also will need logic for fully stopping docfetching and marking the attempt as an error if i.e. the index tasks are just never picking anything up

How Has This Been Tested?

N/A but planning to test across the main connectors in the UI and with all blob store solutions

Backporting (check the box to trigger backport action)

Note: You have to check that the action passes, otherwise resolve the conflicts manually and tag the patches.

  • This PR should be backported (make sure to check that the backport attempt succeeds)
  • [Optional] Override Linear Check

Copy link

vercel bot commented Jun 13, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
internal-search ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jul 12, 2025 2:57am

@@ -393,7 +399,9 @@ def _run_indexing(
index_attempt: IndexAttempt | None = None
try:
with get_session_with_current_tenant() as db_session_temp:
index_attempt = get_index_attempt(db_session_temp, index_attempt_id)
index_attempt = get_index_attempt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this eager load stuff seems to be exposing some DB behavior ... maybe a bit problematic

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

Major architectural change to decouple document fetching from processing in the indexing pipeline, allowing parallel execution through a new blob store intermediary layer.

  • Split monolithic indexing process into docfetching_task (fetches source documents) and document_indexing_pipeline_task (handles processing/embedding/storage), coordinated via Redis locks
  • Added new DocumentBatchStorage system using FileStore abstraction to manage intermediate document storage between fetching and processing stages
  • Implemented robust error handling and state management for the decoupled tasks, including proper cleanup of temporary storage
  • Updated run_indexing.py to support the new pipeline architecture with separate DocExtractionContext and DocIndexingContext models
  • Simplified session handling in S3BackedFileStore by moving db_session from constructor to method parameters for better dependency injection

35 files reviewed, 9 comments
Edit PR Review Bot Settings | Greptile

Comment on lines 20 to 21
# Default parameters for creation
DEFAULT_KWARGS = {
"http2": True,
"limits": lambda: httpx.Limits(),
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Remove empty comment and blank line as they no longer serve a purpose since DEFAULT_KWARGS was moved

@@ -66,6 +66,7 @@
POSTGRES_CELERY_WORKER_LIGHT_APP_NAME = "celery_worker_light"
POSTGRES_CELERY_WORKER_HEAVY_APP_NAME = "celery_worker_heavy"
POSTGRES_CELERY_WORKER_INDEXING_APP_NAME = "celery_worker_indexing"
POSTGRES_CELERY_WORKER_docfetching_APP_NAME = "celery_worker_docfetching"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Use consistent casing ('docfetching' should be 'DOCFETCHING')

Suggested change
POSTGRES_CELERY_WORKER_docfetching_APP_NAME = "celery_worker_docfetching"
POSTGRES_CELERY_WORKER_DOCFETCHING_APP_NAME = "celery_worker_docfetching"

Comment on lines +389 to +392
def parallel_yield_from_funcs(
funcs: list[Callable[..., R]],
max_workers: int = 10,
) -> Iterator[R]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider allowing args and kwargs parameters to support functions that take arguments. Currently only supports nullary functions.

Suggested change
def parallel_yield_from_funcs(
funcs: list[Callable[..., R]],
max_workers: int = 10,
) -> Iterator[R]:
def parallel_yield_from_funcs(
funcs: list[Callable[[], R]],
max_workers: int = 10,
) -> Iterator[R]:

Comment on lines 100 to 101
"onyx.background.celery.tasks.docfetching",
"onyx.background.celery.tasks.indexing", # TODO: remove this and move the task to docfetching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The indexing tasks dependency creates coupling that contradicts the PR's decoupling goal. Need to complete the TODO task migration before merging

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +1082 to +1119
# TODO: change to doc extraction if it doesnt break things
callback.progress("_run_indexing", 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Callback still references '_run_indexing' but should be updated to 'doc_extraction' for clarity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should docfetching specific stuff be moved into a docfetching folder?

if batches_processed > last_batches_completed:
last_batches_completed = batches_processed
last_progress_time = time.monotonic()
elif time.monotonic() - last_progress_time > 3600 * 6:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: 6 hour timeout hardcoded - consider making this configurable via constant or config

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +155 to +139
if not self.file_store.has_file(
file_id=file_name,
file_origin=FileOrigin.OTHER,
file_type="application/json",
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: file_type should be constant, define FILE_TYPE = 'application/json' at class level

Comment on lines 278 to 279
except Exception as e:
logger.warning(f"Failed to delete extraction state: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: error message indicates 'extraction state' but this is in a loop over all state types

Suggested change
except Exception as e:
logger.warning(f"Failed to delete extraction state: {e}")
except Exception as e:
logger.warning(f"Failed to delete {state_type.value} state: {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer logger.exception here (prints out the stack trace nicely)

Comment on lines +397 to 407
raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The bare except block here could mask important errors. Consider handling specific exceptions (like ClientError for S3 operations and SQLAlchemyError for database operations) separately.

Copy link
Contributor

@Weves Weves left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss how we want to do the monitoring, but overall looks good 🧐

Comment on lines 100 to 101
"onyx.background.celery.tasks.docfetching",
"onyx.background.celery.tasks.indexing", # TODO: remove this and move the task to docfetching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -538,6 +538,27 @@ slackbot:
limits:
cpu: "1000m"
memory: "2000Mi"
celery_worker_docfetching:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing actual worker for this

Comment on lines 278 to 279
except Exception as e:
logger.warning(f"Failed to delete extraction state: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer logger.exception here (prints out the stack trace nicely)


# Get batch storage (transition to IN_PROGRESS is handled by run_indexing_entrypoint)
with get_session_with_current_tenant() as db_session:
batch_storage = get_document_batch_storage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems slightly problematic (it's also in a few other places). The db session will no longer be valid outside of the with statement

Comment on lines +1082 to +1119
# TODO: change to doc extraction if it doesnt break things
callback.progress("_run_indexing", 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

logger.error(f"Failed to store {state_type} state: {e}")
raise

def _get_state(self, state_type: str) -> DocumentStorageState | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should state_type be an Enum type?

)
# Get the document batch storage
with get_session_with_current_tenant() as db_session:
storage = get_document_batch_storage(tenant_id, index_attempt_id, db_session)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about db_session

if batches_processed > last_batches_completed:
last_batches_completed = batches_processed
last_progress_time = time.monotonic()
elif time.monotonic() - last_progress_time > 3600 * 6:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

)

with get_session_with_current_tenant() as db_session:
storage = get_document_batch_storage(tenant_id, index_attempt_id, db_session)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about db_session

tenant_id: str,
) -> int | None:
"""
TODO: update docstring to reflect docfetching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this TODO now 🥺

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants