The customer support bot can save the team's time by handling routine inquiries, but building a bot that can reliably manage various tasks is not easy, especially when it comes to avoiding user confusion or disappointment.
In this tutorial, you will build a customer support bot for an airline that helps users research and plan their travel. You will learn how to use LangGraph's interruption feature, checkpoints, and more complex state management to organize the assistant's tools and assist users with flight bookings, hotel reservations, car rentals, and travel activities. This tutorial assumes you are already familiar with the concepts introduced in the LangGraph Introduction Tutorial.
By the end of this tutorial, you will have built a fully functional bot and understood the key concepts and architecture of LangGraph. You can also apply these design patterns to other AI projects.
The final chatbot architecture is roughly illustrated in the figure below:
Now, let's get started!
Prerequisites#
First, set up your development environment. We will install the dependencies required for this tutorial, download the test database, and define the tools that will be reused in each section.
We will use Claude as our large language model (LLM) and define some custom tools. While most tools will connect to a local SQLite database (no additional dependencies required), we will also provide general web search functionality for the agent through Tavily.
%%capture --no-stderr
% pip install -U langgraph langchain-community langchain-anthropic tavily-python pandas
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
_set_env("TAVILY_API_KEY")
Populate the Database#
Run the following script to obtain the sqlite
database we prepared for this tutorial and update it to the latest state. The specific details are not important.
import os
import shutil
import sqlite3
import pandas as pd
import requests
db_url = "https://storage.googleapis.com/benchmarks-artifacts/travel-db/travel2.sqlite"
local_file = "travel2.sqlite"
# The backup lets us restart for each tutorial section
backup_file = "travel2.backup.sqlite"
overwrite = False
if overwrite or not os.path.exists(local_file):
response = requests.get(db_url)
response.raise_for_status() # Ensure the request was successful
with open(local_file, "wb") as f:
f.write(response.content)
# Backup - we will use this to "reset" our DB in each section
shutil.copy(local_file, backup_file)
# Convert the flights to present time for our tutorial
def update_dates(file):
shutil.copy(backup_file, file)
conn = sqlite3.connect(file)
cursor = conn.cursor()
tables = pd.read_sql(
"SELECT name FROM sqlite_master WHERE type='table';", conn
).name.tolist()
tdf = {}
for t in tables:
tdf[t] = pd.read_sql(f"SELECT * from {t}", conn)
example_time = pd.to_datetime(
tdf["flights"]["actual_departure"].replace("\\N", pd.NaT)
).max()
current_time = pd.to_datetime("now").tz_localize(example_time.tz)
time_diff = current_time - example_time
tdf["bookings"]["book_date"] = (
pd.to_datetime(tdf["bookings"]["book_date"].replace("\\N", pd.NaT), utc=True)
+ time_diff
)
datetime_columns = [
"scheduled_departure",
"scheduled_arrival",
"actual_departure",
"actual_arrival",
]
for column in datetime_columns:
tdf["flights"][column] = (
pd.to_datetime(tdf["flights"][column].replace("\\N", pd.NaT)) + time_diff
)
for table_name, df in tdf.items():
df.to_sql(table_name, conn, if_exists="replace", index=False)
del df
del tdf
conn.commit()
conn.close()
return file
db = update_dates(local_file)
Tools#
Next, we define the tools for the assistant to search the airline's policy manual and to search and manage bookings for flights, hotels, car rentals, and travel activities. These tools will be reused throughout the tutorial. The specific implementation details are not important, so you can run the following code directly and jump to Part 1.
Find Company Policies#
The assistant can retrieve policy information to answer user questions. Note that the enforcement of policies still needs to be done through tools or APIs, as large language models (LLMs) may overlook these regulations.
import re
import numpy as np
import openai
from langchain_core.tools import tool
response = requests.get(
"https://storage.googleapis.com/benchmarks-artifacts/travel-db/swiss_faq.md"
)
response.raise_for_status()
faq_text = response.text
docs = [{"page_content": txt} for txt in re.split(r"(?=\n##)", faq_text)]
class VectorStoreRetriever:
def __init__(self, docs: list, vectors: list, oai_client):
self._arr = np.array(vectors)
self._docs = docs
self._client = oai_client
@classmethod
def from_docs(cls, docs, oai_client):
embeddings = oai_client.embeddings.create(
model="text-embedding-3-small", input=[doc["page_content"] for doc in docs]
)
vectors = [emb.embedding for emb in embeddings.data]
return cls(docs, vectors, oai_client)
def query(self, query: str, k: int = 5) -> list[dict]:
embed = self._client.embeddings.create(
model="text-embedding-3-small", input=[query]
)
# "@" is just a matrix multiplication in python
scores = np.array(embed.data[0].embedding) @ self._arr.T
top_k_idx = np.argpartition(scores, -k)[-k:]
top_k_idx_sorted = top_k_idx[np.argsort(-scores[top_k_idx])]
return [
{**self._docs[idx], "similarity": scores[idx]} for idx in top_k_idx_sorted
]
retriever = VectorStoreRetriever.from_docs(docs, openai.Client())
@tool
def lookup_policy(query: str) -> str:
"""Consult the company policies to check whether certain options are permitted.
Use this before making any flight changes performing other 'write' events."""
docs = retriever.query(query, k=2)
return "\n\n".join([doc["page_content"] for doc in docs])
Flights#
Define the fetch_user_flight_information
tool to allow the agent to view the current user's flight information. Then define tools to search for flights and manage passenger booking information stored in the SQL database.
We can check the passenger_id
of the user accessing this application by accessing RunnableConfig. Large language models (LLMs) do not need to be explicitly provided with this information; it will be provided with each call to the graph, ensuring that each user cannot access other passengers' booking information.
import sqlite3
from datetime import date, datetime
from typing import Optional
import pytz
from langchain_core.runnables import RunnableConfig
@tool
def fetch_user_flight_information(config: RunnableConfig) -> list[dict]:
"""Fetch all tickets for the user along with corresponding flight information and seat assignments.
Returns:
A list of dictionaries where each dictionary contains the ticket details,
associated flight details, and the seat assignments for each ticket belonging to the user.
"""
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = """
SELECT
t.ticket_no, t.book_ref,
f.flight_id, f.flight_no, f.departure_airport, f.arrival_airport, f.scheduled_departure, f.scheduled_arrival,
bp.seat_no, tf.fare_conditions
FROM
tickets t
JOIN ticket_flights tf ON t.ticket_no = tf.ticket_no
JOIN flights f ON tf.flight_id = f.flight_id
JOIN boarding_passes bp ON bp.ticket_no = t.ticket_no AND bp.flight_id = f.flight_id
WHERE
t.passenger_id = ?
"""
cursor.execute(query, (passenger_id,))
rows = cursor.fetchall()
column_names = [column[0] for column in cursor.description]
results = [dict(zip(column_names, row)) for row in rows]
cursor.close()
conn.close()
return results
@tool
def search_flights(
departure_airport: Optional[str] = None,
arrival_airport: Optional[str] = None,
start_time: Optional[date | datetime] = None,
end_time: Optional[date | datetime] = None,
limit: int = 20,
) -> list[dict]:
"""Search for flights based on departure airport, arrival airport, and departure time range."""
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = "SELECT * FROM flights WHERE 1 = 1"
params = []
if departure_airport:
query += " AND departure_airport = ?"
params.append(departure_airport)
if arrival_airport:
query += " AND arrival_airport = ?"
params.append(arrival_airport)
if start_time:
query += " AND scheduled_departure >= ?"
params.append(start_time)
if end_time:
query += " AND scheduled_departure <= ?"
params.append(end_time)
query += " LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
column_names = [column[0] for column in cursor.description]
results = [dict(zip(column_names, row)) for row in rows]
cursor.close()
conn.close()
return results
@tool
def update_ticket_to_new_flight(
ticket_no: str, new_flight_id: int, *, config: RunnableConfig
) -> str:
"""Update the user's ticket to a new valid flight."""
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"SELECT departure_airport, arrival_airport, scheduled_departure FROM flights WHERE flight_id = ?",
(new_flight_id,),
)
new_flight = cursor.fetchone()
if not new_flight:
cursor.close()
conn.close()
return "Invalid new flight ID provided."
column_names = [column[0] for column in cursor.description]
new_flight_dict = dict(zip(column_names, new_flight))
timezone = pytz.timezone("Etc/GMT-3")
current_time = datetime.now(tz=timezone)
departure_time = datetime.strptime(
new_flight_dict["scheduled_departure"], "%Y-%m-%d %H:%M:%S.%f%z"
)
time_until = (departure_time - current_time).total_seconds()
if time_until < (3 * 3600):
return f"Not permitted to reschedule to a flight that is less than 3 hours from the current time. Selected flight is at {departure_time}."
cursor.execute(
"SELECT flight_id FROM ticket_flights WHERE ticket_no = ?", (ticket_no,)
)
current_flight = cursor.fetchone()
if not current_flight:
cursor.close()
conn.close()
return "No existing ticket found for the given ticket number."
# Check the signed-in user actually has this ticket
cursor.execute(
"SELECT * FROM tickets WHERE ticket_no = ? AND passenger_id = ?",
(ticket_no, passenger_id),
)
current_ticket = cursor.fetchone()
if not current_ticket:
cursor.close()
conn.close()
return f"Current signed-in passenger with ID {passenger_id} not the owner of ticket {ticket_no}"
# In a real application, you'd likely add additional checks here to enforce business logic,
# like "does the new departure airport match the current ticket", etc.
# While it's best to try to be *proactive* in 'type-hinting' policies to the LLM
# it's inevitably going to get things wrong, so you **also** need to ensure your
# API enforces valid behavior
cursor.execute(
"UPDATE ticket_flights SET flight_id = ? WHERE ticket_no = ?",
(new_flight_id, ticket_no),
)
conn.commit()
cursor.close()
conn.close()
return "Ticket successfully updated to new flight."
@tool
def cancel_ticket(ticket_no: str, *, config: RunnableConfig) -> str:
"""Cancel the user's ticket and remove it from the database."""
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"SELECT flight_id FROM ticket_flights WHERE ticket_no = ?", (ticket_no,)
)
existing_ticket = cursor.fetchone()
if not existing_ticket:
cursor.close()
conn.close()
return "No existing ticket found for the given ticket number."
# Check the signed-in user actually has this ticket
cursor.execute(
"SELECT flight_id FROM tickets WHERE ticket_no = ? AND passenger_id = ?",
(ticket_no, passenger_id),
)
current_ticket = cursor.fetchone()
if not current_ticket:
cursor.close()
conn.close()
return f"Current signed-in passenger with ID {passenger_id} not the owner of ticket {ticket_no}"
cursor.execute("DELETE FROM ticket_flights WHERE ticket_no = ?", (ticket_no,))
conn.commit()
cursor.close()
conn.close()
return "Ticket successfully cancelled."
Car Rental Tools#
After booking flights, users typically want to arrange transportation. Define some "car rental" tools to allow users to search for and book vehicles at their destination.
from datetime import date, datetime
from typing import Optional, Union
@tool
def search_car_rentals(
location: Optional[str] = None,
name: Optional[str] = None,
price_tier: Optional[str] = None,
start_date: Optional[Union[datetime, date]] = None,
end_date: Optional[Union[datetime, date]] = None,
) -> list[dict]:
"""
Search for car rentals based on location, name, price tier, start date, and end date.
Args:
location (Optional[str]): The location of the car rental. Defaults to None.
name (Optional[str]): The name of the car rental company. Defaults to None.
price_tier (Optional[str]): The price tier of the car rental. Defaults to None.
start_date (Optional[Union[datetime, date]]): The start date of the car rental. Defaults to None.
end_date (Optional[Union[datetime, date]]): The end date of the car rental. Defaults to None.
Returns:
list[dict]: A list of car rental dictionaries matching the search criteria.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = "SELECT * FROM car_rentals WHERE 1=1"
params = []
if location:
query += " AND location LIKE ?"
params.append(f"%{location}%")
if name:
query += " AND name LIKE ?"
params.append(f"%{name}%")
# For our tutorial, we will let you match on any dates and price tier.
# (since our toy dataset doesn't have much data)
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return [
dict(zip([column[0] for column in cursor.description], row)) for row in results
]
@tool
def book_car_rental(rental_id: int) -> str:
"""
Book a car rental by its ID.
Args:
rental_id (int): The ID of the car rental to book.
Returns:
str: A message indicating whether the car rental was successfully booked or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute("UPDATE car_rentals SET booked = 1 WHERE id = ?", (rental_id,))
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully booked."
else:
conn.close()
return f"No car rental found with ID {rental_id}."
@tool
def update_car_rental(
rental_id: int,
start_date: Optional[Union[datetime, date]] = None,
end_date: Optional[Union[datetime, date]] = None,
) -> str:
"""
Update a car rental's start and end dates by its ID.
Args:
rental_id (int): The ID of the car rental to update.
start_date (Optional[Union[datetime, date]]): The new start date of the car rental. Defaults to None.
end_date (Optional[Union[datetime, date]]): The new end date of the car rental. Defaults to None.
Returns:
str: A message indicating whether the car rental was successfully updated or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
if start_date:
cursor.execute(
"UPDATE car_rentals SET start_date = ? WHERE id = ?",
(start_date, rental_id),
)
if end_date:
cursor.execute(
"UPDATE car_rentals SET end_date = ? WHERE id = ?", (end_date, rental_id)
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully updated."
else:
conn.close()
return f"No car rental found with ID {rental_id}."
@tool
def cancel_car_rental(rental_id: int) -> str:
"""
Cancel a car rental by its ID.
Args:
rental_id (int): The ID of the car rental to cancel.
Returns:
str: A message indicating whether the car rental was successfully cancelled or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute("UPDATE car_rentals SET booked = 0 WHERE id = ?", (rental_id,))
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully cancelled."
else:
conn.close()
return f"No car rental found with ID {rental_id}."
Hotels#
Users need a place to sleep! Define some tools to search for and manage hotel bookings.
@tool
def search_hotels(
location: Optional[str] = None,
name: Optional[str] = None,
price_tier: Optional[str] = None,
checkin_date: Optional[Union[datetime, date]] = None,
checkout_date: Optional[Union[datetime, date]] = None,
) -> list[dict]:
"""
Search for hotels based on location, name, price tier, check-in date, and check-out date.
Args:
location (Optional[str]): The location of the hotel. Defaults to None.
name (Optional[str]): The name of the hotel. Defaults to None.
price_tier (Optional[str]): The price tier of the hotel. Defaults to None. Examples: Midscale, Upper Midscale, Upscale, Luxury
checkin_date (Optional[Union[datetime, date]]): The check-in date of the hotel. Defaults to None.
checkout_date (Optional[Union[datetime, date]]): The check-out date of the hotel. Defaults to None.
Returns:
list[dict]: A list of hotel dictionaries matching the search criteria.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = "SELECT * FROM hotels WHERE 1=1"
params = []
if location:
query += " AND location LIKE ?"
params.append(f"%{location}%")
if name:
query += " AND name LIKE ?"
params.append(f"%{name}%")
# For the sake of this tutorial, we will let you match on any dates and price tier.
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return [
dict(zip([column[0] for column in cursor.description], row)) for row in results
]
@tool
def book_hotel(hotel_id: int) -> str:
"""
Book a hotel by its ID.
Args:
hotel_id (int): The ID of the hotel to book.
Returns:
str: A message indicating whether the hotel was successfully booked or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute("UPDATE hotels SET booked = 1 WHERE id = ?", (hotel_id,))
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Hotel {hotel_id} successfully booked."
else:
conn.close()
return f"No hotel found with ID {hotel_id}."
@tool
def update_hotel(
hotel_id: int,
checkin_date: Optional[Union[datetime, date]] = None,
checkout_date: Optional[Union[datetime, date]] = None,
) -> str:
"""
Update a hotel's check-in and check-out dates by its ID.
Args:
hotel_id (int): The ID of the hotel to update.
checkin_date (Optional[Union[datetime, date]]): The new check-in date of the hotel. Defaults to None.
checkout_date (Optional[Union[datetime, date]]): The new check-out date of the hotel. Defaults to None.
Returns:
str: A message indicating whether the hotel was successfully updated or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
if checkin_date:
cursor.execute(
"UPDATE hotels SET checkin_date = ? WHERE id = ?", (checkin_date, hotel_id)
)
if checkout_date:
cursor.execute(
"UPDATE hotels SET checkout_date = ? WHERE id = ?",
(checkout_date, hotel_id),
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Hotel {hotel_id} successfully updated."
else:
conn.close()
return f"No hotel found with ID {hotel_id}."
@tool
def cancel_hotel(hotel_id: int) -> str:
"""
Cancel a hotel by its ID.
Args:
hotel_id (int): The ID of the hotel to cancel.
Returns:
str: A message indicating whether the hotel was successfully cancelled or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute("UPDATE hotels SET booked = 0 WHERE id = ?", (hotel_id,))
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Hotel {hotel_id} successfully cancelled."
else:
conn.close()
return f"No hotel found with ID {hotel_id}."
Travel Activities#
Finally, define some tools that allow users to search for things to do (and book them) after arriving at their destination.
@tool
def search_trip_recommendations(
location: Optional[str] = None,
name: Optional[str] = None,
keywords: Optional[str] = None,
) -> list[dict]:
"""
Search for trip recommendations based on location, name, and keywords.
Args:
location (Optional[str]): The location of the trip recommendation. Defaults to None.
name (Optional[str]): The name of the trip recommendation. Defaults to None.
keywords (Optional[str]): The keywords associated with the trip recommendation. Defaults to None.
Returns:
list[dict]: A list of trip recommendation dictionaries matching the search criteria.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
query = "SELECT * FROM trip_recommendations WHERE 1=1"
params = []
if location:
query += " AND location LIKE ?"
params.append(f"%{location}%")
if name:
query += " AND name LIKE ?"
params.append(f"%{name}%")
if keywords:
keyword_list = keywords.split(",")
keyword_conditions = " OR ".join(["keywords LIKE ?" for _ in keyword_list])
query += f" AND ({keyword_conditions})"
params.extend([f"%{keyword.strip()}%" for keyword in keyword_list])
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return [
dict(zip([column[0] for column in cursor.description], row)) for row in results
]
@tool
def book_excursion(recommendation_id: int) -> str:
"""
Book an excursion by its recommendation ID.
Args:
recommendation_id (int): The ID of the trip recommendation to book.
Returns:
str: A message indicating whether the trip recommendation was successfully booked or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"UPDATE trip_recommendations SET booked = 1 WHERE id = ?", (recommendation_id,)
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Trip recommendation {recommendation_id} successfully booked."
else:
conn.close()
return f"No trip recommendation found with ID {recommendation_id}."
@tool
def update_excursion(recommendation_id: int, details: str) -> str:
"""
Update a trip recommendation's details by its ID.
Args:
recommendation_id (int): The ID of the trip recommendation to update.
details (str): The new details of the trip recommendation.
Returns:
str: A message indicating whether the trip recommendation was successfully updated or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"UPDATE trip_recommendations SET details = ? WHERE id = ?",
(details, recommendation_id),
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Trip recommendation {recommendation_id} successfully updated."
else:
conn.close()
return f"No trip recommendation found with ID {recommendation_id}."
@tool
def cancel_excursion(recommendation_id: int) -> str:
"""
Cancel a trip recommendation by its ID.
Args:
recommendation_id (int): The ID of the trip recommendation to cancel.
Returns:
str: A message indicating whether the trip recommendation was successfully cancelled or not.
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(
"UPDATE trip_recommendations SET booked = 0 WHERE id = ?", (recommendation_id,)
)
conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Trip recommendation {recommendation_id} successfully cancelled."
else:
conn.close()
return f"No trip recommendation found with ID {recommendation_id}."
Tool Functions#
Define helper functions to beautify the printed messages in the graph during debugging and provide error handling for our tool nodes (by adding errors to the chat history).
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode
def handle_tool_error(state) -> dict:
error = state.get("error")
tool_calls = state["messages"][-1].tool_calls
return {
"messages": [
ToolMessage(
content=f"Error: {repr(error)}\n please fix your mistakes.",
tool_call_id=tc["id"],
)
for tc in tool_calls
]
}
def create_tool_node_with_fallback(tools: list) -> dict:
return ToolNode(tools).with_fallbacks(
[RunnableLambda(handle_tool_error)], exception_key="error"
)
def _print_event(event: dict, _printed: set, max_length=1500):
current_state = event.get("dialog_state")
if current_state:
print("Currently in: ", current_state[-1])
message = event.get("messages")
if message:
if isinstance(message, list):
message = message[-1]
if message.id not in _printed:
msg_repr = message.pretty_repr(html=True)
if len(msg_repr) > max_length:
msg_repr = msg_repr[:max_length] + " ... (truncated)"
print(msg_repr)
_printed.add(message.id)
Part 1: Zero-Shot Agent#
When building, it's best to start with the simplest working implementation and use tools like LangSmith's evaluation tool to measure its effectiveness. Given the same conditions, a simple and scalable solution should be prioritized over a complex one. In this case, the single graph approach has its limitations. The bot may take unnecessary actions without user confirmation, struggle with complex queries, and lack specificity in responses. We will address these issues in the subsequent sections.
In this part, we will define a simple zero-shot agent as the assistant, giving this agent all the tools and prompting it to use these tools cautiously to assist users.
A simple dual-node graph is illustrated below:
First, define the state.
State#
We define our StateGraph
state as a typed dictionary containing a message list that can only be appended. These messages constitute the chat history and are all the state information our simple assistant needs.
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
Assistant#
Next, define the assistant function. This function takes the graph state, formats it into a prompt, and calls the large language model (LLM) to predict the best response.
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
state = {**state, "user_info": passenger_id}
result = self.runnable.invoke(state)
# If the LLM happens to return an empty response, we will re-prompt it
# for an actual response.
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
# Haiku is faster and cheaper, but less accurate
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# You could swap LLMs, though you will likely want to update the prompts when
# doing so!
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4-turbo-preview")
primary_assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful customer support assistant for Swiss Airlines. "
" Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" If a search comes up empty, expand your search before giving up."
"\n\nCurrent user:\n<User>\n{user_info}\n</User>"
"\nCurrent time: {time}.",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
part_1_tools = [
TavilySearchResults(max_results=1),
fetch_user_flight_information,
search_flights,
lookup_policy,
update_ticket_to_new_flight,
cancel_ticket,
search_car_rentals,
book_car_rental,
update_car_rental,
cancel_car_rental,
search_hotels,
book_hotel,
update_hotel,
cancel_hotel,
search_trip_recommendations,
book_excursion,
update_excursion,
cancel_excursion,
]
part_1_assistant_runnable = primary_assistant_prompt | llm.bind_tools(part_1_tools)
Define the Graph#
Now, create the graph. This graph is the final assistant for this section.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition
builder = StateGraph(State)
# Define nodes: these do the work
builder.add_node("assistant", Assistant(part_1_assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(part_1_tools))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
tools_condition,
)
builder.add_edge("tools", "assistant")
# The checkpointer lets the graph persist its state
# this is a complete memory for the entire graph.
memory = MemorySaver()
part_1_graph = builder.compile(checkpointer=memory)
from IPython.display import Image, display
try:
display(Image(part_1_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
Example Conversation#
Now it's time to try our powerful chatbot! Let's run it with the conversation rounds below. If you encounter "RecursionLimit," it means the bot couldn't get an answer within the allocated steps. That's okay! We have more tricks to use in the later parts of this tutorial.
import shutil
import uuid
# Let's create an example conversation a user might have with the assistant
tutorial_questions = [
"Hi there, what time is my flight?",
"Am I allowed to update my flight to something sooner? I want to leave later today.",
"Update my flight to sometime next week then",
"The next available option is great",
"What about lodging and transportation?",
"Yeah I think I'd like an affordable hotel for my week-long stay (7 days). And I'll want to rent a car.",
"OK could you place a reservation for your recommended hotel? It sounds nice.",
"Yes go ahead and book anything that's moderate expense and has availability.",
"Now for a car, what are my options?",
"Awesome let's just get the cheapest option. Go ahead and book for 7 days",
"Cool so now what recommendations do you have on excursions?",
"Are they available while I'm there?",
"Interesting - I like the museums, what options are there? ",
"OK great pick one and book it for my second day there.",
]
# Update with the backup file so we can restart from the original place in each section
db = update_dates(db)
thread_id = str(uuid.uuid4())
config = {
"configurable": {
# The passenger_id is used in our flight tools to
# fetch the user's flight information
"passenger_id": "3442 587242",
# Checkpoints are accessed by thread_id
"thread_id": thread_id,
}
}
_printed = set()
for question in tutorial_questions:
events = part_1_graph.stream(
{"messages": ("user", question)}, config, stream_mode="values"
)
for event in events:
_print_event(event, _printed)
Part 1 Review#
Our simple assistant performed quite well! It was able to reasonably answer all questions, respond quickly in context, and successfully executed all tasks. You can view a LangSmith example trace to better understand how the LLM was prompted during the interactions above.
If this were just a simple Q&A bot, we might be satisfied with the results above. However, since our customer support bot acts on behalf of the user, some of its behaviors are a bit concerning:
- The assistant booked a rental car while we were focused on lodging, then had to cancel and rebook: oops! To avoid unnecessary costs, users should have the final say before booking.
- The assistant struggled when searching for recommendations. We could improve this by adding more detailed instructions and examples to the tools, but doing so for every tool might lead to overly lengthy prompts that overwhelm the bot.
- The assistant had to perform explicit searches to obtain relevant user information. If we obtained the user's travel details immediately, the assistant could respond directly, saving a lot of time.
In the next section, we will address the first two issues mentioned above.
Part 2: Adding Confirmation#
When the assistant takes action on behalf of the user, it should almost always allow the user to have the final say on whether to execute those actions. Otherwise, any small mistakes made by the assistant (or any prompt injection it receives) could cause real harm to the user.
In this section, we will use interrupt_before
to pause the graph's execution before performing any tool actions and return control to the user.
Your graph will look similar to the following:
As before, start by defining the state:
State & Assistant#
Our graph state and the way we call the LLM are almost identical to Part 1, with the following differences:
- We have added a
user_info
field that will be populated by our graph. - We can use the state directly in the
Assistant
object instead of using configurable parameters.
from typing import Annotated
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
user_info: str
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
result = self.runnable.invoke(state)
# If the LLM happens to return an empty response, we will re-prompt it
# for an actual response.
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
# Haiku is faster and cheaper, but less accurate
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# You could also use OpenAI or another model, though you will likely have
# to adapt the prompts
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4-turbo-preview")
assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful customer support assistant for Swiss Airlines. "
" Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" If a search comes up empty, expand your search before giving up."
"\n\nCurrent user:\n<User>\n{user_info}\n</User>"
"\nCurrent time: {time}.",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
part_2_tools = [
TavilySearchResults(max_results=1),
fetch_user_flight_information,
search_flights,
lookup_policy,
update_ticket_to_new_flight,
cancel_ticket,
search_car_rentals,
book_car_rental,
update_car_rental,
cancel_car_rental,
search_hotels,
book_hotel,
update_hotel,
cancel_hotel,
search_trip_recommendations,
book_excursion,
update_excursion,
cancel_excursion,
]
part_2_assistant_runnable = assistant_prompt | llm.bind_tools(part_2_tools)
Define the Graph#
Now, create the graph. In response to the issues from the first part, make the following two changes:
- Add an interruption before using tools.
- Explicitly populate the user state in the first node so that the assistant can understand user information without needing to use tools.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition
builder = StateGraph(State)
def user_info(state: State):
return {"user_info": fetch_user_flight_information.invoke({})}
# NEW: The fetch_user_info node runs first, meaning our assistant can see the user's flight information without
# having to take an action
builder.add_node("fetch_user_info", user_info)
builder.add_edge(START, "fetch_user_info")
builder.add_node("assistant", Assistant(part_2_assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(part_2_tools))
builder.add_edge("fetch_user_info", "assistant")
builder.add_conditional_edges(
"assistant",
tools_condition,
)
builder.add_edge("tools", "assistant")
memory = MemorySaver()
part_2_graph = builder.compile(
checkpointer=memory,
# NEW: The graph will always halt before executing the "tools" node.
# The user can approve or reject (or even alter the request) before
# the assistant continues
interrupt_before=["tools"],
)
from IPython.display import Image, display
try:
display(Image(part_2_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
Example Conversation#
Now it's time to try our newly revised chatbot! Let's run it based on the conversation rounds below.
import shutil
import uuid
# Update with the backup file so we can restart from the original place in each section
db = update_dates(db)
thread_id = str(uuid.uuid4())
config = {
"configurable": {
# The passenger_id is used in our flight tools to
# fetch the user's flight information
"passenger_id": "3442 587242",
# Checkpoints are accessed by thread_id
"thread_id": thread_id,
}
}
_printed = set()
# We can reuse the tutorial questions from part 1 to see how it does.
for question in tutorial_questions:
events = part_2_graph.stream(
{"messages": ("user", question)}, config, stream_mode="values"
)
for event in events:
_print_event(event, _printed)
snapshot = part_2_graph.get_state(config)
while snapshot.next:
# We have an interrupt! The agent is trying to use a tool, and the user can approve or deny it
# Note: This code is all outside of your graph. Typically, you would stream the output to a UI.
# Then, you would have the frontend trigger a new run via an API call when the user has provided input.
user_input = input(
"Do you approve of the above actions? Type 'y' to continue;"
" otherwise, explain your requested changes.\n\n"
)
if user_input.strip() == "y":
# Just continue
result = part_2_graph.invoke(
None,
config,
)
else:
# Satisfy the tool invocation by
# providing instructions on the requested changes / change of mind
result = part_2_graph.invoke(
{
"messages": [
ToolMessage(
tool_call_id=event["messages"][-1].tool_calls[0]["id"],
content=f"API call denied by user. Reasoning: '{user_input}'. Continue assisting, accounting for the user's input.",
)
]
},
config,
)
snapshot = part_2_graph.get_state(config)
Part 2 Review#
Now our assistant can directly respond to our flight details by skipping a step. We also have complete control over every action executed. All of this relies on LangGraph's interrupts
and checkpointers
. The interrupt feature pauses the execution of the graph, while its state is safely stored in the configured checkpoint. The user can restart at any time by running the correct configuration.
Check out this LangSmith execution example to better understand how the graph operates. Note this example here, where you can typically resume a flow by calling (None, config)
. The state will load from the checkpoint as if it had never been interrupted.
This graph runs quite well! However, we actually do not need to be involved in every single action of the assistant...
In the next part, we will reorganize our graph structure to trigger interruptions only on those "sensitive" operations that actually write to the database.
Part 3: Conditional Interrupts#
In this section, we will optimize our interrupt strategy by categorizing tools as safe (read-only) or sensitive (data-modifying). We will only apply interruptions to sensitive tools, allowing the bot to autonomously handle simple queries.
This approach balances user control with conversational fluidity, but as we add more tools, a single graph structure may become overly complex and difficult to maintain in a "flat" structure. We will address this in the next section.
The graph structure for Part 3 is roughly illustrated in the figure below:
State#
As usual, start by defining the state of the graph. Our state and LLM calls are exactly the same as in Part 2.
from typing import Annotated
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
user_info: str
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
result = self.runnable.invoke(state)
# If the LLM happens to return an empty response, we will re-prompt it
# for an actual response.
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
# Haiku is faster and cheaper, but less accurate
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# You can update the LLMs, though you may need to update the prompts
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4-turbo-preview")
assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful customer support assistant for Swiss Airlines. "
" Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" If a search comes up empty, expand your search before giving up."
"\n\nCurrent user:\n<User>\n{user_info}\n</User>"
"\nCurrent time: {time}.",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
# "Read"-only tools (such as retrievers) don't need a user confirmation to use
part_3_safe_tools = [
TavilySearchResults(max_results=1),
fetch_user_flight_information,
search_flights,
lookup_policy,
search_car_rentals,
search_hotels,
search_trip_recommendations,
]
# These tools all change the user's reservations.
# The user has the right to control what decisions are made
part_3_sensitive_tools = [
update_ticket_to_new_flight,
cancel_ticket,
book_car_rental,
update_car_rental,
cancel_car_rental,
book_hotel,
update_hotel,
cancel_hotel,
book_excursion,
update_excursion,
cancel_excursion,
]
sensitive_tool_names = {t.name for t in part_3_sensitive_tools}
# Our LLM doesn't have to know which nodes it has to route to. In its 'mind', it's just invoking functions.
part_3_assistant_runnable = assistant_prompt | llm.bind_tools(
part_3_safe_tools + part_3_sensitive_tools
)
Define the Graph#
Now, create the graph. Our graph is almost identical to Part 2, with the difference that we have split the tools into two separate nodes. We will only interrupt before those tools that actually change the user's bookings.
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition
builder = StateGraph(State)
def user_info(state: State):
return {"user_info": fetch_user_flight_information.invoke({})}
# NEW: The fetch_user_info node runs first, meaning our assistant can see the user's flight information without
# having to take an action
builder.add_node("fetch_user_info", user_info)
builder.add_edge(START, "fetch_user_info")
builder.add_node("assistant", Assistant(part_3_assistant_runnable))
builder.add_node("safe_tools", create_tool_node_with_fallback(part_3_safe_tools))
builder.add_node(
"sensitive_tools", create_tool_node_with_fallback(part_3_sensitive_tools)
)
# Define logic
builder.add_edge("fetch_user_info", "assistant")
def route_tools(state: State) -> Literal["safe_tools", "sensitive_tools", "__end__"]:
next_node = tools_condition(state)
# If no tools are invoked, return to the user
if next_node == END:
return END
ai_message = state["messages"][-1]
# This assumes single tool calls. To handle parallel tool calling, you'd want to
# use an ANY condition
first_tool_call = ai_message.tool_calls[0]
if first_tool_call["name"] in sensitive_tool_names:
return "sensitive_tools"
return "safe_tools"
builder.add_conditional_edges(
"assistant",
route_tools,
)
builder.add_edge("safe_tools", "assistant")
builder.add_edge("sensitive_tools", "assistant")
memory = MemorySaver()
part_3_graph = builder.compile(
checkpointer=memory,
# NEW: The graph will always halt before executing the "tools" node.
# The user can approve or reject (or even alter the request) before
# the assistant continues
interrupt_before=["sensitive_tools"],
)
from IPython.display import Image, display
try:
display(Image(part_3_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
Example Conversation#
Now it's time to try our improved chatbot! Let's run it through the conversation rounds below. This time, we will have fewer confirmation steps.
import shutil
import uuid
# Update with the backup file so we can restart from the original place in each section
db = update_dates(db)
thread_id = str(uuid.uuid4())
config = {
"configurable": {
# The passenger_id is used in our flight tools to
# fetch the user's flight information
"passenger_id": "3442 587242",
# Checkpoints are accessed by thread_id
"thread_id": thread_id,
}
}
tutorial_questions = [
"Hi there, what time is my flight?",
"Am I allowed to update my flight to something sooner? I want to leave later today.",
"Update my flight to sometime next week then",
"The next available option is great",
"What about lodging and transportation?",
"Yeah I think I'd like an affordable hotel for my week-long stay (7 days). And I'll want to rent a car.",
"OK could you place a reservation for your recommended hotel? It sounds nice.",
"Yes go ahead and book anything that's moderate expense and has availability.",
"Now for a car, what are my options?",
"Awesome let's just get the cheapest option. Go ahead and book for 7 days",
"Cool so now what recommendations do you have on excursions?",
"Are they available while I'm there?",
"Interesting - I like the museums, what options are there? ",
"OK great pick one and book it for my second day there.",
]
_printed = set()
# We can reuse the tutorial questions from part 1 to see how it does.
for question in tutorial_questions:
events = part_3_graph.stream(
{"messages": ("user", question)}, config, stream_mode="values"
)
for event in events:
_print_event(event, _printed)
snapshot = part_3_graph.get_state(config)
while snapshot.next:
# We have an interrupt! The agent is trying to use a tool, and the user can approve or deny it
# Note: This code is all outside of your graph. Typically, you would stream the output to a UI.
# Then, you would have the frontend trigger a new run via an API call when the user has provided input.
user_input = input(
"Do you approve of the above actions? Type 'y' to continue;"
" otherwise, explain your requested changes.\n\n"
)
if user_input.strip() == "y":
# Just continue
result = part_3_graph.invoke(
None,
config,
)
else:
# Satisfy the tool invocation by
# providing instructions on the requested changes / change of mind
result = part_3_graph.invoke(
{
"messages": [
ToolMessage(
tool_call_id=event["messages"][-1].tool_calls[0]["id"],
content=f"API call denied by user. Reasoning: '{user_input}'. Continue assisting, accounting for the user's input.",
)
]
},
config,
)
snapshot = part_3_graph.get_state(config)
Part 3 Review#
Things are running better! Our agent is now functioning well — check out the LangSmith execution trace to see how it works! You might be satisfied with this design. The code is encapsulated, and its performance meets expectations.
One issue with this design is that we are putting too much pressure on a single prompt. If we want to add more tools or if each tool becomes more complex (more filters, more business logic constraints on behavior, etc.), then the effectiveness of tool usage and the overall performance of the bot are likely to start declining.
In the next part, we will demonstrate how to better control different user experiences by routing to dedicated agents or subgraphs based on user intent.
Part 4: Dedicated Workflows#
In the previous chapters, we saw how a "broad" chatbot that relies on a single prompt and large language models (LLMs) to handle various user intents can take us quite far. However, this approach makes it difficult to create predictably excellent user experiences for known intents.
Another approach is for your graph to detect user intent and select the appropriate workflow or "skill" to meet the user's needs. Each workflow can focus on its domain, allowing for independent optimization without degrading the overall performance of the assistant.
In this section, we will divide the user experience into separate subgraphs, resulting in the final structure shown below:
In the figure above, each box contains a capable, focused workflow. The main assistant handles the user's initial query, and the graph routes the request to the appropriate "expert" based on the query content.
State#
We want to be able to track which subgraph is controlling the current session at any time. While we could achieve this by performing some operations on the message list, a simpler way is to track it as a dedicated stack.
In the State
below, add a dialog_state
list. Each time a node
runs and returns a dialog_state
value, the update_dialog_stack
function will be called to determine how to apply the update.
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
"""Push or pop the state."""
if right is None:
return left
if right == "pop":
return left[:-1]
return left + [right]
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
user_info: str
dialog_state: Annotated[
list[
Literal[
"assistant",
"update_flight",
"book_car_rental",
"book_hotel",
"book_excursion",
]
],
update_dialog_stack,
]
Assistant#
This time we will create an assistant for each workflow. That is:
- Flight booking assistant
- Hotel booking assistant
- Car rental assistant
- Excursion assistant
- Finally, a "main assistant" to route between these assistants
If you have been paying attention, you might notice that this is an instance of the supervisor design pattern from our multi-agent example.
Below, define the Runnable
objects that will drive each assistant. Each Runnable
has a prompt, an LLM (large language model), and the tools applicable to that assistant's workflow. Each specialized / delegated assistant can also call the CompleteOrEscalate
tool to indicate that control flow should return to the main assistant. This happens when the assistant successfully completes its task, or the user changes their mind, or needs to handle issues beyond that workflow's scope.
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from pydantic import BaseModel, Field
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
result = self.runnable.invoke(state)
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
class CompleteOrEscalate(BaseModel):
"""A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
who can re-route the dialog based on the user's needs."""
cancel: bool = True
reason: str
class Config:
json_schema_extra = {
"example": {
"cancel": True,
"reason": "User changed their mind about the current task.",
},
"example 2": {
"cancel": True,
"reason": "I have fully completed the task.",
},
"example 3": {
"cancel": False,
"reason": "I need to search the user's emails or calendar for more information.",
},
}
# Flight booking assistant
flight_booking_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for handling flight updates. "
" The primary assistant delegates work to you whenever the user needs help updating their bookings. "
"Confirm the updated flight details with the customer and inform them of any additional fees. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
"If you need more information or the customer changes their mind, escalate the task back to the main assistant."
" Remember that a booking isn't completed until after the relevant tool has successfully been used."
"\n\nCurrent user flight information:\n<Flights>\n{user_info}\n</Flights>"
"\nCurrent time: {time}."
"\n\nIf the user needs help, and none of your tools are appropriate for it, then"
' "CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.',
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
update_flight_safe_tools = [search_flights]
update_flight_sensitive_tools = [update_ticket_to_new_flight, cancel_ticket]
update_flight_tools = update_flight_safe_tools + update_flight_sensitive_tools
update_flight_runnable = flight_booking_prompt | llm.bind_tools(
update_flight_tools + [CompleteOrEscalate]
)
# Hotel Booking Assistant
book_hotel_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for handling hotel bookings. "
"The primary assistant delegates work to you whenever the user needs help booking a hotel. "
"Search for available hotels based on the user's preferences and confirm the booking details with the customer. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
"If you need more information or the customer changes their mind, escalate the task back to the main assistant."
" Remember that a booking isn't completed until after the relevant tool has successfully been used."
"\nCurrent time: {time}."
'\n\nIf the user needs help, and none of your tools are appropriate for it, then "CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.'
" Do not waste the user's time. Do not make up invalid tools or functions."
"\n\nSome examples for which you should CompleteOrEscalate:\n"
" - 'what's the weather like this time of year?'\n"
" - 'nevermind i think I'll book separately'\n"
" - 'i need to figure out transportation while i'm there'\n"
" - 'Oh wait i haven't booked my flight yet i'll do that first'\n"
" - 'Hotel booking confirmed'",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
book_hotel_safe_tools = [search_hotels]
book_hotel_sensitive_tools = [book_hotel, update_hotel, cancel_hotel]
book_hotel_tools = book_hotel_safe_tools + book_hotel_sensitive_tools
book_hotel_runnable = book_hotel_prompt | llm.bind_tools(
book_hotel_tools + [CompleteOrEscalate]
)
# Car Rental Assistant
book_car_rental_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for handling car rental bookings. "
"The primary assistant delegates work to you whenever the user needs help booking a car rental. "
"Search for available car rentals based on the user's preferences and confirm the booking details with the customer. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
"If you need more information or the customer changes their mind, escalate the task back to the main assistant."
" Remember that a booking isn't completed until after the relevant tool has successfully been used."
"\nCurrent time: {time}."
"\n\nIf the user needs help, and none of your tools are appropriate for it, then "
'"CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.'
"\n\nSome examples for which you should CompleteOrEscalate:\n"
" - 'what's the weather like this time of year?'\n"
" - 'What flights are available?'\n"
" - 'nevermind i think I'll book separately'\n"
" - 'Oh wait i haven't booked my flight yet i'll do that first'\n"
" - 'Car rental booking confirmed'",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
book_car_rental_safe_tools = [search_car_rentals]
book_car_rental_sensitive_tools = [
book_car_rental,
update_car_rental,
cancel_car_rental,
]
book_car_rental_tools = book_car_rental_safe_tools + book_car_rental_sensitive_tools
book_car_rental_runnable = book_car_rental_prompt | llm.bind_tools(
book_car_rental_tools + [CompleteOrEscalate]
)
# Excursion Assistant
book_excursion_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for handling trip recommendations. "
"The primary assistant delegates work to you whenever the user needs help booking a recommended trip. "
"Search for available trip recommendations based on the user's preferences and confirm the booking details with the customer. "
"If you need more information or the customer changes their mind, escalate the task back to the main assistant."
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" Remember that a booking isn't completed until after the relevant tool has successfully been used."
"\nCurrent time: {time}."
'\n\nIf the user needs help, and none of your tools are appropriate for it, then "CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.'
"\n\nSome examples for which you should CompleteOrEscalate:\n"
" - 'nevermind i think I'll book separately'\n"
" - 'i need to figure out transportation while i'm there'\n"
" - 'Oh wait i haven't booked my flight yet i'll do that first'\n"
" - 'Excursion booking confirmed!'",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
book_excursion_safe_tools = [search_trip_recommendations]
book_excursion_sensitive_tools = [book_excursion, update_excursion, cancel_excursion]
book_excursion_tools = book_excursion_safe_tools + book_excursion_sensitive_tools
book_excursion_runnable = book_excursion_prompt | llm.bind_tools(
book_excursion_tools + [CompleteOrEscalate]
)
# Primary Assistant
class ToFlightBookingAssistant(BaseModel):
"""Transfers work to a specialized assistant to handle flight updates and cancellations."""
request: str = Field(
description="Any necessary followup questions the update flight assistant should clarify before proceeding."
)
class ToBookCarRental(BaseModel):
"""Transfers work to a specialized assistant to handle car rental bookings."""
location: str = Field(
description="The location where the user wants to rent a car."
)
start_date: str = Field(description="The start date of the car rental.")
end_date: str = Field(description="The end date of the car rental.")
request: str = Field(
description="Any additional information or requests from the user regarding the car rental."
)
class Config:
json_schema_extra = {
"example": {
"location": "Basel",
"start_date": "2023-07-01",
"end_date": "2023-07-05",
"request": "I need a compact car with automatic transmission.",
}
}
class ToHotelBookingAssistant(BaseModel):
"""Transfer work to a specialized assistant to handle hotel bookings."""
location: str = Field(
description="The location where the user wants to book a hotel."
)
checkin_date: str = Field(description="The check-in date for the hotel.")
checkout_date: str = Field(description="The check-out date for the hotel.")
request: str = Field(
description="Any additional information or requests from the user regarding the hotel booking."
)
class Config:
json_schema_extra = {
"example": {
"location": "Zurich",
"checkin_date": "2023-08-15",
"checkout_date": "2023-08-20",
"request": "I prefer a hotel near the city center with a room that has a view.",
}
}
class ToBookExcursion(BaseModel):
"""Transfers work to a specialized assistant to handle trip recommendation and other excursion bookings."""
location: str = Field(
description="The location where the user wants to book a recommended trip."
)
request: str = Field(
description="Any additional information or requests from the user regarding the trip recommendation."
)
class Config:
json_schema_extra = {
"example": {
"location": "Lucerne",
"request": "The user is interested in outdoor activities and scenic views.",
}
}
# The top-level assistant performs general Q&A and delegates specialized tasks to other assistants.
# The task delegation is a simple form of semantic routing / does simple intent detection
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
primary_assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful customer support assistant for Swiss Airlines. "
"Your primary role is to search for flight information and company policies to answer customer queries. "
"If a customer requests to update or cancel a flight, book a car rental, book a hotel, or get trip recommendations, "
"delegate the task to the appropriate specialized assistant by invoking the corresponding tool. You are not able to make these types of changes yourself."
" Only the specialized assistants are given permission to do this for the user."
"The user is not aware of the different specialized assistants, so do not mention them; just quietly delegate through function calls. "
"Provide detailed information to the customer, and always double-check the database before concluding that information is unavailable. "
" When searching, be persistent. Expand your query bounds if the first search returns no results. "
" If a search comes up empty, expand your search before giving up."
"\n\nCurrent user flight information:\n<Flights>\n{user_info}\n</Flights>"
"\nCurrent time: {time}.",
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now())
primary_assistant_tools = [
TavilySearchResults(max_results=1),
search_flights,
lookup_policy,
]
assistant_runnable = primary_assistant_prompt | llm.bind_tools(
primary_assistant_tools
+ [
ToFlightBookingAssistant,
ToBookCarRental,
ToHotelBookingAssistant,
ToBookExcursion,
]
)
Create the Assistants#
We are almost ready to create the graph structure. In the previous part, we made a design decision to share the messages
state across all nodes. This is useful because each assigned assistant can see the entire user journey and share context. However, it also means that weaker language models (LLMs) can easily get confused within specific scopes. To mark the "handoff" between the main assistant and one of its delegated workflows, we will add a ToolMessage
to the state.
Tool Functions#
Create a function that generates an "entry" node for each workflow, declaring "the current assistant is assistant_name
."
from typing import Callable
from langchain_core.messages import ToolMessage
def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
def entry_node(state: State) -> dict:
tool_call_id = state["messages"][-1].tool_calls[0]["id"]
return {
"messages": [
ToolMessage(
content=f"The assistant is now the {assistant_name}. Reflect on the above conversation between the host assistant and the user."
f" The user's intent is unsatisfied. Use the provided tools to assist the user. Remember