Building a lab report processing pipeline that transforms PDFs and images into structured clinical data is a project that combines document extraction, natural language processing, medical terminologies, and interoperability standards. This tutorial walks through step by step how to build a functional pipeline in Python that uses the MedExtract API for the heavy lifting of extraction and mapping, and adds custom logic for integration with EHR systems.
Pipeline architecture
The complete pipeline consists of five phases:
- Ingestion: receive the document (PDF or image) from an external source.
- Extraction: send the document to the MedExtract API for OCR extraction and LOINC mapping.
- Validation: verify result quality and handle low-confidence cases.
- FHIR transformation: convert validated results into FHIR R4 resources.
- Delivery: send FHIR resources to the destination system.
PDF/Image → MedExtract API → Validation → FHIR R4 → EHR System
Prerequisites
To follow this tutorial you need Python 3.10 or higher and the following dependencies:
pip install httpx fhir.resources pydantic python-dotenv
You also need a MedExtract API key. You can get a free one for development on the pricing page.
Create a .env file to store your key securely:
MEDEXTRACT_API_KEY=your_key_here
MEDEXTRACT_API_URL=https://api.medextract.io/v1
Phase 1: document ingestion
The first phase of the pipeline receives documents from a source and prepares them for processing. In a real environment, documents may arrive via email, from a document management system, a shared directory, or a message queue.
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
SUPPORTED_EXTENSIONS = {".pdf", ".png", ".jpg", ".jpeg", ".tiff", ".bmp"}
def discover_reports(input_dir: str) -> list[Path]:
"""Discover lab reports in an input directory."""
input_path = Path(input_dir)
reports = []
for file in input_path.iterdir():
if file.suffix.lower() in SUPPORTED_EXTENSIONS:
reports.append(file)
return sorted(reports, key=lambda f: f.stat().st_mtime)
def validate_file(file_path: Path) -> bool:
"""Verify that the file is valid for processing."""
if not file_path.exists():
return False
# Check maximum size (50 MB)
if file_path.stat().st_size > 50 * 1024 * 1024:
return False
if file_path.suffix.lower() not in SUPPORTED_EXTENSIONS:
return False
return True
Phase 2: extraction with the MedExtract API
The extraction phase sends each document to the MedExtract API and receives structured results. The API internally handles OCR, table detection, LOINC mapping, and plausibility validation.
import httpx
from dataclasses import dataclass
API_KEY = os.getenv("MEDEXTRACT_API_KEY")
API_URL = os.getenv("MEDEXTRACT_API_URL", "https://api.medextract.io/v1")
@dataclass
class LabResult:
test_name: str
loinc_code: str
loinc_display: str
value: float | str
unit: str
reference_range: str | None
confidence: float
flag: str | None # "H", "L", "N", None
@dataclass
class ExtractionResult:
patient_name: str | None
patient_id: str | None
lab_name: str | None
report_date: str | None
results: list[LabResult]
overall_confidence: float
async def extract_report(file_path: Path) -> ExtractionResult:
"""Send a report to the MedExtract API and parse the response."""
async with httpx.AsyncClient(timeout=60.0) as client:
with open(file_path, "rb") as f:
response = await client.post(
f"{API_URL}/extract",
headers={"Authorization": f"Bearer {API_KEY}"},
files={"file": (file_path.name, f, _mime_type(file_path))},
data={"output_format": "json", "include_confidence": "true"},
)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("results", []):
results.append(LabResult(
test_name=item["test_name"],
loinc_code=item["loinc"]["code"],
loinc_display=item["loinc"]["display"],
value=item["value"],
unit=item.get("unit", ""),
reference_range=item.get("reference_range"),
confidence=item.get("confidence", 0.0),
flag=item.get("flag"),
))
return ExtractionResult(
patient_name=data.get("patient", {}).get("name"),
patient_id=data.get("patient", {}).get("id"),
lab_name=data.get("laboratory", {}).get("name"),
report_date=data.get("report_date"),
results=results,
overall_confidence=data.get("overall_confidence", 0.0),
)
def _mime_type(path: Path) -> str:
ext = path.suffix.lower()
return {
".pdf": "application/pdf",
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".tiff": "image/tiff",
".bmp": "image/bmp",
}.get(ext, "application/octet-stream")
Phase 3: result validation
The validation phase checks extraction quality and manages problematic cases. Low-confidence results are flagged for human review rather than being automatically processed.
from enum import Enum
class ValidationStatus(Enum):
APPROVED = "approved"
REVIEW_REQUIRED = "review_required"
REJECTED = "rejected"
@dataclass
class ValidationResult:
extraction: ExtractionResult
status: ValidationStatus
issues: list[str]
approved_results: list[LabResult]
flagged_results: list[LabResult]
def validate_extraction(
extraction: ExtractionResult,
min_confidence: float = 0.85,
min_overall_confidence: float = 0.80,
) -> ValidationResult:
"""Validate extraction results and separate approved ones."""
issues = []
approved = []
flagged = []
# Check overall confidence
if extraction.overall_confidence < min_overall_confidence:
issues.append(
f"Low overall confidence: {extraction.overall_confidence:.2%}"
)
# Validate each result individually
for result in extraction.results:
result_issues = _validate_single_result(result, min_confidence)
if result_issues:
flagged.append(result)
issues.extend(result_issues)
else:
approved.append(result)
# Determine overall status
if not approved:
status = ValidationStatus.REJECTED
elif flagged:
status = ValidationStatus.REVIEW_REQUIRED
else:
status = ValidationStatus.APPROVED
return ValidationResult(
extraction=extraction,
status=status,
issues=issues,
approved_results=approved,
flagged_results=flagged,
)
def _validate_single_result(
result: LabResult, min_confidence: float
) -> list[str]:
"""Validate an individual result."""
issues = []
if result.confidence < min_confidence:
issues.append(
f"{result.test_name}: low confidence ({result.confidence:.2%})"
)
if not result.loinc_code:
issues.append(f"{result.test_name}: no LOINC code")
if result.value is None or result.value == "":
issues.append(f"{result.test_name}: no value")
return issues
Phase 4: FHIR R4 transformation
The transformation phase converts validated results into standard FHIR R4 resources. We generate a transaction Bundle containing a DiagnosticReport and its associated Observations.
from datetime import datetime
import uuid
import json
def build_fhir_bundle(validation: ValidationResult) -> dict:
"""Build a FHIR R4 Transaction Bundle from the results."""
extraction = validation.extraction
bundle_id = str(uuid.uuid4())
report_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat() + "Z"
observations = []
observation_refs = []
for result in validation.approved_results:
obs_id = str(uuid.uuid4())
observation_refs.append({"reference": f"Observation/{obs_id}"})
obs = _build_observation(
obs_id=obs_id,
result=result,
patient_ref=extraction.patient_id,
effective_date=extraction.report_date or now,
)
observations.append({
"fullUrl": f"urn:uuid:{obs_id}",
"resource": obs,
"request": {
"method": "POST",
"url": "Observation",
},
})
# DiagnosticReport grouping all Observations
diagnostic_report = {
"resourceType": "DiagnosticReport",
"id": report_id,
"status": "final",
"category": [{
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/v2-0074",
"code": "LAB",
"display": "Laboratory",
}]
}],
"code": {
"coding": [{
"system": "http://loinc.org",
"code": "11502-2",
"display": "Laboratory report",
}]
},
"effectiveDateTime": extraction.report_date or now,
"issued": now,
"result": observation_refs,
}
if extraction.patient_id:
diagnostic_report["subject"] = {
"reference": f"Patient/{extraction.patient_id}"
}
if extraction.lab_name:
diagnostic_report["performer"] = [{
"display": extraction.lab_name
}]
entries = [{
"fullUrl": f"urn:uuid:{report_id}",
"resource": diagnostic_report,
"request": {
"method": "POST",
"url": "DiagnosticReport",
},
}] + observations
return {
"resourceType": "Bundle",
"id": bundle_id,
"type": "transaction",
"timestamp": now,
"entry": entries,
}
def _build_observation(
obs_id: str,
result: LabResult,
patient_ref: str | None,
effective_date: str,
) -> dict:
"""Build an individual FHIR Observation resource."""
obs: dict = {
"resourceType": "Observation",
"id": obs_id,
"status": "final",
"category": [{
"coding": [{
"system": (
"http://terminology.hl7.org/CodeSystem"
"/observation-category"
),
"code": "laboratory",
}]
}],
"code": {
"coding": [{
"system": "http://loinc.org",
"code": result.loinc_code,
"display": result.loinc_display,
}],
"text": result.test_name,
},
"effectiveDateTime": effective_date,
}
if patient_ref:
obs["subject"] = {"reference": f"Patient/{patient_ref}"}
# Quantitative or textual value
if isinstance(result.value, (int, float)):
obs["valueQuantity"] = {
"value": result.value,
"unit": result.unit,
"system": "http://unitsofmeasure.org",
"code": result.unit,
}
else:
obs["valueString"] = str(result.value)
# Reference range
if result.reference_range:
ref_range = _parse_reference_range(result.reference_range, result.unit)
if ref_range:
obs["referenceRange"] = [ref_range]
# Interpretation (flag)
if result.flag:
interpretation_map = {
"H": ("H", "High"),
"L": ("L", "Low"),
"N": ("N", "Normal"),
}
if result.flag in interpretation_map:
code, display = interpretation_map[result.flag]
obs["interpretation"] = [{
"coding": [{
"system": (
"http://terminology.hl7.org/CodeSystem"
"/v3-ObservationInterpretation"
),
"code": code,
"display": display,
}]
}]
return obs
def _parse_reference_range(
range_str: str, unit: str
) -> dict | None:
"""Parse a reference range like '4.0 - 10.0' to FHIR format."""
import re
match = re.match(
r"([\d.]+)\s*[-–]\s*([\d.]+)", range_str
)
if not match:
return {"text": range_str}
low, high = float(match.group(1)), float(match.group(2))
return {
"low": {
"value": low,
"unit": unit,
"system": "http://unitsofmeasure.org",
},
"high": {
"value": high,
"unit": unit,
"system": "http://unitsofmeasure.org",
},
}
Phase 5: delivery to the destination system
The final phase sends the FHIR Bundle to the destination server. This could be a FHIR server, an EHR with a FHIR interface, or integration middleware.
async def submit_fhir_bundle(
bundle: dict,
fhir_server_url: str,
auth_token: str | None = None,
) -> dict:
"""Submit a FHIR Transaction Bundle to the destination server."""
headers = {"Content-Type": "application/fhir+json"}
if auth_token:
headers["Authorization"] = f"Bearer {auth_token}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
fhir_server_url,
json=bundle,
headers=headers,
)
response.raise_for_status()
return response.json()
Complete pipeline orchestrator
With all phases implemented, the orchestrator connects them into a coherent flow:
import asyncio
import logging
logger = logging.getLogger(__name__)
async def process_report(
file_path: Path,
fhir_server_url: str | None = None,
min_confidence: float = 0.85,
) -> dict:
"""Process a lab report end to end."""
logger.info(f"Processing: {file_path.name}")
# Phase 1: validate input file
if not validate_file(file_path):
raise ValueError(f"Invalid file: {file_path}")
# Phase 2: extraction with MedExtract
extraction = await extract_report(file_path)
logger.info(
f"Extracted {len(extraction.results)} results "
f"(confidence: {extraction.overall_confidence:.2%})"
)
# Phase 3: validation
validation = validate_extraction(extraction, min_confidence)
logger.info(
f"Validation: {validation.status.value} "
f"({len(validation.approved_results)} approved, "
f"{len(validation.flagged_results)} flagged)"
)
if validation.issues:
for issue in validation.issues:
logger.warning(f" Issue: {issue}")
# Phase 4: FHIR transformation
bundle = build_fhir_bundle(validation)
logger.info(
f"FHIR Bundle generated: {len(bundle['entry'])} resources"
)
# Phase 5: delivery (optional)
if fhir_server_url:
response = await submit_fhir_bundle(bundle, fhir_server_url)
logger.info("Bundle submitted to FHIR server")
return response
return bundle
async def process_batch(
input_dir: str,
fhir_server_url: str | None = None,
) -> list[dict]:
"""Process a batch of reports from a directory."""
reports = discover_reports(input_dir)
logger.info(f"Found {len(reports)} reports to process")
results = []
for report in reports:
try:
result = await process_report(report, fhir_server_url)
results.append({"file": report.name, "status": "success", "data": result})
except Exception as e:
logger.error(f"Error processing {report.name}: {e}")
results.append({"file": report.name, "status": "error", "error": str(e)})
return results
# Entry point
if __name__ == "__main__":
asyncio.run(process_batch("./lab_reports"))
Error handling and retries
In a production environment, the pipeline must handle network errors, API timeouts, and transient failures. A retry strategy with exponential backoff is essential:
import asyncio
from functools import wraps
def with_retry(max_attempts: int = 3, base_delay: float = 1.0):
"""Decorator for retries with exponential backoff."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except httpx.HTTPStatusError as e:
if e.response.status_code < 500:
raise # Don't retry client errors
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
logger.warning(
f"Attempt {attempt + 1} failed, "
f"retrying in {delay}s..."
)
await asyncio.sleep(delay)
except httpx.TimeoutException:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
return wrapper
return decorator
Logging and observability
For a production pipeline, observability is crucial. Every pipeline execution should record:
- The input file and its type
- Timing for each phase
- The number of extracted results and their average confidence
- Validation issues found
- The FHIR delivery result
import time
async def process_report_with_metrics(
file_path: Path,
fhir_server_url: str | None = None,
) -> dict:
"""Instrumented version of report processing."""
metrics = {"file": file_path.name, "phases": {}}
t0 = time.monotonic()
# Extraction phase
t1 = time.monotonic()
extraction = await extract_report(file_path)
metrics["phases"]["extraction"] = time.monotonic() - t1
metrics["results_count"] = len(extraction.results)
metrics["overall_confidence"] = extraction.overall_confidence
# Validation phase
t2 = time.monotonic()
validation = validate_extraction(extraction)
metrics["phases"]["validation"] = time.monotonic() - t2
metrics["approved_count"] = len(validation.approved_results)
metrics["flagged_count"] = len(validation.flagged_results)
# FHIR transformation phase
t3 = time.monotonic()
bundle = build_fhir_bundle(validation)
metrics["phases"]["fhir_transform"] = time.monotonic() - t3
metrics["total_time"] = time.monotonic() - t0
logger.info(f"Metrics: {json.dumps(metrics, indent=2)}")
return bundle
Security and compliance considerations
When building a pipeline that processes health data, GDPR compliance is mandatory. Key considerations include:
- Data minimization: do not store original documents longer than necessary for processing.
- Encryption in transit: all communication with the MedExtract API is over HTTPS/TLS.
- Logs without personal data: ensure log records do not contain patient identifying information.
- Record of processing activities: document what data is processed, under what legal basis, and for how long.
Next steps
This tutorial covers the fundamentals of a lab report processing pipeline. To take the pipeline to production, consider:
- Parallel processing: use
asyncio.gather()to process multiple reports simultaneously. - Message queue: integrate with RabbitMQ, Redis Streams, or SQS for asynchronous processing.
- Intermediate storage: save intermediate results in a database for traceability and reprocessing.
- Monitoring dashboard: build a panel showing processed volume, success rates, and cases pending review.
- SNOMED CT integration: add the clinical interpretation layer to the generated FHIR resources.
The MedExtract API greatly simplifies the most complex phase of the pipeline: extraction and mapping. By delegating advanced OCR, LOINC mapping, and plausibility validation to a specialized API, your team can focus on the integration logic specific to your organization.
Related Articles
FHIR R4 Integration Guide for EHR Systems
A practical overview of integrating FHIR R4 resources into EHR systems, focusing on DiagnosticReport and Observation bundles from lab data.
OCR for Medical Lab Reports: Complete Guide
Comprehensive guide to optical character recognition in medical lab reports: technologies, challenges, and best practices.
How to Map Spanish Lab Tests to LOINC Codes
The specific challenges of mapping Spanish lab test names to LOINC and techniques to solve them.