Mnemosyne MCP Server: Ingest Memory Detail
Level 3 (Detail) — Exact step-by-step flow of the async ingestion pipeline.
Concept
ingest_memory accepts text content and a timestamp, enqueues the work to a background goroutine, and returns immediately. The caller receives a queue confirmation but no success signal.
Handler Flow (MCP Layer)
File: internal/mcp/mcp.go — handleIngest() (lines 145-171)
- Parse
content(string) andtimestamp(RFC 3339) from tool arguments - If timestamp parsing fails, fall back to
time.Now() - Call
controller.IngestMemory(content, ts, traceID) - If the controller rejects (queue full), return error
- Otherwise return
{"status": "queued", "request_id": "<trace_id>"}
Async Worker Flow (Logic Layer)
File: internal/logic/logic.go
Producer: IngestMemory() (lines 55-70)
- Create an
IngestRequestwith content, timestamp, trace ID - Send to buffered channel
ingestQueue(capacity: 100) - If channel is full, return
"ingestion queue is full (Service Unavailable)"
Consumer: processIngestTask() (lines 72-130)
Executed by the background goroutine startWorkerPool() (line 42):
- Composite text: format as
DATE: <RFC3339>\n<content> - SHA-256 hash:
sha256.Sum256([]byte(composite)), hex-encoded - In-memory dedup check: look up hash in
cachemap (10-minute TTL):- If found and within TTL → skip (return early)
- Otherwise → store hash in cache with current timestamp
- DB dedup check:
db.CheckMemoryExists(hashStr):- If exists → call
db.InsertMemorywith a zero vector to updatelast_seen_at(ON CONFLICT), then return - If error → log and continue anyway (best-effort insert)
- If exists → call
- Embedding: call
embed.GetEmbedding(composite):- HTTP POST to Gemini API with 15-second timeout
- On failure → log error and return silently (no retry)
- DB insert:
db.InsertMemory(timestamp, composite, vector, hashStr):- On failure → log error and return silently
Deduplication Architecture
Incoming content
│
▼
SHA-256(content) ──► in-memory cache (10-min TTL, fast path)
│
│ cache miss
▼
DB CheckMemoryExists (ON CONFLICT DO UPDATE fallback)
│
│ not in DB
▼
Embed → Insert
The in-memory cache is a plain map[string]time.Time with a sync.RWMutex. Entries are never explicitly evicted after TTL expiry — they become stale but remain in memory until overwritten by a new entry with the same hash. Under sustained unique content, the map grows without bound.
Error Handling Weaknesses
| Failure Point | Behavior | Impact |
|---|---|---|
| Gemini API timeout (15s) | Log error, return | Memory silently lost |
| Gemini API non-200 | Log error with body, return | Memory silently lost |
| DB insert fails | Log error, return | Memory silently lost |
| DB dedup check fails | Log error, continue anyway | Duplicate insertion possible |
| Queue full (100 items) | Return error to caller | Caller knows, but no retry mechanism |
Code Paths
| File | Function | Line | Role |
|---|---|---|---|
internal/mcp/mcp.go | handleIngest() | 145 | MCP handler, parses args, returns queued status |
internal/logic/logic.go | IngestMemory() | 55 | Producer, sends to channel |
internal/logic/logic.go | startWorkerPool() | 47 | Background consumer loop |
internal/logic/logic.go | processIngestTask() | 72 | Full pipeline: hash → dedup → embed → insert |
internal/logic/logic.go | cache | 26 | In-memory dedup map (no eviction) |
Known Issues
- TD-002: dimension mismatch risk if fresh database
- Silent failure: no feedback to caller on embed/DB errors
- Unbounded cache growth under sustained load
- No retry mechanism for transient failures
See Also
- Parent topic: Async Ingestion
- Sibling details: Retrieve Memories Detail, Deployment Detail, Python Tools Detail
- Debt: Known Issues