banner
HuggingLLM

HuggingLLM

Make it & Share it!
x

LangGraph | 顧客サポートロボットの構築

顧客サポートロボットは、一般的な問題を処理することでチームの時間を節約できますが、さまざまなタスクを信頼性を持って処理できるロボットを構築するのは簡単ではなく、特にユーザーが混乱したり失望したりしないようにすることが重要です。

このチュートリアルでは、航空会社のために顧客サポートロボットを構築し、ユーザーが旅行計画を調査し、手配するのを助けます。LangGraph の中断機能、チェックポイント、およびより複雑な状態管理を使用してアシスタントのツールを整理し、ユーザーがフライト予約、ホテル予約、レンタカー、旅行アクティビティの手配を行うのを支援する方法を学びます。このチュートリアルは、あなたがLangGraph 入門チュートリアルで紹介されている概念にすでに精通していることを前提としています。

このチュートリアルの終わりまでに、正常に動作するロボットを構築し、LangGraph の重要な概念とアーキテクチャを理解することができます。また、これらのデザインパターンを他の AI プロジェクトにも適用できます。

最終的なチャットボットのアーキテクチャは、以下の図のようになります:

image

さあ、始めましょう!

前提条件#

まず、開発環境を設定します。このチュートリアルに必要な依存関係をインストールし、テストデータベースをダウンロードし、各セクションで繰り返し使用されるツールを定義します。

私たちは Claude を大規模言語モデル(LLM)として使用し、いくつかのカスタムツールを定義します。ほとんどのツールはローカルの sqlite データベースに接続されます(追加の依存関係は不要です)。また、Tavily を通じてエージェントに一般的なウェブ検索機能を提供します。

%%capture --no-stderr
% pip install -U langgraph langchain-community langchain-anthropic tavily-python pandas
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("ANTHROPIC_API_KEY")
_set_env("TAVILY_API_KEY")

データベースの充填#

次のスクリプトを実行して、このチュートリアルのために準備した sqlite データベースを取得し、最新の状態に更新します。具体的な詳細は重要ではありません。

import os
import shutil
import sqlite3

import pandas as pd
import requests

db_url = "https://storage.googleapis.com/benchmarks-artifacts/travel-db/travel2.sqlite"
local_file = "travel2.sqlite"
# バックアップにより、各チュートリアルセクションのために再起動できます
backup_file = "travel2.backup.sqlite"
overwrite = False
if overwrite or not os.path.exists(local_file):
    response = requests.get(db_url)
    response.raise_for_status()  # リクエストが成功したことを確認
    with open(local_file, "wb") as f:
        f.write(response.content)
    # バックアップ - 各セクションでDBを「リセット」するために使用します
    shutil.copy(local_file, backup_file)
# チュートリアルのためにフライトを現在の時間に変換します
def update_dates(file):
    shutil.copy(backup_file, file)
    conn = sqlite3.connect(file)
    cursor = conn.cursor()

    tables = pd.read_sql(
        "SELECT name FROM sqlite_master WHERE type='table';", conn
    ).name.tolist()
    tdf = {}
    for t in tables:
        tdf[t] = pd.read_sql(f"SELECT * from {t}", conn)

    example_time = pd.to_datetime(
        tdf["flights"]["actual_departure"].replace("\\N", pd.NaT)
    ).max()
    current_time = pd.to_datetime("now").tz_localize(example_time.tz)
    time_diff = current_time - example_time

    tdf["bookings"]["book_date"] = (
        pd.to_datetime(tdf["bookings"]["book_date"].replace("\\N", pd.NaT), utc=True)
        + time_diff
    )

    datetime_columns = [
        "scheduled_departure",
        "scheduled_arrival",
        "actual_departure",
        "actual_arrival",
    ]
    for column in datetime_columns:
        tdf["flights"][column] = (
            pd.to_datetime(tdf["flights"][column].replace("\\N", pd.NaT)) + time_diff
        )

    for table_name, df in tdf.items():
        df.to_sql(table_name, conn, if_exists="replace", index=False)
    del df
    del tdf
    conn.commit()
    conn.close()

    return file

db = update_dates(local_file)

ツール#

次に、航空会社のポリシーマニュアルを検索し、フライト、ホテル、レンタカー、旅行アクティビティの予約を検索および管理するためのアシスタントのツールを定義します。これらのツールは、チュートリアル全体で繰り返し使用されます。具体的な実装の詳細は重要ではないため、次のコードを直接実行し、第 1 部に進むことができます。

会社のポリシーを検索#

アシスタントは、ポリシー情報を取得してユーザーの質問に答えることができます。ポリシーの実行は、ツールや API を介して行う必要があることに注意してください。大規模言語モデル(LLM)は、これらの規定を無視する可能性があります。

import re

import numpy as np
import openai
from langchain_core.tools import tool

response = requests.get(
    "https://storage.googleapis.com/benchmarks-artifacts/travel-db/swiss_faq.md"
)
response.raise_for_status()
faq_text = response.text

docs = [{"page_content": txt} for txt in re.split(r"(?=\n##)", faq_text)]


class VectorStoreRetriever:
    def __init__(self, docs: list, vectors: list, oai_client):
        self._arr = np.array(vectors)
        self._docs = docs
        self._client = oai_client

    @classmethod
    def from_docs(cls, docs, oai_client):
        embeddings = oai_client.embeddings.create(
            model="text-embedding-3-small", input=[doc["page_content"] for doc in docs]
        )
        vectors = [emb.embedding for emb in embeddings.data]
        return cls(docs, vectors, oai_client)

    def query(self, query: str, k: int = 5) -> list[dict]:
        embed = self._client.embeddings.create(
            model="text-embedding-3-small", input=[query]
        )
        # "@"はPythonでの行列の掛け算です
        scores = np.array(embed.data[0].embedding) @ self._arr.T
        top_k_idx = np.argpartition(scores, -k)[-k:]
        top_k_idx_sorted = top_k_idx[np.argsort(-scores[top_k_idx])]
        return [
            {**self._docs[idx], "similarity": scores[idx]} for idx in top_k_idx_sorted
        ]


retriever = VectorStoreRetriever.from_docs(docs, openai.Client())


@tool
def lookup_policy(query: str) -> str:
    """特定のオプションが許可されているかどうかを確認するために会社のポリシーを参照します。
    他の「書き込み」イベントを実行する前にこれを使用してください。"""
    docs = retriever.query(query, k=2)
    return "\n\n".join([doc["page_content"] for doc in docs])

フライト#

現在のユーザーのフライト情報を確認するためのツール(fetch_user_flight_information)を定義します。次に、フライトを検索し、SQL データベースに保存されている乗客の予約情報を管理するためのツールを定義します。

RunnableConfig にアクセスすることで、このアプリケーションにアクセスするユーザーのpassenger_idを確認できます。大規模言語モデル(LLM)は、これらの情報を明示的に提供する必要はなく、グラフの各呼び出し時に提供され、各ユーザーが他の乗客の予約情報にアクセスできないようにします。

import sqlite3
from datetime import date, datetime
from typing import Optional

import pytz
from langchain_core.runnables import RunnableConfig


@tool
def fetch_user_flight_information(config: RunnableConfig) -> list[dict]:
    """ユーザーのすべてのチケットを、対応するフライト情報と座席割り当てとともに取得します。

    Returns:
        ユーザーに属する各チケットの詳細、関連するフライトの詳細、および座席割り当てを含む辞書のリスト。
    """
    configuration = config.get("configurable", {})
    passenger_id = configuration.get("passenger_id", None)
    if not passenger_id:
        raise ValueError("No passenger ID configured.")

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = """
    SELECT 
        t.ticket_no, t.book_ref,
        f.flight_id, f.flight_no, f.departure_airport, f.arrival_airport, f.scheduled_departure, f.scheduled_arrival,
        bp.seat_no, tf.fare_conditions
    FROM 
        tickets t
        JOIN ticket_flights tf ON t.ticket_no = tf.ticket_no
        JOIN flights f ON tf.flight_id = f.flight_id
        JOIN boarding_passes bp ON bp.ticket_no = t.ticket_no AND bp.flight_id = f.flight_id
    WHERE 
        t.passenger_id = ?
    """
    cursor.execute(query, (passenger_id,))
    rows = cursor.fetchall()
    column_names = [column[0] for column in cursor.description]
    results = [dict(zip(column_names, row)) for row in rows]

    cursor.close()
    conn.close()

    return results


@tool
def search_flights(
    departure_airport: Optional[str] = None,
    arrival_airport: Optional[str] = None,
    start_time: Optional[date | datetime] = None,
    end_time: Optional[date | datetime] = None,
    limit: int = 20,
) -> list[dict]:
    """出発空港、到着空港、および出発時間範囲に基づいてフライトを検索します。"""
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = "SELECT * FROM flights WHERE 1 = 1"
    params = []

    if departure_airport:
        query += " AND departure_airport = ?"
        params.append(departure_airport)

    if arrival_airport:
        query += " AND arrival_airport = ?"
        params.append(arrival_airport)

    if start_time:
        query += " AND scheduled_departure >= ?"
        params.append(start_time)

    if end_time:
        query += " AND scheduled_departure <= ?"
        params.append(end_time)
    query += " LIMIT ?"
    params.append(limit)
    cursor.execute(query, params)
    rows = cursor.fetchall()
    column_names = [column[0] for column in cursor.description]
    results = [dict(zip(column_names, row)) for row in rows]

    cursor.close()
    conn.close()

    return results


@tool
def update_ticket_to_new_flight(
    ticket_no: str, new_flight_id: int, *, config: RunnableConfig
) -> str:
    """ユーザーのチケットを新しい有効なフライトに更新します。"""
    configuration = config.get("configurable", {})
    passenger_id = configuration.get("passenger_id", None)
    if not passenger_id:
        raise ValueError("No passenger ID configured.")

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute(
        "SELECT departure_airport, arrival_airport, scheduled_departure FROM flights WHERE flight_id = ?",
        (new_flight_id,),
    )
    new_flight = cursor.fetchone()
    if not new_flight:
        cursor.close()
        conn.close()
        return "Invalid new flight ID provided."
    column_names = [column[0] for column in cursor.description]
    new_flight_dict = dict(zip(column_names, new_flight))
    timezone = pytz.timezone("Etc/GMT-3")
    current_time = datetime.now(tz=timezone)
    departure_time = datetime.strptime(
        new_flight_dict["scheduled_departure"], "%Y-%m-%d %H:%M:%S.%f%z"
    )
    time_until = (departure_time - current_time).total_seconds()
    if time_until < (3 * 3600):
        return f"Not permitted to reschedule to a flight that is less than 3 hours from the current time. Selected flight is at {departure_time}."

    cursor.execute(
        "SELECT flight_id FROM ticket_flights WHERE ticket_no = ?", (ticket_no,)
    )
    current_flight = cursor.fetchone()
    if not current_flight:
        cursor.close()
        conn.close()
        return "No existing ticket found for the given ticket number."

    # サインインしたユーザーが実際にこのチケットを持っているか確認します
    cursor.execute(
        "SELECT * FROM tickets WHERE ticket_no = ? AND passenger_id = ?",
        (ticket_no, passenger_id),
    )
    current_ticket = cursor.fetchone()
    if not current_ticket:
        cursor.close()
        conn.close()
        return f"Current signed-in passenger with ID {passenger_id} not the owner of ticket {ticket_no}"

    # 実際のアプリケーションでは、ビジネスロジックを強制するために追加のチェックを追加することが望ましいです。
    # 新しい出発空港が現在のチケットと一致するかどうかなど。
    # LLMにポリシーを「事前に」ヒントを与えることが最善ですが、
    # 結局のところ、間違ったことをすることがあるので、APIが有効な動作を強制することも必要です
    cursor.execute(
        "UPDATE ticket_flights SET flight_id = ? WHERE ticket_no = ?",
        (new_flight_id, ticket_no),
    )
    conn.commit()

    cursor.close()
    conn.close()
    return "Ticket successfully updated to new flight."


@tool
def cancel_ticket(ticket_no: str, *, config: RunnableConfig) -> str:
    """ユーザーのチケットをキャンセルし、データベースから削除します。"""
    configuration = config.get("configurable", {})
    passenger_id = configuration.get("passenger_id", None)
    if not passenger_id:
        raise ValueError("No passenger ID configured.")
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute(
        "SELECT flight_id FROM ticket_flights WHERE ticket_no = ?", (ticket_no,)
    )
    existing_ticket = cursor.fetchone()
    if not existing_ticket:
        cursor.close()
        conn.close()
        return "No existing ticket found for the given ticket number."

    # サインインしたユーザーが実際にこのチケットを持っているか確認します
    cursor.execute(
        "SELECT flight_id FROM tickets WHERE ticket_no = ? AND passenger_id = ?",
        (ticket_no, passenger_id),
    )
    current_ticket = cursor.fetchone()
    if not current_ticket:
        cursor.close()
        conn.close()
        return f"Current signed-in passenger with ID {passenger_id} not the owner of ticket {ticket_no}"

    cursor.execute("DELETE FROM ticket_flights WHERE ticket_no = ?", (ticket_no,))
    conn.commit()

    cursor.close()
    conn.close()
    return "Ticket successfully cancelled."

レンタカーのツール#

ユーザーがフライトを予約した後、通常は交通手段を手配したいと思います。ユーザーが目的地で車両を検索し、予約できるようにする「レンタカー」ツールを定義します。

from datetime import date, datetime
from typing import Optional, Union


@tool
def search_car_rentals(
    location: Optional[str] = None,
    name: Optional[str] = None,
    price_tier: Optional[str] = None,
    start_date: Optional[Union[datetime, date]] = None,
    end_date: Optional[Union[datetime, date]] = None,
) -> list[dict]:
    """
    場所、名前、価格帯、開始日、終了日に基づいてレンタカーを検索します。

    Args:
        location (Optional[str]): レンタカーの場所。デフォルトはNone。
        name (Optional[str]): レンタカー会社の名前。デフォルトはNone。
        price_tier (Optional[str]): レンタカーの価格帯。デフォルトはNone。
        start_date (Optional[Union[datetime, date]]): レンタカーの開始日。デフォルトはNone。
        end_date (Optional[Union[datetime, date]]): レンタカーの終了日。デフォルトはNone。

    Returns:
        list[dict]: 検索条件に一致するレンタカーの辞書のリスト。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = "SELECT * FROM car_rentals WHERE 1=1"
    params = []

    if location:
        query += " AND location LIKE ?"
        params.append(f"%{location}%")
    if name:
        query += " AND name LIKE ?"
        params.append(f"%{name}%")
    # このチュートリアルでは、日付や価格帯に基づいて一致させることができます。
    # (おもちゃのデータセットにはあまりデータがないため)
    cursor.execute(query, params)
    results = cursor.fetchall()

    conn.close()

    return [
        dict(zip([column[0] for column in cursor.description], row)) for row in results
    ]


@tool
def book_car_rental(rental_id: int) -> str:
    """
    IDによってレンタカーを予約します。

    Args:
        rental_id (int): 予約するレンタカーのID。

    Returns:
        str: レンタカーが正常に予約されたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute("UPDATE car_rentals SET booked = 1 WHERE id = ?", (rental_id,))
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Car rental {rental_id} successfully booked."
    else:
        conn.close()
        return f"No car rental found with ID {rental_id}."


@tool
def update_car_rental(
    rental_id: int,
    start_date: Optional[Union[datetime, date]] = None,
    end_date: Optional[Union[datetime, date]] = None,
) -> str:
    """
    IDによってレンタカーの開始日と終了日を更新します。

    Args:
        rental_id (int): 更新するレンタカーのID。
        start_date (Optional[Union[datetime, date]]): レンタカーの新しい開始日。デフォルトはNone。
        end_date (Optional[Union[datetime, date]]): レンタカーの新しい終了日。デフォルトはNone。

    Returns:
        str: レンタカーが正常に更新されたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    if start_date:
        cursor.execute(
            "UPDATE car_rentals SET start_date = ? WHERE id = ?",
            (start_date, rental_id),
        )
    if end_date:
        cursor.execute(
            "UPDATE car_rentals SET end_date = ? WHERE id = ?", (end_date, rental_id)
        )

    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Car rental {rental_id} successfully updated."
    else:
        conn.close()
        return f"No car rental found with ID {rental_id}."


@tool
def cancel_car_rental(rental_id: int) -> str:
    """
    IDによってレンタカーをキャンセルします。

    Args:
        rental_id (int): キャンセルするレンタカーのID。

    Returns:
        str: レンタカーが正常にキャンセルされたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute("UPDATE car_rentals SET booked = 0 WHERE id = ?", (rental_id,))
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Car rental {rental_id} successfully cancelled."
    else:
        conn.close()
        return f"No car rental found with ID {rental_id}."

ホテル#

ユーザーは寝る必要があります!ホテルの予約を検索および管理するためのツールを定義します。

@tool
def search_hotels(
    location: Optional[str] = None,
    name: Optional[str] = None,
    price_tier: Optional[str] = None,
    checkin_date: Optional[Union[datetime, date]] = None,
    checkout_date: Optional[Union[datetime, date]] = None,
) -> list[dict]:
    """
    場所、名前、価格帯、チェックイン日、チェックアウト日に基づいてホテルを検索します。

    Args:
        location (Optional[str]): ホテルの場所。デフォルトはNone。
        name (Optional[str]): ホテルの名前。デフォルトはNone。
        price_tier (Optional[str]): ホテルの価格帯。デフォルトはNone。例: 中級、上級中級、上級、ラグジュアリー
        checkin_date (Optional[Union[datetime, date]]): ホテルのチェックイン日。デフォルトはNone。
        checkout_date (Optional[Union[datetime, date]]): ホテルのチェックアウト日。デフォルトはNone。

    Returns:
        list[dict]: 検索条件に一致するホテルの辞書のリスト。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = "SELECT * FROM hotels WHERE 1=1"
    params = []

    if location:
        query += " AND location LIKE ?"
        params.append(f"%{location}%")
    if name:
        query += " AND name LIKE ?"
        params.append(f"%{name}%")
    # このチュートリアルのために、日付や価格帯に基づいて一致させることができます。
    cursor.execute(query, params)
    results = cursor.fetchall()

    conn.close()

    return [
        dict(zip([column[0] for column in cursor.description], row)) for row in results
    ]


@tool
def book_hotel(hotel_id: int) -> str:
    """
    IDによってホテルを予約します。

    Args:
        hotel_id (int): 予約するホテルのID。

    Returns:
        str: ホテルが正常に予約されたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute("UPDATE hotels SET booked = 1 WHERE id = ?", (hotel_id,))
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Hotel {hotel_id} successfully booked."
    else:
        conn.close()
        return f"No hotel found with ID {hotel_id}."


@tool
def update_hotel(
    hotel_id: int,
    checkin_date: Optional[Union[datetime, date]] = None,
    checkout_date: Optional[Union[datetime, date]] = None,
) -> str:
    """
    IDによってホテルのチェックイン日とチェックアウト日を更新します。

    Args:
        hotel_id (int): 更新するホテルのID。
        checkin_date (Optional[Union[datetime, date]]): ホテルの新しいチェックイン日。デフォルトはNone。
        checkout_date (Optional[Union[datetime, date]]): ホテルの新しいチェックアウト日。デフォルトはNone。

    Returns:
        str: ホテルが正常に更新されたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    if checkin_date:
        cursor.execute(
            "UPDATE hotels SET checkin_date = ? WHERE id = ?", (checkin_date, hotel_id)
        )
    if checkout_date:
        cursor.execute(
            "UPDATE hotels SET checkout_date = ? WHERE id = ?",
            (checkout_date, hotel_id),
        )

    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Hotel {hotel_id} successfully updated."
    else:
        conn.close()
        return f"No hotel found with ID {hotel_id}."


@tool
def cancel_hotel(hotel_id: int) -> str:
    """
    IDによってホテルをキャンセルします。

    Args:
        hotel_id (int): キャンセルするホテルのID。

    Returns:
        str: ホテルが正常にキャンセルされたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute("UPDATE hotels SET booked = 0 WHERE id = ?", (hotel_id,))
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Hotel {hotel_id} successfully cancelled."
    else:
        conn.close()
        return f"No hotel found with ID {hotel_id}."

旅行アクティビティ#

最後に、ユーザーが目的地に到着した後にできることを検索し(予約も可能)、いくつかのツールを定義します。

@tool
def search_trip_recommendations(
    location: Optional[str] = None,
    name: Optional[str] = None,
    keywords: Optional[str] = None,
) -> list[dict]:
    """
    場所、名前、キーワードに基づいて旅行の推奨を検索します。

    Args:
        location (Optional[str]): 旅行の推奨の場所。デフォルトはNone。
        name (Optional[str]): 旅行の推奨の名前。デフォルトはNone。
        keywords (Optional[str]): 旅行の推奨に関連するキーワード。デフォルトはNone。

    Returns:
        list[dict]: 検索条件に一致する旅行の推奨の辞書のリスト。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = "SELECT * FROM trip_recommendations WHERE 1=1"
    params = []

    if location:
        query += " AND location LIKE ?"
        params.append(f"%{location}%")
    if name:
        query += " AND name LIKE ?"
        params.append(f"%{name}%")
    if keywords:
        keyword_list = keywords.split(",")
        keyword_conditions = " OR ".join(["keywords LIKE ?" for _ in keyword_list])
        query += f" AND ({keyword_conditions})"
        params.extend([f"%{keyword.strip()}%" for keyword in keyword_list])

    cursor.execute(query, params)
    results = cursor.fetchall()

    conn.close()

    return [
        dict(zip([column[0] for column in cursor.description], row)) for row in results
    ]


@tool
def book_excursion(recommendation_id: int) -> str:
    """
    推奨IDによって旅行を予約します。

    Args:
        recommendation_id (int): 予約する旅行の推奨のID。

    Returns:
        str: 旅行の推奨が正常に予約されたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute(
        "UPDATE trip_recommendations SET booked = 1 WHERE id = ?", (recommendation_id,)
    )
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Trip recommendation {recommendation_id} successfully booked."
    else:
        conn.close()
        return f"No trip recommendation found with ID {recommendation_id}."


@tool
def update_excursion(recommendation_id: int, details: str) -> str:
    """
    IDによって旅行の推奨の詳細を更新します。

    Args:
        recommendation_id (int): 更新する旅行の推奨のID。
        details (str): 旅行の推奨の新しい詳細。

    Returns:
        str: 旅行の推奨が正常に更新されたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute(
        "UPDATE trip_recommendations SET details = ? WHERE id = ?",
        (details, recommendation_id),
    )
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Trip recommendation {recommendation_id} successfully updated."
    else:
        conn.close()
        return f"No trip recommendation found with ID {recommendation_id}."


@tool
def cancel_excursion(recommendation_id: int) -> str:
    """
    IDによって旅行の推奨をキャンセルします。

    Args:
        recommendation_id (int): キャンセルする旅行の推奨のID。

    Returns:
        str: 旅行の推奨が正常にキャンセルされたかどうかを示すメッセージ。
    """
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute(
        "UPDATE trip_recommendations SET booked = 0 WHERE id = ?", (recommendation_id,)
    )
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Trip recommendation {recommendation_id} successfully cancelled."
    else:
        conn.close()
        return f"No trip recommendation found with ID {recommendation_id}."

ツール関数#

デバッグ時にグラフ内のメッセージを美化して印刷し、ツールノードにエラーハンドリング機能を提供するための補助関数を定義します(エラーメッセージをチャット履歴に追加します)。

from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda

from langgraph.prebuilt import ToolNode


def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }


def create_tool_node_with_fallback(tools: list) -> dict:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )


def _print_event(event: dict, _printed: set, max_length=1500):
    current_state = event.get("dialog_state")
    if current_state:
        print("Currently in: ", current_state[-1])
    message = event.get("messages")
    if message:
        if isinstance(message, list):
            message = message[-1]
        if message.id not in _printed:
            msg_repr = message.pretty_repr(html=True)
            if len(msg_repr) > max_length:
                msg_repr = msg_repr[:max_length] + " ... (truncated)"
            print(msg_repr)
            _printed.add(message.id)

第 1 部:ゼロショットエージェント#

構築中は、最も簡単な動作実装から始め、LangSmith の評価ツールを使用してその有効性を測定するのが最善です。他の条件が同じであれば、単純で拡張可能なソリューションを優先し、複雑なソリューションを避けるべきです。この例では、単一のグラフアプローチには限界があります。ロボットは、ユーザーの確認なしに不必要な行動をとる可能性があり、複雑なクエリを処理するのが難しく、回答時に特異性が欠けることがあります。これらの問題は、後のセクションで解決します。

このセクションでは、ユーザーにすべてのツールを提供し、これらのツールを慎重に使用するように指示するシンプルなゼロショットエージェントを定義します。

シンプルな二重ノードグラフは以下のようになります:

image

まず、状態を定義します。

状態#

StateGraph 状態を、追加のみが可能なメッセージリストを含む型付き辞書として定義します。これらのメッセージはチャット履歴を構成し、シンプルなアシスタントに必要なすべての状態情報です。

from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph.message import AnyMessage, add_messages


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

アシスタント#

次に、アシスタント関数を定義します。この関数はグラフ状態を受け取り、プロンプトにフォーマットし、大規模言語モデル(LLM)を呼び出して最良の応答を予測します。

from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig


class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            configuration = config.get("configurable", {})
            passenger_id = configuration.get("passenger_id", None)
            state = {**state, "user_info": passenger_id}
            result = self.runnable.invoke(state)
            # LLMが空の応答を返した場合、実際の応答を再度促します。
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}


# Haikuはより速く、安価ですが、正確性が劣ります
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# LLMを入れ替えることもできますが、プロンプトを更新する必要があるでしょう。
# from langchain_openai import ChatOpenAI

# llm = ChatOpenAI(model="gpt-4-turbo-preview")

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful customer support assistant for Swiss Airlines. "
            " Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            " If a search comes up empty, expand your search before giving up."
            "\n\nCurrent user:\n<User>\n{user_info}\n</User>"
            "\nCurrent time: {time}.",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now())

part_1_tools = [
    TavilySearchResults(max_results=1),
    fetch_user_flight_information,
    search_flights,
    lookup_policy,
    update_ticket_to_new_flight,
    cancel_ticket,
    search_car_rentals,
    book_car_rental,
    update_car_rental,
    cancel_car_rental,
    search_hotels,
    book_hotel,
    update_hotel,
    cancel_hotel,
    search_trip_recommendations,
    book_excursion,
    update_excursion,
    cancel_excursion,
]
part_1_assistant_runnable = primary_assistant_prompt | llm.bind_tools(part_1_tools)

グラフの定義#

次に、グラフを作成します。このグラフは、このセクションの最終アシスタントです。

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition

builder = StateGraph(State)


# ノードを定義します:これらが作業を行います
builder.add_node("assistant", Assistant(part_1_assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(part_1_tools))
# エッジを定義します:これらが制御フローの移動を決定します
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    tools_condition,
)
builder.add_edge("tools", "assistant")

# チェックポインタは、グラフがその状態を永続化できるようにします
# これはグラフ全体の完全なメモリです。
memory = MemorySaver()
part_1_graph = builder.compile(checkpointer=memory)
from IPython.display import Image, display

try:
    display(Image(part_1_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    # これは追加の依存関係が必要であり、オプションです
    pass

image

サンプル対話#

今、私たちの強力なチャットボットを試す時が来ました!以下の対話のラウンドを使って実行してみましょう。もし「RecursionLimit」に遭遇した場合、それはロボットが割り当てられたステップ内で答えを得られなかったことを意味します。問題ありません!このチュートリアルの後の部分では、さらに多くのテクニックを使用できます。

import shutil
import uuid

# 各セクションで元の場所から再起動できるようにバックアップファイルで更新します
db = update_dates(db)
thread_id = str(uuid.uuid4())

config = {
    "configurable": {
        # passenger_idは、ユーザーのフライト情報を取得するために私たちのフライトツールで使用されます
        "passenger_id": "3442 587242",
        # チェックポイントはthread_idによってアクセスされます
        "thread_id": thread_id,
    }
}


_printed = set()
for question in tutorial_questions:
    events = part_1_graph.stream(
        {"messages": ("user", question)}, config, stream_mode="values"
    )
    for event in events:
        _print_event(event, _printed)

第 1 部の振り返り#

私たちのシンプルなアシスタントは良いパフォーマンスを発揮しました!それはすべての質問に合理的に答え、文脈内で迅速に応答し、すべてのタスクを成功裏に実行しました。あなたはLangSmith のサンプルトレースを確認して、上記のインタラクションで LLM がどのようにプロンプトを受け取ったかをよりよく理解できます。

もしこれが単なるシンプルな Q&A ロボットであれば、私たちは上記の結果に満足するかもしれません。しかし、私たちの顧客サポートロボットはユーザーの代わりに行動をとるため、そのいくつかの行動はやや懸念されます:

  1. アシスタントは宿泊に集中しているときにレンタカーを予約し、その後キャンセルして再予約しなければなりませんでした:ああ!不必要な費用を避けるために、予約の前にユーザーに最終的な決定権を与えるべきです。
  2. アシスタントは推奨を検索する際に困難に直面しました。ツールにより詳細な指示と例を追加することで改善できますが、すべてのツールに対してそうすると、プロンプトが冗長になり、ロボットが圧倒される可能性があります。
  3. アシスタントはユーザーの関連情報を取得するために明示的な検索を行う必要がありました。ユーザーの旅行の詳細を即座に取得できれば、アシスタントは直接応答でき、大幅な時間を節約できます。

次のセクションでは、上記の問題の最初の 2 つを解決します。

第 2 部:確認の追加#

アシスタントがユーザーの代わりに行動をとるときは、ほぼ常にユーザーに最終的な決定をさせるべきです。そうでないと、アシスタントが犯す小さなミス(または受けるプロンプト注入)は、ユーザーに実際の損害を与える可能性があります。

このセクションでは、interrupt_beforeを使用して、ツールを実行する前にグラフの実行を一時停止し、制御をユーザーに戻します。

あなたのグラフは、以下のようになります:

image

前回と同様に、状態の定義から始めます:

状態 & アシスタント#

私たちのグラフの状態と LLM の呼び出し方は、第 1 部とほぼ同じですが、以下の点が異なります:

  • user_infoフィールドを追加しました。このフィールドは、グラフによって自動的に埋められます。
  • アシスタントオブジェクト内で状態を直接使用できるようになりました。可配置のパラメータを使用する必要はありません。
from typing import Annotated

from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from typing_extensions import TypedDict

from langgraph.graph.message import AnyMessage, add_messages


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    user_info: str


class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)
            # LLMが空の応答を返した場合、実際の応答を再度促します。
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}


# Haikuはより速く、安価ですが、正確性が劣ります
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# LLMを入れ替えることもできますが、プロンプトを更新する必要があるでしょう。
# from langchain_openai import ChatOpenAI

# llm = ChatOpenAI(model="gpt-4-turbo-preview")

assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful customer support assistant for Swiss Airlines. "
            " Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            " If a search comes up empty, expand your search before giving up."
            "\n\nCurrent user:\n<User>\n{user_info}\n</User>"
            "\nCurrent time: {time}.",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now())

part_2_tools = [
    TavilySearchResults(max_results=1),
    fetch_user_flight_information,
    search_flights,
    lookup_policy,
    update_ticket_to_new_flight,
    cancel_ticket,
    search_car_rentals,
    book_car_rental,
    update_car_rental,
    cancel_car_rental,
    search_hotels,
    book_hotel,
    update_hotel,
    cancel_hotel,
    search_trip_recommendations,
    book_excursion,
    update_excursion,
    cancel_excursion,
]
part_2_assistant_runnable = assistant_prompt | llm.bind_tools(part_2_tools)

グラフの定義#

次に、グラフを作成します。このグラフは、第 1 部の問題に対処します。

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition

builder = StateGraph(State)


def user_info(state: State):
    return {"user_info": fetch_user_flight_information.invoke({})}


# NEW: fetch_user_infoノードが最初に実行されるため、アシスタントはユーザーのフライト情報をアクションを取らずに見ることができます
builder.add_node("fetch_user_info", user_info)
builder.add_edge(START, "fetch_user_info")
builder.add_node("assistant", Assistant(part_2_assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(part_2_tools))
builder.add_edge("fetch_user_info", "assistant")
builder.add_conditional_edges(
    "assistant",
    tools_condition,
)
builder.add_edge("tools", "assistant")

# チェックポイントは、グラフがその状態を永続化できるようにします
# これはグラフ全体の完全なメモリです。
memory = MemorySaver()
part_2_graph = builder.compile(
    checkpointer=memory,
    # NEW: グラフは常に「ツール」ノードを実行する前に停止します。
    # ユーザーは、アシスタントが続行する前に承認または拒否できます。
    interrupt_before=["tools"],
)
from IPython.display import Image, display

try:
    display(Image(part_2_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    # これは追加の依存関係が必要であり、オプションです
    pass

image

サンプル対話#

今、私たちの新しく改訂されたチャットボットを試す時が来ました!以下の対話のラウンドを使って実行してみましょう。

import shutil
import uuid

# 各セクションで元の場所から再起動できるようにバックアップファイルで更新します
db = update_dates(db)
thread_id = str(uuid.uuid4())

config = {
    "configurable": {
        # passenger_idは、ユーザーのフライト情報を取得するために私たちのフライトツールで使用されます
        "passenger_id": "3442 587242",
        # チェックポイントはthread_idによってアクセスされます
        "thread_id": thread_id,
    }
}


_printed = set()
# 第1部のチュートリアルの質問を再利用して、どのように機能するかを確認できます。
for question in tutorial_questions:
    events = part_2_graph.stream(
        {"messages": ("user", question)}, config, stream_mode="values"
    )
    for event in events:
        _print_event(event, _printed)
    snapshot = part_2_graph.get_state(config)
    while snapshot.next:
        # 中断があります!エージェントはツールを使用しようとしており、ユーザーはそれを承認または拒否できます。
        # 注意:このコードはすべてグラフの外部にあります。通常、出力をUIにストリーミングします。
        # その後、ユーザーが入力を提供したときに、フロントエンドがAPI呼び出しを介して新しい実行をトリガーします。
        user_input = input(
            "Do you approve of the above actions? Type 'y' to continue;"
            " otherwise, explain your requested changed.\n\n"
        )
        if user_input.strip() == "y":
            # そのまま続行
            result = part_2_graph.invoke(
                None,
                config,
            )
        else:
            # ツール呼び出しを満たすために
            # 要求された変更/考えの変更に関する指示を提供します
            result = part_2_graph.invoke(
                {
                    "messages": [
                        ToolMessage(
                            tool_call_id=event["messages"][-1].tool_calls[0]["id"],
                            content=f"API call denied by user. Reasoning: '{user_input}'. Continue assisting, accounting for the user's input.",
                        )
                    ]
                },
                config,
            )
        snapshot = part_2_graph.get_state(config)

第 2 部の振り返り#

私たちのアシスタントは、ユーザーのフライトの詳細を直接応答できるようになりました。また、実行されるすべての操作を完全に制御できるようになりました。すべては LangGraph のinterrupts(中断)とcheckpointers(チェックポイント)に依存しています。中断機能はグラフの実行を一時停止し、その状態は安全に構成されたチェックポイントに保存されます。ユーザーは、正しい構成を実行することでいつでも再起動できます。

このLangSmith の実行例を確認すると、グラフがどのように機能するかをよりよく理解できます。このを確認すると、通常、(None, config)を呼び出すことでプロセスを再開できます。状態はチェックポイントから読み込まれ、まるで中断されていなかったかのように続行されます。

このグラフはかなりうまく機能しています!ただし、私たちは実際にはアシスタントのすべての操作に関与する必要はありません……

次の部分では、グラフ構造を再編成し、データベースに実際に書き込む「センシティブ」操作のみに中断をトリガーするようにします。

第 3 部:条件付き中断#

このセクションでは、ツールを安全(読み取り専用)またはセンシティブ(データ変更)に分類することで、中断戦略を最適化します。センシティブツールに対してのみ中断を適用し、ロボットが単純なクエリを自律的に処理できるようにします。

これにより、ユーザーの制御と対話の流暢さのバランスが取れますが、ツールを増やすにつれて、単一のグラフ構造が過度に複雑になり、「平面」構造を維持するのが難しくなる可能性があります。次のセクションでは、この問題を解決します。

第 3 部のグラフ構造は、以下の図のようになります:

image

状態#

いつものように、グラフの状態を定義します。私たちの状態と LLM の呼び出しは、第 2 部と完全に同じです。

from typing import Annotated

from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from typing_extensions import TypedDict

from langgraph.graph.message import AnyMessage, add_messages


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    user_info: str


class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)
            # LLMが空の応答を返した場合、実際の応答を再度促します。
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}


# Haikuはより速く、安価ですが、正確性が劣ります
# llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=1)
# LLMを入れ替えることもできますが、プロンプトを更新する必要があるでしょう。
# from langchain_openai import ChatOpenAI

# llm = ChatOpenAI(model="gpt-4-turbo-preview")

assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful customer support assistant for Swiss Airlines. "
            " Use the provided tools to search for flights, company policies, and other information to assist the user's queries. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            " If a search comes up empty, expand your search before giving up."
            "\n\nCurrent user:\n<User>\n{user_info}\n</User>"
            "\nCurrent time: {time}.",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now())

part_3_tools = [
    TavilySearchResults(max_results=1),
    fetch_user_flight_information,
    search_flights,
    lookup_policy,
    update_ticket_to_new_flight,
    cancel_ticket,
    search_car_rentals,
    book_car_rental,
    update_car_rental,
    cancel_car_rental,
    search_hotels,
    book_hotel,
    update_hotel,
    cancel_hotel,
    search_trip_recommendations,
    book_excursion,
    update_excursion,
    cancel_excursion,
]
part_3_assistant_runnable = assistant_prompt | llm.bind_tools(part_3_tools)

グラフの定義#

次に、グラフを作成します。私たちのグラフは第 2 部とほぼ同じですが、違いはツールを 2 つの独立したノードに分けたことです。実際にユーザーの予約を変更するツールの前にのみ中断を行います。

from typing import Literal

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition

builder = StateGraph(State)


def user_info(state: State):
    return {"user_info": fetch_user_flight_information.invoke({})}


# NEW: fetch_user_infoノードが最初に実行されるため、アシスタントはユーザーのフライト情報をアクションを取らずに見ることができます
builder.add_node("fetch_user_info", user_info)
builder.add_edge(START, "fetch_user_info")
builder.add_node("assistant", Assistant(part_3_assistant_runnable))
builder.add_node("safe_tools", create_tool_node_with_fallback(part_3_safe_tools))
builder.add_node(
    "sensitive_tools", create_tool_node_with_fallback(part_3_sensitive_tools)
)
# ロジックを定義します
builder.add_edge("fetch_user_info", "assistant")


def route_tools(state: State) -> Literal["safe_tools", "sensitive_tools", "__end__"]:
    next_node = tools_condition(state)
    # ツールが呼び出されていない場合、ユーザーに戻ります
    if next_node == END:
        return END
    ai_message = state["messages"][-1]
    # これは単一のツール呼び出しを前提としています。並列ツール呼び出しを処理するには、ANY条件を使用する必要があります。
    first_tool_call = ai_message.tool_calls[0]
    if first_tool_call["name"] in sensitive_tool_names:
        return "sensitive_tools"
    return "safe_tools"


builder.add_conditional_edges(
    "assistant",
    route_tools,
)
builder.add_edge("safe_tools", "assistant")
builder.add_edge("sensitive_tools", "assistant")

memory = MemorySaver()
part_3_graph = builder.compile(
    checkpointer=memory,
    # NEW: グラフは常に「ツール」ノードを実行する前に停止します。
    # ユーザーは、アシスタントが続行する前に承認または拒否できます。
    interrupt_before=["sensitive_tools"],
)
from IPython.display import Image, display

try:
    display(Image(part_3_graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    # これは追加の依存関係が必要であり、オプションです
    pass

image

サンプル対話#

今、私たちの改善されたチャットボットを試す時が来ました!以下の対話のラウンドを使って実行してみましょう。今回は、確認ステップが少なくなります。

import shutil
import uuid

# 各セクションで元の場所から再起動できるようにバックアップファイルで更新します
db = update_dates(db)
thread_id = str(uuid.uuid4())

config = {
    "configurable": {
        # passenger_idは、ユーザーのフライト情報を取得するために私たちのフライトツールで使用されます
        "passenger_id": "3442 587242",
        # チェックポイントはthread_idによってアクセスされます
        "thread_id": thread_id,
    }
}

tutorial_questions = [
    "Hi there, what time is my flight?",
    "Am i allowed to update my flight to something sooner? I want to leave later today.",
    "Update my flight to sometime next week then",
    "The next available option is great",
    "what about lodging and transportation?",
    "Yeah i think i'd like an affordable hotel for my week-long stay (7 days). And I'll want to rent a car.",
    "OK could you place a reservation for your recommended hotel? It sounds nice.",
    "yes go ahead and book anything that's moderate expense and has availability.",
    "Now for a car, what are my options?",
    "Awesome let's just get the cheapest option. Go ahead and book for 7 days",
    "Cool so now what recommendations do you have on excursions?",
    "Are they available while I'm there?",
    "interesting - i like the museums, what options are there? ",
    "OK great pick one and book it for my second day there.",
]


_printed = set()
# 第1部のチュートリアルの質問を再利用して、どのように機能するかを確認できます。
for question in tutorial_questions:
    events = part_3_graph.stream(
        {"messages": ("user", question)}, config, stream_mode="values"
    )
    for event in events:
        _print_event(event, _printed)

第 3 部の振り返り#

効果が向上しました!私たちのエージェントは現在、非常にうまく機能しています —— LangSmith の実行記録を確認して、その作業を確認してください!このデザインに満足しているかもしれません。コードは整理されており、期待通りのパフォーマンスを発揮しています。

このデザインの 1 つの問題は、単一のプロンプトに過度の負担をかけていることです。ツールを追加したり、各ツールがより複雑になったり(より多くのフィルタリング条件、より多くのビジネスロジックの制限行動など)すると、ツールの使用効果やロボット全体のパフォーマンスが低下し始める可能性があります。

次の部分では、ユーザーの意図に応じて特定のエージェントやサブグラフにルーティングすることで、異なるユーザー体験をより良く制御する方法を示します。

第 4 部:専門的なワークフロー#

前の章では、単一のプロンプトと大規模言語モデル(LLM)に依存してさまざまなユーザーの意図を処理する「広範な」チャットボットがどれだけ進めるかを見てきました。しかし、このアプローチは、既知の意図に対して予測可能な優れたユーザー体験を作成するのが難しいです。

別のアプローチは、グラフがユーザーの意図を検出し、ユーザーのニーズを満たすために適切なワークフローや「スキル」を選択できるようにすることです。各ワークフローはその分野に特化しているため、全体のアシスタントのパフォーマンスを低下させることなく独立して最適化できます。

このセクションでは、ユーザー体験を個別のサブグラフに分割し、最終的な構造は以下のようになります:

image

上の図では、各ボックスが実行能力を持つ専門的なワークフローを含んでいます。主アシスタントはユーザーの初期クエリを処理し、グラフはクエリの内容に基づいてリクエストを適切な「専門家」にルーティングします。

状態#

現在、どのサブグラフが現在のセッションを制御しているかを追跡できるようにしたいと考えています。これを実現するために、メッセージリストをいくつか操作することができますが、より簡単な方法は、専用のスタックとして追跡することです。

以下のStatedialog_stateリストを追加します。ノードが実行され、dialog_state値を返すたびに、update_dialog_stack関数が呼び出され、更新を適用する方法を決定します。

from typing import Annotated, Literal, Optional

from typing_extensions import TypedDict

from langgraph.graph.message import AnyMessage, add_messages


def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
    """状態をプッシュまたはポップします。"""
    if right is None:
        return left
    if right == "pop":
        return left[:-1]
    return left + [right]


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    user_info: str
    dialog_state: Annotated[
        list[
            Literal[
                "assistant",
                "update_flight",
                "book_car_rental",
                "book_hotel",
                "book_excursion",
            ]
        ],
        update_dialog_stack,
    ]

アシスタント#

これで、各ワークフローのためにアシスタントを作成します。つまり:

  1. フライト予約アシスタント
  2. ホテル予約アシスタント
  3. レンタカーアシスタント
  4. 行程アシスタント
  5. 最後に、これらのアシスタント間でルーティングするための「主アシスタント」

お気づきかもしれませんが、これは私たちのマルチエージェントの例における監視者デザインパターンの一例です。

以下に、各アシスタントを駆動するためのRunnableオブジェクトを定義します。各Runnableにはプロンプト、LLM(大規模言語モデル)、およびそのアシスタントに適したツールのセットが含まれます。
専門化された / 委任されたアシスタントは、CompleteOrEscalateツールを呼び出して、制御フローが主アシスタントに戻るべきであることを示すこともできます。これは、アシスタントがそのタスクを正常に完了した場合、またはユーザーが考えを変えた場合、またはそのワークフローの範囲を超える問題を処理する必要がある場合に発生します。

from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig

from pydantic import BaseModel, Field


class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)

            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}


class CompleteOrEscalate(BaseModel):
    """現在のタスクを完了したとマークし、またはダイアログの制御を主アシスタントにエスカレートするためのツールです。
    主アシスタントは、ユーザーのニーズに基づいてダイアログを再ルーティングできます。"""

    cancel: bool = True
    reason: str

    class Config:
        json_schema_extra = {
            "example": {
                "cancel": True,
                "reason": "User changed their mind about the current task.",
            },
            "example 2": {
                "cancel": True,
                "reason": "I have fully completed the task.",
            },
            "example 3": {
                "cancel": False,
                "reason": "I need to search the user's emails or calendar for more information.",
            },
        }


# フライト予約アシスタント

flight_booking_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a specialized assistant for handling flight updates. "
            " The primary assistant delegates work to you whenever the user needs help updating their bookings. "
            "Confirm the updated flight details with the customer and inform them of any additional fees. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            "If you need more information or the customer changes their mind, escalate the task back to the main assistant."
            " Remember that a booking isn't completed until after the relevant tool has successfully been used."
            "\n\nCurrent user flight information:\n<Flights>\n{user_info}\n</Flights>"
            "\nCurrent time: {time}."
            "\n\nIf the user needs help, and none of your tools are appropriate for it, then"
            ' "CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.',
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now())

update_flight_safe_tools = [search_flights]
update_flight_sensitive_tools = [update_ticket_to_new_flight, cancel_ticket]
update_flight_tools = update_flight_safe_tools + update_flight_sensitive_tools
update_flight_runnable = flight_booking_prompt | llm.bind_tools(
    update_flight_tools + [CompleteOrEscalate]
)

# ホテル予約アシスタント
book_hotel_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a specialized assistant for handling hotel bookings. "
            "The primary assistant delegates work to you whenever the user needs help booking a hotel. "
            "Search for available hotels based on the user's preferences and confirm the booking details with the customer. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            "If you need more information or the customer changes their mind, escalate the task back to the main assistant."
            " Remember that a booking isn't completed until after the relevant tool has successfully been used."
            "\nCurrent time: {time}."
            '\n\nIf the user needs help, and none of your tools are appropriate for it, then "CompleteOrEscalate" the dialog to the host assistant.'
            " Do not waste the user's time. Do not make up invalid tools or functions."
            "\n\nSome examples for which you should CompleteOrEscalate:\n"
            " - 'what's the weather like this time of year?'\n"
            " - 'nevermind i think I'll book separately'\n"
            " - 'i need to figure out transportation while i'm there'\n"
            " - 'Oh wait i haven't booked my flight yet i'll do that first'\n"
            " - 'Hotel booking confirmed'",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now())

book_hotel_safe_tools = [search_hotels]
book_hotel_sensitive_tools = [book_hotel, update_hotel, cancel_hotel]
book_hotel_tools = book_hotel_safe_tools + book_hotel_sensitive_tools
book_hotel_runnable = book_hotel_prompt | llm.bind_tools(
    book_hotel_tools + [CompleteOrEscalate]
)

# レンタカーアシスタント
book_car_rental_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a specialized assistant for handling car rental bookings. "
            "The primary assistant delegates work to you whenever the user needs help booking a car rental. "
            "Search for available car rentals based on the user's preferences and confirm the booking details with the customer. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。