[AINFO] LangChain Memory와 Redis를 활용한 멀티턴 대화 구현

⚠️ 안내: 이 글은 학습 기록용입니다. 오류나 보완 의견은 댓글로 알려주세요.

📌 관련 개념 정리

관련된 기술에 대한 개념적 정리는 아래 포스트를 참고해주세요.

🧭 배경

기존에 구현한 챗봇은 각 질문에 대해 독립적으로 응답을 생성하는 싱글턴 방식으로 동작했습니다. 사용자가 이전 대화 내용을 언급하더라도 챗봇은 이를 기억하지 못하고 매번 새로운 대화로 처리했습니다.

이러한 구조에서는

  • 사용자가 “아까 말한 정책 더 알려줘”와 같은 후속 질문을 하더라도 맥락을 파악할 수 없고, 대화의 흐름이 단절되어 자연스러운 상담 경험을 제공하기 어려웠습니다.
  • 또한, 정책 상담이라는 서비스 특성상 여러 번의 질문을 통해 점진적으로 정보를 얻는 경우가 많은데, 이를 지원하지 못하는 한계가 있었습니다.

이에 따라 이번 작업에서는 대화 기록을 저장하고 활용하는 멀티턴(Multi-turn) 기능을 도입하게 되었습니다. 멀티턴 메모리를 구현하기 위해 LangChain Memory의 ConversationSummaryBufferMemory와 Redis를 사용했습니다.

🧩 구현 과정

1. ChatHistoryManager 클래스

사용자별 대화 기록을 Redis에 저장하고 관리하는 핵심 클래스입니다. 이 클래스는 LangChain의 RedisChatMessageHistoryConversationSummaryBufferMemory를 조합하여 대화 저장, 불러오기, 요약, 삭제 기능을 제공합니다.

# chatbot/memory.py

import redis
from django.conf import settings
from langchain.memory import ConversationSummaryBufferMemory
from langchain_community.chat_message_histories import RedisChatMessageHistory


class ChatHistoryManager:
    """
    사용자의 챗봅 대화 기록을 Redis에 저장하고 관리하는 클래스
    Redis를 이용해서 사용자의 대화를 기록 저장하고,
    LangChain의 `ConversationSummaryBufferMemory`를 통해 200 token 이상의 대화는 요약해서 prompt에 전달해서 사용합니다.
    대화가 길어져도 성능이나 경제적으로 효율적인 방식으로 과거의 대화 내용을 반영해서 답변을 얻을 수 있게 됩니다.
    주요기능:
        - Redis를 활용한 사용자별 대화 기록 저장
        - `ConversationSummaryBufferMemory`를 이용한 대화 요약 및 멀티턴 대화 관리
        - Redis에 저장된 대화 기록을 삭제하는 기능
    Args:
        user_id = 사용자의 id
        model = 길어진 대화를 요약하는데 사용하는 LLM 모델
        max_token_limit : 대화 요약을 위한 최대 토큰 제한 (default = 200)
    Methods:
        get_memory_manager(): 사용자의 대화를 관리할 `ConversationSummaryBufferMemory` 객체 반환
        clear_history(): 사용자의 대화 기록을 Redis에서 삭제
    """

    REDIS_DB = 1

    def __init__(self, user_id, model, max_token_limit=200):
        """ChatHistoryManager 클래스 생성자"""
        self.user_id = str(user_id)
        self.model = model
        self.max_token_limit = max_token_limit
        self.redis_client = redis.Redis(
            host=settings.REDIS_HOST,
            port=settings.REDIS_PORT,
            db=self.REDIS_DB,
            decode_responses=True,
        )

        self.chat_history = RedisChatMessageHistory(
            session_id=self.user_id,
            url=f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{self.REDIS_DB}",
        )

    def get_memory_manager(self):
        """사용자 대화 기록을 관리할 `ConversationSummaryBufferMemory 인스턴스 생성 매서드"""
        return ConversationSummaryBufferMemory(
            llm=self.model,
            chat_memory=self.chat_history,  # Redis에서 관리하는 대화 기록 객체
            max_token_limit=self.max_token_limit,
            return_messages=True,  # 전체 대화 메시지 반환 여부
            memory_key="chat_history",  # 저장된 대화 기록의 key 값
        )

    def clear_history(self):
        """Redis에서 대화기록 삭제 매서드"""
        self.chat_history.clear()

클래스 구조

구성 요소설명
REDIS_DB = 1멀티턴 메모리 전용 Redis DB 번호. Channel Layer(0번)와 분리하여 관리
__init__()Redis 연결 설정 및 RedisChatMessageHistory 인스턴스 생성
get_memory_manager()ConversationSummaryBufferMemory 인스턴스를 생성하여 반환
clear_history()사용자의 대화 기록을 Redis에서 완전히 삭제
  • 본 프로젝트에서는 WebSocket Channel Layer가 0번을 사용하고 있어, 멀티턴 메모리용으로 1번 DB를 지정

생성자 (__init__) 설명

  • user_id: 문자열로 변환하여 저장. Redis의 session_id로 사용되며, session_id는 문자열 타입을 요구하여 문자열로 변환
  • session_id: {user_id}:{room_id} 형태로 구성하여 사용자와 채팅방 조합별로 독립적인 대화 기록 관리
  • redis.Redis(): Redis 서버와의 직접 연결 객체. decode_responses=True 설정으로 바이트가 아닌 문자열로 데이터를 주고받음
  • RedisChatMessageHistory: LangChain에서 제공하는 Redis 기반 대화 기록 저장소. session_id를 키로 사용하여 대화 내역을 저장/조회

get_memory_manager() 메서드 설명

ConversationSummaryBufferMemory의 주요 파라미터:

  • llm: 대화 요약에 사용할 LLM 모델. 토큰 제한 초과 시 이 모델로 요약 수행
  • chat_memory: 실제 대화 기록이 저장되는 저장소 (RedisChatMessageHistory 인스턴스)
  • max_token_limit: 이 토큰 수를 초과하면 오래된 대화부터 요약. 기본값 200은 약 3~4턴의 대화 분량
  • return_messages=True: 대화 기록을 문자열이 아닌 Message 객체 리스트로 반환. ChatModel과 호환성 유지
  • memory_key: 프롬프트에서 대화 기록을 참조할 때 사용하는 변수명

2. 프롬프트에 대화 히스토리 placeholder 추가

멀티턴 대화를 위해서는 이전 대화 기록이 LLM에 전달되어야 합니다. LangChain의 MessagesPlaceholder를 사용하면 프롬프트 템플릿 내에 동적으로 대화 기록을 삽입할 수 있습니다.

# chatbot/prompt.py

from langchain.prompts import (
    ChatPromptTemplate,
    HumanMessagePromptTemplate,
    MessagesPlaceholder,
    SystemMessagePromptTemplate,
)

system_message = SystemMessagePromptTemplate.from_template(
    """
    당신은 대한민국 정부 정책 전문 상담 AI 어시스턴트입니다.
    정책 정보, 지원 제도, 규제 등에 대해 친절하고 정확하게 답변하세요.
    ...
    """
)

user_prompt = HumanMessagePromptTemplate.from_template(
    """
    ## 참고 문서:
    {context}

    ## 사용자 질문:
    질문: {question}
    ...
    """
)

CHATBOT_PROMPT = ChatPromptTemplate.from_messages(
    [
        system_message,
        HumanMessagePromptTemplate.from_template(few_shot_prompt_text),
        MessagesPlaceholder(variable_name="chat_history"),  # 대화 히스토리 삽입 위치
        user_prompt,
    ]
)

프롬프트 구조 설명

ChatPromptTemplate.from_messages()는 여러 메시지를 순서대로 조합하여 하나의 프롬프트를 구성합니다.

위 코드에서 메시지는 다음 순서로 LLM에 전달됩니다:

순서구성 요소역할
1system_messageAI의 역할과 응답 지침을 정의하는 시스템 메시지
2few_shot_prompt_text질의 확장을 위한 Few-shot 예제
3MessagesPlaceholder이전 대화 기록이 동적으로 삽입되는 위치
4user_promptRAG로 검색된 문서와 현재 사용자 질문

MessagesPlaceholder 동작 방식

  • variable_name="chat_history": RAG 체인에서 chat_history 키로 전달된 메시지 리스트가 이 위치에 삽입됨
  • 대화 기록은 HumanMessageAIMessage가 번갈아 나타나는 형태로 구성
  • 대화 기록이 없으면(첫 질문) 해당 위치는 비어있게 되어 싱글턴과 동일하게 동작
  • 대화 기록이 시스템 메시지 바로 뒤, 현재 질문 앞에 위치하여 LLM이 맥락을 파악한 후 현재 질문에 답변할 수 있음

3. 챗봇 로직에 멀티턴 적용

기존 챗봇 로직에 ChatHistoryManager를 통합하여 대화 기록을 불러오고 저장하는 흐름을 추가합니다. 핵심은 RAG 체인 실행 전에 이전 대화를 불러오고, 응답 생성 후에 현재 대화를 저장하는 것입니다.

get_chatbot_response() 수정


# 기존
async def get_chatbot_response(user_message):
    ,,,

# 변경
async def get_chatbot_response(user_message, user_id):
    ...
    chat_manager = ChatHistoryManager(user_id, llm)
    memory_manager = chat_manager.get_memory_manager()
    memory = memory_manager.load_memory_variables({})

  • user_id를 추가하여 사용자별 독립적인 대화 기록 관리가 가능하도록 하였습니다.
  • ChatHistoryManager를 통해 사용자 단위의 메모리 매니저 인스턴스를 생성하였습니다.
  • 체인 실행 전에 load_memory_variables를 호출하여 chat_history를 미리 확보하여 이후에 이어지는 RAG체인에 사용하였습니다.

RAG 체인 구성 변경

# 기존
    rag_chain = (
        {"context": lambda _: retrieved_context, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

# 변경
rag_chain = (
    {
        "context": lambda _: retrieved_context,      # RAG로 검색된 문서
        "question": RunnablePassthrough(),           # 사용자 질문 그대로 전달
        "chat_history": lambda _: memory.get("chat_history", []),  # 이전 대화 기록
    }
    | prompt
    | llm
    | StrOutputParser()
)
  • 기존에는 검색 컨텍스트와 사용자 질문만 프롬프트에 주입했지만,
  • chat_history를 추가하여 이전 대화 맥락을 반영한 응답 생성이 가능하도록 프롬프트 입력 구조를 확장하였습니다.

대화 저장 (save_context)

 output_response = ""

    async for chunk in rag_chain.astream(user_message):
        output_response += chunk
        yield chunk

    memory_manager.save_context({"human": user_message}, {"ai": output_response})
  • LLM 응답을 스트리밍(astream) 방식으로 처리하면서도 대화 메모리를 유지하기 위해,
  • 스트리밍 중 전달되는 chunk를 output_response에 누적한 뒤 최종 응답만을 save_context에 기록하도록 구조를 변경하였습니다.
  • save_context 설명
    • 첫 번째 인자: 사용자 메시지 (HumanMessage로 저장)
    • 두 번째 인자: AI 응답 (AIMessage로 저장)
    • 저장 시점에 토큰 수가 max_token_limit을 초과하면 자동으로 오래된 대화를 요약

전체 코드

# chatbot/utils.py

from langchain.schema.runnable import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

from .memory import ChatHistoryManager
from .prompt import CHATBOT_PROMPT
from .retriever import VectorRetriever

# 리트리버 인스턴스 생성 (싱글톤으로 구현된 리트리버에 맞춰 전역 변수로 생성)
vector_retriever = VectorRetriever()
# 멀티쿼리리트리버로 변경함.
retriever = vector_retriever.get_multi_query_retriever()


async def get_chatbot_response(user_message):
async def get_chatbot_response(user_message, user_id):
    """
    비동기 스트리밍 방식 챗봇 응답을 생성할 수 있는 함수
    - 사용자의 입력을 받아 LLM을 실행하고, 단일 응답만 반환 (싱글턴, 대화 기록 저장 X)
    이 함수는 LangChain의 활용해서 검색 증강 생성(RAG) 방식의 챗봇 응답을 생성합니다.
    비동기 스트리밍으로 응답으로 실시간으로 반환하고, 대화 기록을 활용해 대화의 흐름을 이어나갈 수 있습니다.
    주요 기능:
    - 사용자의 입력을 받아 LLM을 실행하고, 비동기 스트리밍 방식으로 응답 바노한
    - 검색된 문서를 기반으로 하는 답변 제공
    - `ChatHistoryManager`를 통해 대화 내용을 관리할 수 있음
    - temperature=0.3: 창의성보다 정확도에 중점을 둠. (추후 조정 가능)
    변동사항 (03/09/25):
    - async, yield를 사용해 비동기 작업을 할 수 있도록 변경
    - LLM 응답을 invoke() 방식이 아닌 astream() 비동기 스트리밍 방식으로 변경
    변동사항 (03/19/25)
    - `ChatHistoryManager`를 통해 대화 내용 관리 및 멀티턴 기능 추가
    """

    # 사용자 질문을 기반으로 문서 검색
    retrieved_docs = retriever.invoke(user_message)
    # 디버깅: 검색된 문서가 있는지 확인
    if not retrieved_docs:
        print("검색된 문서가 없습니다.")
    else:
        pass
    # 검색된 문서를 문자열로 변환
    retrieved_context = VectorRetriever().format_docs(retrieved_docs)
    # RAG 검색 결과가 없으면 기본 context 사용
    if retrieved_context is None:
        retrieved_context = "정부의 지원 정책"
    # llm 실행 (CHATBOT_PROMPT는 utils1.py에서 import)
    # streaming 을 위한 streaming 파라미터 조정
    llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0.3, streaming=True)

    chat_manager = ChatHistoryManager(user_id, llm)
    memory_manager = chat_manager.get_memory_manager()
    memory = memory_manager.load_memory_variables({})

    prompt = CHATBOT_PROMPT

    rag_chain = (
        {"context": lambda _: retrieved_context, "question": RunnablePassthrough()}
        {
            "context": lambda _: retrieved_context,
            "question": RunnablePassthrough(),
            "chat_history": lambda _: memory.get("chat_history", []),
        }
        | prompt
        | llm
        | StrOutputParser()
    )

    output_response = ""

    async for chunk in rag_chain.astream(user_message):
        output_response += chunk
        yield chunk

    memory_manager.save_context({"human": user_message}, {"ai": output_response})

4. Consumer에서 멀티턴 통합

WebSocket Consumer에서 멀티턴 기능을 통합합니다. 연결 시 사용자/채팅방 정보를 저장하고, 메시지 수신 시 이를 챗봇 로직에 전달하며, 연결 종료 시 대화 기록을 정리합니다.

disconnect() 수정

# 기존
class ChatConsumer(AsyncWebsocketConsumer):
    ...
    async def disconnect(self, close_code):
        pass

# 변경
class ChatConsumer(AsyncWebsocketConsumer):
    ...
    async def disconnect(self, close_code):
        if self.is_authenticated:
            chat_history_manager = ChatHistoryManager(self.user_id, model=None)
            chat_history_manager.clear_history()
  • 인증된 사용자의 연결이 종료될 때만 대화 기록을 삭제합니다.
  • model=None: 대화 기록 삭제 시에는 요약 기능이 필요 없으므로 LLM 모델을 전달하지 않습니다.
  • clear_history(): Redis에서 해당 사용자/채팅방의 대화 기록을 완전히 삭제합니다.
  • 현재는 연결 종료 시 삭제하지만, 추후 chatroom 기능 구현 시 이 로직을 제거하고 채팅방을 삭제하는 경우 삭제하는 것으로 변경 예정입니다.

receive() 수정

# 기존
class ChatConsumer(AsyncWebsocketConsumer):
    ...
    async def receive(self, text_data):
        ...
        async for chunk in get_chatbot_response(user_message):
# 변경
class ChatConsumer(AsyncWebsocketConsumer):
    ...
    async def receive(self, text_data):
        ...
        async for chunk in get_chatbot_response(user_message, self.user_id):

  • 기존에는 user_message만 전달했으나, user_idroom_id를 추가로 전달
  • 이를 통해 get_chatbot_response() 내부에서 사용자별 대화 기록을 불러오고 저장 가능
# 기존
class ChatConsumer(AsyncWebsocketConsumer):
    ...
    async def receive(self, text_data):
        ...
        data = json.loads(text_data)
        user_message = data["message"]

# 변경
class ChatConsumer(AsyncWebsocketConsumer):
    ...
    async def receive(self, text_data):
        ...
        try:
            data = json.loads(text_data)
            user_message = data["message"]
        except json.JSONDecodeError:
            await self.send(
                text_data=json.dumps(
                    {"error": "잘못된 JSON 형식입니다."},
                    ensure_ascii=False,
                )
            )
            return
  • 클라이언트에서 전달되는 메시지가 손상되거나 잘못된 JSON 형식일 가능성을 고려해,
  • 수신 단계에서 JSON 파싱 예외를 처리하고 정상적인 입력만 이후 로직으로 전달되도록 방어 코드도 추가하였습니다.

전체 코드

# chatbot/consumers.py
import json
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer

from accounts.models import User

from .memory import ChatHistoryManager
from .serializers import ChatbotSerializer
from .utils import get_chatbot_response

class ChatConsumer(AsyncWebsocketConsumer):
    """
    WebSocket을 통한 실시간 채팅을 처리하는 comsumer 클래스
    해당 클래스는 Django Channels의 AsyncWebsocketConsumer를 상속받아
    WebSocket 연결을 관리하고, 사용자 인증 및 AI응답을 처리합니다.
    사용 방식:
    - WebSocket 연결 시 사용자를 인증하고, 인증된 사용자만 채팅 이용 가능
    - 클라이언트가 메시지를 전송하면 Chatbot 모델을 호츌하여 점진적인 응답을 생성
    - `is_streaming=True`로 스트리밍 중임을 같이 알림
    - 생성이 끝나면 완전한 메시지와 'is_streaming=False`도 같이 보내 스트리밍이 끝남을 알림
    - 인증되지 않은 사용자는 메시지를 전송할 수 없으며, 에러 메시지를 반환
    매서드(Method)
    - connect(): WebSocket 연결을 초기화하고 사용자 인증을 수행함.
    - disconnect(close_code): Websocket 연결 종료
    - receive(text_data): 클라이언트의 메시지를 받아 Chatbot에 전달
    - get_user(): user_id를 기반으로 데이터베이스에서 사용자 정보 조회
    """
    async def connect(self):
        self.user_id = self.scope.get("user_id")
        self.is_authenticated = await self.get_user() is not None
        if not self.is_authenticated:
            await self.close()
            return
        await self.accept()

    async def disconnect(self, close_code):
        pass
        if self.is_authenticated:
            chat_history_manager = ChatHistoryManager(self.user_id, model=None)
            chat_history_manager.clear_history()

    async def receive(self, text_data):
        if not self.is_authenticated:
            await self.send(
                text_data=json.dumps(
                    {"error": "인증되지 않은 사용자입니다."},
                    ensure_ascii=False,
                )
            )

        data = json.loads(text_data)
        user_message = data["message"]
        try:
            data = json.loads(text_data)
            user_message = data["message"]
        except json.JSONDecodeError:
            await self.send(
                text_data=json.dumps(
                    {"error": "잘못된 JSON 형식입니다."},
                    ensure_ascii=False,
                )
            )
            return

        serializer = ChatbotSerializer(data={"message": user_message})
        if not serializer.is_valid():
            await self.send(
                text_data=json.dumps(
                    {"error": "잘못된 메시지 형식입니다."},
                    ensure_ascii=False,
                )
            )
            return

        # 생성되고 있는 답변의 chunk과 스트리밍 중임을 알림
        async for chunk in get_chatbot_response(user_message):
        async for chunk in get_chatbot_response(user_message, self.user_id):
            await self.send(
                text_data=json.dumps(
                    {"response": chunk, "is_streaming": True},
                    ensure_ascii=False,
                )
            )
        # 완전히 답변 생성이 끝나면 최종 답변과 스트리밍이 끝남을 알림
        await self.send(
            text_data=json.dumps(
                {"response": chunk, "is_streaming": False},
                ensure_ascii=False,
            )
        )
    @sync_to_async
    def get_user(self):
        if self.user_id:
            return User.objects.filter(id=self.user_id).first()
        return None

전체 멀티턴 흐름 요약

1. connect()     → user_id, room_id 저장
2. receive() #1  → Redis에서 대화 기록 불러옴 (비어있음) → LLM 실행 → 대화 저장
3. receive() #2  → Redis에서 대화 기록 불러옴 (1턴 존재) → LLM 실행 → 대화 저장
4. receive() #3  → Redis에서 대화 기록 불러옴 (2턴 존재) → LLM 실행 → 대화 저장
   ...
N. disconnect()  → Redis에서 대화 기록 삭제

🔄 마무리

구현 성과

이번 작업을 통해 AINFO 챗봇에 멀티턴 대화 기능을 안정적으로 도입했습니다.

  • 사용자가 이전 대화를 참조하는 후속 질문을 하더라도 맥락을 유지한 응답을 제공할 수 있게 되었습니다.
  • ConversationSummaryBufferMemory를 활용하여 대화가 길어지더라도 토큰 제한을 넘지 않으면서 중요한 정보를 보존할 수 있습니다.
  • Redis를 저장소로 사용하여 다중 인스턴스 환경에서도 일관된 대화 기록을 유지할 수 있는 확장 가능한 구조를 마련했습니다.

회고

멀티턴 기능을 구현하면서 LangChain에서 제공하는 다양한 Memory 옵션들을 비교해 보고, 각각의 장단점을 정리하는 과정에서 이전 대화를 기억하는 방식에도 여러 설계 선택지가 존재한다는 점을 배울 수 있었습니다.

멀티턴 기능은 단순히 “대화를 저장한다”는 문제를 넘어서, 토큰 제한이라는 제약 속에서 메모리 효율성과 정보 보존 사이의 균형을 어떻게 잡을 것인지에 대한 고민이 필요한 영역이라는 점이 재미있었던 것 같습니다. 여러 선택지 중에서 어떤 방식을 사용할지 결정하고, 그 판단을 실제 구현으로 옮기면서 단순히 기능을 붙이는 것이 아니라 내 생각과 기준을 코드에 녹여낸 코드라 애정이 더 생기는 것 같았습니다.

개선 방향

  • 대화 삭제 시점 개선: 현재는 WebSocket 연결 종료 시 Redis에 저장된 대화 기록을 삭제하고 있으나, 추후 chatroom 개념을 도입하면 chatroom 삭제 시점에 대화 기록을 정리하는 구조로 변경할 예정입니다.
  • 대화 기록 DB 구축: Redis는 실시간 멀티턴 처리를 위한 단기 저장소로 유지하고, 대화 로그 분석 및 이력 관리를 위해 별도의 DB에 대화 내용을 영구 저장하는 구조를 추가할 계획입니다.
  • 메모리 정책 최적화: 현재 max_token_limit을 200으로 설정했는데, 실제 사용 패턴을 분석하여 최적의 값을 찾아갈 예정입니다.

© 2024. All rights reserved.

Powered by Hydejack v9.2.1