Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions docs/en/notes/api/operators/pdf2vqa/generate/LLMOutputParser.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: LLMOutputParser
createTime: 2026/01/20 20:15:00
permalink: /en/api/operators/core_text/parse/llmoutputparser/
permalink: /en/api/operators/pdf2vqa/generate/llmoutputparser/
---

## 📘 Overview
Expand All @@ -16,8 +16,7 @@ The core functionalities of this operator include:
## `__init__` Function

```python
def __init__(self,
mode: Literal['question', 'answer'],
def __init__(self,
output_dir: str,
intermediate_dir: str = "intermediate"
)
Expand All @@ -28,7 +27,6 @@ def __init__(self,

| Parameter | Type | Default | Description |
| --- | --- | --- | --- |
| **mode** | str | Required | Parsing mode. Options are `'question'` or `'answer'`, which affects the output filename and the image subdirectory name. |
| **output_dir** | str | Required | The final root directory for structured data and images. |
| **intermediate_dir** | str | "intermediate" | The intermediate directory where original image resources processed by MinerU are located. |

Expand Down Expand Up @@ -76,7 +74,7 @@ Suppose the LLM returns: `<question>1, 3</question>`
The operator looks up entries with `id` 1 and 3 in the layout JSON:

* If `id: 1` is the text "What is AI?" and `id: 3` is the image `path/to/img.png`.
* The restored content will be: `What is AI?\n![image](images/img.png)`.
* The restored content will be: `What is AI?\n![image](vqa_images/img.png)`.

### 2. Output File Structure

Expand All @@ -86,7 +84,7 @@ After execution, the directory structure under `output_dir` (referenced as `cach
output_dir/
└── {name}/
├── extracted_questions.jsonl # Structured data
└── question_images/ # Automatically synchronized images
└── vqa_images/ # Automatically synchronized images
├── img1.png
└── ...

Expand All @@ -96,7 +94,7 @@ output_dir/

```json
{
"question": "Please analyze the image below:\n![image](question_images/fig1.png)",
"question": "Please analyze the image below:\n![image](vqa_images/img1.png)",
"answer": "This is the parsed answer text.",
"solution": "Detailed step-by-step solution...",
"label": "1",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: MinerU2LLMInputOperator
createTime: 2026/01/20 20:10:00
permalink: /en/api/operators/core_text/convert/mineru2llminputoperator/
permalink: /en/api/operators/pdf2vqa/generate/mineru2llminputoperator/
---

## 📘 Overview
Expand Down
2 changes: 1 addition & 1 deletion docs/en/notes/api/operators/pdf2vqa/generate/QAMerger.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: QA_Merger
createTime: 2026/01/20 20:25:00
permalink: /en/api/operators/core_text/merge/qamerger/
permalink: /en/api/operators/pdf2vqa/generate/qamerger/
---

## 📘 Overview
Expand Down
165 changes: 83 additions & 82 deletions docs/en/notes/guide/quickstart/PDFVQAExtract.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Major stages:

## 2. Quick Start

### Step 1: Install Dataflow (and MinerU)
### Step 1: Install Dataflow
Install Dataflow:
```shell
pip install "open-dataflow[pdf2vqa]"
Expand All @@ -35,12 +35,6 @@ cd Dataflow
pip install -e ".[pdf2vqa]"
```

Then install MinerU and download models:
```shell
pip install "mineru[vllm]>=2.5.0,<2.7.0"
mineru-models-download
```

### Step 2: Create a workspace
```shell
cd /your/working/directory
Expand All @@ -55,13 +49,18 @@ dataflow init
You can then add your pipeline script under `pipelines/` or any custom path.

### Step 4: Configure API credentials
`DF_API_KEY` is for calling LLM API, and `MINERU_API_KEY` is for calling MinerU for layout analysis.
`MINERU_API_KEY` can be obtained from https://mineru.net/apiManage/token, and `DF_API_KEY` can be obtained from your LLM provider (e.g., OpenAI, Google Gemini, etc.). Set them as environment variables:

Linux / macOS:
```shell
export DF_API_KEY="sk-xxxxx"
export MINERU_API_KEY="sk2-xxxxx"
```
Windows PowerShell:
```powershell
$env:DF_API_KEY = "sk-xxxxx"
$env:MINERU_API_KEY = "sk2-xxxxx"
```
In the pipeline script, set your API endpoint:
```python
Expand All @@ -72,12 +71,7 @@ self.llm_serving = APILLMServing_request(
max_workers=100,
)
```
and set MinerU backend ('vlm-vllm-engine' or 'vlm-transformers') and LLM max token length (recommended not to exceed 128000 to avoid LLM forgetting details).
**Caution: The pipeline was only tested with the `vlm` backend; compatibility with the `pipeline` backend is uncertain due to format differences. Using the `vlm` backend is recommended.**
The `vlm-vllm-engine` backend requires GPU support.
```python
self.mineru_executor = FileOrURLToMarkdownConverterBatch(intermediate_dir = "intermediate", mineru_backend="vlm-vllm-engine")
```
and set LLM max token length (recommended not to exceed 128000 to avoid LLM forgetting details).

```python
self.vqa_extractor = ChunkedPromptedGenerator(
Expand All @@ -97,16 +91,12 @@ You can also import the operators into other workflows; the remainder of this do

### 1. Input data

Each job is defined by a JSONL row. Two modes are supported:
Each job is defined by a JSONL row. `input_pdf_paths` can be a single PDF or a list of PDFs (questions appear before answers). `name` is an identifier for the job. Questions and answers can be interleaved or separated; they can come from the same PDF or different PDFs.

- **QA-Separated PDFs**
```jsonl
{"question_pdf_path": "/abs/path/questions.pdf", "answer_pdf_path": "/abs/path/answers.pdf", "subject": "math", "output_dir": "./output/math"}
```
- **QA-Interleaved PDFs**
```jsonl
{"question_pdf_path": "/abs/path/qa.pdf", "answer_pdf_path": "/abs/path/qa.pdf", "name": "math2"}
```
```jsonl
{"input_pdf_paths": "./example_data/PDF2VQAPipeline/questionextract_test.pdf", "name": "math1"}
{"input_pdf_paths": ["./example_data/PDF2VQAPipeline/math_question.pdf", "./example_data/PDF2VQAPipeline/math_answer.pdf"], "name": "math2"}
```

`FileStorage` handles batching/cache management:
```python
Expand All @@ -120,23 +110,56 @@ self.storage = FileStorage(

### 2. Document layout extraction (MinerU)

For each PDF (question, answer, or mixed), the pipeline calls `_parse_file_with_mineru` inside `FileOrURLToMarkdownConverterBatch`. MinerU outputs:
For each PDF (question, answer, or mixed), the pipeline calls `_parse_file_with_mineru` inside `FileOrURLToMarkdownConverterAPI`. MinerU outputs:

- `<book>/<backend>/<book>_content_list.json`: structured layout tokens (texts, figures, tables, IDs)
- `<book>/<backend>/images/`: cropped page images
- `*_content_list.json`: structured layout tokens (texts, figures, tables, IDs)
- `images/`: cropped page images

The backend can be:
---
**Note**:
If you want to use a locally deployed MinerU model, you can replace the operator with `FileOrURLToMarkdownConverterLocal` (original version from opendatalab) or `FileOrURLToMarkdownConverterFlash` (our accelerated version), and provide the corresponding model path and deployment parameters.

- `vlm-transformers`: CPU/GPU compatible
- `vlm-vllm-engine`: high-throughput GPU mode (requires CUDA)
For example:

```python
self.mineru_executor = FileOrURLToMarkdownConverterAPI(intermediate_dir = "intermediate")
```

can be replaced with

```python
self.mineru_executor = FileOrURLToMarkdownConverterLocal(
intermediate_dir = "intermediate",
mineru_model_path = "path/to/mineru/model",
)
```

or

```python
self.mineru_executor = FileOrURLToMarkdownConverterFlash(
intermediate_dir = "intermediate",
mineru_model_path = "path/to/mineru/model",
batch_size = 4,
replicas = 1,
num_gpus_per_replica = 1,
engine_gpu_util_rate_to_ray_cap = 0.9
)
```

You can refer to https://github.com/OpenDCAI/DataFlow/blob/main/dataflow/operators/knowledge_cleaning/generate/mineru_operators.py for specific parameters and usage.

---

Afterwards, the `MinerU2LLMInputOperator` flattens list items and re-indexes them to create LLM-friendly input.

### 3. QA extraction (VQAExtractor)

`ChunkedPromptedGenerator` chunks the layout JSON to respect token limits, builds prompts (`QAExtractPrompt`), and batches LLM calls via `APILLMServing_request`. Key behaviors:

- Grouping and pairing Q&A based, and inserting images to proper positions.
- Supports QA separated or interleaved PDFs.
- Copies rendered images into `output_dir/question_images` and/or `answer_images`.
- Copies rendered images into `cache_path/name/vqa_images`.
- Parses `<qa_pair>`, `<question>`, `<answer>`, `<solution>`, `<chapter>`, `<label>` tags from the LLM response.

### 4. Post-processing and outputs
Expand All @@ -155,11 +178,10 @@ This operator includes a `strict_title_match` parameter:

For each `output_dir` (under cache_path/name/), the pipeline writes:

1. `vqa_extracted_questions.jsonl`
2. `vqa_extracted_answers.jsonl`
3. `vqa_merged_qa_pairs.jsonl`
4. `vqa_merged_qa_pairs.md`
5. `question_images/`, `answer_images/` (depending on mode)
1. `extracted_vqa.jsonl` (extracted questions and answers, could be separate or interleaved depending on input)
2. `merged_qa_pairs.jsonl` (fully merged question-answer pairs)
3. `merged_qa_pairs.md` (markdown version of the merged QA pairs)
4. `vqa_images/` (containing all images extracted for the QA pairs)

Furthermore, the final step of the cache main file will contain all extracted qa pairs, making it easier to connect subsequent operators for downstream post-processing.

Expand All @@ -185,17 +207,19 @@ Example:
## 5. Pipeline Example

```python
from dataflow.operators.knowledge_cleaning import FileOrURLToMarkdownConverterBatch
from dataflow.operators.knowledge_cleaning import FileOrURLToMarkdownConverterAPI

from dataflow.serving import APILLMServing_request
from dataflow.utils.storage import FileStorage
from dataflow.operators.pdf2vqa import MinerU2LLMInputOperator, LLMOutputParser, QA_Merger
from dataflow.operators.pdf2vqa import MinerU2LLMInputOperator, LLMOutputParser, QA_Merger, PDF_Merger
from dataflow.operators.core_text import ChunkedPromptedGenerator

from dataflow.pipeline import PipelineABC
from dataflow.prompts.pdf2vqa import QAExtractPrompt

class VQA_extract_optimized_pipeline(PipelineABC):
from pypdf import PdfWriter

class PDF_VQA_extract_optimized_pipeline(PipelineABC):
def __init__(self):
super().__init__()
self.storage = FileStorage(
Expand All @@ -214,82 +238,59 @@ class VQA_extract_optimized_pipeline(PipelineABC):

self.vqa_extract_prompt = QAExtractPrompt()

self.mineru_executor = FileOrURLToMarkdownConverterBatch(intermediate_dir = "intermediate", mineru_backend="vlm-vllm-engine")
self.pdf_merger = PDF_Merger(output_dir="./cache")
self.mineru_executor = FileOrURLToMarkdownConverterAPI(intermediate_dir = "intermediate")
self.input_formatter = MinerU2LLMInputOperator()
self.vqa_extractor = ChunkedPromptedGenerator(
llm_serving=self.llm_serving,
system_prompt = self.vqa_extract_prompt.build_prompt(),
max_chunk_len=128000,
)
self.llm_output_question_parser = LLMOutputParser(mode="question", output_dir="./cache", intermediate_dir="intermediate")
self.llm_output_answer_parser = LLMOutputParser(mode="answer", output_dir="./cache", intermediate_dir="intermediate")
self.llm_output_parser = LLMOutputParser(output_dir="./cache", intermediate_dir="intermediate")
self.qa_merger = QA_Merger(output_dir="./cache", strict_title_match=False)
def forward(self):
# The current processing logic is: MinerU processes questions -> MinerU processes answers -> Format question text -> Format answer text -> Input question text into LLM -> Input answer text into LLM -> Parse question output -> Parse answer output -> Merge QA pairs.
# Since QA pairs may originate from the same PDF or different PDFs, and DataFlow currently does not support branching, both question and answer PDFs must be processed even when they are the same PDF.
# This means if they come from the same PDF, it will be processed twice before the final QA merging step.
# Future optimizations will be considered to refine this workflow, avoid redundant processing of the same PDF, and improve performance.

self.mineru_executor.run(
self.pdf_merger.run(
storage=self.storage.step(),
input_key="question_pdf_path",
output_key="question_markdown_path",
input_pdf_list_key="input_pdf_paths",
input_name_key="name",
output_pdf_path_key="merged_pdf_path",
)
self.mineru_executor.run(
storage=self.storage.step(),
input_key="answer_pdf_path",
output_key="answer_markdown_path",
input_key="merged_pdf_path",
output_key="vqa_markdown_path",
)
self.input_formatter.run(
storage=self.storage.step(),
input_markdown_path_key="question_markdown_path",
output_converted_layout_key="converted_question_layout_path",
)
self.input_formatter.run(
storage=self.storage.step(),
input_markdown_path_key="answer_markdown_path",
output_converted_layout_key="converted_answer_layout_path",
input_markdown_path_key="vqa_markdown_path",
output_converted_layout_key="converted_vqa_layout_path",
)
self.vqa_extractor.run(
storage=self.storage.step(),
input_path_key="converted_question_layout_path",
output_path_key="vqa_extracted_questions_path",
)
self.vqa_extractor.run(
storage=self.storage.step(),
input_path_key="converted_answer_layout_path",
output_path_key="vqa_extracted_answers_path",
)
self.llm_output_question_parser.run(
storage=self.storage.step(),
input_response_path_key="vqa_extracted_questions_path",
input_converted_layout_path_key="converted_question_layout_path",
input_name_key="name",
output_qalist_path_key="extracted_questions_path",
input_path_key="converted_vqa_layout_path",
output_path_key="extracted_llm_vqa_path",
)
self.llm_output_answer_parser.run(
self.llm_output_parser.run(
storage=self.storage.step(),
input_response_path_key="vqa_extracted_answers_path",
input_converted_layout_path_key="converted_answer_layout_path",
input_response_path_key="extracted_llm_vqa_path",
input_converted_layout_path_key="converted_vqa_layout_path",
input_name_key="name",
output_qalist_path_key="extracted_answers_path",
output_qalist_path_key="extracted_vqa_path",
)
self.qa_merger.run(
storage=self.storage.step(),
input_question_qalist_path_key="extracted_questions_path",
input_answer_qalist_path_key="extracted_answers_path",
input_qalist_path_key="extracted_vqa_path",
input_name_key="name",
output_merged_qalist_path_key="output_merged_qalist_path",
output_merged_qalist_path_key="output_merged_vqalist_path",
output_merged_md_path_key="output_merged_md_path",
output_qa_item_key="qa_pair",
output_qa_item_key="vqa_pair",
)



if __name__ == "__main__":
# Each line in the JSONL file contains `question_pdf_path`, `answer_pdf_path`, and `name` (e.g., math1, math2, physics1, chemistry1, ...).
# If the questions and answers are located within the same PDF, set both question_pdf_path and answer_pdf_path to the same file path.
pipeline = VQA_extract_optimized_pipeline()
# Each line in the jsonl contains input_pdf_paths, name (math1, math2, physics1, chemistry1, ...)
pipeline = PDF_VQA_extract_optimized_pipeline()
pipeline.compile()
pipeline.forward()
```
Expand Down
Loading