From a408edbd34eecc1cc9911a15d2ea0fbd3f5ff7ca Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 18 Feb 2026 08:28:00 -0800 Subject: [PATCH 1/7] Initial nemotron-cc data curation scripts Signed-off-by: Ayush Dattagupta --- .../data_curation/nemotron-cc/README.md | 21 ++ .../nemotron-cc/step_1-download_extract.py | 327 ++++++++++++++++++ 2 files changed, 348 insertions(+) create mode 100644 src/nemotron/recipes/data_curation/nemotron-cc/README.md create mode 100644 src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/README.md b/src/nemotron/recipes/data_curation/nemotron-cc/README.md new file mode 100644 index 0000000..794d034 --- /dev/null +++ b/src/nemotron/recipes/data_curation/nemotron-cc/README.md @@ -0,0 +1,21 @@ +## Nemotron-CC Data Curation + +This script includes the recipe for Curating datasets similar to the [nemotron-cc datasets](https://huggingface.co/datasets/nvidia/Nemotron-CC-v2). + +The Nemotron-cc pipeline can be roughly split up into the following stages: + +1. Download, Extract & Clean + - This is a CPU only pipeline consisting of the following stages: + - Download common crawl snapshots from the web & extract text from the html webpages. + - Use a fasttext based language identification model to annotate each document with a language. + - Fix mojibake (encoding issues) for utf-8. + - We recommend a CPU node where each worker can get at-least 3.5GB of RAM to prevent OOM errors. + +2. Deduplication + a. Exact Deduplication + b. Fuzzy Deduplication + c. Substring Deduplication + +3. Annotation and Quality Filtering + +4. Synthetic Data generation diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py b/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py new file mode 100644 index 0000000..cb482c3 --- /dev/null +++ b/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py @@ -0,0 +1,327 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Download, extract, and preprocess Common Crawl data for the Nemotron-CC pipeline. + +See README.md in this directory for detailed usage instructions. +""" + +import argparse +import ast +import json +import os +import pickle +import time +import urllib.request +from pathlib import Path + +from fsspec.core import url_to_fs +from loguru import logger + +from nemo_curator.backends.experimental.ray_data import RayDataExecutor +from nemo_curator.backends.xenna import XennaExecutor +from nemo_curator.core.client import RayClient +from nemo_curator.pipeline import Pipeline +from nemo_curator.stages.base import ProcessingStage +from nemo_curator.stages.text.download import CommonCrawlDownloadExtractStage +from nemo_curator.stages.text.filters import FastTextLangId +from nemo_curator.stages.text.modifiers import UnicodeReformatter +from nemo_curator.stages.text.modules import Modify, ScoreFilter +from nemo_curator.tasks import DocumentBatch, FileGroupTask +from nemo_curator.tasks.utils import TaskPerfUtils +from nemo_curator.utils.client_utils import is_remote_url +from nemo_curator.stages.resources import Resources + +FASTTEXT_MODEL_URL = "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin" +FASTTEXT_MODEL_FILENAME = "lid.176.bin" + + +class LanguageFilter(ProcessingStage[DocumentBatch, DocumentBatch]): + """Extract language codes from FastTextLangId scores, optionally filtering to specific languages. + + FastTextLangId produces scores in the format "[0.95, 'EN']" (stringified list). + This stage parses that field and replaces it with just the language code. + If target_languages is provided, only documents matching those languages are kept. + """ + + def __init__(self, target_languages: list[str] | None = None, language_field: str = "language") -> None: + self.target_languages = {lang.upper() for lang in target_languages} if target_languages else None + self.language_field = language_field + self.name = "language_filter" + + def process(self, task: DocumentBatch) -> DocumentBatch | None: + df = task.to_pandas() + # Parse "[0.95, 'EN']" -> 'EN' + df[self.language_field] = df[self.language_field].apply(lambda v: ast.literal_eval(v)[1]) + if self.target_languages: + df = df[df[self.language_field].isin(self.target_languages)] + if len(df) == 0: + return None + task.data = df + return task + + +class LanguagePartitionWriter(ProcessingStage[DocumentBatch, FileGroupTask]): + """Write documents to language-partitioned subdirectories. + + Groups by (language, file_name) to preserve WARC provenance in output filenames. + Supports both local and cloud paths (s3://, gs://, etc.) via fsspec. + + Output structure: + output_dir/EN/crawl-data-CC-MAIN-2024-51-...-00002.warc.jsonl + output_dir/DE/crawl-data-CC-MAIN-2024-51-...-00002.warc.jsonl + ... + """ + + def __init__(self, output_dir: str, storage_options: dict | None = None) -> None: + self.output_dir = output_dir + self.storage_options = storage_options or {} + self.fs, self._fs_path = url_to_fs(self.output_dir, **self.storage_options) + self._is_remote = is_remote_url(self.output_dir) + self.name = "language_partition_writer" + + def process(self, task: DocumentBatch) -> FileGroupTask: + df = task.to_pandas() + files = [] + for (language, file_name), group in df.groupby(["language", "file_name"]): + lang_dir = self.fs.sep.join([self._fs_path, language]) + self.fs.mkdirs(lang_dir, exist_ok=True) + stem, _ = os.path.splitext(file_name) + file_path = self.fs.sep.join([lang_dir, f"{stem}.jsonl"]) + + file_path_with_protocol = self.fs.unstrip_protocol(file_path) if self._is_remote else file_path + group.to_json(file_path_with_protocol, lines=True, orient="records", storage_options=self.storage_options, force_ascii=False) + files.append(file_path_with_protocol) + + return FileGroupTask( + task_id=task.task_id, + dataset_name=task.dataset_name, + data=files, + _metadata={**task._metadata, "format": "jsonl"}, + _stage_perf=task._stage_perf, + ) + + +def download_fasttext_model(model_dir: str) -> str: + """Download the FastText language identification model if not already present. + + Args: + model_dir: Directory that should contain the FastText model file. + + Returns: + The full path to the model file. + """ + model_path = os.path.join(model_dir, FASTTEXT_MODEL_FILENAME) + + if os.path.exists(model_path): + logger.info(f"FastText model already exists at {model_path}") + return model_path + + os.makedirs(model_dir, exist_ok=True) + logger.info(f"Downloading FastText language ID model to {model_path}") + urllib.request.urlretrieve(FASTTEXT_MODEL_URL, model_path) # noqa: S310 + logger.info("Download complete") + return model_path + + +def create_pipeline(args: argparse.Namespace) -> Pipeline: + """Build the download-extract-preprocess pipeline.""" + output_dir = args.output_dir + download_dir = str(Path(args.download_dir).resolve()) + + fasttext_model_path = download_fasttext_model(args.lang_id_model_path) + + storage_options = json.loads(args.storage_options) if args.storage_options else {} + + stages = [ + # JusText is the Nemotron-CC choice for html extraction + CommonCrawlDownloadExtractStage( + start_snapshot=args.start_snapshot, + end_snapshot=args.end_snapshot, + download_dir=download_dir, + crawl_type="main", + html_extraction="justext", + url_limit=args.url_limit, + record_limit=args.record_limit, + ).with_({"iterate_extract_commoncrawlwarciterator_commoncrawlhtmlextractor":{"resources":Resources(cpus=2.0)}}), + # 2. Language identification using FastText lid.176.bin (threshold 0.3 per paper). + ScoreFilter( + FastTextLangId( + model_path=fasttext_model_path, + min_langid_score=0.3, + ), + score_field="language", + ), + # 3. Extract language code, optionally filter to requested languages. + LanguageFilter( + target_languages=args.languages, + language_field="language", + ), + # 4. Fix unicode issues on all documents. + Modify(UnicodeReformatter()), + # 5. Write output partitioned by language (e.g., output_dir/EN/, output_dir/DE/). + LanguagePartitionWriter(output_dir, storage_options=storage_options), + ] + + return Pipeline( + name="nemotron-cc-download-extract", + description="Download, extract, and preprocess Common Crawl data with language ID and unicode fixing.", + stages=stages, + ) + + +def main(args: argparse.Namespace) -> None: + storage_options = json.loads(args.storage_options) if args.storage_options else {} + fs, fs_path = url_to_fs(args.output_dir, **storage_options) + fs.mkdirs(fs_path, exist_ok=True) + os.makedirs(args.download_dir, exist_ok=True) + + ray_client = RayClient(num_cpus=args.num_cpus) + ray_client.start() + + logger.info("Starting Nemotron-CC download and preprocessing pipeline") + logger.info(f" Snapshots: {args.start_snapshot} to {args.end_snapshot}") + logger.info(f" Languages: {args.languages or 'all'}") + logger.info(f" Download dir: {args.download_dir}") + logger.info(f" Output dir: {args.output_dir}") + if args.url_limit is not None: + logger.info(f" URL limit: {args.url_limit}") + if args.record_limit is not None: + logger.info(f" Record limit: {args.record_limit}") + + pipeline = create_pipeline(args) + logger.info(f"\n{pipeline.describe()}") + + executor = RayDataExecutor() if args.executor == "ray-data" else XennaExecutor() + logger.info(f"Using executor: {args.executor}") + + start_time = time.perf_counter() + results = pipeline.run(executor=executor) + elapsed = time.perf_counter() - start_time + + total_documents = sum(task.num_items for task in results) if results else 0 + logger.info(f"Pipeline completed in {elapsed:.1f}s") + logger.info(f"Total output files: {total_documents}") + + # Dump result tasks (with _stage_perf timing stats) for later analysis + results_file = fs.sep.join([fs_path, "results.pkl"]) + with fs.open(results_file, "wb") as f: + pickle.dump(results, f) + results_file_display = fs.unstrip_protocol(results_file) if is_remote_url(args.output_dir) else results_file + logger.info(f"Task results saved to {results_file_display}") + + # Aggregate and save per-stage metrics (mean/std/sum for each metric) + metrics = TaskPerfUtils.aggregate_task_metrics(results) + metrics_file = fs.sep.join([fs_path, "metrics.json"]) + with fs.open(metrics_file, "w") as f: + json.dump(metrics, f, indent=2) + metrics_file_display = fs.unstrip_protocol(metrics_file) if is_remote_url(args.output_dir) else metrics_file + logger.info(f"Aggregated metrics saved to {metrics_file_display}") + + ray_client.stop() + + +def attach_args() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Download, extract, and preprocess Common Crawl data for Nemotron-CC.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + # Snapshot range + parser.add_argument( + "--start-snapshot", + type=str, + required=True, + help="Start Common Crawl snapshot (e.g., '2024-46').", + ) + parser.add_argument( + "--end-snapshot", + type=str, + required=True, + help="End Common Crawl snapshot (e.g., '2024-51').", + ) + + # Paths + parser.add_argument( + "--output-dir", + type=str, + default="./data/preprocessed", + help="Directory to write the preprocessed output, partitioned by language.", + ) + parser.add_argument( + "--download-dir", + type=str, + default="./data/cc_downloads", + help="Directory for intermediate Common Crawl WARC downloads.", + ) + + # Common Crawl options + parser.add_argument( + "--url-limit", + type=int, + default=None, + help="Limit number of WARC files to download per snapshot (useful for testing).", + ) + parser.add_argument( + "--record-limit", + type=int, + default=None, + help="Limit number of records to extract per WARC file (useful for testing).", + ) + + # Language filtering + parser.add_argument( + "--languages", + nargs="+", + type=str, + default=None, + help="Language codes to keep (e.g., EN DE FR). If omitted, all languages are written.", + ) + parser.add_argument( + "--lang-id-model-path", + type=str, + required=True, + help="Directory for the FastText lid.176.bin model. Downloads automatically if not present in the directory.", + ) + + # Cloud storage + parser.add_argument( + "--storage-options", + type=str, + default=None, + help='JSON string of fsspec storage options for cloud output paths (e.g., \'{"key": "...", "secret": "..."}\').', + ) + + # Executor + parser.add_argument( + "--executor", + type=str, + choices=["xenna", "ray-data"], + default="ray-data", + help="Pipeline executor backend.", + ) + + # Local Ray cluster + parser.add_argument( + "--num-cpus", + type=int, + default=None, + help="Number of CPUs for the Ray cluster (default: all available).", + ) + + return parser + + +if __name__ == "__main__": + main(attach_args().parse_args()) From 59e09f37003202f3d87d3d7d3f5207e399e897ce Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Mon, 2 Mar 2026 10:51:43 -0800 Subject: [PATCH 2/7] Updates based on max_call fix & curator API changes Signed-off-by: Ayush Dattagupta --- .../nemotron-cc/step_1-download_extract.py | 77 ++++--------------- 1 file changed, 14 insertions(+), 63 deletions(-) diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py b/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py index cb482c3..7177f26 100644 --- a/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py +++ b/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py @@ -30,18 +30,18 @@ from loguru import logger from nemo_curator.backends.experimental.ray_data import RayDataExecutor -from nemo_curator.backends.xenna import XennaExecutor from nemo_curator.core.client import RayClient from nemo_curator.pipeline import Pipeline from nemo_curator.stages.base import ProcessingStage from nemo_curator.stages.text.download import CommonCrawlDownloadExtractStage -from nemo_curator.stages.text.filters import FastTextLangId -from nemo_curator.stages.text.modifiers import UnicodeReformatter -from nemo_curator.stages.text.modules import Modify, ScoreFilter -from nemo_curator.tasks import DocumentBatch, FileGroupTask +from nemo_curator.stages.text.filters import ScoreFilter +from nemo_curator.stages.text.filters.fasttext import FastTextLangId +from nemo_curator.stages.text.modifiers.unicode import UnicodeReformatter +from nemo_curator.stages.text.modifiers import Modify +from nemo_curator.tasks import DocumentBatch from nemo_curator.tasks.utils import TaskPerfUtils from nemo_curator.utils.client_utils import is_remote_url -from nemo_curator.stages.resources import Resources +from nemo_curator.stages.text.io.writer import ParquetWriter FASTTEXT_MODEL_URL = "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin" FASTTEXT_MODEL_FILENAME = "lid.176.bin" @@ -72,47 +72,6 @@ def process(self, task: DocumentBatch) -> DocumentBatch | None: return task -class LanguagePartitionWriter(ProcessingStage[DocumentBatch, FileGroupTask]): - """Write documents to language-partitioned subdirectories. - - Groups by (language, file_name) to preserve WARC provenance in output filenames. - Supports both local and cloud paths (s3://, gs://, etc.) via fsspec. - - Output structure: - output_dir/EN/crawl-data-CC-MAIN-2024-51-...-00002.warc.jsonl - output_dir/DE/crawl-data-CC-MAIN-2024-51-...-00002.warc.jsonl - ... - """ - - def __init__(self, output_dir: str, storage_options: dict | None = None) -> None: - self.output_dir = output_dir - self.storage_options = storage_options or {} - self.fs, self._fs_path = url_to_fs(self.output_dir, **self.storage_options) - self._is_remote = is_remote_url(self.output_dir) - self.name = "language_partition_writer" - - def process(self, task: DocumentBatch) -> FileGroupTask: - df = task.to_pandas() - files = [] - for (language, file_name), group in df.groupby(["language", "file_name"]): - lang_dir = self.fs.sep.join([self._fs_path, language]) - self.fs.mkdirs(lang_dir, exist_ok=True) - stem, _ = os.path.splitext(file_name) - file_path = self.fs.sep.join([lang_dir, f"{stem}.jsonl"]) - - file_path_with_protocol = self.fs.unstrip_protocol(file_path) if self._is_remote else file_path - group.to_json(file_path_with_protocol, lines=True, orient="records", storage_options=self.storage_options, force_ascii=False) - files.append(file_path_with_protocol) - - return FileGroupTask( - task_id=task.task_id, - dataset_name=task.dataset_name, - data=files, - _metadata={**task._metadata, "format": "jsonl"}, - _stage_perf=task._stage_perf, - ) - - def download_fasttext_model(model_dir: str) -> str: """Download the FastText language identification model if not already present. @@ -140,12 +99,14 @@ def create_pipeline(args: argparse.Namespace) -> Pipeline: output_dir = args.output_dir download_dir = str(Path(args.download_dir).resolve()) + # Ensure FastText model is available locally (downloads if missing) fasttext_model_path = download_fasttext_model(args.lang_id_model_path) storage_options = json.loads(args.storage_options) if args.storage_options else {} stages = [ - # JusText is the Nemotron-CC choice for html extraction + # 1. Download and extract Common Crawl data using JusText. + # The JusText extractor was chosen for the Nemotron-CC pipeline. CommonCrawlDownloadExtractStage( start_snapshot=args.start_snapshot, end_snapshot=args.end_snapshot, @@ -154,7 +115,7 @@ def create_pipeline(args: argparse.Namespace) -> Pipeline: html_extraction="justext", url_limit=args.url_limit, record_limit=args.record_limit, - ).with_({"iterate_extract_commoncrawlwarciterator_commoncrawlhtmlextractor":{"resources":Resources(cpus=2.0)}}), + ), # 2. Language identification using FastText lid.176.bin (threshold 0.3 per paper). ScoreFilter( FastTextLangId( @@ -170,8 +131,8 @@ def create_pipeline(args: argparse.Namespace) -> Pipeline: ), # 4. Fix unicode issues on all documents. Modify(UnicodeReformatter()), - # 5. Write output partitioned by language (e.g., output_dir/EN/, output_dir/DE/). - LanguagePartitionWriter(output_dir, storage_options=storage_options), + # 5. Write output + ParquetWriter(output_dir, write_kwargs={"storage_options": storage_options}), ] return Pipeline( @@ -203,8 +164,7 @@ def main(args: argparse.Namespace) -> None: pipeline = create_pipeline(args) logger.info(f"\n{pipeline.describe()}") - executor = RayDataExecutor() if args.executor == "ray-data" else XennaExecutor() - logger.info(f"Using executor: {args.executor}") + executor = RayDataExecutor() start_time = time.perf_counter() results = pipeline.run(executor=executor) @@ -303,16 +263,7 @@ def attach_args() -> argparse.ArgumentParser: help='JSON string of fsspec storage options for cloud output paths (e.g., \'{"key": "...", "secret": "..."}\').', ) - # Executor - parser.add_argument( - "--executor", - type=str, - choices=["xenna", "ray-data"], - default="ray-data", - help="Pipeline executor backend.", - ) - - # Local Ray cluster + # Ray cluster parser.add_argument( "--num-cpus", type=int, From 152aa7803af917aac491eb98441e6a9905a2dfb6 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Mon, 2 Mar 2026 10:58:29 -0800 Subject: [PATCH 3/7] update quality filtering to say filtering since different filtering techniques are applied Signed-off-by: Ayush Dattagupta --- src/nemotron/recipes/data_curation/nemotron-cc/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/README.md b/src/nemotron/recipes/data_curation/nemotron-cc/README.md index 794d034..dfb1bc5 100644 --- a/src/nemotron/recipes/data_curation/nemotron-cc/README.md +++ b/src/nemotron/recipes/data_curation/nemotron-cc/README.md @@ -16,6 +16,6 @@ The Nemotron-cc pipeline can be roughly split up into the following stages: b. Fuzzy Deduplication c. Substring Deduplication -3. Annotation and Quality Filtering +3. Annotation and Filtering 4. Synthetic Data generation From 3d39c43c3b9c9aebf84fd10e64c4447557ba8c2b Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 4 Mar 2026 14:58:51 -0800 Subject: [PATCH 4/7] minor updates to download_extract paths Signed-off-by: Ayush Dattagupta --- .../nemotron-cc/step_1-download_extract.py | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py b/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py index 7177f26..a345e1b 100644 --- a/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py +++ b/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py @@ -40,7 +40,6 @@ from nemo_curator.stages.text.modifiers import Modify from nemo_curator.tasks import DocumentBatch from nemo_curator.tasks.utils import TaskPerfUtils -from nemo_curator.utils.client_utils import is_remote_url from nemo_curator.stages.text.io.writer import ParquetWriter FASTTEXT_MODEL_URL = "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin" @@ -97,10 +96,12 @@ def download_fasttext_model(model_dir: str) -> str: def create_pipeline(args: argparse.Namespace) -> Pipeline: """Build the download-extract-preprocess pipeline.""" output_dir = args.output_dir - download_dir = str(Path(args.download_dir).resolve()) + cache_dir = str(Path(args.cache_dir).resolve()) + download_dir = os.path.join(cache_dir, "cc_downloads") + model_dir = os.path.join(cache_dir, "model") # Ensure FastText model is available locally (downloads if missing) - fasttext_model_path = download_fasttext_model(args.lang_id_model_path) + fasttext_model_path = download_fasttext_model(model_dir) storage_options = json.loads(args.storage_options) if args.storage_options else {} @@ -146,7 +147,8 @@ def main(args: argparse.Namespace) -> None: storage_options = json.loads(args.storage_options) if args.storage_options else {} fs, fs_path = url_to_fs(args.output_dir, **storage_options) fs.mkdirs(fs_path, exist_ok=True) - os.makedirs(args.download_dir, exist_ok=True) + cache_dir = str(Path(args.cache_dir).resolve()) + os.makedirs(cache_dir, exist_ok=True) ray_client = RayClient(num_cpus=args.num_cpus) ray_client.start() @@ -154,7 +156,7 @@ def main(args: argparse.Namespace) -> None: logger.info("Starting Nemotron-CC download and preprocessing pipeline") logger.info(f" Snapshots: {args.start_snapshot} to {args.end_snapshot}") logger.info(f" Languages: {args.languages or 'all'}") - logger.info(f" Download dir: {args.download_dir}") + logger.info(f" Cache dir: {cache_dir}") logger.info(f" Output dir: {args.output_dir}") if args.url_limit is not None: logger.info(f" URL limit: {args.url_limit}") @@ -175,19 +177,17 @@ def main(args: argparse.Namespace) -> None: logger.info(f"Total output files: {total_documents}") # Dump result tasks (with _stage_perf timing stats) for later analysis - results_file = fs.sep.join([fs_path, "results.pkl"]) - with fs.open(results_file, "wb") as f: + results_file = os.path.join(cache_dir, "results.pkl") + with open(results_file, "wb") as f: pickle.dump(results, f) - results_file_display = fs.unstrip_protocol(results_file) if is_remote_url(args.output_dir) else results_file - logger.info(f"Task results saved to {results_file_display}") + logger.info(f"Task results saved to {results_file}") # Aggregate and save per-stage metrics (mean/std/sum for each metric) metrics = TaskPerfUtils.aggregate_task_metrics(results) - metrics_file = fs.sep.join([fs_path, "metrics.json"]) - with fs.open(metrics_file, "w") as f: + metrics_file = os.path.join(cache_dir, "metrics.json") + with open(metrics_file, "w") as f: json.dump(metrics, f, indent=2) - metrics_file_display = fs.unstrip_protocol(metrics_file) if is_remote_url(args.output_dir) else metrics_file - logger.info(f"Aggregated metrics saved to {metrics_file_display}") + logger.info(f"Aggregated metrics saved to {metrics_file}") ray_client.stop() @@ -216,14 +216,14 @@ def attach_args() -> argparse.ArgumentParser: parser.add_argument( "--output-dir", type=str, - default="./data/preprocessed", - help="Directory to write the preprocessed output, partitioned by language.", + default="./data/cleaned_extracted", + help="Directory to write the preprocessed extracted content.", ) parser.add_argument( - "--download-dir", + "--cache-dir", type=str, - default="./data/cc_downloads", - help="Directory for intermediate Common Crawl WARC downloads.", + default="./data/cache", + help="Cache directory for intermediate files. Layout: cache_dir/cc_downloads (WARC files), cache_dir/model (FastText model), plus results.pkl and metrics.json.", ) # Common Crawl options @@ -248,13 +248,6 @@ def attach_args() -> argparse.ArgumentParser: default=None, help="Language codes to keep (e.g., EN DE FR). If omitted, all languages are written.", ) - parser.add_argument( - "--lang-id-model-path", - type=str, - required=True, - help="Directory for the FastText lid.176.bin model. Downloads automatically if not present in the directory.", - ) - # Cloud storage parser.add_argument( "--storage-options", From d3fb55912cc3a5c6bba234f838f4c098ea0b8291 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 4 Mar 2026 21:07:46 -0800 Subject: [PATCH 5/7] Write to jsonl instead of parquet Signed-off-by: Ayush Dattagupta --- .../nemotron-cc/step_1-download_extract.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py b/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py index a345e1b..348a0e8 100644 --- a/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py +++ b/src/nemotron/recipes/data_curation/nemotron-cc/step_1-download_extract.py @@ -40,7 +40,7 @@ from nemo_curator.stages.text.modifiers import Modify from nemo_curator.tasks import DocumentBatch from nemo_curator.tasks.utils import TaskPerfUtils -from nemo_curator.stages.text.io.writer import ParquetWriter +from nemo_curator.stages.text.io.writer import JsonlWriter FASTTEXT_MODEL_URL = "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin" FASTTEXT_MODEL_FILENAME = "lid.176.bin" @@ -54,8 +54,12 @@ class LanguageFilter(ProcessingStage[DocumentBatch, DocumentBatch]): If target_languages is provided, only documents matching those languages are kept. """ - def __init__(self, target_languages: list[str] | None = None, language_field: str = "language") -> None: - self.target_languages = {lang.upper() for lang in target_languages} if target_languages else None + def __init__( + self, target_languages: list[str] | None = None, language_field: str = "language" + ) -> None: + self.target_languages = ( + {lang.upper() for lang in target_languages} if target_languages else None + ) self.language_field = language_field self.name = "language_filter" @@ -133,7 +137,9 @@ def create_pipeline(args: argparse.Namespace) -> Pipeline: # 4. Fix unicode issues on all documents. Modify(UnicodeReformatter()), # 5. Write output - ParquetWriter(output_dir, write_kwargs={"storage_options": storage_options}), + JsonlWriter( + output_dir, write_kwargs={"storage_options": storage_options, "force_ascii": False} + ), ] return Pipeline( From 3b199e2bbb231815786b3c8806c0f0d8a221a821 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Fri, 6 Mar 2026 17:39:07 -0800 Subject: [PATCH 6/7] Add exact and fuzzy deduplication scripts Signed-off-by: Ayush Dattagupta --- .../nemotron-cc/step_2a-exact_dedup.py | 299 ++++++++++++++++++ .../nemotron-cc/step_2b-fuzzy_dedup.py | 292 +++++++++++++++++ 2 files changed, 591 insertions(+) create mode 100644 src/nemotron/recipes/data_curation/nemotron-cc/step_2a-exact_dedup.py create mode 100644 src/nemotron/recipes/data_curation/nemotron-cc/step_2b-fuzzy_dedup.py diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/step_2a-exact_dedup.py b/src/nemotron/recipes/data_curation/nemotron-cc/step_2a-exact_dedup.py new file mode 100644 index 0000000..c754f64 --- /dev/null +++ b/src/nemotron/recipes/data_curation/nemotron-cc/step_2a-exact_dedup.py @@ -0,0 +1,299 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Exact deduplication for the Nemotron-CC pipeline. + +This script performs exact deduplication in two phases that can be run +together or independently: + + 1. Identification (--identify) — hash every document and find exact + duplicates (GPU-accelerated). Writes duplicate IDs and an ID + generator mapping to --cache-dir. + + 2. Removal (--remove) — read the duplicate IDs from --cache-dir and + remove them from the original dataset, writing deduplicated output + to --output-dir. + +See README.md in this directory for detailed usage instructions. +""" + +import argparse +import json +import time +from typing import Literal + +from loguru import logger + +from nemo_curator.stages.file_partitioning import FilePartitioningStage +from nemo_curator.tasks import EmptyTask +from nemo_curator.backends.experimental.ray_data import RayDataExecutor +from nemo_curator.core.client import RayClient +from nemo_curator.stages.deduplication.exact.workflow import ExactDeduplicationWorkflow +from nemo_curator.stages.deduplication.id_generator import CURATOR_DEDUP_ID_STR +from nemo_curator.stages.text.deduplication.removal_workflow import TextDuplicatesRemovalWorkflow + + +EXACT_DEDUP_IDS_SUBDIR = "ExactDuplicateIds" +ID_GENERATOR_FILENAME = "exact_id_generator.json" + + +def _parse_memory_arg(value: str) -> int | Literal["auto"] | None: + """Parse a memory argument that can be an int, 'auto', or None.""" + if value.lower() == "none": + return None + if value.lower() == "auto": + return "auto" + return int(value) + + +def run_identification(args: argparse.Namespace) -> None: + """Run exact duplicate identification using ExactDeduplicationWorkflow. + + Writes ExactDuplicateIds/ and exact_id_generator.json into --cache-dir. + """ + storage_options = json.loads(args.storage_options) if args.storage_options else None + + logger.info("Starting exact duplicate identification") + logger.info(f" Input: {args.input_dir}") + logger.info(f" Cache dir: {args.cache_dir}") + start_time = time.perf_counter() + + workflow = ExactDeduplicationWorkflow( + input_path=args.input_dir, + output_path=args.cache_dir, + input_filetype=args.input_filetype, + text_field=args.text_field, + input_blocksize=args.input_blocksize, + identification_batchsize=args.identification_batchsize, + assign_id=True, + total_nparts=args.total_nparts, + rmm_pool_size=args.rmm_pool_size, + spill_memory_limit=args.spill_memory_limit, + read_kwargs={"storage_options": storage_options} if storage_options else None, + ) + workflow_result = workflow.run(initial_tasks=None) + elapsed = time.perf_counter() - start_time + + num_duplicates = workflow_result.metadata.get("num_duplicates", 0) + identification_time = workflow_result.metadata.get("identification_time", 0.0) + input_filegroups_time = workflow_result.metadata.get("input_filegroups_time", 0.0) + + logger.info(f"Identification completed in {elapsed:.1f}s") + logger.info(f" Time taken to group files by blocksize: {input_filegroups_time:.1f}s") + logger.info(f" Identification time: {identification_time:.1f}s") + logger.info(f" Exact duplicates found: {num_duplicates}") + + +def run_removal(args: argparse.Namespace) -> None: + """Remove identified exact duplicates using TextDuplicatesRemovalWorkflow. + + Reads duplicate IDs and ID generator from --cache-dir, writes + deduplicated output to --output-dir. + """ + storage_options = json.loads(args.storage_options) if args.storage_options else None + cache_base = args.cache_dir.rstrip("/") + output_base = args.output_dir.rstrip("/") + ids_to_remove_path = f"{cache_base}/{EXACT_DEDUP_IDS_SUBDIR}" + id_generator_path = f"{cache_base}/{ID_GENERATOR_FILENAME}" + deduplicated_output_path = f"{output_base}/exact_deduplicated" + + output_kwargs = {} + if args.output_filetype == "jsonl": + output_kwargs["force_ascii"] = False + if storage_options: + output_kwargs["storage_options"] = storage_options + + logger.info("Starting duplicate removal") + logger.info(f" Input: {args.input_dir}") + logger.info(f" Cache dir (IDs): {ids_to_remove_path}") + logger.info(f" Output: {deduplicated_output_path}") + start_time = time.perf_counter() + + file_partitioning_stage = FilePartitioningStage( + file_paths=args.input_dir, + blocksize=args.input_blocksize, + file_extensions=None, + storage_options=storage_options, + ) + logger.info("Running file partitioning pipeline...") + file_partitioning_stage.setup() + initial_tasks = file_partitioning_stage.process(EmptyTask) + logger.info(f"File partitioning pipeline completed with {len(initial_tasks)} initial tasks") + + workflow = TextDuplicatesRemovalWorkflow( + input_path=args.input_dir, + ids_to_remove_path=ids_to_remove_path, + output_path=deduplicated_output_path, + input_filetype=args.input_filetype, + input_blocksize=args.input_blocksize, + duplicate_id_field=CURATOR_DEDUP_ID_STR, + id_generator_path=id_generator_path, + output_filetype=args.output_filetype, + output_fields=["url", "warc_id", "source_id", "language", "text", "file_name"], + input_kwargs={"storage_options": storage_options} if storage_options else None, + output_kwargs=output_kwargs or None, + ) + workflow_result = workflow.run(executor=RayDataExecutor(), initial_tasks=initial_tasks) + elapsed = time.perf_counter() - start_time + + num_removed = workflow_result.metadata.get("num_duplicates_removed", 0) + + logger.info(f"Removal completed in {elapsed:.1f}s") + logger.info(f" Duplicates removed: {num_removed}") + + +def main(args: argparse.Namespace) -> None: + # If neither flag is specified, default to running identification + if not args.identify and not args.remove: + raise ValueError("No operation specified. Use --identify and/or --remove flags.") + + ray_client = RayClient(num_gpus=args.num_gpus, num_cpus=args.num_cpus) + ray_client.start() + + logger.info("Starting Nemotron-CC exact deduplication") + + if args.identify: + run_identification(args) + + if args.remove: + run_removal(args) + + ray_client.stop() + + +def attach_args() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Exact deduplication for Nemotron-CC: identification and removal of duplicate documents.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + # Operation flags + parser.add_argument( + "--identify", + action="store_true", + help="Run the identification phase to find exact duplicates.", + ) + parser.add_argument( + "--remove", + action="store_true", + help="Run the removal phase to remove identified duplicates.", + ) + + # Paths + parser.add_argument( + "--input-dir", + type=str, + required=True, + help="Directory containing the input dataset (output of step 1).", + ) + parser.add_argument( + "--cache-dir", + type=str, + required=True, + help="Directory for intermediate identification artifacts (ExactDuplicateIds/, ID generator).", + ) + parser.add_argument( + "--output-dir", + type=str, + default="./data/exact_deduplicated", + help="Directory to write deduplicated output. Required when --remove is set.", + ) + + # Input format + parser.add_argument( + "--input-filetype", + type=str, + default="jsonl", + choices=["parquet", "jsonl"], + help="Format of the input files.", + ) + parser.add_argument( + "--text-field", + type=str, + default="text", + help="Name of the field containing the document text.", + ) + + # Output format + parser.add_argument( + "--output-filetype", + type=str, + default="jsonl", + choices=["parquet", "jsonl"], + help="Format of the deduplicated output files.", + ) + + # Identification settings + parser.add_argument( + "--input-blocksize", + type=str, + default="256MiB", + help="Target partition size for input data (e.g., '256MiB', '512MiB', '2GiB').", + ) + parser.add_argument( + "--identification-batchsize", + type=int, + default=12, + help="Number of partitions to process per identification batch.", + ) + parser.add_argument( + "--total-nparts", + type=int, + default=None, + help="Total number of output partitions for identification. Auto-determined if not set.", + ) + parser.add_argument( + "--rmm-pool-size", + type=_parse_memory_arg, + default="auto", + help="Size of the RMM GPU memory pool in bytes, 'auto' for 90%% of free GPU memory, or 'none'.", + ) + parser.add_argument( + "--spill-memory-limit", + type=_parse_memory_arg, + default="auto", + help="Device memory limit in bytes for spilling to host, 'auto' for 80%% of RMM pool, or 'none'.", + ) + + # Cloud storage + parser.add_argument( + "--storage-options", + type=str, + default=None, + help='JSON string of fsspec storage options for cloud I/O (e.g., \'{"endpoint_url": "...", "key": "...", "secret": "..."}\').', + ) + + # Ray cluster — these only apply when starting a new local Ray cluster. + # When connecting to an existing cluster (e.g., via RAY_ADDRESS), they are ignored. + parser.add_argument( + "--num-gpus", + type=int, + default=None, + help="Number of GPUs for a local Ray cluster (default: all available). Ignored when connecting to an external cluster.", + ) + parser.add_argument( + "--num-cpus", + type=int, + default=None, + help="Number of CPUs for a local Ray cluster (default: all available). Ignored when connecting to an external cluster.", + ) + + return parser + + +if __name__ == "__main__": + args = attach_args().parse_args() + if args.remove and args.output_dir is None: + attach_args().error("--output-dir is required when --remove is set") + main(args) diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/step_2b-fuzzy_dedup.py b/src/nemotron/recipes/data_curation/nemotron-cc/step_2b-fuzzy_dedup.py new file mode 100644 index 0000000..82b2ea5 --- /dev/null +++ b/src/nemotron/recipes/data_curation/nemotron-cc/step_2b-fuzzy_dedup.py @@ -0,0 +1,292 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Fuzzy deduplication for the Nemotron-CC pipeline. + +This script performs fuzzy deduplication in two phases that can be run +together or independently: + + 1. Identification (--identify) — compute MinHash signatures, perform + Locality Sensitive Hashing, and find fuzzy duplicates via connected + components (GPU-accelerated). Writes duplicate IDs and an ID + generator mapping to --output-dir, with intermediate artifacts + stored in --cache-dir. + + 2. Removal (--remove) — read the duplicate IDs from --output-dir and + remove them from the original dataset, writing deduplicated output + to --output-dir/fuzzy_deduplicated. + +See README.md in this directory for detailed usage instructions. +""" + +import argparse +import json +import time + +from loguru import logger + +from nemo_curator.backends.experimental.ray_data import RayDataExecutor +from nemo_curator.core.client import RayClient +from nemo_curator.stages.deduplication.fuzzy.workflow import FuzzyDeduplicationWorkflow +from nemo_curator.stages.deduplication.id_generator import CURATOR_DEDUP_ID_STR +from nemo_curator.stages.file_partitioning import FilePartitioningStage +from nemo_curator.stages.text.deduplication.removal_workflow import TextDuplicatesRemovalWorkflow +from nemo_curator.tasks import EmptyTask + + +FUZZY_DEDUP_IDS_SUBDIR = "FuzzyDuplicateIds" +ID_GENERATOR_FILENAME = "fuzzy_id_generator.json" + +# Nemotron-CC defaults +DEFAULT_CHAR_NGRAMS = 24 +DEFAULT_NUM_BANDS = 20 +DEFAULT_MINHASHES_PER_BAND = 13 + + +def run_identification(args: argparse.Namespace) -> None: + """Run fuzzy duplicate identification using FuzzyDeduplicationWorkflow. + + Writes FuzzyDuplicateIds/ and fuzzy_id_generator.json into --output-dir, + with intermediate artifacts (minhashes, LSH buckets, etc.) in --cache-dir. + """ + storage_options = json.loads(args.storage_options) if args.storage_options else None + storage_kwargs = {"storage_options": storage_options} if storage_options else None + + logger.info("Starting fuzzy duplicate identification") + logger.info(f" Input: {args.input_dir}") + logger.info(f" Cache dir: {args.cache_dir}") + logger.info(f" Output dir: {args.output_dir}") + logger.info(f" Config: char_ngrams={DEFAULT_CHAR_NGRAMS}, num_bands={DEFAULT_NUM_BANDS}, " + f"minhashes_per_band={DEFAULT_MINHASHES_PER_BAND}, bands_per_iteration={args.bands_per_iteration}") + start_time = time.perf_counter() + + workflow = FuzzyDeduplicationWorkflow( + input_path=args.input_dir, + cache_path=args.cache_dir, + output_path=args.output_dir, + input_filetype=args.input_filetype, + input_blocksize=args.input_blocksize, + text_field=args.text_field, + read_kwargs=storage_kwargs, + cache_kwargs=storage_kwargs, + write_kwargs=storage_kwargs, + char_ngrams=DEFAULT_CHAR_NGRAMS, + num_bands=DEFAULT_NUM_BANDS, + minhashes_per_band=DEFAULT_MINHASHES_PER_BAND, + bands_per_iteration=args.bands_per_iteration, + lsh_num_output_partitions=args.total_nparts, + ) + workflow_result = workflow.run() + elapsed = time.perf_counter() - start_time + + num_duplicates = workflow_result.metadata.get("num_duplicates", 0) + minhash_time = workflow_result.metadata.get("minhash_time", 0.0) + lsh_time = workflow_result.metadata.get("lsh_time", 0.0) + cc_time = workflow_result.metadata.get("connected_components_pipeline_time", 0.0) + + logger.info(f"Identification completed in {elapsed:.1f}s") + logger.info(f" MinHash time: {minhash_time:.1f}s") + logger.info(f" LSH time: {lsh_time:.1f}s") + logger.info(f" Connected components time: {cc_time:.1f}s") + logger.info(f" Fuzzy duplicates found: {num_duplicates}") + + +def run_removal(args: argparse.Namespace) -> None: + """Remove identified fuzzy duplicates using TextDuplicatesRemovalWorkflow. + + Reads duplicate IDs and ID generator from --output-dir, writes + deduplicated output to --output-dir/deduplicated. + """ + storage_options = json.loads(args.storage_options) if args.storage_options else None + storage_kwargs = {"storage_options": storage_options} if storage_options else None + output_base = args.output_dir.rstrip("/") + ids_to_remove_path = f"{output_base}/{FUZZY_DEDUP_IDS_SUBDIR}" + id_generator_path = f"{output_base}/{ID_GENERATOR_FILENAME}" + deduplicated_output_path = f"{output_base}/fuzzy_deduplicated" + + output_kwargs = {} + if args.output_filetype == "jsonl": + output_kwargs["force_ascii"] = False + if storage_options: + output_kwargs["storage_options"] = storage_options + + logger.info("Starting duplicate removal") + logger.info(f" Input: {args.input_dir}") + logger.info(f" IDs to remove: {ids_to_remove_path}") + logger.info(f" Output: {deduplicated_output_path}") + start_time = time.perf_counter() + + file_partitioning_stage = FilePartitioningStage( + file_paths=args.input_dir, + blocksize=args.input_blocksize, + file_extensions=None, + storage_options=storage_options, + ) + logger.info("Running file partitioning pipeline...") + file_partitioning_stage.setup() + initial_tasks = file_partitioning_stage.process(EmptyTask) + logger.info(f"File partitioning pipeline completed with {len(initial_tasks)} initial tasks") + + workflow = TextDuplicatesRemovalWorkflow( + input_path=args.input_dir, + ids_to_remove_path=ids_to_remove_path, + output_path=deduplicated_output_path, + input_filetype=args.input_filetype, + input_blocksize=args.input_blocksize, + duplicate_id_field=CURATOR_DEDUP_ID_STR, + id_generator_path=id_generator_path, + output_filetype=args.output_filetype, + output_fields=["url", "warc_id", "source_id", "language", "text", "file_name"], + input_kwargs=storage_kwargs, + output_kwargs=output_kwargs or None, + ) + workflow_result = workflow.run(executor=RayDataExecutor(), initial_tasks=initial_tasks) + elapsed = time.perf_counter() - start_time + + num_removed = workflow_result.metadata.get("num_duplicates_removed", 0) + + logger.info(f"Removal completed in {elapsed:.1f}s") + logger.info(f" Duplicates removed: {num_removed}") + + +def main(args: argparse.Namespace) -> None: + if not args.identify and not args.remove: + raise ValueError("No operation specified. Use --identify and/or --remove flags.") + + ray_client = RayClient(num_gpus=args.num_gpus, num_cpus=args.num_cpus) + ray_client.start() + + logger.info("Starting Nemotron-CC fuzzy deduplication") + + if args.identify: + run_identification(args) + + if args.remove: + run_removal(args) + + ray_client.stop() + + +def attach_args() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Fuzzy deduplication for Nemotron-CC: identification and removal of near-duplicate documents.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + # Operation flags + parser.add_argument( + "--identify", + action="store_true", + help="Run the identification phase to find fuzzy duplicates.", + ) + parser.add_argument( + "--remove", + action="store_true", + help="Run the removal phase to remove identified duplicates.", + ) + + # Paths + parser.add_argument( + "--input-dir", + type=str, + required=True, + help="Directory containing the input dataset (output of step 1).", + ) + parser.add_argument( + "--cache-dir", + type=str, + required=True, + help="Directory for intermediate artifacts (minhashes, LSH buckets, edges, connected components).", + ) + parser.add_argument( + "--output-dir", + type=str, + required=True, + help="Directory for duplicate IDs, ID generator, and deduplicated output.", + ) + + # Input format + parser.add_argument( + "--input-filetype", + type=str, + default="jsonl", + choices=["parquet", "jsonl"], + help="Format of the input files.", + ) + parser.add_argument( + "--text-field", + type=str, + default="text", + help="Name of the field containing the document text.", + ) + + # Output format + parser.add_argument( + "--output-filetype", + type=str, + default="jsonl", + choices=["parquet", "jsonl"], + help="Format of the deduplicated output files.", + ) + + # Fuzzy dedup settings + parser.add_argument( + "--input-blocksize", + type=str, + default="256MiB", + help="Target partition size for input data (e.g., '256MiB', '512MiB', '2GiB').", + ) + parser.add_argument( + "--bands-per-iteration", + type=int, + default=5, + help="Number of LSH bands to shuffle concurrently. Higher values are faster but use more memory.", + ) + + # Partitioning + parser.add_argument( + "--total-nparts", + type=int, + default=None, + help="Total number of output partitions for the LSH shuffle. Auto-determined if not set.", + ) + + # Cloud storage + parser.add_argument( + "--storage-options", + type=str, + default=None, + help='JSON string of fsspec storage options for cloud I/O (e.g., \'{"endpoint_url": "...", "key": "...", "secret": "..."}\').', + ) + + # Ray cluster + parser.add_argument( + "--num-gpus", + type=int, + default=None, + help="Number of GPUs for a local Ray cluster (default: all available). Ignored when connecting to an external cluster.", + ) + parser.add_argument( + "--num-cpus", + type=int, + default=None, + help="Number of CPUs for a local Ray cluster (default: all available). Ignored when connecting to an external cluster.", + ) + + return parser + + +if __name__ == "__main__": + args = attach_args().parse_args() + main(args) From cfb0986dbefd0ebbe590007fc9bb3a3167a3dfc4 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Mon, 9 Mar 2026 20:17:29 -0700 Subject: [PATCH 7/7] Add quality classifiers Signed-off-by: Ayush Dattagupta --- .../step_3-quality_classification.py | 506 ++++++++++++++++++ 1 file changed, 506 insertions(+) create mode 100644 src/nemotron/recipes/data_curation/nemotron-cc/step_3-quality_classification.py diff --git a/src/nemotron/recipes/data_curation/nemotron-cc/step_3-quality_classification.py b/src/nemotron/recipes/data_curation/nemotron-cc/step_3-quality_classification.py new file mode 100644 index 0000000..32e6a21 --- /dev/null +++ b/src/nemotron/recipes/data_curation/nemotron-cc/step_3-quality_classification.py @@ -0,0 +1,506 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Quality classification and bucketing for the Nemotron-CC pipeline. + +This script performs model-based quality labeling in two phases that can +be run together or independently: + + 1. Classification (--classify) — filter to English, then run an ensemble + of three quality classifiers (FineWebNemotronEduClassifier, + FineWebMixtralEduClassifier, and fasttext-oh-eli5) on the + deduplicated data. Writes classification results (with float scores) + to parquet. + + 2. Ensemble & Bucketing (--ensemble) — compute token-weighted percentile + thresholds from the classification scores, map float scores to integer + bins (0–19), take the per-document max across classifiers, and write + the bucketed results partitioned by ensemble-max-int. + +See README.md in this directory for detailed usage instructions. +""" + +import argparse +import ctypes +import json +import os +import time + +import numpy as np +import pandas as pd +from loguru import logger + +from nemo_curator.backends.experimental.ray_data import RayDataExecutor +from nemo_curator.core.client import RayClient +from nemo_curator.pipeline import Pipeline +from nemo_curator.stages.base import ProcessingStage +from nemo_curator.stages.function_decorators import processing_stage +from nemo_curator.stages.text.classifiers.fineweb_edu import ( + FineWebMixtralEduClassifier, + FineWebNemotronEduClassifier, +) +from nemo_curator.stages.text.filters.fasttext import FastTextQualityFilter +from nemo_curator.stages.text.filters.score_filter import Filter, Score +from nemo_curator.stages.text.io.reader import JsonlReader, ParquetReader +from nemo_curator.stages.text.io.writer import ParquetWriter +from nemo_curator.tasks import DocumentBatch +from nemo_curator.tasks.utils import TaskPerfUtils + +# --------------------------------------------------------------------------- +# Classifier score column names +# --------------------------------------------------------------------------- +CLASSIFIER_SCORES = { + "nemotron": { + "float_score": "fineweb-nemotron-edu-score", + "int_score": "fineweb-nemotron-edu-score-int", + "label": "fineweb-nemotron-edu-score-label", + }, + "mixtral": { + "float_score": "fineweb-mixtral-edu-score", + "int_score": "fineweb-mixtral-edu-score-int", + "label": "fineweb-mixtral-edu-score-label", + }, + "fasttext": { + "float_score": "fasttext-quality-score", + }, +} + +FASTTEXT_HQ_MODEL_REPO = "mlfoundations/fasttext-oh-eli5" +FASTTEXT_HQ_MODEL_FILENAME = "openhermes_reddit_eli5_vs_rw_v2_bigram_200k_train.bin" + + +# --------------------------------------------------------------------------- +# Threshold / binning helpers +# --------------------------------------------------------------------------- + +def weighted_percentile(data: np.ndarray, percentiles: np.ndarray, weights: np.ndarray) -> np.ndarray: + """Compute weighted percentiles using the inverted-CDF method.""" + sorter = np.argsort(data) + data_sorted = data[sorter] + weights_sorted = weights[sorter] + + cum_weights = np.cumsum(weights_sorted) + total_weight = cum_weights[-1] + normalized = cum_weights / total_weight + + results = [] + for p in percentiles: + idx = np.searchsorted(normalized, p / 100.0, side="left") + results.append(data_sorted[idx]) + return np.array(results) + + +def compute_thresholds(score_ar: np.ndarray, token_ar: np.ndarray) -> dict[int, float]: + """Return {percentile: threshold} for 5th, 10th, …, 95th percentiles.""" + percentiles = np.arange(5, 100, 5) + thresholds = weighted_percentile(score_ar, percentiles, weights=token_ar) + return {int(p): float(t) for p, t in zip(percentiles, thresholds)} + + +def compute_thresholds_for_score_columns( + df: pd.DataFrame, + text_col_name: str, + score_col_names: list[str], +) -> dict[str, dict[int, float]]: + """Compute percentile-based thresholds for each score column. + + ``text_col_name`` should reference a column of pre-computed integer + byte-lengths (e.g. ``"token_length"``). + """ + token_ar = df[text_col_name].to_numpy() + threshold_dict = {} + for score_col in score_col_names: + threshold_dict[score_col] = compute_thresholds( + df[score_col].to_numpy(), token_ar, + ) + return threshold_dict + + +def map_scores( + df: pd.DataFrame, + score_col_name: str, + score_int_name: str, + bins: np.ndarray, +) -> pd.DataFrame: + """Map float scores to integer bins via np.digitize.""" + pred_orig = np.array(df[score_col_name]) + df[score_int_name] = np.digitize(pred_orig, bins) + return df + + +def map_score_columns( + df: pd.DataFrame, + score_col_names: list[str], + threshold_dict: dict[str, dict[int, float]], +) -> pd.DataFrame: + """Apply score→int mapping for every classifier.""" + for score_col_name in score_col_names: + score_int_name = score_col_name + "-int" + thresholds = threshold_dict.get(score_col_name) + if thresholds is None: + msg = f"No thresholds found for score column '{score_col_name}'" + raise ValueError(msg) + sorted_keys = sorted(thresholds.keys(), key=int) + bins = np.array([thresholds[k] for k in sorted_keys]) + df = map_scores(df, score_col_name, score_int_name, bins) + return df + + +def save_thresholds(threshold_dict: dict, file_name: str) -> None: + with open(file_name, "w") as fout: + json.dump(threshold_dict, fout, indent=4) + logger.info(f"Thresholds saved to {file_name}") + + +def _save_metrics(metrics: dict, file_path: str) -> None: + with open(file_path, "w") as f: + json.dump(metrics, f, indent=2) + logger.info(f"Metrics saved to {file_path}") + + +# --------------------------------------------------------------------------- +# Phase 1 — Classify +# --------------------------------------------------------------------------- + +def run_classification(args: argparse.Namespace) -> None: + """Filter to English, run 3 classifiers, write parquet.""" + from huggingface_hub import hf_hub_download + + classification_results_dir = os.path.join(args.output_dir, "classification_results") + os.makedirs(classification_results_dir, exist_ok=True) + + # Download the fasttext quality model + fasttext_model_path = hf_hub_download( + repo_id=FASTTEXT_HQ_MODEL_REPO, + filename=FASTTEXT_HQ_MODEL_FILENAME, + ) + logger.info(f"FastText quality model path: {fasttext_model_path}") + + logger.info("Starting quality classification pipeline") + logger.info(f" Input: {args.input_dir}") + logger.info(f" Output: {classification_results_dir}") + start_time = time.perf_counter() + + # --- Build the pipeline --- + pipeline = Pipeline(name="quality-classification") + + # 1. Read deduplicated JSONL + pipeline.add_stage(JsonlReader(args.input_dir)) + + # 2. Filter to English inline + pipeline.add_stage( + Filter( + filter_fn=lambda lang: lang == "EN", + filter_field="language", + ) + ) + + # 3a. Score with FastText quality (CPU, uses Score to add column) + pipeline.add_stage( + Score( + score_fn=FastTextQualityFilter(model_path=fasttext_model_path, label="__label__hq"), + score_field=CLASSIFIER_SCORES["fasttext"]["float_score"], + text_field="text", + ) + ) + + # 3b. Nemotron-4 edu classifier (GPU) + pipeline.add_stage( + FineWebNemotronEduClassifier( + float_score_field=CLASSIFIER_SCORES["nemotron"]["float_score"], + int_score_field=CLASSIFIER_SCORES["nemotron"]["int_score"], + label_field=CLASSIFIER_SCORES["nemotron"]["label"], + ) + ) + + # 3c. Mixtral edu classifier (GPU) + pipeline.add_stage( + FineWebMixtralEduClassifier( + float_score_field=CLASSIFIER_SCORES["mixtral"]["float_score"], + int_score_field=CLASSIFIER_SCORES["mixtral"]["int_score"], + label_field=CLASSIFIER_SCORES["mixtral"]["label"], + ) + ) + + # 4. Drop label and original int_score columns — the ensemble step + # recomputes 0–19 int bins from the float scores. + cols_to_drop = [ + v[k] + for v in CLASSIFIER_SCORES.values() + for k in ("int_score", "label") + if k in v + ] + + @processing_stage(name="drop-unused-classifier-columns") + def drop_unused_cols(batch: DocumentBatch) -> DocumentBatch: + df = batch.to_pandas() + batch.data = df.drop(columns=cols_to_drop, errors="ignore") + return batch + + pipeline.add_stage(drop_unused_cols) + + # 5. Write classification results to parquet + pipeline.add_stage(ParquetWriter(classification_results_dir, mode="overwrite")) + + results = pipeline.run() + elapsed = time.perf_counter() - start_time + + logger.info(f"Classification completed in {elapsed:.1f}s") + logger.info(f" Results written to: {classification_results_dir}") + + metrics = TaskPerfUtils.aggregate_task_metrics(results) + metrics["total_elapsed_s"] = round(elapsed, 2) + _save_metrics(metrics, os.path.join(args.output_dir, "classification_metrics.json")) + + +# --------------------------------------------------------------------------- +# Phase 2 — Ensemble & Bucket +# --------------------------------------------------------------------------- + +def _sample_threshold_data( + classification_results_dir: str, + score_col_names: list[str], + sample_frac: float, +) -> pd.DataFrame: + """Read score columns + token byte-lengths from every classification + result file, sampling a fraction of rows from each. + + Uses Ray to read files in parallel. Computes byte-lengths of the + ``text`` column inside each Ray task so that only the scores and a + small integer ``token_length`` column are transferred to the driver, + keeping driver memory proportional to the number of score columns + rather than the size of the raw text. + """ + import glob + + import ray + + parquet_files = sorted(glob.glob(os.path.join(classification_results_dir, "*.parquet"))) + if not parquet_files: + msg = f"No parquet files found in {classification_results_dir}" + raise FileNotFoundError(msg) + + logger.info(f" Reading {len(parquet_files)} files with sample_frac={sample_frac}") + + columns_to_read = score_col_names + ["text"] + + @ray.remote + def _read_and_sample(path: str, columns: list[str], frac: float) -> pd.DataFrame: + df = pd.read_parquet(path, columns=columns) + if frac < 1.0: + df = df.sample(frac=frac) + # Compute byte-length on the worker and drop text before returning + df["token_length"] = df["text"].str.encode("utf-8").apply(len) + df = df.drop(columns=["text"]) + return df + + futures = [_read_and_sample.remote(f, columns_to_read, sample_frac) for f in parquet_files] + dfs = ray.get(futures) + df = pd.concat(dfs, ignore_index=True) + del dfs + logger.info(f" Sampled {len(df)} documents for threshold computation") + return df + + +def run_ensemble(args: argparse.Namespace) -> None: + """Compute thresholds, map to int bins, ensemble, write bucketed parquet.""" + classification_results_dir = os.path.join(args.output_dir, "classification_results") + thresholds_path = os.path.join(args.output_dir, "classifier_thresholds.json") + bucketed_results_dir = os.path.join(args.output_dir, "bucketed_results") + os.makedirs(bucketed_results_dir, exist_ok=True) + + logger.info("Starting ensemble & bucketing pipeline") + logger.info(f" Classification results: {classification_results_dir}") + logger.info(f" Output: {bucketed_results_dir}") + start_time = time.perf_counter() + + # Float score column names for all three classifiers + score_col_names = [v["float_score"] for v in CLASSIFIER_SCORES.values()] + + # Integer score column names (will be created by map_score_columns) + int_column_names = [col + "-int" for col in score_col_names] + + # --- Step 1: Compute thresholds from a sample of classification results --- + logger.info("Computing token-weighted percentile thresholds...") + t_sample_start = time.perf_counter() + df_sample = _sample_threshold_data( + classification_results_dir, score_col_names, args.threshold_sample_frac + ) + num_sampled_docs = len(df_sample) + t_sample_elapsed = time.perf_counter() - t_sample_start + logger.info(f" Sampling completed in {t_sample_elapsed:.1f}s ({num_sampled_docs} docs)") + + t_thresh_start = time.perf_counter() + threshold_dict = compute_thresholds_for_score_columns( + df_sample, text_col_name="token_length", score_col_names=score_col_names + ) + t_thresh_elapsed = time.perf_counter() - t_thresh_start + logger.info(f" Threshold computation completed in {t_thresh_elapsed:.1f}s") + + save_thresholds(threshold_dict, thresholds_path) + del df_sample + # glibc malloc keeps freed pages mapped; trim them back to the OS so the + # subsequent pipeline doesn't start from an inflated RSS baseline. + try: + ctypes.CDLL("libc.so.6").malloc_trim(0) + except (OSError, AttributeError): + pass + + # --- Step 2: Map scores to ints, ensemble, write bucketed --- + logger.info("Running ensemble & bucketing pipeline...") + t_pipeline_start = time.perf_counter() + + pipeline = Pipeline(name="ensemble-quality-scores") + pipeline.add_stage(ParquetReader(classification_results_dir)) + + @processing_stage(name="ensemble-score") + def ensemble_score(batch: DocumentBatch) -> DocumentBatch: + df = batch.to_pandas() + df = map_score_columns(df, score_col_names, threshold_dict) + df["ensemble-max-int"] = df[int_column_names].max(axis=1) + batch.data = df + return batch + + pipeline.add_stage(ensemble_score) + + # Write partitioned by ensemble bucket + pipeline.add_stage( + _PartitionedParquetWriter( + bucketed_results_dir, + write_kwargs={"partition_cols": ["ensemble-max-int"]}, + mode="overwrite", + ) + ) + + results = pipeline.run(executor=RayDataExecutor()) + t_pipeline_elapsed = time.perf_counter() - t_pipeline_start + elapsed = time.perf_counter() - start_time + + logger.info(f" Pipeline completed in {t_pipeline_elapsed:.1f}s") + logger.info(f"Ensemble & bucketing completed in {elapsed:.1f}s") + logger.info(f" Bucketed results written to: {bucketed_results_dir}") + + # Print bucket distribution + buckets = sorted(os.listdir(bucketed_results_dir)) + logger.info(f" Buckets created: {buckets}") + + metrics = TaskPerfUtils.aggregate_task_metrics(results) + metrics["threshold_sampling_elapsed_s"] = round(t_sample_elapsed, 2) + metrics["threshold_sampling_num_docs"] = num_sampled_docs + metrics["threshold_sampling_frac"] = args.threshold_sample_frac + metrics["threshold_computation_elapsed_s"] = round(t_thresh_elapsed, 2) + metrics["pipeline_elapsed_s"] = round(t_pipeline_elapsed, 2) + metrics["total_elapsed_s"] = round(elapsed, 2) + _save_metrics(metrics, os.path.join(args.output_dir, "bucketing_metrics.json")) + + +class _PartitionedParquetWriter(ParquetWriter): + """ParquetWriter that partitions output by a column (e.g. ensemble-max-int).""" + + def write_data(self, task: DocumentBatch, file_path: str) -> None: + df = task.to_pandas().reset_index(drop=True) + df.to_parquet( + os.path.dirname(file_path), + **{"index": None, **self.write_kwargs}, + ) + + +# --------------------------------------------------------------------------- +# Main & CLI +# --------------------------------------------------------------------------- + +def main(args: argparse.Namespace) -> None: + if not args.classify and not args.ensemble: + raise ValueError("No operation specified. Use --classify and/or --ensemble flags.") + + ray_client = RayClient(num_gpus=args.num_gpus, num_cpus=args.num_cpus) + ray_client.start() + + logger.info("Starting Nemotron-CC quality classification") + + if args.classify: + run_classification(args) + + if args.ensemble: + run_ensemble(args) + + ray_client.stop() + + +def attach_args() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Quality classification and bucketing for Nemotron-CC: " + "filter to English, run an ensemble of quality classifiers, " + "and write bucketed output (0-19)." + ), + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + # Operation flags + parser.add_argument( + "--classify", + action="store_true", + help="Run the classification phase: filter to English and score with 3 quality classifiers.", + ) + parser.add_argument( + "--ensemble", + action="store_true", + help="Run the ensemble phase: compute thresholds, map to int bins, and write bucketed output.", + ) + + # Paths + parser.add_argument( + "--input-dir", + type=str, + required=True, + help="Directory containing deduplicated JSONL input (e.g. output of step 2b fuzzy dedup).", + ) + parser.add_argument( + "--output-dir", + type=str, + default="./data/quality_labeling", + help="Base output directory. Sub-directories will be created for classification_results/, " + "classifier_thresholds.json, and bucketed_results/.", + ) + + # Threshold sampling + parser.add_argument( + "--threshold-sample-frac", + type=float, + default=0.01, + help="Fraction of rows to sample per file when computing percentile thresholds. " + "Use < 1.0 (e.g. 0.01) to reduce memory at large scale.", + ) + + # Ray cluster + parser.add_argument( + "--num-gpus", + type=int, + default=None, + help="Number of GPUs for a local Ray cluster (default: all available).", + ) + parser.add_argument( + "--num-cpus", + type=int, + default=None, + help="Number of CPUs for a local Ray cluster (default: all available).", + ) + + return parser + + +if __name__ == "__main__": + args = attach_args().parse_args() + main(args)