Ingestion
IR/text → stores. Structured fact ingestion with a batch manifest ledger, narrative chunk ingestion, and an SME human review-queue state machine — all idempotent.
The ingestion domain takes extraction output (StyledGrid IR, narrative text) and writes it into the stores. It has three lanes: structured (numeric facts), narrative (document chunks), and review (the human-in-the-loop queue that catches everything the deterministic path is unsure about). Every lane is idempotent — re-running an ingest must not double-write.
Package: src/ragspine/ingestion/. Contract: src/ragspine/ingestion/CLAUDE.md.
Layout
Structured ingestion
structured/ingestion.py orchestrates one document end to end:
extract → normalize (glossary) → color tags → upsert. There are two entry points:
ingest_excel(path, store, registry, queue, *, dry_run=False, extractor_version="xlsx_styled@1", manifest=None, batch_id=None)— xlsx-only.ingest_file(path, store, registry, queue, *, dry_run=False, manifest=None, batch_id=None, valid_as_of=None, grid_extractor=None)— the unified multi-format dispatcher: it routes by suffix to the right extractor (xlsx / xlsm / pptx, or PDF via the router) and reuses the shared ingest logic.
Both return an IngestReport (a counts object, not raw numbers):
Prop
Type
Inside, each grid is color-tagged via the active mapping (apply_mapping), turned into
Fact objects, and written with
store.upsert_facts(...). Facts are stamped with their lineage —
source_doc_id, source_locator, source_file_hash, extractor_version,
mapping_version, and review_status=REVIEW_AUTO_APPROVED.
from ragspine.ingestion.structured.ingestion import ingest_file
report = ingest_file("report.xlsx", store, registry, queue)
print(report.n_facts_ingested, report.n_enqueued_review)When ingestion enqueues review instead
The structured path is conservative. A file routes to the review queue rather than auto-ingesting when:
- no grid resolves an entity yet the file contains extractable data — reason "实体无法解析,需人工指认" (entity unresolvable, needs human identification);
- the file has colored cells but the scope has no active color mapping — reason "颜色映射未确认,需 SME 确认图例" (color mapping unconfirmed, SME must confirm the legend);
- a PDF looks like a PowerPoint export (ask for the pptx source) or is a scan needing OCR.
With dry_run=True, extraction and reporting run fully but n_facts_ingested and
n_enqueued_review stay 0 — the store and queue are untouched.
The batch manifest ledger
structured/ingestion_manifest.py records what ran. ManifestStore (sqlite,
manifest_batch + manifest_input tables) opens a batch, logs each input file, and closes
the batch with a final status and duration. Each batch is a ManifestRecord:
| field | meaning |
|---|---|
batch_id | caller-supplied, or auto batch-{uuid4 hex[:12]} |
status | running → done / failed |
inputs | per-file {path, hash, format, …} rows |
n_facts · n_warnings · n_failed | aggregate counts |
duration_s · failures | timing + per-file errors |
API: open_batch(batch_id=None), record_input(...), close_batch(batch_id, status="done"),
get_batch(id), list_batches(). Two observability helpers ride alongside:
compute_metrics(manifest_store, queue, store) (fact totals, review backlog, confidence
buckets, warning rate) and list_versions(store, registry) (active extractor versions +
color mappings).
Where idempotency actually lives. The contract calls the manifest "the guard," and it is the
audit ledger of every run (path / hash / counts / failures). But the literal no-double-write
guarantee comes from the fact store's unique-key upsert (store.upsert_facts, keyed on
dim_key): re-running a batch re-extracts and
re-upserts, and the unique key keeps the store from growing. batch_id is not content-derived
— it is caller-supplied or a random uuid.
Narrative ingestion
The narrative lane is two modules with a clean split:
Pure, deterministic text extraction — zero OCR, zero LLM, no store. extract_narrative(path)
dispatches by suffix ({.pptx, .pdf}) to extract_pptx_narrative / extract_pdf_narrative
and returns a NarrativeDoc: doc_id, file_hash, a list of NarrativeSegment
(text + source_locator), skipped_pages, and warnings. Locators look like
'slide={N},frame={M}', 'slide={N},notes', or 'page={N}'. NarrativeDoc.to_text()
joins segments with blank lines — that string is the chunking input contract.
Batch orchestration: extract → chunk → write chunk store, idempotent and dry-runnable.
ingest_narrative(inputs, store, *, meta_by_doc=None, dry_run=False) accepts a folder, a
file, or a list, and returns a NarrativeIngestReport (a list of per-file FileReport,
plus counts()). Each file is chunked via chunk_document(doc.to_text(), doc_meta) and
written with store.replace_doc_chunks(...) into the
ChunkStore.
Per-file status is one of ingested / skipped / failed / no_text. Idempotency uses a
narrative_doc table (doc_id → file_hash) in the same sqlite DB: if the recorded hash
matches the file, the file is skipped without re-extracting. meta_by_doc keys are
validated against ALLOWED_META_KEYS (title, topic, entity, geography, period,
language, sensitivity, valid_as_of) — unknown fields raise ValueError. The period is
taken from metadata or inferred from the filename via period_from_filename(name).
Sensitivity is applied here: an explicit meta["sensitivity"] wins, otherwise
classify_sensitivity(...) from common runs. This is what
later lets retrieval enforce RESTRICTED isolation.
The review queue
review/review_queue.py is the SME human-review state machine that catches everything
the deterministic path is unsure about — low-confidence OCR, cross-channel conflicts,
unconfirmed color mappings, unresolvable entities. It is sqlite-backed (same DB as the fact
store, different tables: review_item + an append-only review_audit).
The state machine has three string states and two transitions:
pending ──approve──▶ approved (terminal)
pending ──reject───▶ rejected (terminal)STATUS_PENDING = "pending", STATUS_APPROVED = "approved", STATUS_REJECTED = "rejected".
Approved and rejected are terminal — re-processing a terminal item (or acting on a
non-existent one) raises IllegalTransitionError.
A ReviewItem carries reason, payload (JSON), locator, priority (default 100,
lower = reviewed sooner), id, status, actor, note, and corrected_value.
API:
| method | effect |
|---|---|
enqueue(reason, payload, locator, priority=100) -> int | insert a pending item + write an enqueue audit row |
list_pending() -> list[ReviewItem] | pending items, ordered priority ASC, id ASC |
approve(item_id, actor, note=None) | → approved |
reject(item_id, actor, note=None, corrected_value=None) | → rejected (optionally record a correction) |
get(item_id) · audit_trail(item_id) | fetch item / append-only AuditRecord history |
Every transition appends an AuditRecord (enqueue / approve / reject) — the trail is
append-only and never mutated, so review history is fully reconstructable. See the
review queue concept.
Invariants this domain upholds
- Idempotent ingestion — structured re-runs upsert on
dim_key; narrative re-runs skip on matchingfile_hash. Re-ingesting never doubles the store. - Provenance preserved — every fact and chunk keeps its
source_doc_id+ locator. - Conservative auto-ingest — anything ambiguous (entity / mapping / confidence / conflict) goes to a human, not silently into the store.
- Append-only audit — review transitions are recorded, never overwritten.
Related
Extraction
Documents → a frozen StyledGrid IR. Style- and color-aware xlsx/pptx/pdf extractors, per-page PDF routing, a versioned color-semantics registry, and dual-channel cross-checking into a review queue.
Storage
The sqlite persistence layer — a numeric fact store and a narrative chunk store, both with full source lineage. The Fact dataclass, the dim_key upsert key, and deterministic found/not-found reads.