Skip to content

Commit 78198c7

Browse files
feat: DIA-2148: write exports using thread pool (#7342)
Co-authored-by: robot-ci-heartex <[email protected]>
1 parent b516272 commit 78198c7

File tree

1 file changed

+35
-7
lines changed

1 file changed

+35
-7
lines changed

label_studio/io_storages/base_models.py

+35-7
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
22
"""
33
import base64
4+
import concurrent.futures
5+
import itertools
46
import json
57
import logging
8+
import os
69
import traceback as tb
10+
from concurrent.futures import ThreadPoolExecutor
711
from datetime import datetime
812
from typing import Union
913
from urllib.parse import urljoin
@@ -526,10 +530,23 @@ def storage_background_failure(*args, **kwargs):
526530
storage.info_set_failed()
527531

528532

533+
# note: this is available in python 3.12 , #TODO to switch to builtin function when we move to it.
534+
def _batched(iterable, n):
535+
# batched('ABCDEFG', 3) --> ABC DEF G
536+
if n < 1:
537+
raise ValueError('n must be at least one')
538+
it = iter(iterable)
539+
while batch := tuple(itertools.islice(it, n)):
540+
yield batch
541+
542+
529543
class ExportStorage(Storage, ProjectStorageMixin):
530544
can_delete_objects = models.BooleanField(
531545
_('can_delete_objects'), null=True, blank=True, help_text='Deletion from storage enabled'
532546
)
547+
# Use 8 threads, unless we know we only have a single core
548+
# TODO from testing, more than 8 seems to cause problems. revisit to add more parallelism.
549+
max_workers = min(8, (os.cpu_count() or 2) * 4)
533550

534551
def _get_serialized_data(self, annotation):
535552
user = self.project.organization.created_by
@@ -557,13 +574,24 @@ def save_annotations(self, annotations: models.QuerySet[Annotation]):
557574
self.info_set_in_progress()
558575
self.cached_user = self.project.organization.created_by
559576

560-
for annotation in annotations.iterator(chunk_size=settings.STORAGE_EXPORT_CHUNK_SIZE):
561-
annotation.cached_user = self.cached_user
562-
self.save_annotation(annotation)
563-
564-
# update progress counters
565-
annotation_exported += 1
566-
self.info_update_progress(last_sync_count=annotation_exported, total_annotations=total_annotations)
577+
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
578+
# Batch annotations so that we update progress before having to submit every future.
579+
# Updating progress in thread requires coordinating on count and db writes, so just
580+
# batching to keep it simpler.
581+
for annotation_batch in _batched(
582+
Annotation.objects.filter(project=self.project).iterator(
583+
chunk_size=settings.STORAGE_EXPORT_CHUNK_SIZE
584+
),
585+
settings.STORAGE_EXPORT_CHUNK_SIZE,
586+
):
587+
futures = []
588+
for annotation in annotation_batch:
589+
annotation.cached_user = self.cached_user
590+
futures.append(executor.submit(self.save_annotation, annotation))
591+
592+
for future in concurrent.futures.as_completed(futures):
593+
annotation_exported += 1
594+
self.info_update_progress(last_sync_count=annotation_exported, total_annotations=total_annotations)
567595

568596
self.info_set_completed(last_sync_count=annotation_exported, total_annotations=total_annotations)
569597

0 commit comments

Comments
 (0)