客户支持机器人可以通过处理常规问题来节省团队的时间,但构建一个能够可靠处理各种任务的机器人却不容易,尤其是要避免让用户感到困惑或失望。
在本教程中,你将为一家航空公司构建一个客户支持机器人,帮助用户研究和安排旅行计划。你将学习如何使用 LangGraph 的中断功能、检查点和更复杂的状态管理来组织助手的工具,并帮助用户进行航班预订、酒店预订、租车以及旅行活动安排。本教程假设你已经熟悉LangGraph 入门教程中介绍的概念。
到本教程结束时,你将构建一个能够正常运行的机器人,并理解 LangGraph 的关键概念和架构。你还可以将这些设计模式应用到其他 AI 项目中。
最终的聊天机器人架构大致如下图所示:
现在让我们开始吧!
先决条件#
首先,设置你的开发环境。我们将安装本教程所需的依赖项,下载测试数据库,并定义在每个部分中会重复使用的工具。
我们将使用 Claude 作为我们的大型语言模型(LLM),并定义一些自定义工具。虽然大部分工具将连接到本地的 sqlite 数据库(不需要额外的依赖),我们也会通过 Tavily 为代理提供通用的网页搜索功能。
%%capture --no-stderr
% pip install -U langgraph langchain-community langchain-anthropic tavily-python pandas
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
_set_env("TAVILY_API_KEY")
填充数据库#
运行以下脚本来获取我们为本教程准备的 sqlite
数据库,并将其更新为最新状态。具体细节并不重要。
import os
import shutil
import sqlite3
import pandas as pd
import requests
db_url = "https://storage.googleapis.com/benchmarks-artifacts/travel-db/travel2.sqlite"
local_file = "travel2.sqlite"
# The backup lets us restart for each tutorial section
backup_file = "travel2.backup.sqlite"
overwrite = False
if overwrite or not os.path.exists(local_file):
response = requests.get(db_url)
response.raise_for_status() # Ensure the request was successful
with open(local_file, "wb") as f:
f.write(response.content)
# Backup - we will use this to "reset" our DB in each section
shutil.copy(local_file, backup_file)
# Convert the flights to present time for our tutorial
def update_dates(file):
shutil.copy(backup_file, file)
conn = sqlite3.connect(file)
cursor = conn.cursor()
tables = pd.read_sql(
"SELECT name FROM sqlite_master WHERE type='table';", conn
).name.tolist()
tdf = {}
for t in tables:
tdf[t] = pd.read_sql(f"SELECT * from {t}", conn)
example_time = pd.to_datetime(
tdf["flights"]["actual_departure"].replace("\\N", pd.NaT)
).max()
current_time = pd.to_datetime("now").tz_localize(example_time.tz)
time_diff = current_time - example_time
tdf["bookings"]["book_date"] = (
pd.to_datetime(tdf["bookings"]["book_date"].replace("\\N", pd.NaT), utc=True)
+ time_diff
)
datetime_columns = [
"scheduled_departure",
"scheduled_arrival",
"actual_departure",
"actual_arrival",
]
for column in datetime_columns:
tdf["flights"][column] = (
pd.to_datetime(tdf["flights"][column].replace("\\N", pd.NaT)) + time_diff
)
for table_name, df in tdf.items():
df.to_sql(table_name, conn, if_exists="replace", index=False)
del df
del tdf
conn.commit()
conn.close()
return file
db = update_dates(local_file)
工具#
接下来,我们定义助手的工具,用于搜索航空公司的政策手册,以及搜索和管理航班、酒店、租车和旅行活动的预订。这些工具将在整个教程中重复使用。具体的实现细节并不重要,因此你可以直接运行以下代码并跳到第 1 部分。
查找公司政策#
助手可以检索政策信息来回答用户的问题。请注意,政策的执行仍然必须通过工具或 API 来完成,因为大型语言模型(LLM)可能会忽略这些规定。
import re
import numpy as np
import openai
from langchain_core.tools import tool
response = requests.get(
"https://storage.googleapis.com/benchmarks-artifacts/travel-db/swiss_faq.md"
)
response.raise_for_status()
faq_text = response.text
docs = [{"page_content": txt} for txt in re.split(r"(?=\n##)", faq_text)]
class VectorStoreRetriever:
def __init__(self, docs: list, vectors: list, oai_client):
self._arr = np.array(vectors)
self._docs = docs
self._client = oai_client
@classmethod
def from_docs(cls, docs, oai_client):
embeddings = oai_client.embeddings.create(
model="text-embedding-3-small", input=[doc["page_content"] for doc in docs]
)
vectors = [emb.embedding for emb in embeddings.data]
return cls(docs, vectors, oai_client)
def query(self, query: str, k: int = 5) -> list[dict]:
embed = self._client.embeddings.create(
model="text-embedding-3-small", input=[query]
)
# "@" is just a matrix multiplication in python
scores = np.array(embed.data[0].embedding) @ self._arr.T
top_k_idx = np.argpartition(scores, -k)[-k:]
top_k_idx_sorted = top_k_idx[np.argsort(-scores[top_k_idx])]
return [
{**self._docs[idx], "similarity": scores[idx]} for idx in top_k_idx_sorted
]
retriever = VectorStoreRetriever.from_docs(docs, openai.Client())
@tool
def lookup_policy(query: str) -> str:
"""Consult the company policies to check whether certain options are permitted.
Use this before making any flight changes performing other 'write' events."""
docs = retriever.query(query, k=2)
return "\n\n".join([doc["page_content"] for doc in docs])
航班#
定义 (fetch_user_flight_information
) 工具,以便让代理查看当前用户的航班信息。然后定义工具来搜索航班并管理存储在 SQL 数据库中的乘客预订信息。
我们可以通过访问 RunnableConfig 来检查访问此应用程序的用户的passenger_id
。大型语言模型(LLM)不需要显式提供这些信息,它们会在图的每次调用时提供,确保每个用户无法访问其他乘客的预订信息。
import sqlite3
from datetime import date, datetime
from typing import Optional
import pytz
from langchain_core.runnables import RunnableConfig
@tool
def fetch_user_flight_information(config: RunnableConfig) -> list[dict]:
"""Fetch all tickets for the user along with corresponding flight information and seat assignments.
Returns:
A list of dictionaries where each dictionary contains the ticket details,
associated flight details, and the seat assignments for each ticket belonging to the user.
"""
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = """
SELECT
t.ticket_no, t.book_ref,
f.flight_id, f.flight_no, f.departure_airport, f.arrival_airport, f.scheduled_departure, f.scheduled_arrival,
bp.seat_no, tf.fare_conditions
FROM
tickets t
JOIN ticket_flights tf ON t.ticket_no = tf.ticket_no
JOIN flights f ON tf.flight_id = f.flight_id
JOIN boarding_passes bp ON bp.ticket_no = t.ticket_no AND bp.flight_id = f.flight_id
WHERE
t.passenger_id = ?
"""
cursor.execute(query, (passenger_id,))
rows = cursor.fetchall()
column_names = [column[0] for column in cursor.description]
results = [dict(zip(column_names, row)) for row in rows]
cursor.close()
conn.close()
return results
@tool
def search_flights(
departure_airport: Optional[str] = None,
arrival_airport: Optional[str] = None,
start_time: Optional[date | datetime] = None,
end_time: Optional[date | datetime] = None,
limit: int = 20,
) -> list[dict]:
"""Search for flights based on departure airport, arrival airport, and departure time range."""
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = "SELECT * FROM flights WHERE 1 = 1"
params = []
if departure_airport:
query += " AND departure_airport = ?"
params.append(departure_airport)
if arrival_airport:
query += " AND arrival_airport = ?"
params.append(arrival_airport)
if start_time:
query += " AND scheduled_departure >= ?"
params.append(start_time)
if end_time:
query += " AND scheduled_departure <= ?"
params.append(end_time)
query += " LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
column_names = [column[0] for column in cursor.description]
results = [dict(zip(column_names, row)) for row in rows]
cursor.close()
conn.close()
return results
@tool
def update_ticket_to_new_flight(
ticket_no: str, new_flight_id: int, *, config: RunnableConfig
) -> str:
"""Update the user's ticket to a new valid flight."""
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"SELECT departure_airport, arrival_airport, scheduled_departure FROM flights WHERE flight_id = ?",
(new_flight_id,),
)
new_flight = cursor.fetchone()
if not new_flight:
cursor.close()
conn.close()
return "Invalid new flight ID provided."
column_names = [column[0] for column in cursor.description]
new_flight_dict = dict(zip(column_names, new_flight))
timezone = pytz.timezone("Etc/GMT-3")
current_time = datetime.now(tz=timezone)
departure_time = datetime.strptime(
new_flight_dict["scheduled_departure"], "%Y-%m-%d %H:%M:%S.%f%z"
)
time_until = (departure_time - current_time).total_seconds()
if time_until < (3 * 3600):
return f"Not permitted to reschedule to a flight that is less than 3 hours from the current time. Selected flight is at {departure_time}."
cursor.execute(
"SELECT flight_id FROM ticket_flights WHERE ticket_no = ?", (ticket_no,)
)
current_flight = cursor.fetchone()
if not current_flight:
cursor.close()
conn.close()
return "No existing ticket found for the given ticket number."
# Check the signed-in user actually has this ticket
cursor.execute(
"SELECT * FROM tickets WHERE ticket_no = ? AND passenger_id = ?",
(ticket_no, passenger_id),
)
current_ticket = cursor.fetchone()
if not current_ticket:
cursor.close()
conn.close()
return f"Current signed-in passenger with ID {passenger_id} not the owner of ticket {ticket_no}"
# In a real application, you'd likely add additional checks here to enforce business logic,
# like "does the new departure airport match the current ticket", etc.
# While it's best to try to be *proactive* in 'type-hinting' policies to the LLM
# it's inevitably going to get things wrong, so you **also** need to ensure your
# API enforces valid behavior
cursor.execute(
"UPDATE ticket_flights SET flight_id = ? WHERE ticket_no = ?",
(new_flight_id, ticket_no),
)
conn.commit()
cursor.close()
conn.close()
return "Ticket successfully updated to new flight."
@tool
def cancel_ticket(ticket_no: str, *, config: RunnableConfig) -> str:
"""Cancel the user's ticket and remove it from the database."""
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"SELECT flight_id FROM ticket_flights WHERE ticket_no = ?", (ticket_no,)
)
existing_ticket = cursor.fetchone()
if not existing_ticket:
cursor.close()
conn.close()
return "No existing ticket found for the given ticket number."
# Check the signed-in user actually has this ticket
cursor.execute(
"SELECT flight_id FROM tickets WHERE ticket_no = ? AND passenger_id = ?",
(ticket_no, passenger_id),
)
current_ticket = cursor.fetchone()
if not current_ticket:
cursor.close()
conn.close()
return f"Current signed-in passenger with ID {passenger_id} not the owner of ticket {ticket_no}"
cursor.execute("DELETE FROM ticket_flights WHERE ticket_no = ?", (ticket_no,))
conn.commit()
cursor.close()
conn.close()
return "Ticket successfully cancelled."
租车工具#
用户预订航班后,通常会希望安排交通工具。定义一些 “租车” 工具,让用户可以在目的地搜索并预订车辆。
from datetime import date, datetime
from typing import Optional, Union
@tool
def search_car_rentals(
location: Optional[str] = None,
name: Optional[str] = None,
price_tier: Optional[str] = None,
start_date: Optional[Union[datetime, date]] = None,
end_date: Optional[Union[datetime, date]] = None,
) -> list[dict]:
"""
Search for car rentals based on location, name, price tier, start date, and end date.
Args:
location (Optional[str]): The location of the car rental. Defaults to None.
name (Optional[str]): The name of the car rental company. Defaults to None.
price_tier (Optional[str]): The price tier of the car rental. Defaults to None.
start_date (Optional[Union[datetime, date]]): The start date of the car rental. Defaults to None.
end_date (Optional[Union[datetime, date]]): The end date of the car rental. Defaults to None.
Returns:
list[dict]: A list of car rental dictionaries matching the search criteria.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = "SELECT * FROM car_rentals WHERE 1=1"
params = []
if location:
query += " AND location LIKE ?"
params.append(f"%{location}%")
if name:
query += " AND name LIKE ?"
params.append(f"%{name}%")
# For our tutorial, we will let you match on any dates and price tier.
# (since our toy dataset doesn't have much data)
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return [
dict(zip([column[0] for column in cursor.description], row)) for row in results
]
@tool
def book_car_rental(rental_id: int) -> str:
"""
Book a car rental by its ID.
Args:
rental_id (int): The ID of the car rental to book.
Returns:
str: A message indicating whether the car rental was successfully booked or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute("UPDATE car_rentals SET booked = 1 WHERE id = ?", (rental_id,))
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully booked."
else:
conn.close()
return f"No car rental found with ID {rental_id}."
@tool
def update_car_rental(
rental_id: int,
start_date: Optional[Union[datetime, date]] = None,
end_date: Optional[Union[datetime, date]] = None,
) -> str:
"""
Update a car rental's start and end dates by its ID.
Args:
rental_id (int): The ID of the car rental to update.
start_date (Optional[Union[datetime, date]]): The new start date of the car rental. Defaults to None.
end_date (Optional[Union[datetime, date]]): The new end date of the car rental. Defaults to None.
Returns:
str: A message indicating whether the car rental was successfully updated or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
if start_date:
cursor.execute(
"UPDATE car_rentals SET start_date = ? WHERE id = ?",
(start_date, rental_id),
)
if end_date:
cursor.execute(
"UPDATE car_rentals SET end_date = ? WHERE id = ?", (end_date, rental_id)
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully updated."
else:
conn.close()
return f"No car rental found with ID {rental_id}."
@tool
def cancel_car_rental(rental_id: int) -> str:
"""
Cancel a car rental by its ID.
Args:
rental_id (int): The ID of the car rental to cancel.
Returns:
str: A message indicating whether the car rental was successfully cancelled or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute("UPDATE car_rentals SET booked = 0 WHERE id = ?", (rental_id,))
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully cancelled."
else:
conn.close()
return f"No car rental found with ID {rental_id}."
酒店#
用户需要睡觉!定义一些工具来搜索和管理酒店预订。
@tool
def search_hotels(
location: Optional[str] = None,
name: Optional[str] = None,
price_tier: Optional[str] = None,
checkin_date: Optional[Union[datetime, date]] = None,
checkout_date: Optional[Union[datetime, date]] = None,
) -> list[dict]:
"""
Search for hotels based on location, name, price tier, check-in date, and check-out date.
Args:
location (Optional[str]): The location of the hotel. Defaults to None.
name (Optional[str]): The name of the hotel. Defaults to None.
price_tier (Optional[str]): The price tier of the hotel. Defaults to None. Examples: Midscale, Upper Midscale, Upscale, Luxury
checkin_date (Optional[Union[datetime, date]]): The check-in date of the hotel. Defaults to None.
checkout_date (Optional[Union[datetime, date]]): The check-out date of the hotel. Defaults to None.
Returns:
list[dict]: A list of hotel dictionaries matching the search criteria.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = "SELECT * FROM hotels WHERE 1=1"
params = []
if location:
query += " AND location LIKE ?"
params.append(f"%{location}%")
if name:
query += " AND name LIKE ?"
params.append(f"%{name}%")
# For the sake of this tutorial, we will let you match on any dates and price tier.
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return [
dict(zip([column[0] for column in cursor.description], row)) for row in results
]
@tool
def book_hotel(hotel_id: int) -> str:
"""
Book a hotel by its ID.
Args:
hotel_id (int): The ID of the hotel to book.
Returns:
str: A message indicating whether the hotel was successfully booked or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute("UPDATE hotels SET booked = 1 WHERE id = ?", (hotel_id,))
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Hotel {hotel_id} successfully booked."
else:
conn.close()
return f"No hotel found with ID {hotel_id}."
@tool
def update_hotel(
hotel_id: int,
checkin_date: Optional[Union[datetime, date]] = None,
checkout_date: Optional[Union[datetime, date]] = None,
) -> str:
"""
Update a hotel's check-in and check-out dates by its ID.
Args:
hotel_id (int): The ID of the hotel to update.
checkin_date (Optional[Union[datetime, date]]): The new check-in date of the hotel. Defaults to None.
checkout_date (Optional[Union[datetime, date]]): The new check-out date of the hotel. Defaults to None.
Returns:
str: A message indicating whether the hotel was successfully updated or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
if checkin_date:
cursor.execute(
"UPDATE hotels SET checkin_date = ? WHERE id = ?", (checkin_date, hotel_id)
)
if checkout_date:
cursor.execute(
"UPDATE hotels SET checkout_date = ? WHERE id = ?",
(checkout_date, hotel_id),
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Hotel {hotel_id} successfully updated."
else:
conn.close()
return f"No hotel found with ID {hotel_id}."
@tool
def cancel_hotel(hotel_id: int) -> str:
"""
Cancel a hotel by its ID.
Args:
hotel_id (int): The ID of the hotel to cancel.
Returns:
str: A message indicating whether the hotel was successfully cancelled or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute("UPDATE hotels SET booked = 0 WHERE id = ?", (hotel_id,))
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Hotel {hotel_id} successfully cancelled."
else:
conn.close()
return f"No hotel found with ID {hotel_id}."
旅行活动#
最后,定义一些工具,允许用户在到达目的地后搜索可以做的事情(并进行预订)。
@tool
def search_trip_recommendations(
location: Optional[str] = None,
name: Optional[str] = None,
keywords: Optional[str] = None,
) -> list[dict]:
"""
Search for trip recommendations based on location, name, and keywords.
Args:
location (Optional[str]): The location of the trip recommendation. Defaults to None.
name (Optional[str]): The name of the trip recommendation. Defaults to None.
keywords (Optional[str]): The keywords associated with the trip recommendation. Defaults to None.
Returns:
list[dict]: A list of trip recommendation dictionaries matching the search criteria.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = "SELECT * FROM trip_recommendations WHERE 1=1"
params = []
if location:
query += " AND location LIKE ?"
params.append(f"%{location}%")
if name:
query += " AND name LIKE ?"
params.append(f"%{name}%")
if keywords:
keyword_list = keywords.split(",")
keyword_conditions = " OR ".join(["keywords LIKE ?" for _ in keyword_list])
query += f" AND ({keyword_conditions})"
params.extend([f"%{keyword.strip()}%" for keyword in keyword_list])
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return [
dict(zip([column[0] for column in cursor.description], row)) for row in results
]
@tool
def book_excursion(recommendation_id: int) -> str:
"""
Book a excursion by its recommendation ID.
Args:
recommendation_id (int): The ID of the trip recommendation to book.
Returns:
str: A message indicating whether the trip recommendation was successfully booked or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"UPDATE trip_recommendations SET booked = 1 WHERE id = ?", (recommendation_id,)
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Trip recommendation {recommendation_id} successfully booked."
else:
conn.close()
return f"No trip recommendation found with ID {recommendation_id}."
@tool
def update_excursion(recommendation_id: int, details: str) -> str:
"""
Update a trip recommendation's details by its ID.
Args:
recommendation_id (int): The ID of the trip recommendation to update.
details (str): The new details of the trip recommendation.
Returns:
str: A message indicating whether the trip recommendation was successfully updated or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"UPDATE trip_recommendations SET details = ? WHERE id = ?",
(details, recommendation_id),
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Trip recommendation {recommendation_id} successfully updated."
else:
conn.close()
return f"No trip recommendation found with ID {recommendation_id}."
@tool
def cancel_excursion(recommendation_id: int) -> str:
"""
Cancel a trip recommendation by its ID.
Args:
recommendation_id (int): The ID of the trip recommendation to cancel.
Returns:
str: A message indicating whether the trip recommendation was successfully cancelled or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"UPDATE trip_recommendations SET booked = 0 WHERE id = ?", (recommendation_id,)
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Trip recommendation {recommendation_id} successfully cancelled."
else:
conn.close()
return f"No trip recommendation found with ID {recommendation_id}."
工具函数#
定义辅助函数,以便在调试时美化打印图中的消息,并为我们的工具节点提供错误处理功能(通过将错误添加到聊天记录中)。
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode
def handle_tool_error(state) -> dict:
error = state.get("error")
tool_calls = state["messages"][-1].tool_calls
return {
"messages": [
ToolMessage(
content=f"Error: {repr(error)}\n please fix your mistakes.",
tool_call_id=tc["id"],
)
for tc in tool_calls
]
}
def create_tool_node_with_fallback(tools: list) -> dict:
return ToolNode(tools).with_fallbacks(
[RunnableLambda(handle_tool_error)], exception_key="error"
)
def _print_event(event: dict, _printed: set, max_length=1500):
current_state = event.get("dialog_state")
if current_state:
print("Currently in: ", current_state[-1])
message = event.get("messages")
if message:
if isinstance(message, list):
message = message[-1]
if message.id not in _printed:
msg_repr = message.pretty_repr(html=True)
if len(msg_repr) > max_length:
msg_repr = msg_repr[:max_length] + " ... (truncated)"
print(msg_repr)
_printed.add(message.id)
第 1 部分:零次学习代理#
在构建时,最好从最简单的可工作实现开始,并使用类似LangSmith 的评估工具来衡量其有效性。如果其他条件相同,应优先选择简单且可扩展的解决方案,而不是复杂的方案。在本例中,单一图表方法有其局限性。机器人可能会在没有用户确认的情况下采取不必要的行动,难以处理复杂查询,且在回答时缺乏针对性。我们将在后续部分解决这些问题。
在本部分中,我们将定义一个简单的零次学习代理作为助手,给该代理所有的工具,并提示其谨慎使用这些工具来协助用户。
简单的双节点图表如下所示:
首先定义状态。
状态#
将我们的 StateGraph
状态定义为一个类型化字典,包含一个只能追加的消息列表。这些消息构成了聊天记录,也是我们简单助手所需的全部状态信息。
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
代理#
接下来,定义助手函数。该函数接受图状态,将其格式化为提示,然后调用大型语言模型(LLM)来预测最佳响应。
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
state = {**state, "user_info": passenger_id}
result = self.runnable.invoke(state)
# If the LLM happens to return an empty response, we will re-prompt it
# for an actual response.
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
# Haiku is faster and cheaper, but less accurate
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# You could swap LLMs, though you will likely want to update the prompts when
# doing so!
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4-turbo-preview")
primary_assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful customer support assistant for Swiss Airlines. "
" Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" If a search comes up empty, expand your search before giving up."
"\n\nCurrent user:\n<User>\n{user_info}\n</User>"
"\nCurrent time: {time}.",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
part_1_tools = [
TavilySearchResults(max_results=1),
fetch_user_flight_information,
search_flights,
lookup_policy,
update_ticket_to_new_flight,
cancel_ticket,
search_car_rentals,
book_car_rental,
update_car_rental,
cancel_car_rental,
search_hotels,
book_hotel,
update_hotel,
cancel_hotel,
search_trip_recommendations,
book_excursion,
update_excursion,
cancel_excursion,
]
part_1_assistant_runnable = primary_assistant_prompt | llm.bind_tools(part_1_tools)
定义图#
现在,创建图。这个图是本节的最终助手。
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition
builder = StateGraph(State)
# Define nodes: these do the work
builder.add_node("assistant", Assistant(part_1_assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(part_1_tools))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
tools_condition,
)
builder.add_edge("tools", "assistant")
# The checkpointer lets the graph persist its state
# this is a complete memory for the entire graph.
memory = MemorySaver()
part_1_graph = builder.compile(checkpointer=memory)
from IPython.display import Image, display
try:
display(Image(part_1_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
示例对话#
现在是时候尝试我们强大的聊天机器人了!让我们用下面的对话轮次来运行它。如果遇到 “RecursionLimit”,那意味着机器人无法在分配的步骤内获得答案。这没关系!在本教程的后续部分,我们还有更多技巧可以使用。
import shutil
import uuid
# Let's create an example conversation a user might have with the assistant
tutorial_questions = [
"Hi there, what time is my flight?",
"Am i allowed to update my flight to something sooner? I want to leave later today.",
"Update my flight to sometime next week then",
"The next available option is great",
"what about lodging and transportation?",
"Yeah i think i'd like an affordable hotel for my week-long stay (7 days). And I'll want to rent a car.",
"OK could you place a reservation for your recommended hotel? It sounds nice.",
"yes go ahead and book anything that's moderate expense and has availability.",
"Now for a car, what are my options?",
"Awesome let's just get the cheapest option. Go ahead and book for 7 days",
"Cool so now what recommendations do you have on excursions?",
"Are they available while I'm there?",
"interesting - i like the museums, what options are there? ",
"OK great pick one and book it for my second day there.",
]
# Update with the backup file so we can restart from the original place in each section
db = update_dates(db)
thread_id = str(uuid.uuid4())
config = {
"configurable": {
# The passenger_id is used in our flight tools to
# fetch the user's flight information
"passenger_id": "3442 587242",
# Checkpoints are accessed by thread_id
"thread_id": thread_id,
}
}
_printed = set()
for question in tutorial_questions:
events = part_1_graph.stream(
{"messages": ("user", question)}, config, stream_mode="values"
)
for event in events:
_print_event(event, _printed)
第 1 部分回顾#
我们的简单助手表现还不错!它能够合理地回答所有问题,快速在上下文中响应,并成功执行了所有任务。你可以查看一个 LangSmith 的示例跟踪,以更好地了解 LLM 在上述互动中的提示方式。
如果这只是一个简单的问答机器人,我们对上述结果可能会感到满意。然而,由于我们的客户支持机器人是代表用户采取行动的,它的一些行为有点令人担忧:
- 助手在我们专注于住宿时预订了租车,然后不得不取消并重新预订:哎呀!为了避免不必要的费用,应该让用户在预订之前有最终的决定权。
- 助手在搜索推荐时遇到困难。我们可以通过为工具添加更详细的指令和示例来改进这一点,但如果对每个工具都这么做,可能会导致提示变得冗长,压垮机器人。
- 助手不得不进行显式搜索才能获取用户的相关信息。如果我们立即获取用户的旅行详情,助手就能直接做出响应,从而节省大量时间。
在下一节中,我们将解决上述问题中的前两个。
第 2 部分:添加确认#
当助手代表用户采取行动时,几乎总是应该让用户最终决定是否执行这些操作。否则,助手所犯的任何小错误(或任何它受到的提示注入)都可能对用户造成实际损害。
在本节中,我们将使用 interrupt_before
来在执行任何工具之前暂停图表的运行,并将控制权交还给用户。
你的图表将类似于以下内容:
与之前一样,从定义状态开始:
状态 & 助手#
我们的图表状态和调用 LLM 的方式与第 1 部分几乎相同,除了以下几点不同:
- 我们增加了一个
user_info
字段,该字段将由我们的图表主动填充 - 我们可以直接在
Assistant
对象中使用状态,而不是使用可配置的参数
from typing import Annotated
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
user_info: str
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
result = self.runnable.invoke(state)
# If the LLM happens to return an empty response, we will re-prompt it
# for an actual response.
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
# Haiku is faster and cheaper, but less accurate
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# You could also use OpenAI or another model, though you will likely have
# to adapt the prompts
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4-turbo-preview")
assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful customer support assistant for Swiss Airlines. "
" Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" If a search comes up empty, expand your search before giving up."
"\n\nCurrent user:\n<User>\n{user_info}\n</User>"
"\nCurrent time: {time}.",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
part_2_tools = [
TavilySearchResults(max_results=1),
fetch_user_flight_information,
search_flights,
lookup_policy,
update_ticket_to_new_flight,
cancel_ticket,
search_car_rentals,
book_car_rental,
update_car_rental,
cancel_car_rental,
search_hotels,
book_hotel,
update_hotel,
cancel_hotel,
search_trip_recommendations,
book_excursion,
update_excursion,
cancel_excursion,
]
part_2_assistant_runnable = assistant_prompt | llm.bind_tools(part_2_tools)
定义图#
现在,创建图表。针对第一部分中的问题,做出以下两项改动:
- 在使用工具之前添加一个中断。
- 在第一个节点中明确填充用户状态,以便助手不必使用工具就能了解用户信息。
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition
builder = StateGraph(State)
def user_info(state: State):
return {"user_info": fetch_user_flight_information.invoke({})}
# NEW: The fetch_user_info node runs first, meaning our assistant can see the user's flight information without
# having to take an action
builder.add_node("fetch_user_info", user_info)
builder.add_edge(START, "fetch_user_info")
builder.add_node("assistant", Assistant(part_2_assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(part_2_tools))
builder.add_edge("fetch_user_info", "assistant")
builder.add_conditional_edges(
"assistant",
tools_condition,
)
builder.add_edge("tools", "assistant")
memory = MemorySaver()
part_2_graph = builder.compile(
checkpointer=memory,
# NEW: The graph will always halt before executing the "tools" node.
# The user can approve or reject (or even alter the request) before
# the assistant continues
interrupt_before=["tools"],
)
from IPython.display import Image, display
try:
display(Image(part_2_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
示例对话#
现在是时候尝试我们新修订的聊天机器人了!让我们根据以下对话轮次来运行它。
import shutil
import uuid
# Update with the backup file so we can restart from the original place in each section
db = update_dates(db)
thread_id = str(uuid.uuid4())
config = {
"configurable": {
# The passenger_id is used in our flight tools to
# fetch the user's flight information
"passenger_id": "3442 587242",
# Checkpoints are accessed by thread_id
"thread_id": thread_id,
}
}
_printed = set()
# We can reuse the tutorial questions from part 1 to see how it does.
for question in tutorial_questions:
events = part_2_graph.stream(
{"messages": ("user", question)}, config, stream_mode="values"
)
for event in events:
_print_event(event, _printed)
snapshot = part_2_graph.get_state(config)
while snapshot.next:
# We have an interrupt! The agent is trying to use a tool, and the user can approve or deny it
# Note: This code is all outside of your graph. Typically, you would stream the output to a UI.
# Then, you would have the frontend trigger a new run via an API call when the user has provided input.
user_input = input(
"Do you approve of the above actions? Type 'y' to continue;"
" otherwise, explain your requested changed.\n\n"
)
if user_input.strip() == "y":
# Just continue
result = part_2_graph.invoke(
None,
config,
)
else:
# Satisfy the tool invocation by
# providing instructions on the requested changes / change of mind
result = part_2_graph.invoke(
{
"messages": [
ToolMessage(
tool_call_id=event["messages"][-1].tool_calls[0]["id"],
content=f"API call denied by user. Reasoning: '{user_input}'. Continue assisting, accounting for the user's input.",
)
]
},
config,
)
snapshot = part_2_graph.get_state(config)
第二部分回顾#
现在我们的助手已经能够通过跳过一个步骤来直接回复我们的航班详情了。我们也完全掌控了执行的每一个操作。这一切都依赖于 LangGraph 的interrupts
(中断)和checkpointers
(检查点)。中断功能会暂停图的执行,而它的状态会安全地保存在你配置的检查点中。用户可以通过运行正确的配置随时重新启动。
查看这个LangSmith 执行示例,你可以更好地理解图是如何运行的。注意这个示例,通常你可以通过调用(None, config)
来恢复一个流程。状态会从检查点加载,就像从未被中断一样。
这个图运行得相当不错!不过我们其实并不需要参与每一个助手的操作……
在下一部分中,我们将重新组织我们的图结构,以便仅在那些实际写入数据库的 “敏感” 操作上触发中断。
第三部分:条件中断#
在本节中,我们将通过将工具分类为安全(只读)或敏感(数据修改)来优化我们的中断策略。我们只会对敏感工具应用中断,允许机器人自主处理简单查询。
这样做平衡了用户控制与对话流畅性,但随着我们增加更多工具,单一的图结构可能会变得过于复杂,难以维持 “平面” 结构。我们将在下一节解决这个问题。
第三部分的图结构大致如下图所示:
状态#
与往常一样,首先定义图的状态。我们的状态和 LLM 调用与第二部分完全相同。
from typing import Annotated
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
user_info: str
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
result = self.runnable.invoke(state)
# If the LLM happens to return an empty response, we will re-prompt it
# for an actual response.
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
# Haiku is faster and cheaper, but less accurate
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# You can update the LLMs, though you may need to update the prompts
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4-turbo-preview")
assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful customer support assistant for Swiss Airlines. "
" Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" If a search comes up empty, expand your search before giving up."
"\n\nCurrent user:\n<User>\n{user_info}\n</User>"
"\nCurrent time: {time}.",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
# "Read"-only tools (such as retrievers) don't need a user confirmation to use
part_3_safe_tools = [
TavilySearchResults(max_results=1),
fetch_user_flight_information,
search_flights,
lookup_policy,
search_car_rentals,
search_hotels,
search_trip_recommendations,
]
# These tools all change the user's reservations.
# The user has the right to control what decisions are made
part_3_sensitive_tools = [
update_ticket_to_new_flight,
cancel_ticket,
book_car_rental,
update_car_rental,
cancel_car_rental,
book_hotel,
update_hotel,
cancel_hotel,
book_excursion,
update_excursion,
cancel_excursion,
]
sensitive_tool_names = {t.name for t in part_3_sensitive_tools}
# Our LLM doesn't have to know which nodes it has to route to. In its 'mind', it's just invoking functions.
part_3_assistant_runnable = assistant_prompt | llm.bind_tools(
part_3_safe_tools + part_3_sensitive_tools
)
定义图#
现在,创建图。我们的图与第二部分几乎相同,区别在于我们将工具分成了两个独立的节点。我们只会在那些实际对用户预订进行更改的工具之前进行中断。
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition
builder = StateGraph(State)
def user_info(state: State):
return {"user_info": fetch_user_flight_information.invoke({})}
# NEW: The fetch_user_info node runs first, meaning our assistant can see the user's flight information without
# having to take an action
builder.add_node("fetch_user_info", user_info)
builder.add_edge(START, "fetch_user_info")
builder.add_node("assistant", Assistant(part_3_assistant_runnable))
builder.add_node("safe_tools", create_tool_node_with_fallback(part_3_safe_tools))
builder.add_node(
"sensitive_tools", create_tool_node_with_fallback(part_3_sensitive_tools)
)
# Define logic
builder.add_edge("fetch_user_info", "assistant")
def route_tools(state: State) -> Literal["safe_tools", "sensitive_tools", "__end__"]:
next_node = tools_condition(state)
# If no tools are invoked, return to the user
if next_node == END:
return END
ai_message = state["messages"][-1]
# This assumes single tool calls. To handle parallel tool calling, you'd want to
# use an ANY condition
first_tool_call = ai_message.tool_calls[0]
if first_tool_call["name"] in sensitive_tool_names:
return "sensitive_tools"
return "safe_tools"
builder.add_conditional_edges(
"assistant",
route_tools,
)
builder.add_edge("safe_tools", "assistant")
builder.add_edge("sensitive_tools", "assistant")
memory = MemorySaver()
part_3_graph = builder.compile(
checkpointer=memory,
# NEW: The graph will always halt before executing the "tools" node.
# The user can approve or reject (or even alter the request) before
# the assistant continues
interrupt_before=["sensitive_tools"],
)
from IPython.display import Image, display
try:
display(Image(part_3_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
示例对话#
现在是时候尝试我们刚刚改进的聊天机器人了!让我们通过以下对话轮次来运行它。这次,我们会有更少的确认步骤。
import shutil
import uuid
# Update with the backup file so we can restart from the original place in each section
db = update_dates(db)
thread_id = str(uuid.uuid4())
config = {
"configurable": {
# The passenger_id is used in our flight tools to
# fetch the user's flight information
"passenger_id": "3442 587242",
# Checkpoints are accessed by thread_id
"thread_id": thread_id,
}
}
tutorial_questions = [
"Hi there, what time is my flight?",
"Am i allowed to update my flight to something sooner? I want to leave later today.",
"Update my flight to sometime next week then",
"The next available option is great",
"what about lodging and transportation?",
"Yeah i think i'd like an affordable hotel for my week-long stay (7 days). And I'll want to rent a car.",
"OK could you place a reservation for your recommended hotel? It sounds nice.",
"yes go ahead and book anything that's moderate expense and has availability.",
"Now for a car, what are my options?",
"Awesome let's just get the cheapest option. Go ahead and book for 7 days",
"Cool so now what recommendations do you have on excursions?",
"Are they available while I'm there?",
"interesting - i like the museums, what options are there? ",
"OK great pick one and book it for my second day there.",
]
_printed = set()
# We can reuse the tutorial questions from part 1 to see how it does.
for question in tutorial_questions:
events = part_3_graph.stream(
{"messages": ("user", question)}, config, stream_mode="values"
)
for event in events:
_print_event(event, _printed)
snapshot = part_3_graph.get_state(config)
while snapshot.next:
# We have an interrupt! The agent is trying to use a tool, and the user can approve or deny it
# Note: This code is all outside of your graph. Typically, you would stream the output to a UI.
# Then, you would have the frontend trigger a new run via an API call when the user has provided input.
user_input = input(
"Do you approve of the above actions? Type 'y' to continue;"
" otherwise, explain your requested changed.\n\n"
)
if user_input.strip() == "y":
# Just continue
result = part_3_graph.invoke(
None,
config,
)
else:
# Satisfy the tool invocation by
# providing instructions on the requested changes / change of mind
result = part_3_graph.invoke(
{
"messages": [
ToolMessage(
tool_call_id=event["messages"][-1].tool_calls[0]["id"],
content=f"API call denied by user. Reasoning: '{user_input}'. Continue assisting, accounting for the user's input.",
)
]
},
config,
)
snapshot = part_3_graph.get_state(config)
第三部分回顾#
效果更好了!我们的代理现在运行得很好 —— 查看 LangSmith 的执行记录,来检查它的工作!你可能对这个设计已经感到满意。代码是封装好的,并且它的表现符合预期。
这个设计的一个问题是,我们把太多压力放在了单个提示词上。如果我们想添加更多工具,或者每个工具变得更加复杂(更多筛选条件、更多业务逻辑限制行为等),那么工具的使用效果以及机器人的整体表现很可能会开始下降。
在接下来的部分中,我们将展示如何通过根据用户意图路由到专门的代理或子图,来更好地控制不同的用户体验。
第 4 部分:专门的工作流程#
在前面的章节中,我们看到了依赖于单一提示和大型语言模型(LLM)处理各种用户意图的 “广泛” 聊天机器人能够带我们走得多远。然而,这种方法很难为已知的意图创建可预测的出色用户体验。
另一种方法是,您的图可以检测用户意图并选择合适的工作流程或 “技能” 来满足用户的需求。每个工作流程可以专注于其领域,从而可以进行独立优化,而不会降低整个助手的整体性能。
在本节中,我们将用户体验划分为单独的子图,最终结构如下所示:
在上图中,每个方块都包含一个具备执行能力、专注的工作流程。主助手处理用户的初始查询,图根据查询内容将请求路由到相应的 “专家”。
状态#
我们希望能够随时跟踪哪个子图在控制当前会话。虽然我们_可以_通过对消息列表进行一些运算来实现这一点,但更简单的方法是将其作为专门的堆栈进行跟踪。
在下面的 State
中添加一个 dialog_state
列表。每当一个 node
运行并返回一个 dialog_state
值时,将调用 update_dialog_stack
函数来确定如何应用更新。
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
"""Push or pop the state."""
if right is None:
return left
if right == "pop":
return left[:-1]
return left + [right]
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
user_info: str
dialog_state: Annotated[
list[
Literal[
"assistant",
"update_flight",
"book_car_rental",
"book_hotel",
"book_excursion",
]
],
update_dialog_stack,
]
助手#
这次我们将为每个工作流程创建一个助手。也就是说:
- 航班预订助手
- 酒店预订助手
- 租车助手
- 行程安排助手
- 最后,一个 “主助手” 用于在这些助手之间进行路由
如果你留意了,你可能会发现这是我们多代理示例中的监督者设计模式的一个实例。
下面定义用于驱动每个助手的 Runnable
对象。每个 Runnable
都有一个提示、LLM(大型语言模型)以及适用于该助手的工具的模式。
每个专门化的 / 委派的助手还可以调用 CompleteOrEscalate
工具,来指示控制流应回到主助手。这发生在助手成功完成其任务,或用户改变主意,或者需要处理超出该工作流程范围的问题时。
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from pydantic import BaseModel, Field
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
result = self.runnable.invoke(state)
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
class CompleteOrEscalate(BaseModel):
"""A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
who can re-route the dialog based on the user's needs."""
cancel: bool = True
reason: str
class Config:
json_schema_extra = {
"example": {
"cancel": True,
"reason": "User changed their mind about the current task.",
},
"example 2": {
"cancel": True,
"reason": "I have fully completed the task.",
},
"example 3": {
"cancel": False,
"reason": "I need to search the user's emails or calendar for more information.",
},
}
# Flight booking assistant
flight_booking_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for handling flight updates. "
" The primary assistant delegates work to you whenever the user needs help updating their bookings. "
"Confirm the updated flight details with the customer and inform them of any additional fees. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
"If you need more information or the customer changes their mind, escalate the task back to the main assistant."
" Remember that a booking isn't completed until after the relevant tool has successfully been used."
"\n\nCurrent user flight information:\n<Flights>\n{user_info}\n</Flights>"
"\nCurrent time: {time}."
"\n\nIf the user needs help, and none of your tools are appropriate for it, then"
' "CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.',
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
update_flight_safe_tools = [search_flights]
update_flight_sensitive_tools = [update_ticket_to_new_flight, cancel_ticket]
update_flight_tools = update_flight_safe_tools + update_flight_sensitive_tools
update_flight_runnable = flight_booking_prompt | llm.bind_tools(
update_flight_tools + [CompleteOrEscalate]
)
# Hotel Booking Assistant
book_hotel_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for handling hotel bookings. "
"The primary assistant delegates work to you whenever the user needs help booking a hotel. "
"Search for available hotels based on the user's preferences and confirm the booking details with the customer. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
"If you need more information or the customer changes their mind, escalate the task back to the main assistant."
" Remember that a booking isn't completed until after the relevant tool has successfully been used."
"\nCurrent time: {time}."
'\n\nIf the user needs help, and none of your tools are appropriate for it, then "CompleteOrEscalate" the dialog to the host assistant.'
" Do not waste the user's time. Do not make up invalid tools or functions."
"\n\nSome examples for which you should CompleteOrEscalate:\n"
" - 'what's the weather like this time of year?'\n"
" - 'nevermind i think I'll book separately'\n"
" - 'i need to figure out transportation while i'm there'\n"
" - 'Oh wait i haven't booked my flight yet i'll do that first'\n"
" - 'Hotel booking confirmed'",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
book_hotel_safe_tools = [search_hotels]
book_hotel_sensitive_tools = [book_hotel, update_hotel, cancel_hotel]
book_hotel_tools = book_hotel_safe_tools + book_hotel_sensitive_tools
book_hotel_runnable = book_hotel_prompt | llm.bind_tools(
book_hotel_tools + [CompleteOrEscalate]
)
# Car Rental Assistant
book_car_rental_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for handling car rental bookings. "
"The primary assistant delegates work to you whenever the user needs help booking a car rental. "
"Search for available car rentals based on the user's preferences and confirm the booking details with the customer. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
"If you need more information or the customer changes their mind, escalate the task back to the main assistant."
" Remember that a booking isn't completed until after the relevant tool has successfully been used."
"\nCurrent time: {time}."
"\n\nIf the user needs help, and none of your tools are appropriate for it, then "
'"CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.'
"\n\nSome examples for which you should CompleteOrEscalate:\n"
" - 'what's the weather like this time of year?'\n"
" - 'What flights are available?'\n"
" - 'nevermind i think I'll book separately'\n"
" - 'Oh wait i haven't booked my flight yet i'll do that first'\n"
" - 'Car rental booking confirmed'",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
book_car_rental_safe_tools = [search_car_rentals]
book_car_rental_sensitive_tools = [
book_car_rental,
update_car_rental,
cancel_car_rental,
]
book_car_rental_tools = book_car_rental_safe_tools + book_car_rental_sensitive_tools
book_car_rental_runnable = book_car_rental_prompt | llm.bind_tools(
book_car_rental_tools + [CompleteOrEscalate]
)
# Excursion Assistant
book_excursion_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for handling trip recommendations. "
"The primary assistant delegates work to you whenever the user needs help booking a recommended trip. "
"Search for available trip recommendations based on the user's preferences and confirm the booking details with the customer. "
"If you need more information or the customer changes their mind, escalate the task back to the main assistant."
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" Remember that a booking isn't completed until after the relevant tool has successfully been used."
"\nCurrent time: {time}."
'\n\nIf the user needs help, and none of your tools are appropriate for it, then "CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.'
"\n\nSome examples for which you should CompleteOrEscalate:\n"
" - 'nevermind i think I'll book separately'\n"
" - 'i need to figure out transportation while i'm there'\n"
" - 'Oh wait i haven't booked my flight yet i'll do that first'\n"
" - 'Excursion booking confirmed!'",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
book_excursion_safe_tools = [search_trip_recommendations]
book_excursion_sensitive_tools = [book_excursion, update_excursion, cancel_excursion]
book_excursion_tools = book_excursion_safe_tools + book_excursion_sensitive_tools
book_excursion_runnable = book_excursion_prompt | llm.bind_tools(
book_excursion_tools + [CompleteOrEscalate]
)
# Primary Assistant
class ToFlightBookingAssistant(BaseModel):
"""Transfers work to a specialized assistant to handle flight updates and cancellations."""
request: str = Field(
description="Any necessary followup questions the update flight assistant should clarify before proceeding."
)
class ToBookCarRental(BaseModel):
"""Transfers work to a specialized assistant to handle car rental bookings."""
location: str = Field(
description="The location where the user wants to rent a car."
)
start_date: str = Field(description="The start date of the car rental.")
end_date: str = Field(description="The end date of the car rental.")
request: str = Field(
description="Any additional information or requests from the user regarding the car rental."
)
class Config:
json_schema_extra = {
"example": {
"location": "Basel",
"start_date": "2023-07-01",
"end_date": "2023-07-05",
"request": "I need a compact car with automatic transmission.",
}
}
class ToHotelBookingAssistant(BaseModel):
"""Transfer work to a specialized assistant to handle hotel bookings."""
location: str = Field(
description="The location where the user wants to book a hotel."
)
checkin_date: str = Field(description="The check-in date for the hotel.")
checkout_date: str = Field(description="The check-out date for the hotel.")
request: str = Field(
description="Any additional information or requests from the user regarding the hotel booking."
)
class Config:
json_schema_extra = {
"example": {
"location": "Zurich",
"checkin_date": "2023-08-15",
"checkout_date": "2023-08-20",
"request": "I prefer a hotel near the city center with a room that has a view.",
}
}
class ToBookExcursion(BaseModel):
"""Transfers work to a specialized assistant to handle trip recommendation and other excursion bookings."""
location: str = Field(
description="The location where the user wants to book a recommended trip."
)
request: str = Field(
description="Any additional information or requests from the user regarding the trip recommendation."
)
class Config:
json_schema_extra = {
"example": {
"location": "Lucerne",
"request": "The user is interested in outdoor activities and scenic views.",
}
}
# The top-level assistant performs general Q&A and delegates specialized tasks to other assistants.
# The task delegation is a simple form of semantic routing / does simple intent detection
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
primary_assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful customer support assistant for Swiss Airlines. "
"Your primary role is to search for flight information and company policies to answer customer queries. "
"If a customer requests to update or cancel a flight, book a car rental, book a hotel, or get trip recommendations, "
"delegate the task to the appropriate specialized assistant by invoking the corresponding tool. You are not able to make these types of changes yourself."
" Only the specialized assistants are given permission to do this for the user."
"The user is not aware of the different specialized assistants, so do not mention them; just quietly delegate through function calls. "
"Provide detailed information to the customer, and always double-check the database before concluding that information is unavailable. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" If a search comes up empty, expand your search before giving up."
"\n\nCurrent user flight information:\n<Flights>\n{user_info}\n</Flights>"
"\nCurrent time: {time}.",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
primary_assistant_tools = [
TavilySearchResults(max_results=1),
search_flights,
lookup_policy,
]
assistant_runnable = primary_assistant_prompt | llm.bind_tools(
primary_assistant_tools
+ [
ToFlightBookingAssistant,
ToBookCarRental,
ToHotelBookingAssistant,
ToBookExcursion,
]
)
创建助手#
我们差不多准备好创建图结构了。在前一部分中,我们做了一个设计决策,即在所有节点之间共享 messages
状态。这很有用,因为每个分配的助手都可以看到整个用户旅程,并共享上下文。然而,这也意味着较弱的语言模型(LLMs)很容易在特定范围内混淆。为了标记主要助手与其中一个分配工作流之间的 “交接”(并完成路由器的工具调用),我们将向状态中添加一个 ToolMessage
。
工具函数#
创建一个函数,为每个工作流生成一个 “入口” 节点,声明 “当前助手是 assistant_name
”。
from typing import Callable
from langchain_core.messages import ToolMessage
def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
def entry_node(state: State) -> dict:
tool_call_id = state["messages"][-1].tool_calls[0]["id"]
return {
"messages": [
ToolMessage(
content=f"The assistant is now the {assistant_name}. Reflect on the above conversation between the host assistant and the user."
f" The user's intent is unsatisfied. Use the provided tools to assist the user. Remember, you are {assistant_name},"
" and the booking, update, other other action is not complete until after you have successfully invoked the appropriate tool."
" If the user changes their mind or needs help for other tasks, call the CompleteOrEscalate function to let the primary host assistant take control."
" Do not mention who you are - just act as the proxy for the assistant.",
tool_call_id=tool_call_id,
)
],
"dialog_state": new_dialog_state,
}
return entry_node
定义图表#
现在是时候开始构建我们的图表了。和之前一样,我们将从一个节点开始,用用户的当前信息预先填充状态。
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition
builder = StateGraph(State)
def user_info(state: State):
return {"user_info": fetch_user_flight_information.invoke({})}
builder.add_node("fetch_user_info", user_info)
builder.add_edge(START, "fetch_user_info")
现在我们将开始添加专门的工作流。每个小型工作流的结构与我们在第三部分中完整的图形非常相似,包含以下五个节点:
enter_*
: 使用你上面定义的create_entry_node
工具添加一个 ToolMessage,表示新的专用助手已经接管。- 助手:由提示和 LLM 组合,处理当前状态,决定是使用工具、向用户提问,还是结束工作流(返回到主助手)。
*_safe_tools
: 助手可以使用的 “只读” 工具,不需要用户确认。*_sensitive_tools
: 具有 “写入” 权限的工具,使用这些工具需要用户确认(当我们编译图形时,会为这些工具分配一个interrupt_before
中断)。leave_skill
: 弹出dialog_state
,表示主助手重新接管控制。
由于它们之间的相似性,我们可以定义一个工厂函数来生成这些节点。不过,鉴于这是一个教程,我们会明确地定义每个节点。
首先,创建专门用于管理用户更新和取消航班的航班预订助手。
# Flight booking assistant
builder.add_node(
"enter_update_flight",
create_entry_node("Flight Updates & Booking Assistant", "update_flight"),
)
builder.add_node("update_flight", Assistant(update_flight_runnable))
builder.add_edge("enter_update_flight", "update_flight")
builder.add_node(
"update_flight_sensitive_tools",
create_tool_node_with_fallback(update_flight_sensitive_tools),
)
builder.add_node(
"update_flight_safe_tools",
create_tool_node_with_fallback(update_flight_safe_tools),
)
def route_update_flight(
state: State,
) -> Literal[
"update_flight_sensitive_tools",
"update_flight_safe_tools",
"leave_skill",
"__end__",
]:
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
if did_cancel:
return "leave_skill"
safe_toolnames = [t.name for t in update_flight_safe_tools]
if all(tc["name"] in safe_toolnames for tc in tool_calls):
return "update_flight_safe_tools"
return "update_flight_sensitive_tools"
builder.add_edge("update_flight_sensitive_tools", "update_flight")
builder.add_edge("update_flight_safe_tools", "update_flight")
builder.add_conditional_edges("update_flight", route_update_flight)
# This node will be shared for exiting all specialized assistants
def pop_dialog_state(state: State) -> dict:
"""Pop the dialog stack and return to the main assistant.
This lets the full graph explicitly track the dialog flow and delegate control
to specific sub-graphs.
"""
messages = []
if state["messages"][-1].tool_calls:
# Note: Doesn't currently handle the edge case where the llm performs parallel tool calls
messages.append(
ToolMessage(
content="Resuming dialog with the host assistant. Please reflect on the past conversation and assist the user as needed.",
tool_call_id=state["messages"][-1].tool_calls[0]["id"],
)
)
return {
"dialog_state": "pop",
"messages": messages,
}
builder.add_node("leave_skill", pop_dialog_state)
builder.add_edge("leave_skill", "primary_assistant")
接下来,创建 租车助手 图表,用于处理所有租车需求。
# Car rental assistant
builder.add_node(
"enter_book_car_rental",
create_entry_node("Car Rental Assistant", "book_car_rental"),
)
builder.add_node("book_car_rental", Assistant(book_car_rental_runnable))
builder.add_edge("enter_book_car_rental", "book_car_rental")
builder.add_node(
"book_car_rental_safe_tools",
create_tool_node_with_fallback(book_car_rental_safe_tools),
)
builder.add_node(
"book_car_rental_sensitive_tools",
create_tool_node_with_fallback(book_car_rental_sensitive_tools),
)
def route_book_car_rental(
state: State,
) -> Literal[
"book_car_rental_safe_tools",
"book_car_rental_sensitive_tools",
"leave_skill",
"__end__",
]:
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
if did_cancel:
return "leave_skill"
safe_toolnames = [t.name for t in book_car_rental_safe_tools]
if all(tc["name"] in safe_toolnames for tc in tool_calls):
return "book_car_rental_safe_tools"
return "book_car_rental_sensitive_tools"
builder.add_edge("book_car_rental_sensitive_tools", "book_car_rental")
builder.add_edge("book_car_rental_safe_tools", "book_car_rental")
builder.add_conditional_edges("book_car_rental", route_book_car_rental)
然后定义酒店预订的工作流程。
# Hotel booking assistant
builder.add_node(
"enter_book_hotel", create_entry_node("Hotel Booking Assistant", "book_hotel")
)
builder.add_node("book_hotel", Assistant(book_hotel_runnable))
builder.add_edge("enter_book_hotel", "book_hotel")
builder.add_node(
"book_hotel_safe_tools",
create_tool_node_with_fallback(book_hotel_safe_tools),
)
builder.add_node(
"book_hotel_sensitive_tools",
create_tool_node_with_fallback(book_hotel_sensitive_tools),
)
def route_book_hotel(
state: State,
) -> Literal[
"leave_skill", "book_hotel_safe_tools", "book_hotel_sensitive_tools", "__end__"
]:
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
if did_cancel:
return "leave_skill"
tool_names = [t.name for t in book_hotel_safe_tools]
if all(tc["name"] in tool_names for tc in tool_calls):
return "book_hotel_safe_tools"
return "book_hotel_sensitive_tools"
builder.add_edge("book_hotel_sensitive_tools", "book_hotel")
builder.add_edge("book_hotel_safe_tools", "book_hotel")
builder.add_conditional_edges("book_hotel", route_book_hotel)
接下来,定义旅行助手。
# Excursion assistant
builder.add_node(
"enter_book_excursion",
create_entry_node("Trip Recommendation Assistant", "book_excursion"),
)
builder.add_node("book_excursion", Assistant(book_excursion_runnable))
builder.add_edge("enter_book_excursion", "book_excursion")
builder.add_node(
"book_excursion_safe_tools",
create_tool_node_with_fallback(book_excursion_safe_tools),
)
builder.add_node(
"book_excursion_sensitive_tools",
create_tool_node_with_fallback(book_excursion_sensitive_tools),
)
def route_book_excursion(
state: State,
) -> Literal[
"book_excursion_safe_tools",
"book_excursion_sensitive_tools",
"leave_skill",
"__end__",
]:
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
if did_cancel:
return "leave_skill"
tool_names = [t.name for t in book_excursion_safe_tools]
if all(tc["name"] in tool_names for tc in tool_calls):
return "book_excursion_safe_tools"
return "book_excursion_sensitive_tools"
builder.add_edge("book_excursion_sensitive_tools", "book_excursion")
builder.add_edge("book_excursion_safe_tools", "book_excursion")
builder.add_conditional_edges("book_excursion", route_book_excursion)
最后,创建主要助手。
# Primary assistant
builder.add_node("primary_assistant", Assistant(assistant_runnable))
builder.add_node(
"primary_assistant_tools", create_tool_node_with_fallback(primary_assistant_tools)
)
def route_primary_assistant(
state: State,
) -> Literal[
"primary_assistant_tools",
"enter_update_flight",
"enter_book_hotel",
"enter_book_excursion",
"__end__",
]:
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
if tool_calls:
if tool_calls[0]["name"] == ToFlightBookingAssistant.__name__:
return "enter_update_flight"
elif tool_calls[0]["name"] == ToBookCarRental.__name__:
return "enter_book_car_rental"
elif tool_calls[0]["name"] == ToHotelBookingAssistant.__name__:
return "enter_book_hotel"
elif tool_calls[0]["name"] == ToBookExcursion.__name__:
return "enter_book_excursion"
return "primary_assistant_tools"
raise ValueError("Invalid route")
# The assistant can route to one of the delegated assistants,
# directly use a tool, or directly respond to the user
builder.add_conditional_edges(
"primary_assistant",
route_primary_assistant,
{
"enter_update_flight": "enter_update_flight",
"enter_book_car_rental": "enter_book_car_rental",
"enter_book_hotel": "enter_book_hotel",
"enter_book_excursion": "enter_book_excursion",
"primary_assistant_tools": "primary_assistant_tools",
END: END,
},
)
builder.add_edge("primary_assistant_tools", "primary_assistant")
# Each delegated workflow can directly respond to the user
# When the user responds, we want to return to the currently active workflow
def route_to_workflow(
state: State,
) -> Literal[
"primary_assistant",
"update_flight",
"book_car_rental",
"book_hotel",
"book_excursion",
]:
"""If we are in a delegated state, route directly to the appropriate assistant."""
dialog_state = state.get("dialog_state")
if not dialog_state:
return "primary_assistant"
return dialog_state[-1]
builder.add_conditional_edges("fetch_user_info", route_to_workflow)
# Compile graph
memory = MemorySaver()
part_4_graph = builder.compile(
checkpointer=memory,
# Let the user approve or deny the use of sensitive tools
interrupt_before=[
"update_flight_sensitive_tools",
"book_car_rental_sensitive_tools",
"book_hotel_sensitive_tools",
"book_excursion_sensitive_tools",
],
)
from IPython.display import Image, display
try:
display(Image(part_4_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
对话#
内容真不少!让我们在接下来的对话轮次列表中运行它。这次,我们将减少很多确认步骤。
import shutil
import uuid
# Update with the backup file so we can restart from the original place in each section
db = update_dates(db)
thread_id = str(uuid.uuid4())
config = {
"configurable": {
# The passenger_id is used in our flight tools to
# fetch the user's flight information
"passenger_id": "3442 587242",
# Checkpoints are accessed by thread_id
"thread_id": thread_id,
}
}
_printed = set()
# We can reuse the tutorial questions from part 1 to see how it does.
for question in tutorial_questions:
events = part_4_graph.stream(
{"messages": ("user", question)}, config, stream_mode="values"
)
for event in events:
_print_event(event, _printed)
snapshot = part_4_graph.get_state(config)
while snapshot.next:
# We have an interrupt! The agent is trying to use a tool, and the user can approve or deny it
# Note: This code is all outside of your graph. Typically, you would stream the output to a UI.
# Then, you would have the frontend trigger a new run via an API call when the user has provided input.
user_input = input(
"Do you approve of the above actions? Type 'y' to continue;"
" otherwise, explain your requested changed.\n\n"
)
if user_input.strip() == "y":
# Just continue
result = part_4_graph.invoke(
None,
config,
)
else:
# Satisfy the tool invocation by
# providing instructions on the requested changes / change of mind
result = part_4_graph.invoke(
{
"messages": [
ToolMessage(
tool_call_id=event["messages"][-1].tool_calls[0]["id"],
content=f"API call denied by user. Reasoning: '{user_input}'. Continue assisting, accounting for the user's input.",
)
]
},
config,
)
snapshot = part_4_graph.get_state(config)
结论:#
现在你已经开发了一个能够处理多种任务的客户支持机器人,使用了专注的工作流程。更重要的是,你学会了如何使用 LangGraph 的一些核心功能,根据产品需求设计和重构应用程序。
以上示例并非针对你的独特需求进行优化 ——LLMs 会犯错误,每个流程都可以通过更好的提示和实验来提高可靠性。一旦你创建了初始的支持机器人,下一步就是开始添加评估,这样你可以自信地改进系统。查看相关文档以及我们的其他教程,了解更多信息吧!