微信扫码
添加专属顾问
 
                        我要投稿
字节跳动Dolphin方案突破文档解析瓶颈,两阶段设计兼顾精度与效率,轻松应对复杂布局挑战。 核心内容: 1. 现有文档解析方案的两大技术路线及其局限性 2. Dolphin创新的分析-解析双阶段架构设计原理 3. 方案在表格/公式等复杂元素处理中的实战表现
 
                                现有的文档图像解析解决方案主要分为两大类:基于集成的方法和端到端的方法。
Dolphin (Document Image Parsing via Heterogeneous Anchor Prompting)采用了一种分析-解析范式(analyze-then-parse),将文档解析过程分解为两个阶段:
这种两阶段的设计既避免了传统集成方法中多模型协调的复杂性,又克服了端到端方法在复杂布局和长文档解析中的效率瓶颈,还能通过轻量级架构和并行解析机制实现优越的运行效率。
L = {l1, l2, ..., ln}通过这个链接可以免费试用
http://115.190.42.15:8888/dolphin/
Dolphin 提供了两个推理框架,支持两个解析粒度:
页面级解析:将整个文档图像解析为结构化的 JSON 和 Markdown 格式
元素级解析:解析单个文档元素(文本、表格、公式)
import argparse
import glob
import os
import cv2
import torch
from PIL import Image
from transformers import AutoProcessor, VisionEncoderDecoderModel
from utils.utils import *
class DOLPHIN:
    def __init__(self, model_id_or_path):
        """Initialize the Hugging Face model
        
        Args:
            model_id_or_path: Path to local model or Hugging Face model ID
        """
        # Load model from local path or Hugging Face hub
        self.processor = AutoProcessor.from_pretrained(model_id_or_path)
        self.model = VisionEncoderDecoderModel.from_pretrained(model_id_or_path)
        self.model.eval()
        
        # Set device and precision
        self.device = "cuda"if torch.cuda.is_available() else"cpu"
        self.model.to(self.device)
        self.model = self.model.half()  # Always use half precision by default
        
        # set tokenizer
        self.tokenizer = self.processor.tokenizer
        
    def chat(self, prompt, image):
        """Process an image or batch of images with the given prompt(s)
        
        Args:
            prompt: Text prompt or list of prompts to guide the model
            image: PIL Image or list of PIL Images to process
            
        Returns:
            Generated text or list of texts from the model
        """
        # Check if we're dealing with a batch
        is_batch = isinstance(image, list)
        
        if not is_batch:
            # Single image, wrap it in a list for consistent processing
            images = [image]
            prompts = [prompt]
        else:
            # Batch of images
            images = image
            prompts = prompt if isinstance(prompt, list) else [prompt] * len(images)
        
        # Prepare image
        batch_inputs = self.processor(images, return_tensors="pt", padding=True)
        batch_pixel_values = batch_inputs.pixel_values.half().to(self.device)
        
        # Prepare prompt
        prompts = [f"<s>{p} <Answer/>"for p in prompts]
        batch_prompt_inputs = self.tokenizer(
            prompts,
            add_special_tokens=False,
            return_tensors="pt"
        )
        batch_prompt_ids = batch_prompt_inputs.input_ids.to(self.device)
        batch_attention_mask = batch_prompt_inputs.attention_mask.to(self.device)
        
        # Generate text
        outputs = self.model.generate(
            pixel_values=batch_pixel_values,
            decoder_input_ids=batch_prompt_ids,
            decoder_attention_mask=batch_attention_mask,
            min_length=1,
            max_length=4096,
            pad_token_id=self.tokenizer.pad_token_id,
            eos_token_id=self.tokenizer.eos_token_id,
            use_cache=True,
            bad_words_ids=[[self.tokenizer.unk_token_id]],
            return_dict_in_generate=True,
            do_sample=False,
            num_beams=1,
            repetition_penalty=1.1
        )
        
        # Process output
        sequences = self.tokenizer.batch_decode(outputs.sequences, skip_special_tokens=False)
        
        # Clean prompt text from output
        results = []
        for i, sequence in enumerate(sequences):
            cleaned = sequence.replace(prompts[i], "").replace("<pad>", "").replace("</s>", "").strip()
            results.append(cleaned)
            
        # Return a single result for single image input
        if not is_batch:
            return results[0]
        return results
def process_page(image_path, model, save_dir, max_batch_size=None):
    """Parse document images with two stages"""
    # Stage 1: Page-level layout and reading order parsing
    pil_image = Image.open(image_path).convert("RGB")
    layout_output = model.chat("Parse the reading order of this document.", pil_image)
    # Stage 2: Element-level content parsing
    padded_image, dims = prepare_image(pil_image)
    recognition_results = process_elements(layout_output, padded_image, dims, model, max_batch_size)
    # Save outputs
    json_path = save_outputs(recognition_results, image_path, save_dir)
    return json_path, recognition_results
def process_elements(layout_results, padded_image, dims, model, max_batch_size=None):
    """Parse all document elements with parallel decoding"""
    layout_results = parse_layout_string(layout_results)
    # Store text and table elements separately
    text_elements = []  # Text elements
    table_elements = []  # Table elements
    figure_results = []  # Image elements (no processing needed)
    previous_box = None
    reading_order = 0
    # Collect elements to process and group by type
    for bbox, label in layout_results:
        try:
            # Adjust coordinates
            x1, y1, x2, y2, orig_x1, orig_y1, orig_x2, orig_y2, previous_box = process_coordinates(
                bbox, padded_image, dims, previous_box
            )
            # Crop and parse element
            cropped = padded_image[y1:y2, x1:x2]
            if cropped.size > 0:
                if label == "fig":
                    # For figure regions, add empty text result immediately
                    figure_results.append(
                        {
                            "label": label,
                            "bbox": [orig_x1, orig_y1, orig_x2, orig_y2],
                            "text": "",
                            "reading_order": reading_order,
                        }
                    )
                else:
                    # Prepare element for parsing
                    pil_crop = Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
                    element_info = {
                        "crop": pil_crop,
                        "label": label,
                        "bbox": [orig_x1, orig_y1, orig_x2, orig_y2],
                        "reading_order": reading_order,
                    }
                    
                    # Group by type
                    if label == "tab":
                        table_elements.append(element_info)
                    else:  # Text elements
                        text_elements.append(element_info)
            reading_order += 1
        except Exception as e:
            print(f"Error processing bbox with label {label}: {str(e)}")
            continue
    # Initialize results list
    recognition_results = figure_results.copy()
    
    # Process text elements (in batches)
    if text_elements:
        text_results = process_element_batch(text_elements, model, "Read text in the image.", max_batch_size)
        recognition_results.extend(text_results)
    
    # Process table elements (in batches)
    if table_elements:
        table_results = process_element_batch(table_elements, model, "Parse the table in the image.", max_batch_size)
        recognition_results.extend(table_results)
    # Sort elements by reading order
    recognition_results.sort(key=lambda x: x.get("reading_order", 0))
    return recognition_results
def process_element_batch(elements, model, prompt, max_batch_size=None):
    """Process elements of the same type in batches"""
    results = []
    
    # Determine batch size
    batch_size = len(elements)
    if max_batch_size is not None and max_batch_size > 0:
        batch_size = min(batch_size, max_batch_size)
    
    # Process in batches
    for i in range(0, len(elements), batch_size):
        batch_elements = elements[i:i+batch_size]
        crops_list = [elem["crop"] for elem in batch_elements]
        
        # Use the same prompt for all elements in the batch
        prompts_list = [prompt] * len(crops_list)
        
        # Batch inference
        batch_results = model.chat(prompts_list, crops_list)
        
        # Add results
        for j, result in enumerate(batch_results):
            elem = batch_elements[j]
            results.append({
                "label": elem["label"],
                "bbox": elem["bbox"],
                "text": result.strip(),
                "reading_order": elem["reading_order"],
            })
    
    return results
def main():
    parser = argparse.ArgumentParser(description="Document processing tool using DOLPHIN model")
    parser.add_argument("--model_path", default="./hf_model", help="Path to Hugging Face model")
    parser.add_argument("--input_path", type=str, default="./demo", help="Path to input image or directory of images")
    parser.add_argument(
        "--save_dir",
        type=str,
        default=None,
        help="Directory to save parsing results (default: same as input directory)",
    )
    parser.add_argument(
        "--max_batch_size",
        type=int,
        default=16,
        help="Maximum number of document elements to parse in a single batch (default: 16)",
    )
    args = parser.parse_args()
    # Load Model
    model = DOLPHIN(args.model_path)
    # Collect Document Images
    if os.path.isdir(args.input_path):
        image_files = []
        for ext in [".jpg", ".jpeg", ".png", ".JPG", ".JPEG", ".PNG"]:
            image_files.extend(glob.glob(os.path.join(args.input_path, f"*{ext}")))
        image_files = sorted(image_files)
    else:
        if not os.path.exists(args.input_path):
            raise FileNotFoundError(f"Input path {args.input_path} does not exist")
        image_files = [args.input_path]
    save_dir = args.save_dir or (
        args.input_path if os.path.isdir(args.input_path) else os.path.dirname(args.input_path)
    )
    setup_output_dirs(save_dir)
    total_samples = len(image_files)
    print(f"\nTotal samples to process: {total_samples}")
    # Process All Document Images
    for image_path in image_files:
        print(f"\nProcessing {image_path}")
        try:
            json_path, recognition_results = process_page(
                image_path=image_path,
                model=model,
                save_dir=save_dir,
                max_batch_size=args.max_batch_size,
            )
            print(f"Processing completed. Results saved to {save_dir}")
        except Exception as e:
            print(f"Error processing {image_path}: {str(e)}")
            continue
if __name__ == "__main__":
    main()
import argparse
import glob
import os
import torch
from PIL import Image
from transformers import AutoProcessor, VisionEncoderDecoderModel
from utils.utils import *
class DOLPHIN:
    def __init__(self, model_id_or_path):
        """Initialize the Hugging Face model
        
        Args:
            model_id_or_path: Path to local model or Hugging Face model ID
        """
        # Load model from local path or Hugging Face hub
        self.processor = AutoProcessor.from_pretrained(model_id_or_path)
        self.model = VisionEncoderDecoderModel.from_pretrained(model_id_or_path)
        self.model.eval()
        
        # Set device and precision
        self.device = "cuda"if torch.cuda.is_available() else"cpu"
        self.model.to(self.device)
        self.model = self.model.half()  # Always use half precision by default
        
        # set tokenizer
        self.tokenizer = self.processor.tokenizer
        
    def chat(self, prompt, image):
        """Process an image with the given prompt
        
        Args:
            prompt: Text prompt to guide the model
            image: PIL Image to process
            
        Returns:
            Generated text from the model
        """
        # Prepare image
        pixel_values = self.processor(image, return_tensors="pt").pixel_values
        pixel_values = pixel_values.half()
            
        # Prepare prompt
        prompt = f"<s>{prompt} <Answer/>"
        prompt_ids = self.tokenizer(
            prompt, 
            add_special_tokens=False, 
            return_tensors="pt"
        ).input_ids.to(self.device)
        
        decoder_attention_mask = torch.ones_like(prompt_ids)
        
        # Generate text
        outputs = self.model.generate(
            pixel_values=pixel_values.to(self.device),
            decoder_input_ids=prompt_ids,
            decoder_attention_mask=decoder_attention_mask,
            min_length=1,
            max_length=4096,
            pad_token_id=self.tokenizer.pad_token_id,
            eos_token_id=self.tokenizer.eos_token_id,
            use_cache=True,
            bad_words_ids=[[self.tokenizer.unk_token_id]],
            return_dict_in_generate=True,
            do_sample=False,
            num_beams=1,
        )
        
        # Process the output
        sequence = self.tokenizer.batch_decode(outputs.sequences, skip_special_tokens=False)[0]
        sequence = sequence.replace(prompt, "").replace("<pad>", "").replace("</s>", "").strip()
        
        return sequence
def process_element(image_path, model, element_type, save_dir=None):
    """Process a single element image (text, table, formula)
    
    Args:
        image_path: Path to the element image
        model: HFModel model instance
        element_type: Type of element ('text', 'table', 'formula')
        save_dir: Directory to save results (default: same as input directory)
        
    Returns:
        Parsed content of the element and recognition results
    """
    # Load and prepare image
    pil_image = Image.open(image_path).convert("RGB")
    pil_image = crop_margin(pil_image)
    
    # Select appropriate prompt based on element type
    if element_type == "table":
        prompt = "Parse the table in the image."
        label = "tab"
    elif element_type == "formula":
        prompt = "Read text in the image."
        label = "formula"
    else:  # Default to text
        prompt = "Read text in the image."
        label = "text"
    
    # Process the element
    result = model.chat(prompt, pil_image)
    
    # Create recognition result in the same format as the document parser
    recognition_result = [
        {
            "label": label,
            "text": result.strip(),
        }
    ]
    
    # Save results if save_dir is provided
    if save_dir:
        save_outputs(recognition_result, image_path, save_dir)
        print(f"Results saved to {save_dir}")
    
    return result, recognition_result
def main():
    parser = argparse.ArgumentParser(description="Element-level processing using DOLPHIN model")
    parser.add_argument("--model_path", default="./hf_model", help="Path to Hugging Face model")
    parser.add_argument("--input_path", type=str, required=True, help="Path to input image or directory of images")
    parser.add_argument(
        "--element_type",
        type=str,
        choices=["text", "table", "formula"],
        default="text",
        help="Type of element to process (text, table, formula)",
    )
    parser.add_argument(
        "--save_dir",
        type=str,
        default=None,
        help="Directory to save parsing results (default: same as input directory)",
    )
    parser.add_argument("--print_results", action="store_true", help="Print recognition results to console")
    args = parser.parse_args()
    
    # Load Model
    model = DOLPHIN(args.model_path)
    
    # Set save directory
    save_dir = args.save_dir or (
        args.input_path if os.path.isdir(args.input_path) else os.path.dirname(args.input_path)
    )
    setup_output_dirs(save_dir)
    
    # Collect Images
    if os.path.isdir(args.input_path):
        image_files = []
        for ext in [".jpg", ".jpeg", ".png", ".JPG", ".JPEG", ".PNG"]:
            image_files.extend(glob.glob(os.path.join(args.input_path, f"*{ext}")))
        image_files = sorted(image_files)
    else:
        if not os.path.exists(args.input_path):
            raise FileNotFoundError(f"Input path {args.input_path} does not exist")
        image_files = [args.input_path]
    
    total_samples = len(image_files)
    print(f"\nTotal samples to process: {total_samples}")
    
    # Process images one by one
    for image_path in image_files:
        print(f"\nProcessing {image_path}")
        try:
            result, recognition_result = process_element(
                image_path=image_path,
                model=model,
                element_type=args.element_type,
                save_dir=save_dir,
            )
            if args.print_results:
                print("\nRecognition result:")
                print(result)
                print("-" * 40)
        except Exception as e:
            print(f"Error processing {image_path}: {str(e)}")
            continue
if __name__ == "__main__":
    main()53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-10-31
Opera One升级内置AI 迎来智能助手新纪元
2025-10-31
LangExtract——大模型文本提炼工具
2025-10-31
用户测评|DeepSeek-OCR,你用了吗?
2025-10-31
从Palantir智能化技术路线看AI时代企业级架构平台的核心战略位置
2025-10-31
OpenAI 公开 Atlas 架构:为 Agent 重新发明浏览器
2025-10-31
Palantir 本体论模式:重塑企业 AI 应用的 “语义根基” 与产业启示
2025-10-31
树莓派这种“玩具级”设备,真能跑大模型吗?
2025-10-30
Cursor 2.0的一些有趣的新特性
 
            2025-08-21
2025-08-21
2025-08-19
2025-09-16
2025-10-02
2025-09-08
2025-09-17
2025-08-19
2025-09-29
2025-08-20