Initial: Neo4j + FastAPI + APOC import + Azure OpenAI endpoint

This commit is contained in:
johannes 2025-08-28 12:58:41 +03:00
commit 6641a0c5bb
6 changed files with 382 additions and 0 deletions

21
.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
# secrets & local config
.env
.env.*
.envrc
# Neo4j local data (if you add these later, keep them ignored)
neo4j/
neo4j/data/
neo4j/logs/
neo4j/plugins/
neo4j/config/
neo4j/import/
# Python
__pycache__/
*.pyc
# Editor/OS noise
.DS_Store
.idea/
.vscode/

18
README.md Normal file
View File

@ -0,0 +1,18 @@
# Neo4j + FastAPI Demo
## Run (local)
1) Copy `.env.example` to `.env` and fill secrets.
2) `docker compose up -d --build`
3) API: http://localhost:8000/docs
4) Neo4j Browser: http://localhost:7474
## Endpoints
- POST `/echo`
- GET `/messages`
- GET `/messages/{id}`, PUT `/messages/{id}`, DELETE `/messages/{id}`
- POST `/import/messages` (APOC CSV import)
- POST `/ai/echo` (Azure OpenAI; stores reply in Neo4j)
## Notes
- `.env` is **not** committed. Use `.env.example` as template.
- Neo4j data is ignored (`neo4j/data`, `neo4j/logs`, etc).

26
api/Dockerfile Normal file
View File

@ -0,0 +1,26 @@
FROM python:3.12-slim
# OS deps (optional but nice to have)
#RUN apt-get update && apt-get install -y --no-install-recommends tini && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y --no-install-recommends tini wget && rm -rf /var/lib/apt/lists/*
# App env
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
WORKDIR /app
# Install deps first (better layer caching)
COPY api/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
# Copy app
COPY api /app
# Non-root user
RUN useradd -m appuser
USER appuser
EXPOSE 8000
ENTRYPOINT ["/usr/bin/tini","--"]
CMD ["uvicorn","app:app","--host","0.0.0.0","--port","8000"]

256
api/app.py Normal file
View File

@ -0,0 +1,256 @@
from fastapi import FastAPI
from fastapi import HTTPException
from pydantic import BaseModel
from neo4j import GraphDatabase
from typing import Optional
import os, requests
NEO4J_URI = os.getenv("NEO4J_URI", "neo4j://neo4j:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
app = FastAPI()
class Inp(BaseModel):
message: str
@app.on_event("startup")
def startup():
try:
driver.verify_connectivity()
except Exception:
# DB might not be ready yet; endpoints will retry as needed
pass
@app.on_event("shutdown")
def shutdown():
driver.close()
@app.get("/healthz")
def healthz():
return {"ok": True}
@app.post("/echo")
def echo(inp: Inp):
reply = "hi hier gpt5-mini" if inp.message.strip().lower() == "hello world" else inp.message
cypher = """
CREATE (m:Message {input: $input, reply: $reply, created_at: datetime()})
RETURN id(m) AS id
"""
records, _, _ = driver.execute_query(
cypher,
input=inp.message,
reply=reply,
database_="neo4j",
)
return {"reply": reply, "node_id": records[0]["id"]}
@app.get("/messages")
def list_messages(limit: int = 20):
cypher = """
MATCH (m:Message)
RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at
ORDER BY m.created_at DESC
LIMIT $limit
"""
try:
records, _, _ = driver.execute_query(cypher, limit=limit, database_="neo4j")
except Exception as e:
# Neo4j not yet available or auth/conn issue
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
items = []
for r in records:
created = r["created_at"]
items.append({
"id": r["id"],
"input": r["input"],
"reply": r["reply"],
"created_at": str(created) if created is not None else None,
})
return {"items": items}
class UpdateMessage(BaseModel):
input: Optional[str] = None
reply: Optional[str] = None
@app.get("/messages/{id}")
def get_message(id: int):
cypher = """
MATCH (m:Message) WHERE id(m) = $id
RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at
"""
try:
records, _, _ = driver.execute_query(cypher, id=id, database_="neo4j")
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
if not records:
raise HTTPException(status_code=404, detail="Message not found")
r = records[0]
return {
"id": r["id"],
"input": r["input"],
"reply": r["reply"],
"created_at": str(r["created_at"]) if r["created_at"] is not None else None,
}
@app.put("/messages/{id}")
def update_message(id: int, body: UpdateMessage):
if body.input is None and body.reply is None:
raise HTTPException(status_code=400, detail="No fields to update")
set_clauses = []
params = {"id": id}
if body.input is not None:
set_clauses.append("m.input = $input")
params["input"] = body.input
if body.reply is not None:
set_clauses.append("m.reply = $reply")
params["reply"] = body.reply
cypher = f"""
MATCH (m:Message) WHERE id(m) = $id
SET {", ".join(set_clauses)}
RETURN id(m) AS id, m.input AS input, m.reply AS reply, m.created_at AS created_at
"""
try:
records, _, _ = driver.execute_query(cypher, **params, database_="neo4j")
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
if not records:
raise HTTPException(status_code=404, detail="Message not found")
r = records[0]
return {
"id": r["id"],
"input": r["input"],
"reply": r["reply"],
"created_at": str(r["created_at"]) if r["created_at"] is not None else None,
}
@app.delete("/messages/{id}")
def delete_message(id: int):
cypher = """
MATCH (m:Message) WHERE id(m) = $id
WITH m, properties(m) AS props
DETACH DELETE m
RETURN $id AS id, props AS deleted
"""
try:
records, _, _ = driver.execute_query(cypher, id=id, database_="neo4j")
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
# If no records returned, node didn't exist
if not records:
raise HTTPException(status_code=404, detail="Message not found")
r = records[0]
# props may be None if there was no match; we already guard above
return {
"id": r["id"],
"deleted": r["deleted"],
}
class ImportMessagesRequest(BaseModel):
filename: str = "messages.csv" # must live in /import
batch_size: int = 1000
@app.post("/import/messages")
def import_messages(req: ImportMessagesRequest):
fname = os.path.basename(req.filename)
if not fname:
raise HTTPException(status_code=400, detail="Invalid filename")
if not (1 <= req.batch_size <= 100_000):
raise HTTPException(status_code=400, detail="batch_size out of range (1..100000)")
file_url = f"file:///{fname}"
# IMPORTANT: all literal { } in the Cypher must be doubled inside an f-string
cypher = f"""
CALL apoc.periodic.iterate(
'CALL apoc.load.csv($url,{{header:true}}) YIELD map RETURN map',
'
MERGE (m:Message {{
input: map.input,
reply: map.reply,
created_at: datetime(map.created_at)
}})
ON CREATE SET m.imported_at = datetime()
ON MATCH SET m.last_seen = datetime()
',
{{batchSize: {req.batch_size}, parallel: false, params: $params}}
)
YIELD batches, total, committedOperations, failedOperations, failedBatches, retries, updateStatistics
RETURN batches, total, committedOperations, failedOperations, failedBatches, retries, updateStatistics
"""
try:
records, _, _ = driver.execute_query(
cypher,
params={"url": file_url},
database_="neo4j",
)
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j/APOC error: {type(e).__name__}: {e}")
r = records[0] if records else {}
stats = r.get("updateStatistics", {}) if isinstance(r.get("updateStatistics"), dict) else {}
created = stats.get("nodesCreated", 0)
props = stats.get("propertiesSet", 0)
return {
"file": fname,
"status": "created" if created > 0 else "up-to-date",
"batches": r.get("batches"),
"total": r.get("total"),
"created": created,
"propertiesSet": props,
"updateStatistics": stats,
}
class AiInp(BaseModel):
message: str
@app.post("/ai/echo")
def ai_echo(inp: AiInp):
msg = inp.message.strip()
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT")
api_ver = os.getenv("AZURE_OPENAI_API_VERSION", "2024-10-21")
if not all([endpoint, api_key, deployment]):
raise HTTPException(status_code=500, detail="Azure OpenAI env not configured")
url = f"{endpoint}/openai/deployments/{deployment}/chat/completions?api-version={api_ver}"
payload = {
"messages": [
{"role":"system","content":"You are a terse rephraser."},
{"role":"user","content": msg}
],
"temperature": 0.2
}
headers = {"Content-Type":"application/json","api-key": api_key}
try:
r = requests.post(url, json=payload, headers=headers, timeout=30)
r.raise_for_status()
data = r.json()
reply = data["choices"][0]["message"]["content"].strip()
except Exception as e:
raise HTTPException(status_code=502, detail=f"Azure OpenAI request failed: {type(e).__name__}: {e}")
# store in Neo4j (same as /echo)
cypher = """
CREATE (m:Message {input: $input, reply: $reply, created_at: datetime()})
RETURN id(m) AS id
"""
try:
records, _, _ = driver.execute_query(cypher, input=msg, reply=reply, database_="neo4j")
except Exception as e:
raise HTTPException(status_code=503, detail=f"Neo4j unavailable: {type(e).__name__}")
return {"reply": reply, "node_id": records[0]["id"]}

5
api/requirements.txt Normal file
View File

@ -0,0 +1,5 @@
fastapi
uvicorn
neo4j
pydantic
requests

56
compose.yaml Normal file
View File

@ -0,0 +1,56 @@
services:
neo4j:
image: "neo4j:${NEO4J_TAG}"
container_name: neo4j
user: "${UID}:${GID}"
environment:
- "NEO4J_AUTH=neo4j/${NEO4J_PASSWORD}"
- 'NEO4J_PLUGINS=["apoc","apoc-extended"]'
# Allow APOC procedures (5.x: allowlist/unrestricted)
- "NEO4J_dbms_security_procedures_unrestricted=apoc.*"
- "NEO4J_dbms_security_procedures_allowlist=apoc.*"
# Optional but handy for file import/export via APOC
- "NEO4J_apoc_export_file_enabled=true"
- "NEO4J_apoc_import_file_enabled=true"
- "NEO4J_apoc_import_file_use__neo4j__config=true"
- "NEO4J_server_directories_import=/import"
ports:
- "7474:7474" # HTTP / Browser
- "7687:7687" # Bolt
volumes:
- "${HOME}/neo4j/data:/data"
- "${HOME}/neo4j/logs:/logs"
- "${HOME}/neo4j/plugins:/plugins"
- "${HOME}/neo4j/config:/config"
- "${HOME}/neo4j/import:/import"
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:7474"]
interval: 10s
timeout: 5s
retries: 12
api:
build:
context: .
dockerfile: api/Dockerfile
container_name: neo4j-api
environment:
- NEO4J_PASSWORD=${NEO4J_PASSWORD}
- NEO4J_URI=${NEO4J_URI}
- NEO4J_USER=${NEO4J_USER}
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_OPENAI_DEPLOYMENT=${AZURE_OPENAI_DEPLOYMENT}
- AZURE_OPENAI_API_VERSION=${AZURE_OPENAI_API_VERSION}
ports:
- "${API_PORT}:8000"
depends_on:
- neo4j
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:8000/docs"]
interval: 5s
timeout: 3s
retries: 20
start_period: 10s