|
13 | 13 | # Lint as: python3
|
14 | 14 | """To write records into Parquet files."""
|
15 | 15 |
|
16 |
| -import errno |
17 | 16 | import json
|
18 |
| -import os |
19 | 17 | import sys
|
20 |
| -from pathlib import Path |
21 | 18 | from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
22 | 19 |
|
23 | 20 | import fsspec
|
|
43 | 40 | from .keyhash import DuplicatedKeysError, KeyHasher
|
44 | 41 | from .table import array_cast, cast_array_to_feature, embed_table_storage, table_cast
|
45 | 42 | from .utils import logging
|
46 |
| -from .utils import tqdm as hf_tqdm |
47 |
| -from .utils.file_utils import hash_url_to_filename |
48 | 43 | from .utils.py_utils import asdict, first_non_null_value
|
49 | 44 |
|
50 | 45 |
|
@@ -617,130 +612,3 @@ def finalize(self, close_stream=True):
|
617 | 612 |
|
618 | 613 | class ParquetWriter(ArrowWriter):
|
619 | 614 | _WRITER_CLASS = pq.ParquetWriter
|
620 |
| - |
621 |
| - |
622 |
| -class BeamWriter: |
623 |
| - """ |
624 |
| - Shuffles and writes Examples to Arrow files. |
625 |
| - The Arrow files are converted from Parquet files that are the output of Apache Beam pipelines. |
626 |
| - """ |
627 |
| - |
628 |
| - def __init__( |
629 |
| - self, |
630 |
| - features: Optional[Features] = None, |
631 |
| - schema: Optional[pa.Schema] = None, |
632 |
| - path: Optional[str] = None, |
633 |
| - namespace: Optional[str] = None, |
634 |
| - cache_dir: Optional[str] = None, |
635 |
| - ): |
636 |
| - if features is None and schema is None: |
637 |
| - raise ValueError("At least one of features and schema must be provided.") |
638 |
| - if path is None: |
639 |
| - raise ValueError("Path must be provided.") |
640 |
| - |
641 |
| - if features is not None: |
642 |
| - self._features: Features = features |
643 |
| - self._schema: pa.Schema = features.arrow_schema |
644 |
| - else: |
645 |
| - self._schema: pa.Schema = schema |
646 |
| - self._features: Features = Features.from_arrow_schema(schema) |
647 |
| - |
648 |
| - self._path = path |
649 |
| - self._parquet_path = os.path.splitext(path)[0] # remove extension |
650 |
| - self._namespace = namespace or "default" |
651 |
| - self._num_examples = None |
652 |
| - self._cache_dir = cache_dir or config.HF_DATASETS_CACHE |
653 |
| - |
654 |
| - def write_from_pcollection(self, pcoll_examples): |
655 |
| - """Add the final steps of the beam pipeline: write to parquet files.""" |
656 |
| - import apache_beam as beam |
657 |
| - |
658 |
| - def inc_num_examples(example): |
659 |
| - beam.metrics.Metrics.counter(self._namespace, "num_examples").inc() |
660 |
| - |
661 |
| - # count examples |
662 |
| - _ = pcoll_examples | "Count N. Examples" >> beam.Map(inc_num_examples) |
663 |
| - |
664 |
| - # save dataset |
665 |
| - return ( |
666 |
| - pcoll_examples |
667 |
| - | "Get values" >> beam.Values() |
668 |
| - | "Save to parquet" |
669 |
| - >> beam.io.parquetio.WriteToParquet( |
670 |
| - self._parquet_path, self._schema, shard_name_template="-SSSSS-of-NNNNN.parquet" |
671 |
| - ) |
672 |
| - ) |
673 |
| - |
674 |
| - def finalize(self, metrics_query_result: dict): |
675 |
| - """ |
676 |
| - Run after the pipeline has finished. |
677 |
| - It converts the resulting parquet files to arrow and it completes the info from the pipeline metrics. |
678 |
| -
|
679 |
| - Args: |
680 |
| - metrics_query_result: `dict` obtained from pipeline_results.metrics().query(m_filter). Make sure |
681 |
| - that the filter keeps only the metrics for the considered split, under the namespace `split_name`. |
682 |
| - """ |
683 |
| - |
684 |
| - # Beam FileSystems require the system's path separator in the older versions |
685 |
| - fs, parquet_path = url_to_fs(self._parquet_path) |
686 |
| - parquet_path = str(Path(parquet_path)) if not is_remote_filesystem(fs) else fs.unstrip_protocol(parquet_path) |
687 |
| - |
688 |
| - shards = fs.glob(parquet_path + "*.parquet") |
689 |
| - num_bytes = sum(fs.sizes(shards)) |
690 |
| - shard_lengths = get_parquet_lengths(shards) |
691 |
| - |
692 |
| - # Convert to arrow |
693 |
| - if self._path.endswith(".arrow"): |
694 |
| - logger.info(f"Converting parquet files {self._parquet_path} to arrow {self._path}") |
695 |
| - try: # stream conversion |
696 |
| - num_bytes = 0 |
697 |
| - for shard in hf_tqdm(shards, unit="shards"): |
698 |
| - with fs.open(shard, "rb") as source: |
699 |
| - with fs.open(shard.replace(".parquet", ".arrow"), "wb") as destination: |
700 |
| - shard_num_bytes, _ = parquet_to_arrow(source, destination) |
701 |
| - num_bytes += shard_num_bytes |
702 |
| - except OSError as e: # broken pipe can happen if the connection is unstable, do local conversion instead |
703 |
| - if e.errno != errno.EPIPE: # not a broken pipe |
704 |
| - raise |
705 |
| - logger.warning( |
706 |
| - "Broken Pipe during stream conversion from parquet to arrow. Using local convert instead" |
707 |
| - ) |
708 |
| - local_convert_dir = os.path.join(self._cache_dir, "beam_convert") |
709 |
| - os.makedirs(local_convert_dir, exist_ok=True) |
710 |
| - num_bytes = 0 |
711 |
| - for shard in hf_tqdm(shards, unit="shards"): |
712 |
| - local_parquet_path = os.path.join(local_convert_dir, hash_url_to_filename(shard) + ".parquet") |
713 |
| - fs.download(shard, local_parquet_path) |
714 |
| - local_arrow_path = local_parquet_path.replace(".parquet", ".arrow") |
715 |
| - shard_num_bytes, _ = parquet_to_arrow(local_parquet_path, local_arrow_path) |
716 |
| - num_bytes += shard_num_bytes |
717 |
| - remote_arrow_path = shard.replace(".parquet", ".arrow") |
718 |
| - fs.upload(local_arrow_path, remote_arrow_path) |
719 |
| - |
720 |
| - # Save metrics |
721 |
| - counters_dict = {metric.key.metric.name: metric.result for metric in metrics_query_result["counters"]} |
722 |
| - self._num_examples = counters_dict["num_examples"] |
723 |
| - self._num_bytes = num_bytes |
724 |
| - self._shard_lengths = shard_lengths |
725 |
| - return self._num_examples, self._num_bytes |
726 |
| - |
727 |
| - |
728 |
| -def get_parquet_lengths(sources) -> List[int]: |
729 |
| - shard_lengths = [] |
730 |
| - for source in hf_tqdm(sources, unit="parquet files"): |
731 |
| - parquet_file = pa.parquet.ParquetFile(source) |
732 |
| - shard_lengths.append(parquet_file.metadata.num_rows) |
733 |
| - return shard_lengths |
734 |
| - |
735 |
| - |
736 |
| -def parquet_to_arrow(source, destination) -> List[int]: |
737 |
| - """Convert parquet file to arrow file. Inputs can be str paths or file-like objects""" |
738 |
| - stream = None if isinstance(destination, str) else destination |
739 |
| - parquet_file = pa.parquet.ParquetFile(source) |
740 |
| - # Beam can create empty Parquet files, so we need to pass the source Parquet file's schema |
741 |
| - with ArrowWriter(schema=parquet_file.schema_arrow, path=destination, stream=stream) as writer: |
742 |
| - for record_batch in parquet_file.iter_batches(): |
743 |
| - pa_table = pa.Table.from_batches([record_batch]) |
744 |
| - writer.write_table(pa_table) |
745 |
| - num_bytes, num_examples = writer.finalize() |
746 |
| - return num_bytes, num_examples |
0 commit comments