from fastapi import FastAPI from fastapi import HTTPException from pydantic import BaseModel from neo4j import GraphDatabase from typing import Optional import os, requests NEO4J_URI = os.getenv("NEO4J_URI", "neo4j://neo4j:7687") NEO4J_USER = os.getenv("NEO4J_USER", "neo4j") NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) app = FastAPI() class Inp(BaseModel): message: str @app.on_event("startup") def startup(): try: driver.verify_connectivity() except Exception: # DB might not be ready yet; endpoints will retry as needed pass @app.on_event("shutdown") def shutdown(): driver.close() @app.get("/healthz") def healthz(): return {"ok": True} @app.post("/echo") def echo(inp: Inp): reply = "hi hier gpt5-mini" if inp.message.strip().lower() == "hello world" else inp.message cypher = """ CREATE (m:Message {input: $input, reply: $reply, created_at: datetime()}) RETURN id(m) AS id """ records, _, _ = driver.execute_query( cypher, input=inp.message, reply=reply, database_="neo4j", ) return {"reply": reply, "node_id": records[0]["id"]} @app.get("/messages") def list_messages(limit: int = 20): cypher = """ MATCH (m:Message) RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at ORDER BY m.created_at DESC LIMIT $limit """ try: records, _, _ = driver.execute_query(cypher, limit=limit, database_="neo4j") except Exception as e: # Neo4j not yet available or auth/conn issue raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}") items = [] for r in records: created = r["created_at"] items.append({ "id": r["id"], "input": r["input"], "reply": r["reply"], "created_at": str(created) if created is not None else None, }) return {"items": items} class UpdateMessage(BaseModel): input: Optional[str] = None reply: Optional[str] = None @app.get("/messages/{id}") def get_message(id: int): cypher = """ MATCH (m:Message) WHERE id(m) = $id RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at """ try: records, _, _ = driver.execute_query(cypher, id=id, database_="neo4j") except Exception as e: raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}") if not records: raise HTTPException(status_code=404, detail="Message not found") r = records[0] return { "id": r["id"], "input": r["input"], "reply": r["reply"], "created_at": str(r["created_at"]) if r["created_at"] is not None else None, } @app.put("/messages/{id}") def update_message(id: int, body: UpdateMessage): if body.input is None and body.reply is None: raise HTTPException(status_code=400, detail="No fields to update") set_clauses = [] params = {"id": id} if body.input is not None: set_clauses.append("m.input = $input") params["input"] = body.input if body.reply is not None: set_clauses.append("m.reply = $reply") params["reply"] = body.reply cypher = f""" MATCH (m:Message) WHERE id(m) = $id SET {", ".join(set_clauses)} RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at """ try: records, _, _ = driver.execute_query(cypher, **params, database_="neo4j") except Exception as e: raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}") if not records: raise HTTPException(status_code=404, detail="Message not found") r = records[0] return { "id": r["id"], "input": r["input"], "reply": r["reply"], "created_at": str(r["created_at"]) if r["created_at"] is not None else None, } @app.delete("/messages/{id}") def delete_message(id: int): cypher = """ MATCH (m:Message) WHERE id(m) = $id WITH m, properties(m) AS props DETACH DELETE m RETURN $id AS id, props AS deleted """ try: records, _, _ = driver.execute_query(cypher, id=id, database_="neo4j") except Exception as e: raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}") # If no records returned, node didn't exist if not records: raise HTTPException(status_code=404, detail="Message not found") r = records[0] # props may be None if there was no match; we already guard above return { "id": r["id"], "deleted": r["deleted"], } class ImportMessagesRequest(BaseModel): filename: str = "messages.csv" # must live in /import batch_size: int = 1000 @app.post("/import/messages") def import_messages(req: ImportMessagesRequest): fname = os.path.basename(req.filename) if not fname: raise HTTPException(status_code=400, detail="Invalid filename") if not (1 <= req.batch_size <= 100_000): raise HTTPException(status_code=400, detail="batch_size out of range (1..100000)") file_url = f"file:///{fname}" # IMPORTANT: all literal { } in the Cypher must be doubled inside an f-string cypher = f""" CALL apoc.periodic.iterate( 'CALL apoc.load.csv($url,{{header:true}}) YIELD map RETURN map', ' MERGE (m:Message {{ input: map.input, reply: map.reply, created_at: datetime(map.created_at) }}) ON CREATE SET m.imported_at = datetime() ON MATCH SET m.last_seen = datetime() ', {{batchSize: {req.batch_size}, parallel: false, params: $params}} ) YIELD batches, total, committedOperations, failedOperations, failedBatches, retries, updateStatistics RETURN batches, total, committedOperations, failedOperations, failedBatches, retries, updateStatistics """ try: records, _, _ = driver.execute_query( cypher, params={"url": file_url}, database_="neo4j", ) except Exception as e: raise HTTPException(status_code=503, detail=f"Neo4j/APOC error: {type(e).__name__}: {e}") r = records[0] if records else {} stats = r.get("updateStatistics", {}) if isinstance(r.get("updateStatistics"), dict) else {} created = stats.get("nodesCreated", 0) props = stats.get("propertiesSet", 0) return { "file": fname, "status": "created" if created > 0 else "up-to-date", "batches": r.get("batches"), "total": r.get("total"), "created": created, "propertiesSet": props, "updateStatistics": stats, } class AiInp(BaseModel): message: str @app.post("/ai/echo") def ai_echo(inp: AiInp): msg = inp.message.strip() endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") api_key = os.getenv("AZURE_OPENAI_API_KEY") deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT") api_ver = os.getenv("AZURE_OPENAI_API_VERSION", "2024-10-21") if not all([endpoint, api_key, deployment]): raise HTTPException(status_code=500, detail="Azure OpenAI env not configured") url = f"{endpoint}/openai/deployments/{deployment}/chat/completions?api-version={api_ver}" payload = { "messages": [ {"role":"system","content":"You are a terse rephraser."}, {"role":"user","content": msg} ], "temperature": 0.2 } headers = {"Content-Type":"application/json","api-key": api_key} try: r = requests.post(url, json=payload, headers=headers, timeout=30) r.raise_for_status() data = r.json() reply = data["choices"][0]["message"]["content"].strip() except Exception as e: raise HTTPException(status_code=502, detail=f"Azure OpenAI request failed: {type(e).__name__}: {e}") # store in Neo4j (same as /echo) cypher = """ CREATE (m:Message {input: $input, reply: $reply, created_at: datetime()}) RETURN id(m) AS id """ try: records, _, _ = driver.execute_query(cypher, input=msg, reply=reply, database_="neo4j") except Exception as e: raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}") return {"reply": reply, "node_id": records[0]["id"]}