import os
import json
from dataclasses import dataclass
from typing import List
import pandas as pd
from multimodal_fin.processing.preprocessing.ensemble_classifier import EnsembleInterventionClassifier
from multimodal_fin.utils.logging import get_logger
logger = get_logger(__name__)
[docs]
@dataclass
class Preprocessor:
"""
Handles the full transcript preprocessing pipeline for a financial conference:
Steps:
1. Section segmentation between 'prepared_remarks' and 'q_a'.
2. Classification using ensemble of Q&A and monologue classifiers.
3. Annotation of question-answer pairs.
"""
qa_model_names: List[str]
monologue_model_names: List[str]
num_evaluations: int = 5
verbose: int = 1
section_col: str = "Conf_Section"
text_col: str = "text"
qna_key: str = "questions_and_answers"
def __post_init__(self):
"""Initializes the ensemble classifier used for intervention classification."""
self.classifier = EnsembleInterventionClassifier(
qa_model_names=self.qa_model_names,
monologue_model_names=self.monologue_model_names,
NUM_EVALUATIONS=self.num_evaluations,
verbose=self.verbose
)
[docs]
def divide_conference(self, csv_path: str, json_path: str) -> pd.DataFrame:
"""
Assigns sections ('prepared_remarks' or 'q_a') to each row based on intro location.
Args:
csv_path: Path to transcript CSV.
json_path: Path to LEVEL_4.json with Q&A intro.
Returns:
DataFrame with new section column.
"""
df = pd.read_csv(csv_path)
intro = self.extract_qna_intro(json_path)
if intro and self.text_col in df.columns:
mask = df[self.text_col].str.contains(intro, case=False, na=False)
if mask.any():
start = mask.idxmax()
df[self.section_col] = [
'prepared_remarks' if i < start else 'q_a' for i in df.index
]
logger.info("Q&A section detected. Split applied.")
else:
df[self.section_col] = 'prepared_remarks'
logger.info("Q&A intro not found. Defaulted to 'prepared_remarks'.")
else:
df[self.section_col] = 'prepared_remarks'
logger.info("No intro extracted. Entire transcript set as 'prepared_remarks'.")
return df
[docs]
def process(self, csv_path: str, json_path: str) -> pd.DataFrame:
"""
Executes sectioning, classification, and annotation pipeline.
Args:
csv_path: Path to transcript CSV.
json_path: Path to LEVEL_4.json
Returns:
Annotated and classified DataFrame.
"""
df = self.divide_conference(csv_path, json_path)
df = self.classifier.classify_dataframe(df)
df = self.classifier.annotate_question_answer_pairs(df)
return df
[docs]
def process_and_save(self, csv_path: str, json_path: str, output_csv_path: str) -> pd.DataFrame:
"""
Runs the preprocessing pipeline and saves the final DataFrame to CSV.
Args:
csv_path: Input transcript CSV.
json_path: LEVEL_4.json.
output_csv_path: Path to save the processed CSV.
Returns:
Final processed DataFrame.
"""
df = self.process(csv_path, json_path)
df.to_csv(output_csv_path, index=False)
logger.info(f"Processed transcript saved to {output_csv_path}")
return df