Mnemosyne MCP Server: Ingest Memory Detail

Level 3 (Detail) — Exact step-by-step flow of the async ingestion pipeline.

Concept

ingest_memory accepts text content and a timestamp, enqueues the work to a background goroutine, and returns immediately. The caller receives a queue confirmation but no success signal.

Handler Flow (MCP Layer)

File: internal/mcp/mcp.gohandleIngest() (lines 145-171)

  1. Parse content (string) and timestamp (RFC 3339) from tool arguments
  2. If timestamp parsing fails, fall back to time.Now()
  3. Call controller.IngestMemory(content, ts, traceID)
  4. If the controller rejects (queue full), return error
  5. Otherwise return {"status": "queued", "request_id": "<trace_id>"}

Async Worker Flow (Logic Layer)

File: internal/logic/logic.go

Producer: IngestMemory() (lines 55-70)

  1. Create an IngestRequest with content, timestamp, trace ID
  2. Send to buffered channel ingestQueue (capacity: 100)
  3. If channel is full, return "ingestion queue is full (Service Unavailable)"

Consumer: processIngestTask() (lines 72-130)

Executed by the background goroutine startWorkerPool() (line 42):

  1. Composite text: format as DATE: <RFC3339>\n<content>
  2. SHA-256 hash: sha256.Sum256([]byte(composite)), hex-encoded
  3. In-memory dedup check: look up hash in cache map (10-minute TTL):
    • If found and within TTL → skip (return early)
    • Otherwise → store hash in cache with current timestamp
  4. DB dedup check: db.CheckMemoryExists(hashStr):
    • If exists → call db.InsertMemory with a zero vector to update last_seen_at (ON CONFLICT), then return
    • If error → log and continue anyway (best-effort insert)
  5. Embedding: call embed.GetEmbedding(composite):
    • HTTP POST to Gemini API with 15-second timeout
    • On failure → log error and return silently (no retry)
  6. DB insert: db.InsertMemory(timestamp, composite, vector, hashStr):
    • On failure → log error and return silently

Deduplication Architecture

Incoming content
       │
       ▼
  SHA-256(content) ──► in-memory cache (10-min TTL, fast path)
       │
       │  cache miss
       ▼
  DB CheckMemoryExists (ON CONFLICT DO UPDATE fallback)
       │
       │  not in DB
       ▼
  Embed → Insert

The in-memory cache is a plain map[string]time.Time with a sync.RWMutex. Entries are never explicitly evicted after TTL expiry — they become stale but remain in memory until overwritten by a new entry with the same hash. Under sustained unique content, the map grows without bound.

Error Handling Weaknesses

Failure PointBehaviorImpact
Gemini API timeout (15s)Log error, returnMemory silently lost
Gemini API non-200Log error with body, returnMemory silently lost
DB insert failsLog error, returnMemory silently lost
DB dedup check failsLog error, continue anywayDuplicate insertion possible
Queue full (100 items)Return error to callerCaller knows, but no retry mechanism

Code Paths

FileFunctionLineRole
internal/mcp/mcp.gohandleIngest()145MCP handler, parses args, returns queued status
internal/logic/logic.goIngestMemory()55Producer, sends to channel
internal/logic/logic.gostartWorkerPool()47Background consumer loop
internal/logic/logic.goprocessIngestTask()72Full pipeline: hash → dedup → embed → insert
internal/logic/logic.gocache26In-memory dedup map (no eviction)

Known Issues

  • TD-002: dimension mismatch risk if fresh database
  • Silent failure: no feedback to caller on embed/DB errors
  • Unbounded cache growth under sustained load
  • No retry mechanism for transient failures

See Also