Source code for multimodal_fin.processing.pipeline

# multimodal_fin/processing/runner.py

from pathlib import Path
import torch
import time

from multimodal_fin.config import Settings
from multimodal_fin.utils.logging import get_logger
from multimodal_fin.utils.files import read_paths_csv, make_processed_path

from multimodal_fin.processing.preprocessing.preprocessor import Preprocessor
from multimodal_fin.processing.processor import Processor

logger = get_logger(__name__)


[docs] class ConferencePipeline: """ Orchestrates the full processing pipeline for a financial conference folder: Steps performed: 1. Preprocessing of the transcript and section segmentation. 2. Text classification and question-answer (Q&A) annotation. 3. Multimodal embedding extraction (text, audio, video). 4. Metadata enrichment using LLMs (topics, Q&A analysis, coherence). 5. Result persistence in CSV and enriched JSON format. """ def __init__(self, settings: Settings) -> None: """ Initialize the processor with given settings. Args: settings (Settings): Configuration parameters for processing. """ self.settings = settings self.device = settings.device or ("cuda" if torch.cuda.is_available() else "cpu") # Step 1: Text pipeline (sectioning + classification) self.preprocessor = Preprocessor( qa_model_names=settings.qa_models, monologue_model_names=settings.monologue_models, num_evaluations=settings.evals, verbose=settings.verbose ) # Step 2: Multimodal pipeline (embedding extraction + enrichment) self.multimodal_processor = Processor( sec10k_model_names=settings.sec10k_models, qa_analyzer_models=settings.qa_analyzer_models, audio_model_name=settings.audio_model, text_model_name=settings.text_model, video_model_name=settings.video_model, num_evaluations=settings.evals, device=self.device, verbose=settings.verbose )
[docs] def run(self) -> None: """ Run the processing pipeline on each conference folder path defined in the input CSV. """ for original_path in read_paths_csv(self.settings.input_csv_path): try: self._process_conference(Path(original_path)) except Exception as e: logger.error(f"Failed to process: {original_path}", exc_info=True)
def _process_conference(self, original: Path) -> None: """ Process a single conference folder end-to-end. Args: original (Path): Path to the original conference folder. Raises: FileNotFoundError: If transcript.csv or LEVEL_4.json is missing. """ logger.info(f"🔄 Starting processing for conference: {original}") start_time = time.perf_counter() # Create output directory processed_dir = make_processed_path(original) processed_dir.mkdir(parents=True, exist_ok=True) # Validate required input files transcript_csv = original / "transcript.csv" level4_json = original / "LEVEL_4.json" if not transcript_csv.exists() or not level4_json.exists(): raise FileNotFoundError(f"Required files not found in {original}") # Step 1: Preprocessing and CSV output output_csv = processed_dir / "transcript.csv" df = self.preprocessor.process_and_save( csv_path=str(transcript_csv), json_path=str(level4_json), output_csv_path=str(output_csv) ) logger.info(f"✅ Transcript classified and saved at: {output_csv}") # Step 2: Multimodal processing and JSON output output_json = processed_dir / "transcript.json" self.multimodal_processor.process_and_save( input_csv_path=str(output_csv), original_dir=original, output_json_path=str(output_json) ) logger.info(f"✅ Enriched JSON saved at: {output_json}") logger.info(f"⏱️ Finished processing conference: {original.name} in {(time.perf_counter() - start_time):.2f} seconds")