Data Ingestion Recipe Parsing Workflows
CSV Bulk Import Automation
Multi-unit restaurant operators require deterministic data ingestion to maintain accurate food cost analytics across distributed locations. When culinary teams distribute weekly recipe updates, ingredient substitutions, or seasonal menu changes via spreadsheet exports, the downstream costing engine must parse, validate, and normalize those files before any margin calculations execute. A structured CSV bulk import workflow eliminates manual reconciliation, prevents unit conversion drift, and ensures that cost-per-portion metrics remain synchronized across enterprise resource planning stacks. This module operates as a critical subsystem within the broader Data Ingestion & Recipe Parsing Workflows architecture, anchoring enterprise-wide costing accuracy through strict, repeatable execution paths.
1. Schema Enforcement & Validation Contracts
Raw CSV exports from culinary management platforms or distributor portals rarely align perfectly with internal costing databases. Missing headers, inconsistent decimal separators, locale-specific number formatting, and unstandardized yield percentages introduce silent calculation errors that compound across thousands of SKUs. To mitigate this, the ingestion layer implements a Pydantic-based validation contract that rejects malformed rows before they enter the transformation stage.
from pydantic import BaseModel, Field, field_validator
from decimal import Decimal
from typing import Optional
class RecipeLineItem(BaseModel):
location_id: str = Field(..., pattern=r"^[A-Z]{3}-\d{4}$")
sku: str = Field(..., min_length=4, max_length=20)
ingredient_name: str
raw_quantity: Decimal = Field(..., gt=0)
unit_of_measure: str = Field(..., pattern=r"^(lbs|kg|oz|g|ml|l|case|ea)$")
vendor_cost: Decimal = Field(..., ge=0)
yield_pct: Optional[Decimal] = Field(default=100.0, ge=0, le=100)
@field_validator("vendor_cost", "raw_quantity", mode="before")
@classmethod
def normalize_decimals(cls, v: str | float) -> Decimal:
if isinstance(v, str):
v = v.replace(",", ".")
return Decimal(str(v))
This validation layer acts as a hard gate. Rows failing type coercion, regex constraints, or logical bounds are quarantined to a structured error log rather than silently coerced. Deterministic rejection prevents downstream food cost inflation and ensures culinary teams receive precise line-item feedback for remediation.
2. Deterministic Unit Normalization
Once schema validation passes, the pipeline must resolve unit discrepancies across multi-unit operations. A single ingredient might appear in the source CSV as lbs, kg, oz, or case, while the costing engine expects a base unit like grams or milliliters. Implementing a deterministic unit normalization script during the import phase prevents downstream margin distortion.
When a regional manager uploads a vendor price sheet containing bulk packaging weights, the automation layer cross-references a centralized conversion matrix before committing the data to the analytics warehouse. This normalization step is equally critical when reconciling structured spreadsheet data against outputs from PDF Recipe Extraction Pipelines, where OCR-derived measurements often require additional semantic mapping to align with standardized recipe cards.
from decimal import Decimal
from typing import Optional
# Centralized conversion matrix (grams/ml base)
UNIT_CONVERSIONS = {
"g": 1.0, "kg": 1000.0, "oz": 28.3495, "lbs": 453.592,
"ml": 1.0, "l": 1000.0, "ea": 1.0, # 'case' requires dynamic lookup
}
def normalize_to_base_unit(quantity: Decimal, uom: str, case_multiplier: Optional[int] = None) -> Decimal:
if uom == "case":
if not case_multiplier or case_multiplier <= 0:
raise ValueError("Case UOM requires valid item multiplier")
return quantity * Decimal(case_multiplier)
factor = UNIT_CONVERSIONS.get(uom.lower())
if not factor:
raise ValueError(f"Unsupported unit: {uom}")
return quantity * Decimal(str(factor))
Normalization logic should be vectorized during bulk execution to avoid Python-level loop overhead. The conversion matrix must be version-controlled and audited quarterly to reflect distributor packaging changes or regional measurement standards.
3. Memory-Efficient Chunked Processing & Async Execution
The actual bulk processing routine must leverage chunked reading and asynchronous batch execution to handle files exceeding 50,000 rows without exhausting memory. Using polars for streaming CSV parsing allows the pipeline to maintain a consistent memory footprint while applying row-level transformations. Each chunk undergoes deduplication, cost mapping against the latest vendor price index, and yield-adjusted portioning logic.
import polars as pl
from typing import AsyncIterator
# `persist_to_staging` is defined in the next block.
async def process_csv_stream(file_path: str, chunk_size: int = 10_000) -> AsyncIterator[pl.DataFrame]:
# Polars lazy evaluation + streaming engine for memory-safe parsing
lazy_df = pl.scan_csv(file_path)
# Apply schema casting, filtering, and unit normalization in the query plan
normalized = lazy_df.with_columns([
pl.col("raw_quantity").cast(pl.Float64),
pl.col("vendor_cost").cast(pl.Float64),
pl.col("yield_pct").fill_null(100.0).cast(pl.Float64)
]).filter(
(pl.col("raw_quantity") > 0) & (pl.col("vendor_cost") >= 0)
)
# Stream chunks asynchronously
for chunk in normalized.collect(streaming=True).iter_slices(n_rows=chunk_size):
yield chunk
async def batch_transform_chunks(file_path: str):
async for chunk in process_csv_stream(file_path):
# Apply deduplication, yield adjustment, and vendor cost mapping
transformed = chunk.with_columns([
(pl.col("raw_quantity") * (pl.col("yield_pct") / 100.0)).alias("net_quantity"),
(pl.col("vendor_cost") / pl.col("net_quantity")).alias("cost_per_unit")
])
await persist_to_staging(transformed)
Streaming execution aligns with Async Batch Processing Workflows patterns, enabling concurrent I/O operations for vendor API lookups and database writes without blocking the main event loop.
4. Atomic Staging & Transactional Insertion
After transformation, the validated dataset is staged for atomic insertion into the menu engineering database. This staging pattern ensures that POS API Polling Strategies and live sales telemetry can immediately reference updated cost baselines without encountering partial writes or race conditions. The pipeline wraps the final commit in a database transaction, rolling back entirely if any row fails constraint validation or unique key conflicts arise.
import asyncpg
import polars as pl
from contextlib import asynccontextmanager
@asynccontextmanager
async def atomic_db_transaction(dsn: str):
conn = await asyncpg.connect(dsn)
try:
await conn.execute("BEGIN")
yield conn
await conn.execute("COMMIT")
except Exception:
await conn.execute("ROLLBACK")
raise
finally:
await conn.close()
async def persist_to_staging(df: pl.DataFrame):
rows = [
(r["location_id"], r["sku"], r["net_quantity"], r["cost_per_unit"])
for r in df.to_dicts()
]
async with atomic_db_transaction("postgresql://user:pass@host/menu_engineering") as conn:
await conn.executemany(
"""
INSERT INTO recipe_cost_staging (location_id, sku, net_quantity, cost_per_unit, import_ts)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (location_id, sku) DO UPDATE SET
net_quantity = EXCLUDED.net_quantity,
cost_per_unit = EXCLUDED.cost_per_unit,
import_ts = NOW()
""",
rows,
)
The upsert pattern guarantees idempotency. Re-running the same import file will overwrite stale costs without creating duplicate rows or triggering cascading margin recalculations prematurely.
5. Operational Guardrails & Production Scaling
Production environments require explicit error handling, retry logic, and memory optimization. Implement exponential backoff for transient database failures, log malformed rows to a structured quarantine table for culinary team review, and monitor chunk processing latency via OpenTelemetry metrics. This aligns with Automating Weekly CSV Menu Updates scheduling patterns, ensuring predictable execution windows during off-peak hours when POS traffic is minimal.
Key operational controls:
- Error Handling & Retry Logic: Wrap external vendor lookups and DB commits in circuit-breaker patterns. Retry only on idempotent-safe operations (HTTP 429, 503, DB connection drops).
- Production Scaling & Memory Optimization: Cap concurrent chunk workers based on available RAM. Use
polarsstreaming orpandaschunksizeto prevent OOM kills on legacy infrastructure. - Auditability: Hash each imported CSV, log row counts, validation failure rates, and final committed SKUs. Maintain a 90-day retention policy for raw imports to support financial reconciliation.
Deterministic CSV ingestion transforms fragmented culinary spreadsheets into a single source of truth. By enforcing strict validation, normalizing units at scale, and committing changes atomically, multi-unit operators eliminate costing drift and maintain margin visibility across every location in the portfolio.