Skip to content

Add huggingface regression test #1274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/stateful_dataloader_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,6 @@ jobs:
- name: Run StatefulDataLoader tests with pytest - state_dict 3
if: ${{ ! contains(github.event.pull_request.labels.*.name, 'ciflow/slow') }}
run: pytest --durations=0 --no-header -v test/stateful_dataloader/test_state_dict.py -k _shard3
- name: Run StatefulDataLoader HuggingFace tests
if: ${{ ! contains(github.event.pull_request.labels.*.name, 'ciflow/slow') }}
run: pytest --durations=0 --no-header -v test/stateful_dataloader/test_hugging_face.py
2 changes: 1 addition & 1 deletion test/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ portalocker >= 2.0.0
# Protobuf 3.20.2 is also broken on MacOS Python 3.10
# See: https://github.com/protocolbuffers/protobuf/issues/10571
protobuf >= 3.9.2, < 3.20
datasets
datasets @ git+https://github.com/huggingface/datasets@main
graphviz
adlfs
awscli>=1.27.66
Expand Down
107 changes: 107 additions & 0 deletions test/stateful_dataloader/test_hugging_face.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import itertools

from datasets.info import DatasetInfo
from datasets.iterable_dataset import ExamplesIterable, IterableDataset
from torch.testing._internal.common_utils import IS_MACOS, TestCase
from torchdata.stateful_dataloader import StatefulDataLoader


DEFAULT_N_EXAMPLES = 20
DEFAULT_FILEPATH = "file.txt"


def generate_examples_fn(**kwargs):
kwargs = kwargs.copy()
n = kwargs.pop("n", DEFAULT_N_EXAMPLES)
filepaths = kwargs.pop("filepaths", None)
for filepath in filepaths or [DEFAULT_FILEPATH]:
if filepaths is not None:
kwargs["filepath"] = filepath
for i in range(n):
yield f"{filepath}_{i}", {"id": i, **kwargs}


def identity(x):
return x


class TestStatefulDataLoaderIterable_shard0(TestCase):
def _get_dataset(self):
ex_iterable = ExamplesIterable(generate_examples_fn, {})
return IterableDataset(ex_iterable, info=DatasetInfo(description="dummy"), split="train")

def _run_and_checkpoint(self, num_workers, batch_size, pw, interrupt, every_n_steps=1):
dataset = self._get_dataset()
dl = StatefulDataLoader(
dataset=dataset,
num_workers=num_workers,
collate_fn=identity,
snapshot_every_n_steps=every_n_steps,
persistent_workers=pw,
multiprocessing_context="forkserver" if IS_MACOS and num_workers else None,
)
it = iter(dl)
for _ in range(interrupt):
next(it)

state_dict = dl.state_dict()
exp = []
for data in it:
exp.append(data)

# Restore new instance from state
batches = []
dl = StatefulDataLoader(
dataset=dataset,
num_workers=num_workers,
collate_fn=identity,
snapshot_every_n_steps=every_n_steps,
persistent_workers=pw,
multiprocessing_context="forkserver" if IS_MACOS and num_workers else None,
)
dl.load_state_dict(state_dict)
for batch in iter(dl):
batches.append(batch)

self.assertEqual(exp, batches)

def test_no_mp(self):
for batch_size, interrupt in itertools.product([None, 7], [0, 1, 10]):
with self.subTest(batch_size=batch_size, interrupt=interrupt):
self._run_and_checkpoint(
num_workers=0,
batch_size=batch_size,
pw=False,
interrupt=interrupt,
)

def test_mp_x(self):
for batch_size, interrupt in itertools.product([None, 7], [0, 1, 10]):
with self.subTest(batch_size=batch_size, interrupt=interrupt):
self._run_and_checkpoint(
num_workers=3,
batch_size=batch_size,
pw=False,
interrupt=interrupt,
)

def test_mp_pw(self):
for batch_size, interrupt in itertools.product([None, 7], [0, 1, 10]):
with self.subTest(batch_size=batch_size, interrupt=interrupt):
self._run_and_checkpoint(
num_workers=3,
batch_size=batch_size,
pw=True,
interrupt=interrupt,
)

def test_mp_every_n_steps(self):
batch_size = 7
for every_n_steps, interrupt in itertools.product([2, 5], [0, 1, 10]):
with self.subTest(every_n_steps=every_n_steps, batch_size=batch_size, interrupt=interrupt):
self._run_and_checkpoint(
num_workers=3,
batch_size=batch_size,
pw=True,
interrupt=interrupt,
)
Loading