neo4j-python/api/app.py

256 lines
8.2 KiB
Python

from fastapi import FastAPI
from fastapi import HTTPException
from pydantic import BaseModel
from neo4j import GraphDatabase
from typing import Optional
import os, requests
NEO4J_URI = os.getenv("NEO4J_URI", "neo4j://neo4j:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
app = FastAPI()
class Inp(BaseModel):
message: str
@app.on_event("startup")
def startup():
try:
driver.verify_connectivity()
except Exception:
# DB might not be ready yet; endpoints will retry as needed
pass
@app.on_event("shutdown")
def shutdown():
driver.close()
@app.get("/healthz")
def healthz():
return {"ok": True}
@app.post("/echo")
def echo(inp: Inp):
reply = "hi hier gpt5-mini" if inp.message.strip().lower() == "hello world" else inp.message
cypher = """
CREATE (m:Message {input: $input, reply: $reply, created_at: datetime()})
RETURN id(m) AS id
"""
records, _, _ = driver.execute_query(
cypher,
input=inp.message,
reply=reply,
database_="neo4j",
)
return {"reply": reply, "node_id": records[0]["id"]}
@app.get("/messages")
def list_messages(limit: int = 20):
cypher = """
MATCH (m:Message)
RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at
ORDER BY m.created_at DESC
LIMIT $limit
"""
try:
records, _, _ = driver.execute_query(cypher, limit=limit, database_="neo4j")
except Exception as e:
# Neo4j not yet available or auth/conn issue
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
items = []
for r in records:
created = r["created_at"]
items.append({
"id": r["id"],
"input": r["input"],
"reply": r["reply"],
"created_at": str(created) if created is not None else None,
})
return {"items": items}
class UpdateMessage(BaseModel):
input: Optional[str] = None
reply: Optional[str] = None
@app.get("/messages/{id}")
def get_message(id: int):
cypher = """
MATCH (m:Message) WHERE id(m) = $id
RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at
"""
try:
records, _, _ = driver.execute_query(cypher, id=id, database_="neo4j")
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
if not records:
raise HTTPException(status_code=404, detail="Message not found")
r = records[0]
return {
"id": r["id"],
"input": r["input"],
"reply": r["reply"],
"created_at": str(r["created_at"]) if r["created_at"] is not None else None,
}
@app.put("/messages/{id}")
def update_message(id: int, body: UpdateMessage):
if body.input is None and body.reply is None:
raise HTTPException(status_code=400, detail="No fields to update")
set_clauses = []
params = {"id": id}
if body.input is not None:
set_clauses.append("m.input = $input")
params["input"] = body.input
if body.reply is not None:
set_clauses.append("m.reply = $reply")
params["reply"] = body.reply
cypher = f"""
MATCH (m:Message) WHERE id(m) = $id
SET {", ".join(set_clauses)}
RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at
"""
try:
records, _, _ = driver.execute_query(cypher, **params, database_="neo4j")
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
if not records:
raise HTTPException(status_code=404, detail="Message not found")
r = records[0]
return {
"id": r["id"],
"input": r["input"],
"reply": r["reply"],
"created_at": str(r["created_at"]) if r["created_at"] is not None else None,
}
@app.delete("/messages/{id}")
def delete_message(id: int):
cypher = """
MATCH (m:Message) WHERE id(m) = $id
WITH m, properties(m) AS props
DETACH DELETE m
RETURN $id AS id, props AS deleted
"""
try:
records, _, _ = driver.execute_query(cypher, id=id, database_="neo4j")
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
# If no records returned, node didn't exist
if not records:
raise HTTPException(status_code=404, detail="Message not found")
r = records[0]
# props may be None if there was no match; we already guard above
return {
"id": r["id"],
"deleted": r["deleted"],
}
class ImportMessagesRequest(BaseModel):
filename: str = "messages.csv" # must live in /import
batch_size: int = 1000
@app.post("/import/messages")
def import_messages(req: ImportMessagesRequest):
fname = os.path.basename(req.filename)
if not fname:
raise HTTPException(status_code=400, detail="Invalid filename")
if not (1 <= req.batch_size <= 100_000):
raise HTTPException(status_code=400, detail="batch_size out of range (1..100000)")
file_url = f"file:///{fname}"
# IMPORTANT: all literal { } in the Cypher must be doubled inside an f-string
cypher = f"""
CALL apoc.periodic.iterate(
'CALL apoc.load.csv($url,{{header:true}}) YIELD map RETURN map',
'
MERGE (m:Message {{
input: map.input,
reply: map.reply,
created_at: datetime(map.created_at)
}})
ON CREATE SET m.imported_at = datetime()
ON MATCH SET m.last_seen = datetime()
',
{{batchSize: {req.batch_size}, parallel: false, params: $params}}
)
YIELD batches, total, committedOperations, failedOperations, failedBatches, retries, updateStatistics
RETURN batches, total, committedOperations, failedOperations, failedBatches, retries, updateStatistics
"""
try:
records, _, _ = driver.execute_query(
cypher,
params={"url": file_url},
database_="neo4j",
)
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j/APOC error: {type(e).__name__}: {e}")
r = records[0] if records else {}
stats = r.get("updateStatistics", {}) if isinstance(r.get("updateStatistics"), dict) else {}
created = stats.get("nodesCreated", 0)
props = stats.get("propertiesSet", 0)
return {
"file": fname,
"status": "created" if created > 0 else "up-to-date",
"batches": r.get("batches"),
"total": r.get("total"),
"created": created,
"propertiesSet": props,
"updateStatistics": stats,
}
class AiInp(BaseModel):
message: str
@app.post("/ai/echo")
def ai_echo(inp: AiInp):
msg = inp.message.strip()
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT")
api_ver = os.getenv("AZURE_OPENAI_API_VERSION", "2024-10-21")
if not all([endpoint, api_key, deployment]):
raise HTTPException(status_code=500, detail="Azure OpenAI env not configured")
url = f"{endpoint}/openai/deployments/{deployment}/chat/completions?api-version={api_ver}"
payload = {
"messages": [
{"role":"system","content":"You are a terse rephraser."},
{"role":"user","content": msg}
],
"temperature": 0.2
}
headers = {"Content-Type":"application/json","api-key": api_key}
try:
r = requests.post(url, json=payload, headers=headers, timeout=30)
r.raise_for_status()
data = r.json()
reply = data["choices"][0]["message"]["content"].strip()
except Exception as e:
raise HTTPException(status_code=502, detail=f"Azure OpenAI request failed: {type(e).__name__}: {e}")
# store in Neo4j (same as /echo)
cypher = """
CREATE (m:Message {input: $input, reply: $reply, created_at: datetime()})
RETURN id(m) AS id
"""
try:
records, _, _ = driver.execute_query(cypher, input=msg, reply=reply, database_="neo4j")
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
return {"reply": reply, "node_id": records[0]["id"]}