389 lines
15 KiB
Python
389 lines
15 KiB
Python
# Import necessary libraries
|
||
import os
|
||
|
||
from google.adk.agents import Agent
|
||
from google.adk.models.lite_llm import LiteLlm # For OpenAI support
|
||
from google.adk.sessions import InMemorySessionService
|
||
from google.adk.runners import Runner
|
||
from google.adk.tools.tool_context import ToolContext
|
||
from google.genai import types # For creating message Content/Parts
|
||
|
||
from typing import Optional, Dict, Any
|
||
|
||
import warnings
|
||
warnings.filterwarnings("ignore")
|
||
|
||
import logging
|
||
logging.basicConfig(level=logging.CRITICAL)
|
||
|
||
from helper import make_agent_caller # use the shared factory
|
||
|
||
# Convenience libraries for working with Neo4j inside of Google ADK
|
||
from neo4j_for_adk import graphdb, tool_success, tool_error
|
||
|
||
print("Libraries imported.")
|
||
|
||
# Define Model Constants for easier use
|
||
MODEL_GPT = "openai/gpt-4o"
|
||
|
||
llm = LiteLlm(model=MODEL_GPT)
|
||
|
||
# Test LLM with a direct call
|
||
print(llm.llm_client.completion(model=llm.model,
|
||
messages=[{"role": "user",
|
||
"content": "Are you ready?"}],
|
||
tools=[]))
|
||
|
||
print("\nOpenAI is ready for use.")
|
||
|
||
|
||
|
||
# Sending a simple query to the database
|
||
neo4j_is_ready = graphdb.send_query("RETURN 'Neo4j is Ready!' as message")
|
||
print(neo4j_is_ready)
|
||
|
||
# Define a basic tool -- send a parameterized cypher query
|
||
def say_hello(person_name: str) -> dict:
|
||
"""Formats a welcome message to a named person.
|
||
|
||
Args:
|
||
person_name (str): the name of the person saying hello
|
||
|
||
Returns:
|
||
dict: A dictionary containing the results of the query.
|
||
Includes a 'status' key ('success' or 'error').
|
||
If 'success', includes a 'query_result' key with an array of result rows.
|
||
If 'error', includes an 'error_message' key.
|
||
"""
|
||
return graphdb.send_query("RETURN 'Hello to you, ' + $person_name AS reply",
|
||
{
|
||
"person_name": person_name
|
||
})
|
||
|
||
# Example tool usage (optional test)
|
||
print(say_hello("ABK"))
|
||
|
||
# Example tool usage (optional test)
|
||
print(say_hello("RETURN 'injection attack avoided'"))
|
||
|
||
# Define the new goodbye tool
|
||
def say_goodbye() -> dict:
|
||
"""Provides a simple farewell message to conclude the conversation."""
|
||
return graphdb.send_query("RETURN 'Goodbye from Cypher!' as farewell")
|
||
|
||
|
||
|
||
def say_hello_stateful(user_name: str, tool_context: ToolContext):
|
||
"""Says hello to the user, recording their name into state."""
|
||
tool_context.state["user_name"] = user_name
|
||
print("\ntool_context.state['user_name']:", tool_context.state["user_name"])
|
||
return graphdb.send_query(
|
||
"RETURN 'Hello to you, ' + $user_name + '.' AS reply",
|
||
{"user_name": user_name}
|
||
)
|
||
|
||
def say_goodbye_stateful(tool_context: ToolContext) -> dict:
|
||
"""Says goodbye to the user, reading their name from state."""
|
||
user_name = tool_context.state.get("user_name", "stranger")
|
||
print("\ntool_context.state['user_name']:", user_name)
|
||
return graphdb.send_query(
|
||
"RETURN 'Goodbye, ' + $user_name + ', nice to chat with you!' AS reply",
|
||
{"user_name": user_name}
|
||
)
|
||
|
||
|
||
# Define the Cypher Agent
|
||
hello_agent = Agent(
|
||
name="hello_agent_v1",
|
||
model=llm, # defined earlier in a variable
|
||
description="Has friendly chats with a user.",
|
||
instruction="""You are a helpful assistant, chatting with a user.
|
||
Be polite and friendly, introducing yourself and asking who the user is.
|
||
|
||
If the user provides their name, use the 'say_hello' tool to get a custom greeting.
|
||
If the tool returns an error, inform the user politely.
|
||
If the tool is successful, present the reply.
|
||
""",
|
||
tools=[say_hello], # Pass the function directly
|
||
)
|
||
|
||
# --- Greeting Agent ---
|
||
greeting_subagent = Agent(
|
||
model=llm,
|
||
name="greeting_subagent_v1",
|
||
instruction="You are the Greeting Agent. Your ONLY task is to provide a friendly greeting to the user. "
|
||
"Use the 'say_hello' tool to generate the greeting. "
|
||
"If the user provides their name, make sure to pass it to the tool. "
|
||
"Do not engage in any other conversation or tasks.",
|
||
description="Handles simple greetings and hellos using the 'say_hello' tool.",
|
||
tools=[say_hello],
|
||
)
|
||
|
||
# --- Farewell Agent ---
|
||
farewell_subagent = Agent(
|
||
model=llm,
|
||
name="farewell_subagent_v1",
|
||
instruction="You are the Farewell Agent. Your ONLY task is to provide a polite goodbye message. "
|
||
"Use the 'say_goodbye' tool when the user indicates they are leaving or ending the conversation "
|
||
"(e.g., using words like 'bye', 'goodbye', 'thanks bye', 'see you'). "
|
||
"Do not perform any other actions.",
|
||
description="Handles simple farewells and goodbyes using the 'say_goodbye' tool.",
|
||
tools=[say_goodbye],
|
||
)
|
||
|
||
root_agent = Agent(
|
||
name="friendly_agent_team_v1",
|
||
model=llm,
|
||
description="The main coordinator agent. Delegates greetings/farewells to specialists.",
|
||
instruction="""You are the main Agent coordinating a team. Your primary responsibility is to be friendly.
|
||
|
||
You have specialized sub-agents:
|
||
1. 'greeting_subagent_v1': Handles simple greetings like 'Hi', 'Hello'. Delegate to it for these.
|
||
2. 'farewell_subagent_v1': Handles simple farewells like 'Bye', 'See you'. Delegate to it for these.
|
||
|
||
Analyze the user's query. If it's a greeting, delegate to 'greeting_subagent_v1'.
|
||
If it's a farewell, delegate to 'farewell_subagent_v1'.
|
||
|
||
For anything else, respond appropriately or state you cannot handle it.
|
||
""",
|
||
|
||
tools=[],
|
||
sub_agents=[greeting_subagent, farewell_subagent],
|
||
)
|
||
|
||
|
||
# Test delegation
|
||
root_agent_caller = await make_agent_caller(root_agent)
|
||
|
||
async def run_team_conversation():
|
||
await root_agent_caller.call("Hello I'm ABK", True)
|
||
await root_agent_caller.call("Thanks, bye!", True)
|
||
|
||
await run_team_conversation()
|
||
|
||
print(f"Agent '{hello_agent.name}' created.")
|
||
|
||
|
||
app_name = hello_agent.name + "_app"
|
||
user_id = hello_agent.name + "_user"
|
||
session_id = hello_agent.name + "_session_01"
|
||
|
||
# Initialize a session service and a session
|
||
session_service = InMemorySessionService()
|
||
await session_service.create_session(
|
||
app_name=app_name,
|
||
user_id=user_id,
|
||
session_id=session_id
|
||
)
|
||
|
||
runner = Runner(
|
||
agent=hello_agent,
|
||
app_name=app_name,
|
||
session_service=session_service
|
||
)
|
||
|
||
user_message = "Hello, I'm ABK"
|
||
print(f"\n>>> User Message: {user_message}")
|
||
|
||
# Prepare the user's message in ADK format
|
||
content = types.Content(role='user', parts=[types.Part(text=user_message)])
|
||
|
||
final_response_text = "Agent did not produce a final response." # Default will be replaced if the agent produces a final response.
|
||
|
||
|
||
# We iterate through events to find the final answer.
|
||
verbose = False
|
||
async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):
|
||
if verbose:
|
||
print(f" [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")
|
||
|
||
# Key Concept: is_final_response() marks the concluding message for the turn.
|
||
if event.is_final_response():
|
||
if event.content and event.content.parts:
|
||
final_response_text = event.content.parts[0].text # Assuming text response in the first part
|
||
elif event.actions and event.actions.escalate: # Handle potential errors/escalations
|
||
final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
|
||
break # Stop processing events once the final response is found
|
||
|
||
print(f"<<< Agent Response: {final_response_text}")
|
||
|
||
|
||
# Stateful greeting agent
|
||
greeting_agent_stateful = Agent(
|
||
model=llm,
|
||
name="greeting_agent_stateful_v1",
|
||
instruction="You are the Greeting Agent. Your ONLY task is to provide a friendly greeting using the 'say_hello_stateful' tool. Do nothing else.",
|
||
description="Handles simple greetings and hellos using the 'say_hello_stateful' tool.",
|
||
tools=[say_hello_stateful],
|
||
)
|
||
|
||
# Stateful farewell agent
|
||
farewell_agent_stateful = Agent(
|
||
model=llm,
|
||
name="farewell_agent_stateful_v1",
|
||
instruction="You are the Farewell Agent. Your ONLY task is to provide a polite goodbye message using the 'say_goodbye_stateful' tool. Do not perform any other actions.",
|
||
description="Handles simple farewells and goodbyes using the 'say_goodbye_stateful' tool.",
|
||
tools=[say_goodbye_stateful],
|
||
)
|
||
|
||
# Stateful root
|
||
root_agent_stateful = Agent(
|
||
name="friendly_team_stateful",
|
||
model=llm,
|
||
description="The main coordinator agent. Delegates greetings/farewells to specialists.",
|
||
instruction="""You are the main Agent coordinating a team. Your primary responsibility is to be friendly.
|
||
|
||
You have specialized sub-agents:
|
||
1. 'greeting_agent_stateful': Handles simple greetings like 'Hi', 'Hello'. Delegate to it for these.
|
||
2. 'farewell_agent_stateful': Handles simple farewells like 'Bye', 'See you'. Delegate to it for these.
|
||
|
||
Analyze the user's query. If it's a greeting, delegate to 'greeting_agent_stateful'. If it's a farewell, delegate to 'farewell_agent_stateful'.
|
||
|
||
For anything else, respond appropriately or state you cannot handle it.
|
||
""",
|
||
tools=[],
|
||
sub_agents=[greeting_agent_stateful, farewell_agent_stateful],
|
||
)
|
||
|
||
# Initialize caller and show initial state
|
||
root_stateful_caller = await make_agent_caller(root_agent_stateful)
|
||
session = await root_stateful_caller.get_session()
|
||
print(f"Initial State: {session.state}")
|
||
|
||
# Run stateful conversation
|
||
async def run_stateful_conversation():
|
||
await root_stateful_caller.call("Hello, I'm ABK!")
|
||
await root_stateful_caller.call("Thanks, bye!")
|
||
|
||
await run_stateful_conversation()
|
||
|
||
# Show final state
|
||
session = await root_stateful_caller.get_session()
|
||
print(f"\nFinal State: {session.state}")
|
||
|
||
|
||
# === Lesson 4: User Intent Agent ===================================================
|
||
# 4.1–4.3.1 Agent Instructions
|
||
agent_role_and_goal = """
|
||
You are an expert at knowledge graph use cases.
|
||
Your primary goal is to help the user come up with a knowledge graph use case.
|
||
"""
|
||
|
||
agent_conversational_hints = """
|
||
If the user is unsure what to do, make some suggestions based on classic use cases like:
|
||
- social network involving friends, family, or professional relationships
|
||
- logistics network with suppliers, customers, and partners
|
||
- recommendation system with customers, products, and purchase patterns
|
||
- fraud detection over multiple accounts with suspicious patterns of transactions
|
||
- pop-culture graphs with movies, books, or music
|
||
"""
|
||
|
||
agent_output_definition = """
|
||
A user goal has two components:
|
||
- kind_of_graph: at most 3 words describing the graph, for example "social network" or "USA freight logistics"
|
||
- description: a few sentences about the intention of the graph, for example
|
||
"A dynamic routing and delivery system for cargo." or
|
||
"Analysis of product dependencies and supplier alternatives."
|
||
"""
|
||
|
||
agent_chain_of_thought_directions = """
|
||
Think carefully and collaborate with the user:
|
||
1. Understand the user's goal, which is a kind_of_graph with description
|
||
2. Ask clarifying questions as needed
|
||
3. When you think you understand their goal, use the 'set_perceived_user_goal' tool to record your perception
|
||
4. Present the perceived user goal to the user for confirmation
|
||
5. If the user agrees, use the 'approve_perceived_user_goal' tool to approve the user goal.
|
||
This will save the goal in state under the 'approved_user_goal' key.
|
||
"""
|
||
|
||
complete_agent_instruction = f"""
|
||
{agent_role_and_goal}
|
||
{agent_conversational_hints}
|
||
{agent_output_definition}
|
||
{agent_chain_of_thought_directions}
|
||
"""
|
||
|
||
print(complete_agent_instruction)
|
||
|
||
# 4.3.2 Tools
|
||
PERCEIVED_USER_GOAL = "perceived_user_goal"
|
||
APPROVED_USER_GOAL = "approved_user_goal"
|
||
|
||
def set_perceived_user_goal(kind_of_graph: str, graph_description:str, tool_context: ToolContext):
|
||
"""Sets the perceived user's goal, including the kind of graph and its description."""
|
||
user_goal_data = {"kind_of_graph": kind_of_graph, "graph_description": graph_description}
|
||
tool_context.state[PERCEIVED_USER_GOAL] = user_goal_data
|
||
return tool_success(PERCEIVED_USER_GOAL, user_goal_data)
|
||
|
||
def approve_perceived_user_goal(tool_context: ToolContext):
|
||
"""Upon approval from user, record the perceived user goal as the approved user goal."""
|
||
if PERCEIVED_USER_GOAL not in tool_context.state:
|
||
return tool_error("perceived_user_goal not set. Set perceived user goal first, or ask clarifying questions if you are unsure.")
|
||
tool_context.state[APPROVED_USER_GOAL] = tool_context.state[PERCEIVED_USER_GOAL]
|
||
return tool_success(APPROVED_USER_GOAL, tool_context.state[APPROVED_USER_GOAL])
|
||
|
||
user_intent_agent_tools = [set_perceived_user_goal, approve_perceived_user_goal]
|
||
|
||
# 4.3.3 Agent Definition
|
||
user_intent_agent = Agent(
|
||
name="user_intent_agent_v1",
|
||
model=llm,
|
||
description="Helps the user ideate on a knowledge graph use case.",
|
||
instruction=complete_agent_instruction,
|
||
tools=user_intent_agent_tools,
|
||
)
|
||
print(f"Agent '{user_intent_agent.name}' created.")
|
||
|
||
# 4.4 Interact with the Agent
|
||
# (helper import is already at top; safe to repeat if needed)
|
||
from helper import make_agent_caller
|
||
|
||
user_intent_caller = await make_agent_caller(user_intent_agent)
|
||
|
||
session_start = await user_intent_caller.get_session()
|
||
print(f"Session Start: {session_start.state}") # expect empty
|
||
|
||
async def run_user_intent_conversation():
|
||
# 1) Initial goal
|
||
await user_intent_caller.call(
|
||
"I'd like a bill of materials graph (BOM graph) which includes all levels from suppliers to finished product, which can support root-cause analysis."
|
||
)
|
||
|
||
# 2) If agent asked clarifying Qs, provide more info
|
||
if PERCEIVED_USER_GOAL not in session_start.state:
|
||
await user_intent_caller.call("I'm concerned about possible manufacturing or supplier issues.")
|
||
|
||
# 3) Approve the perceived goal
|
||
await user_intent_caller.call("Approve that goal.", True)
|
||
|
||
await run_user_intent_conversation()
|
||
|
||
session_end = await user_intent_caller.get_session()
|
||
print(f"Session End: {session_end.state}")
|
||
# ================================================================================
|
||
|
||
|
||
|
||
|
||
hello_agent_caller = await make_agent_caller(hello_agent)
|
||
|
||
# We need an async function to await our interaction helper
|
||
async def run_conversation():
|
||
await hello_agent_caller.call("Hello I'm ABK")
|
||
|
||
await hello_agent_caller.call("I am excited")
|
||
|
||
# Execute the conversation using await
|
||
await run_conversation()
|
||
|
||
|
||
async def run_interactive_conversation():
|
||
while True:
|
||
user_query = input("Ask me something (or type 'exit' to quit): ")
|
||
if user_query.lower() == 'exit':
|
||
break
|
||
response = await root_stateful_caller.call(user_query)
|
||
print(f"Response: {response}")
|
||
|
||
await run_interactive_conversation()
|