Construir un pipeline de procesamiento de informes de laboratorio que transforme PDFs e imágenes en datos clínicos estructurados es un proyecto que combina extracción de documentos, procesamiento de lenguaje natural, terminologías médicas y estándares de interoperabilidad. Este tutorial muestra paso a paso cómo construir un pipeline funcional en Python que utiliza la API de MedExtract para el trabajo pesado de extracción y mapeo, y añade lógica personalizada para la integración con sistemas HCE.
Arquitectura del pipeline
El pipeline completo consta de cinco fases:
- Ingesta: recibir el documento (PDF o imagen) desde una fuente externa.
- Extracción: enviar el documento a la API de MedExtract para la extracción OCR y el mapeo LOINC.
- Validación: verificar la calidad de los resultados y gestionar casos de baja confianza.
- Transformación FHIR: convertir los resultados validados en recursos FHIR R4.
- Entrega: enviar los recursos FHIR al sistema destino.
PDF/Imagen → API MedExtract → Validación → FHIR R4 → Sistema HCE
Requisitos previos
Para seguir este tutorial necesitas Python 3.10 o superior y las siguientes dependencias:
pip install httpx fhir.resources pydantic python-dotenv
También necesitas una clave de API de MedExtract. Puedes obtener una gratuita para desarrollo en la página de precios.
Crea un archivo .env para almacenar tu clave de forma segura:
MEDEXTRACT_API_KEY=tu_clave_aqui
MEDEXTRACT_API_URL=https://api.medextract.io/v1
Fase 1: ingesta de documentos
La primera fase del pipeline recibe documentos de una fuente y los prepara para el procesamiento. En un entorno real, los documentos pueden llegar por correo electrónico, desde un sistema de gestión documental, un directorio compartido o una cola de mensajes.
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
SUPPORTED_EXTENSIONS = {".pdf", ".png", ".jpg", ".jpeg", ".tiff", ".bmp"}
def discover_reports(input_dir: str) -> list[Path]:
"""Descubre informes de laboratorio en un directorio de entrada."""
input_path = Path(input_dir)
reports = []
for file in input_path.iterdir():
if file.suffix.lower() in SUPPORTED_EXTENSIONS:
reports.append(file)
return sorted(reports, key=lambda f: f.stat().st_mtime)
def validate_file(file_path: Path) -> bool:
"""Verifica que el archivo sea válido para procesamiento."""
if not file_path.exists():
return False
# Verificar tamaño máximo (50 MB)
if file_path.stat().st_size > 50 * 1024 * 1024:
return False
if file_path.suffix.lower() not in SUPPORTED_EXTENSIONS:
return False
return True
Fase 2: extracción con la API de MedExtract
La fase de extracción envía cada documento a la API de MedExtract y recibe los resultados estructurados. La API maneja internamente el OCR, la detección de tablas, el mapeo LOINC y la validación de plausibilidad.
import httpx
from dataclasses import dataclass
API_KEY = os.getenv("MEDEXTRACT_API_KEY")
API_URL = os.getenv("MEDEXTRACT_API_URL", "https://api.medextract.io/v1")
@dataclass
class LabResult:
test_name: str
loinc_code: str
loinc_display: str
value: float | str
unit: str
reference_range: str | None
confidence: float
flag: str | None # "H", "L", "N", None
@dataclass
class ExtractionResult:
patient_name: str | None
patient_id: str | None
lab_name: str | None
report_date: str | None
results: list[LabResult]
overall_confidence: float
async def extract_report(file_path: Path) -> ExtractionResult:
"""Envía un informe a la API de MedExtract y parsea la respuesta."""
async with httpx.AsyncClient(timeout=60.0) as client:
with open(file_path, "rb") as f:
response = await client.post(
f"{API_URL}/extract",
headers={"Authorization": f"Bearer {API_KEY}"},
files={"file": (file_path.name, f, _mime_type(file_path))},
data={"output_format": "json", "include_confidence": "true"},
)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("results", []):
results.append(LabResult(
test_name=item["test_name"],
loinc_code=item["loinc"]["code"],
loinc_display=item["loinc"]["display"],
value=item["value"],
unit=item.get("unit", ""),
reference_range=item.get("reference_range"),
confidence=item.get("confidence", 0.0),
flag=item.get("flag"),
))
return ExtractionResult(
patient_name=data.get("patient", {}).get("name"),
patient_id=data.get("patient", {}).get("id"),
lab_name=data.get("laboratory", {}).get("name"),
report_date=data.get("report_date"),
results=results,
overall_confidence=data.get("overall_confidence", 0.0),
)
def _mime_type(path: Path) -> str:
ext = path.suffix.lower()
return {
".pdf": "application/pdf",
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".tiff": "image/tiff",
".bmp": "image/bmp",
}.get(ext, "application/octet-stream")
Fase 3: validación de resultados
La fase de validación verifica la calidad de la extracción y gestiona los casos problemáticos. Los resultados con baja confianza se marcan para revisión humana en lugar de procesarse automáticamente.
from enum import Enum
class ValidationStatus(Enum):
APPROVED = "approved"
REVIEW_REQUIRED = "review_required"
REJECTED = "rejected"
@dataclass
class ValidationResult:
extraction: ExtractionResult
status: ValidationStatus
issues: list[str]
approved_results: list[LabResult]
flagged_results: list[LabResult]
def validate_extraction(
extraction: ExtractionResult,
min_confidence: float = 0.85,
min_overall_confidence: float = 0.80,
) -> ValidationResult:
"""Valida los resultados de la extracción y separa los aprobados."""
issues = []
approved = []
flagged = []
# Verificar confianza global
if extraction.overall_confidence < min_overall_confidence:
issues.append(
f"Confianza global baja: {extraction.overall_confidence:.2%}"
)
# Verificar cada resultado individualmente
for result in extraction.results:
result_issues = _validate_single_result(result, min_confidence)
if result_issues:
flagged.append(result)
issues.extend(result_issues)
else:
approved.append(result)
# Determinar estado global
if not approved:
status = ValidationStatus.REJECTED
elif flagged:
status = ValidationStatus.REVIEW_REQUIRED
else:
status = ValidationStatus.APPROVED
return ValidationResult(
extraction=extraction,
status=status,
issues=issues,
approved_results=approved,
flagged_results=flagged,
)
def _validate_single_result(
result: LabResult, min_confidence: float
) -> list[str]:
"""Valida un resultado individual."""
issues = []
if result.confidence < min_confidence:
issues.append(
f"{result.test_name}: confianza baja ({result.confidence:.2%})"
)
if not result.loinc_code:
issues.append(f"{result.test_name}: sin código LOINC")
if result.value is None or result.value == "":
issues.append(f"{result.test_name}: sin valor")
return issues
Fase 4: transformación a FHIR R4
La fase de transformación convierte los resultados validados en recursos FHIR R4 estándar. Generamos un Bundle de tipo transaction que contiene un DiagnosticReport y sus Observations asociadas.
from datetime import datetime
import uuid
import json
def build_fhir_bundle(validation: ValidationResult) -> dict:
"""Construye un Bundle FHIR R4 Transaction a partir de los resultados."""
extraction = validation.extraction
bundle_id = str(uuid.uuid4())
report_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat() + "Z"
observations = []
observation_refs = []
for result in validation.approved_results:
obs_id = str(uuid.uuid4())
observation_refs.append({"reference": f"Observation/{obs_id}"})
obs = _build_observation(
obs_id=obs_id,
result=result,
patient_ref=extraction.patient_id,
effective_date=extraction.report_date or now,
)
observations.append({
"fullUrl": f"urn:uuid:{obs_id}",
"resource": obs,
"request": {
"method": "POST",
"url": "Observation",
},
})
# DiagnosticReport que agrupa todas las Observations
diagnostic_report = {
"resourceType": "DiagnosticReport",
"id": report_id,
"status": "final",
"category": [{
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/v2-0074",
"code": "LAB",
"display": "Laboratory",
}]
}],
"code": {
"coding": [{
"system": "http://loinc.org",
"code": "11502-2",
"display": "Laboratory report",
}]
},
"effectiveDateTime": extraction.report_date or now,
"issued": now,
"result": observation_refs,
}
if extraction.patient_id:
diagnostic_report["subject"] = {
"reference": f"Patient/{extraction.patient_id}"
}
if extraction.lab_name:
diagnostic_report["performer"] = [{
"display": extraction.lab_name
}]
entries = [{
"fullUrl": f"urn:uuid:{report_id}",
"resource": diagnostic_report,
"request": {
"method": "POST",
"url": "DiagnosticReport",
},
}] + observations
return {
"resourceType": "Bundle",
"id": bundle_id,
"type": "transaction",
"timestamp": now,
"entry": entries,
}
def _build_observation(
obs_id: str,
result: LabResult,
patient_ref: str | None,
effective_date: str,
) -> dict:
"""Construye un recurso FHIR Observation individual."""
obs: dict = {
"resourceType": "Observation",
"id": obs_id,
"status": "final",
"category": [{
"coding": [{
"system": (
"http://terminology.hl7.org/CodeSystem"
"/observation-category"
),
"code": "laboratory",
}]
}],
"code": {
"coding": [{
"system": "http://loinc.org",
"code": result.loinc_code,
"display": result.loinc_display,
}],
"text": result.test_name,
},
"effectiveDateTime": effective_date,
}
if patient_ref:
obs["subject"] = {"reference": f"Patient/{patient_ref}"}
# Valor cuantitativo o textual
if isinstance(result.value, (int, float)):
obs["valueQuantity"] = {
"value": result.value,
"unit": result.unit,
"system": "http://unitsofmeasure.org",
"code": result.unit,
}
else:
obs["valueString"] = str(result.value)
# Rango de referencia
if result.reference_range:
ref_range = _parse_reference_range(result.reference_range, result.unit)
if ref_range:
obs["referenceRange"] = [ref_range]
# Interpretación (flag)
if result.flag:
interpretation_map = {
"H": ("H", "High"),
"L": ("L", "Low"),
"N": ("N", "Normal"),
}
if result.flag in interpretation_map:
code, display = interpretation_map[result.flag]
obs["interpretation"] = [{
"coding": [{
"system": (
"http://terminology.hl7.org/CodeSystem"
"/v3-ObservationInterpretation"
),
"code": code,
"display": display,
}]
}]
return obs
def _parse_reference_range(
range_str: str, unit: str
) -> dict | None:
"""Parsea un rango de referencia como '4.0 - 10.0' a FHIR."""
import re
match = re.match(
r"([\d.]+)\s*[-–]\s*([\d.]+)", range_str
)
if not match:
return {"text": range_str}
low, high = float(match.group(1)), float(match.group(2))
return {
"low": {
"value": low,
"unit": unit,
"system": "http://unitsofmeasure.org",
},
"high": {
"value": high,
"unit": unit,
"system": "http://unitsofmeasure.org",
},
}
Fase 5: entrega al sistema destino
La fase final envía el Bundle FHIR al servidor destino. Esto puede ser un servidor FHIR, un HCE con interfaz FHIR, o un middleware de integración.
async def submit_fhir_bundle(
bundle: dict,
fhir_server_url: str,
auth_token: str | None = None,
) -> dict:
"""Envía un Bundle FHIR Transaction al servidor destino."""
headers = {"Content-Type": "application/fhir+json"}
if auth_token:
headers["Authorization"] = f"Bearer {auth_token}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
fhir_server_url,
json=bundle,
headers=headers,
)
response.raise_for_status()
return response.json()
Orquestador completo del pipeline
Con todas las fases implementadas, el orquestador las conecta en un flujo coherente:
import asyncio
import logging
logger = logging.getLogger(__name__)
async def process_report(
file_path: Path,
fhir_server_url: str | None = None,
min_confidence: float = 0.85,
) -> dict:
"""Procesa un informe de laboratorio de extremo a extremo."""
logger.info(f"Procesando: {file_path.name}")
# Fase 1: validar archivo de entrada
if not validate_file(file_path):
raise ValueError(f"Archivo no válido: {file_path}")
# Fase 2: extracción con MedExtract
extraction = await extract_report(file_path)
logger.info(
f"Extraídos {len(extraction.results)} resultados "
f"(confianza: {extraction.overall_confidence:.2%})"
)
# Fase 3: validación
validation = validate_extraction(extraction, min_confidence)
logger.info(
f"Validación: {validation.status.value} "
f"({len(validation.approved_results)} aprobados, "
f"{len(validation.flagged_results)} marcados)"
)
if validation.issues:
for issue in validation.issues:
logger.warning(f" Problema: {issue}")
# Fase 4: transformación FHIR
bundle = build_fhir_bundle(validation)
logger.info(
f"Bundle FHIR generado: {len(bundle['entry'])} recursos"
)
# Fase 5: entrega (opcional)
if fhir_server_url:
response = await submit_fhir_bundle(bundle, fhir_server_url)
logger.info("Bundle enviado al servidor FHIR")
return response
return bundle
async def process_batch(
input_dir: str,
fhir_server_url: str | None = None,
) -> list[dict]:
"""Procesa un lote de informes desde un directorio."""
reports = discover_reports(input_dir)
logger.info(f"Encontrados {len(reports)} informes para procesar")
results = []
for report in reports:
try:
result = await process_report(report, fhir_server_url)
results.append({"file": report.name, "status": "success", "data": result})
except Exception as e:
logger.error(f"Error procesando {report.name}: {e}")
results.append({"file": report.name, "status": "error", "error": str(e)})
return results
# Punto de entrada
if __name__ == "__main__":
asyncio.run(process_batch("./lab_reports"))
Gestión de errores y reintentos
En un entorno de producción, el pipeline debe manejar errores de red, timeouts de API y fallos transitorios. Una estrategia de reintentos con backoff exponencial es esencial:
import asyncio
from functools import wraps
def with_retry(max_attempts: int = 3, base_delay: float = 1.0):
"""Decorador para reintentos con backoff exponencial."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except httpx.HTTPStatusError as e:
if e.response.status_code < 500:
raise # No reintentar errores del cliente
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
logger.warning(
f"Intento {attempt + 1} fallido, "
f"reintentando en {delay}s..."
)
await asyncio.sleep(delay)
except httpx.TimeoutException:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
return wrapper
return decorator
Logging y observabilidad
Para un pipeline de producción, la observabilidad es crucial. Cada ejecución del pipeline debe registrar:
- El archivo de entrada y su tipo
- Los tiempos de cada fase
- El número de resultados extraídos y su confianza media
- Los problemas de validación encontrados
- El resultado de la entrega FHIR
import time
async def process_report_with_metrics(
file_path: Path,
fhir_server_url: str | None = None,
) -> dict:
"""Versión instrumentada del procesamiento de informes."""
metrics = {"file": file_path.name, "phases": {}}
t0 = time.monotonic()
# Fase de extracción
t1 = time.monotonic()
extraction = await extract_report(file_path)
metrics["phases"]["extraction"] = time.monotonic() - t1
metrics["results_count"] = len(extraction.results)
metrics["overall_confidence"] = extraction.overall_confidence
# Fase de validación
t2 = time.monotonic()
validation = validate_extraction(extraction)
metrics["phases"]["validation"] = time.monotonic() - t2
metrics["approved_count"] = len(validation.approved_results)
metrics["flagged_count"] = len(validation.flagged_results)
# Fase de transformación FHIR
t3 = time.monotonic()
bundle = build_fhir_bundle(validation)
metrics["phases"]["fhir_transform"] = time.monotonic() - t3
metrics["total_time"] = time.monotonic() - t0
logger.info(f"Métricas: {json.dumps(metrics, indent=2)}")
return bundle
Consideraciones de seguridad y cumplimiento
Al construir un pipeline que procesa datos de salud, el cumplimiento del RGPD es obligatorio. Las consideraciones clave incluyen:
- Minimización de datos: no almacenar los documentos originales más tiempo del necesario para el procesamiento.
- Cifrado en tránsito: toda la comunicación con la API de MedExtract se realiza sobre HTTPS/TLS.
- Logs sin datos personales: asegurarse de que los registros de log no contengan datos identificativos del paciente.
- Registro de actividades de tratamiento: documentar qué datos se procesan, con qué base legal y durante cuánto tiempo.
Próximos pasos
Este tutorial cubre los fundamentos de un pipeline de procesamiento de informes de laboratorio. Para llevar el pipeline a producción, considera:
- Procesamiento paralelo: usar
asyncio.gather()para procesar múltiples informes simultáneamente. - Cola de mensajes: integrar con RabbitMQ, Redis Streams o SQS para procesamiento asíncrono.
- Almacenamiento intermedio: guardar los resultados intermedios en una base de datos para trazabilidad y re-procesamiento.
- Dashboard de monitorización: construir un panel que muestre el volumen procesado, las tasas de éxito y los casos pendientes de revisión.
- Integración con SNOMED CT: añadir la capa de interpretación clínica a los recursos FHIR generados.
La API de MedExtract simplifica enormemente la fase más compleja del pipeline: la extracción y el mapeo. Al delegar el OCR avanzado, el mapeo LOINC y la validación de plausibilidad a una API especializada, tu equipo puede centrarse en la lógica de integración específica de tu organización.
Artículos relacionados
Guía de integración FHIR R4 para sistemas HCE
Una visión práctica de la integración de recursos FHIR R4 en sistemas HCE, centrada en bundles DiagnosticReport y Observation de datos de laboratorio.
OCR para informes de laboratorio médicos: guía completa
Guía exhaustiva sobre extracción óptica de caracteres en informes médicos de laboratorio: tecnologías, desafíos y mejores prácticas.
Cómo mapear pruebas de laboratorio en español a códigos LOINC
Los retos específicos del mapeo de nombres de pruebas en español a LOINC y técnicas para resolverlos.