[AINFO] LangChain Memory와 Redis를 활용한 멀티턴 대화 구현
⚠️ 안내: 이 글은 학습 기록용입니다. 오류나 보완 의견은 댓글로 알려주세요.
📌 관련 개념 정리
관련된 기술에 대한 개념적 정리는 아래 포스트를 참고해주세요.
🧭 배경
기존에 구현한 챗봇은 각 질문에 대해 독립적으로 응답을 생성하는 싱글턴 방식으로 동작했습니다. 사용자가 이전 대화 내용을 언급하더라도 챗봇은 이를 기억하지 못하고 매번 새로운 대화로 처리했습니다.
이러한 구조에서는
- 사용자가 “아까 말한 정책 더 알려줘”와 같은 후속 질문을 하더라도 맥락을 파악할 수 없고, 대화의 흐름이 단절되어 자연스러운 상담 경험을 제공하기 어려웠습니다.
- 또한, 정책 상담이라는 서비스 특성상 여러 번의 질문을 통해 점진적으로 정보를 얻는 경우가 많은데, 이를 지원하지 못하는 한계가 있었습니다.
이에 따라 이번 작업에서는 대화 기록을 저장하고 활용하는 멀티턴(Multi-turn) 기능을 도입하게 되었습니다. 멀티턴 메모리를 구현하기 위해 LangChain Memory의 ConversationSummaryBufferMemory와 Redis를 사용했습니다.
🧩 구현 과정
1. ChatHistoryManager 클래스
사용자별 대화 기록을 Redis에 저장하고 관리하는 핵심 클래스입니다. 이 클래스는 LangChain의 RedisChatMessageHistory와 ConversationSummaryBufferMemory를 조합하여 대화 저장, 불러오기, 요약, 삭제 기능을 제공합니다.
# chatbot/memory.py
import redis
from django.conf import settings
from langchain.memory import ConversationSummaryBufferMemory
from langchain_community.chat_message_histories import RedisChatMessageHistory
class ChatHistoryManager:
"""
사용자의 챗봅 대화 기록을 Redis에 저장하고 관리하는 클래스
Redis를 이용해서 사용자의 대화를 기록 저장하고,
LangChain의 `ConversationSummaryBufferMemory`를 통해 200 token 이상의 대화는 요약해서 prompt에 전달해서 사용합니다.
대화가 길어져도 성능이나 경제적으로 효율적인 방식으로 과거의 대화 내용을 반영해서 답변을 얻을 수 있게 됩니다.
주요기능:
- Redis를 활용한 사용자별 대화 기록 저장
- `ConversationSummaryBufferMemory`를 이용한 대화 요약 및 멀티턴 대화 관리
- Redis에 저장된 대화 기록을 삭제하는 기능
Args:
user_id = 사용자의 id
model = 길어진 대화를 요약하는데 사용하는 LLM 모델
max_token_limit : 대화 요약을 위한 최대 토큰 제한 (default = 200)
Methods:
get_memory_manager(): 사용자의 대화를 관리할 `ConversationSummaryBufferMemory` 객체 반환
clear_history(): 사용자의 대화 기록을 Redis에서 삭제
"""
REDIS_DB = 1
def __init__(self, user_id, model, max_token_limit=200):
"""ChatHistoryManager 클래스 생성자"""
self.user_id = str(user_id)
self.model = model
self.max_token_limit = max_token_limit
self.redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=self.REDIS_DB,
decode_responses=True,
)
self.chat_history = RedisChatMessageHistory(
session_id=self.user_id,
url=f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{self.REDIS_DB}",
)
def get_memory_manager(self):
"""사용자 대화 기록을 관리할 `ConversationSummaryBufferMemory 인스턴스 생성 매서드"""
return ConversationSummaryBufferMemory(
llm=self.model,
chat_memory=self.chat_history, # Redis에서 관리하는 대화 기록 객체
max_token_limit=self.max_token_limit,
return_messages=True, # 전체 대화 메시지 반환 여부
memory_key="chat_history", # 저장된 대화 기록의 key 값
)
def clear_history(self):
"""Redis에서 대화기록 삭제 매서드"""
self.chat_history.clear()
클래스 구조
| 구성 요소 | 설명 |
|---|---|
REDIS_DB = 1 | 멀티턴 메모리 전용 Redis DB 번호. Channel Layer(0번)와 분리하여 관리 |
__init__() | Redis 연결 설정 및 RedisChatMessageHistory 인스턴스 생성 |
get_memory_manager() | ConversationSummaryBufferMemory 인스턴스를 생성하여 반환 |
clear_history() | 사용자의 대화 기록을 Redis에서 완전히 삭제 |
- 본 프로젝트에서는 WebSocket Channel Layer가 0번을 사용하고 있어, 멀티턴 메모리용으로 1번 DB를 지정
생성자 (__init__) 설명
user_id: 문자열로 변환하여 저장. Redis의 session_id로 사용되며, session_id는 문자열 타입을 요구하여 문자열로 변환session_id:{user_id}:{room_id}형태로 구성하여 사용자와 채팅방 조합별로 독립적인 대화 기록 관리redis.Redis(): Redis 서버와의 직접 연결 객체.decode_responses=True설정으로 바이트가 아닌 문자열로 데이터를 주고받음RedisChatMessageHistory: LangChain에서 제공하는 Redis 기반 대화 기록 저장소.session_id를 키로 사용하여 대화 내역을 저장/조회
get_memory_manager() 메서드 설명
ConversationSummaryBufferMemory의 주요 파라미터:
llm: 대화 요약에 사용할 LLM 모델. 토큰 제한 초과 시 이 모델로 요약 수행chat_memory: 실제 대화 기록이 저장되는 저장소 (RedisChatMessageHistory인스턴스)max_token_limit: 이 토큰 수를 초과하면 오래된 대화부터 요약. 기본값 200은 약 3~4턴의 대화 분량return_messages=True: 대화 기록을 문자열이 아닌Message객체 리스트로 반환. ChatModel과 호환성 유지memory_key: 프롬프트에서 대화 기록을 참조할 때 사용하는 변수명
2. 프롬프트에 대화 히스토리 placeholder 추가
멀티턴 대화를 위해서는 이전 대화 기록이 LLM에 전달되어야 합니다. LangChain의 MessagesPlaceholder를 사용하면 프롬프트 템플릿 내에 동적으로 대화 기록을 삽입할 수 있습니다.
# chatbot/prompt.py
from langchain.prompts import (
ChatPromptTemplate,
HumanMessagePromptTemplate,
MessagesPlaceholder,
SystemMessagePromptTemplate,
)
system_message = SystemMessagePromptTemplate.from_template(
"""
당신은 대한민국 정부 정책 전문 상담 AI 어시스턴트입니다.
정책 정보, 지원 제도, 규제 등에 대해 친절하고 정확하게 답변하세요.
...
"""
)
user_prompt = HumanMessagePromptTemplate.from_template(
"""
## 참고 문서:
{context}
## 사용자 질문:
질문: {question}
...
"""
)
CHATBOT_PROMPT = ChatPromptTemplate.from_messages(
[
system_message,
HumanMessagePromptTemplate.from_template(few_shot_prompt_text),
MessagesPlaceholder(variable_name="chat_history"), # 대화 히스토리 삽입 위치
user_prompt,
]
)
프롬프트 구조 설명
ChatPromptTemplate.from_messages()는 여러 메시지를 순서대로 조합하여 하나의 프롬프트를 구성합니다.
위 코드에서 메시지는 다음 순서로 LLM에 전달됩니다:
| 순서 | 구성 요소 | 역할 |
|---|---|---|
| 1 | system_message | AI의 역할과 응답 지침을 정의하는 시스템 메시지 |
| 2 | few_shot_prompt_text | 질의 확장을 위한 Few-shot 예제 |
| 3 | MessagesPlaceholder | 이전 대화 기록이 동적으로 삽입되는 위치 |
| 4 | user_prompt | RAG로 검색된 문서와 현재 사용자 질문 |
MessagesPlaceholder 동작 방식
variable_name="chat_history": RAG 체인에서chat_history키로 전달된 메시지 리스트가 이 위치에 삽입됨- 대화 기록은
HumanMessage와AIMessage가 번갈아 나타나는 형태로 구성 - 대화 기록이 없으면(첫 질문) 해당 위치는 비어있게 되어 싱글턴과 동일하게 동작
- 대화 기록이 시스템 메시지 바로 뒤, 현재 질문 앞에 위치하여 LLM이 맥락을 파악한 후 현재 질문에 답변할 수 있음
3. 챗봇 로직에 멀티턴 적용
기존 챗봇 로직에 ChatHistoryManager를 통합하여 대화 기록을 불러오고 저장하는 흐름을 추가합니다. 핵심은 RAG 체인 실행 전에 이전 대화를 불러오고, 응답 생성 후에 현재 대화를 저장하는 것입니다.
get_chatbot_response() 수정
# 기존
async def get_chatbot_response(user_message):
,,,
# 변경
async def get_chatbot_response(user_message, user_id):
...
chat_manager = ChatHistoryManager(user_id, llm)
memory_manager = chat_manager.get_memory_manager()
memory = memory_manager.load_memory_variables({})
user_id를 추가하여 사용자별 독립적인 대화 기록 관리가 가능하도록 하였습니다.- ChatHistoryManager를 통해 사용자 단위의 메모리 매니저 인스턴스를 생성하였습니다.
- 체인 실행 전에 load_memory_variables를 호출하여 chat_history를 미리 확보하여 이후에 이어지는 RAG체인에 사용하였습니다.
RAG 체인 구성 변경
# 기존
rag_chain = (
{"context": lambda _: retrieved_context, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# 변경
rag_chain = (
{
"context": lambda _: retrieved_context, # RAG로 검색된 문서
"question": RunnablePassthrough(), # 사용자 질문 그대로 전달
"chat_history": lambda _: memory.get("chat_history", []), # 이전 대화 기록
}
| prompt
| llm
| StrOutputParser()
)
- 기존에는 검색 컨텍스트와 사용자 질문만 프롬프트에 주입했지만,
- chat_history를 추가하여 이전 대화 맥락을 반영한 응답 생성이 가능하도록 프롬프트 입력 구조를 확장하였습니다.
대화 저장 (save_context)
output_response = ""
async for chunk in rag_chain.astream(user_message):
output_response += chunk
yield chunk
memory_manager.save_context({"human": user_message}, {"ai": output_response})
- LLM 응답을 스트리밍(astream) 방식으로 처리하면서도 대화 메모리를 유지하기 위해,
- 스트리밍 중 전달되는 chunk를 output_response에 누적한 뒤 최종 응답만을 save_context에 기록하도록 구조를 변경하였습니다.
save_context설명- 첫 번째 인자: 사용자 메시지 (
HumanMessage로 저장) - 두 번째 인자: AI 응답 (
AIMessage로 저장) - 저장 시점에 토큰 수가
max_token_limit을 초과하면 자동으로 오래된 대화를 요약
- 첫 번째 인자: 사용자 메시지 (
전체 코드
# chatbot/utils.py
from langchain.schema.runnable import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from .memory import ChatHistoryManager
from .prompt import CHATBOT_PROMPT
from .retriever import VectorRetriever
# 리트리버 인스턴스 생성 (싱글톤으로 구현된 리트리버에 맞춰 전역 변수로 생성)
vector_retriever = VectorRetriever()
# 멀티쿼리리트리버로 변경함.
retriever = vector_retriever.get_multi_query_retriever()
async def get_chatbot_response(user_message):
async def get_chatbot_response(user_message, user_id):
"""
비동기 스트리밍 방식 챗봇 응답을 생성할 수 있는 함수
- 사용자의 입력을 받아 LLM을 실행하고, 단일 응답만 반환 (싱글턴, 대화 기록 저장 X)
이 함수는 LangChain의 활용해서 검색 증강 생성(RAG) 방식의 챗봇 응답을 생성합니다.
비동기 스트리밍으로 응답으로 실시간으로 반환하고, 대화 기록을 활용해 대화의 흐름을 이어나갈 수 있습니다.
주요 기능:
- 사용자의 입력을 받아 LLM을 실행하고, 비동기 스트리밍 방식으로 응답 바노한
- 검색된 문서를 기반으로 하는 답변 제공
- `ChatHistoryManager`를 통해 대화 내용을 관리할 수 있음
- temperature=0.3: 창의성보다 정확도에 중점을 둠. (추후 조정 가능)
변동사항 (03/09/25):
- async, yield를 사용해 비동기 작업을 할 수 있도록 변경
- LLM 응답을 invoke() 방식이 아닌 astream() 비동기 스트리밍 방식으로 변경
변동사항 (03/19/25)
- `ChatHistoryManager`를 통해 대화 내용 관리 및 멀티턴 기능 추가
"""
# 사용자 질문을 기반으로 문서 검색
retrieved_docs = retriever.invoke(user_message)
# 디버깅: 검색된 문서가 있는지 확인
if not retrieved_docs:
print("검색된 문서가 없습니다.")
else:
pass
# 검색된 문서를 문자열로 변환
retrieved_context = VectorRetriever().format_docs(retrieved_docs)
# RAG 검색 결과가 없으면 기본 context 사용
if retrieved_context is None:
retrieved_context = "정부의 지원 정책"
# llm 실행 (CHATBOT_PROMPT는 utils1.py에서 import)
# streaming 을 위한 streaming 파라미터 조정
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0.3, streaming=True)
chat_manager = ChatHistoryManager(user_id, llm)
memory_manager = chat_manager.get_memory_manager()
memory = memory_manager.load_memory_variables({})
prompt = CHATBOT_PROMPT
rag_chain = (
{"context": lambda _: retrieved_context, "question": RunnablePassthrough()}
{
"context": lambda _: retrieved_context,
"question": RunnablePassthrough(),
"chat_history": lambda _: memory.get("chat_history", []),
}
| prompt
| llm
| StrOutputParser()
)
output_response = ""
async for chunk in rag_chain.astream(user_message):
output_response += chunk
yield chunk
memory_manager.save_context({"human": user_message}, {"ai": output_response})
4. Consumer에서 멀티턴 통합
WebSocket Consumer에서 멀티턴 기능을 통합합니다. 연결 시 사용자/채팅방 정보를 저장하고, 메시지 수신 시 이를 챗봇 로직에 전달하며, 연결 종료 시 대화 기록을 정리합니다.
disconnect() 수정
# 기존
class ChatConsumer(AsyncWebsocketConsumer):
...
async def disconnect(self, close_code):
pass
# 변경
class ChatConsumer(AsyncWebsocketConsumer):
...
async def disconnect(self, close_code):
if self.is_authenticated:
chat_history_manager = ChatHistoryManager(self.user_id, model=None)
chat_history_manager.clear_history()
- 인증된 사용자의 연결이 종료될 때만 대화 기록을 삭제합니다.
model=None: 대화 기록 삭제 시에는 요약 기능이 필요 없으므로 LLM 모델을 전달하지 않습니다.clear_history(): Redis에서 해당 사용자/채팅방의 대화 기록을 완전히 삭제합니다.- 현재는 연결 종료 시 삭제하지만, 추후 chatroom 기능 구현 시 이 로직을 제거하고 채팅방을 삭제하는 경우 삭제하는 것으로 변경 예정입니다.
receive() 수정
# 기존
class ChatConsumer(AsyncWebsocketConsumer):
...
async def receive(self, text_data):
...
async for chunk in get_chatbot_response(user_message):
# 변경
class ChatConsumer(AsyncWebsocketConsumer):
...
async def receive(self, text_data):
...
async for chunk in get_chatbot_response(user_message, self.user_id):
- 기존에는
user_message만 전달했으나,user_id와room_id를 추가로 전달 - 이를 통해
get_chatbot_response()내부에서 사용자별 대화 기록을 불러오고 저장 가능
# 기존
class ChatConsumer(AsyncWebsocketConsumer):
...
async def receive(self, text_data):
...
data = json.loads(text_data)
user_message = data["message"]
# 변경
class ChatConsumer(AsyncWebsocketConsumer):
...
async def receive(self, text_data):
...
try:
data = json.loads(text_data)
user_message = data["message"]
except json.JSONDecodeError:
await self.send(
text_data=json.dumps(
{"error": "잘못된 JSON 형식입니다."},
ensure_ascii=False,
)
)
return
- 클라이언트에서 전달되는 메시지가 손상되거나 잘못된 JSON 형식일 가능성을 고려해,
- 수신 단계에서 JSON 파싱 예외를 처리하고 정상적인 입력만 이후 로직으로 전달되도록 방어 코드도 추가하였습니다.
전체 코드
# chatbot/consumers.py
import json
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from accounts.models import User
from .memory import ChatHistoryManager
from .serializers import ChatbotSerializer
from .utils import get_chatbot_response
class ChatConsumer(AsyncWebsocketConsumer):
"""
WebSocket을 통한 실시간 채팅을 처리하는 comsumer 클래스
해당 클래스는 Django Channels의 AsyncWebsocketConsumer를 상속받아
WebSocket 연결을 관리하고, 사용자 인증 및 AI응답을 처리합니다.
사용 방식:
- WebSocket 연결 시 사용자를 인증하고, 인증된 사용자만 채팅 이용 가능
- 클라이언트가 메시지를 전송하면 Chatbot 모델을 호츌하여 점진적인 응답을 생성
- `is_streaming=True`로 스트리밍 중임을 같이 알림
- 생성이 끝나면 완전한 메시지와 'is_streaming=False`도 같이 보내 스트리밍이 끝남을 알림
- 인증되지 않은 사용자는 메시지를 전송할 수 없으며, 에러 메시지를 반환
매서드(Method)
- connect(): WebSocket 연결을 초기화하고 사용자 인증을 수행함.
- disconnect(close_code): Websocket 연결 종료
- receive(text_data): 클라이언트의 메시지를 받아 Chatbot에 전달
- get_user(): user_id를 기반으로 데이터베이스에서 사용자 정보 조회
"""
async def connect(self):
self.user_id = self.scope.get("user_id")
self.is_authenticated = await self.get_user() is not None
if not self.is_authenticated:
await self.close()
return
await self.accept()
async def disconnect(self, close_code):
pass
if self.is_authenticated:
chat_history_manager = ChatHistoryManager(self.user_id, model=None)
chat_history_manager.clear_history()
async def receive(self, text_data):
if not self.is_authenticated:
await self.send(
text_data=json.dumps(
{"error": "인증되지 않은 사용자입니다."},
ensure_ascii=False,
)
)
data = json.loads(text_data)
user_message = data["message"]
try:
data = json.loads(text_data)
user_message = data["message"]
except json.JSONDecodeError:
await self.send(
text_data=json.dumps(
{"error": "잘못된 JSON 형식입니다."},
ensure_ascii=False,
)
)
return
serializer = ChatbotSerializer(data={"message": user_message})
if not serializer.is_valid():
await self.send(
text_data=json.dumps(
{"error": "잘못된 메시지 형식입니다."},
ensure_ascii=False,
)
)
return
# 생성되고 있는 답변의 chunk과 스트리밍 중임을 알림
async for chunk in get_chatbot_response(user_message):
async for chunk in get_chatbot_response(user_message, self.user_id):
await self.send(
text_data=json.dumps(
{"response": chunk, "is_streaming": True},
ensure_ascii=False,
)
)
# 완전히 답변 생성이 끝나면 최종 답변과 스트리밍이 끝남을 알림
await self.send(
text_data=json.dumps(
{"response": chunk, "is_streaming": False},
ensure_ascii=False,
)
)
@sync_to_async
def get_user(self):
if self.user_id:
return User.objects.filter(id=self.user_id).first()
return None
전체 멀티턴 흐름 요약
1. connect() → user_id, room_id 저장
2. receive() #1 → Redis에서 대화 기록 불러옴 (비어있음) → LLM 실행 → 대화 저장
3. receive() #2 → Redis에서 대화 기록 불러옴 (1턴 존재) → LLM 실행 → 대화 저장
4. receive() #3 → Redis에서 대화 기록 불러옴 (2턴 존재) → LLM 실행 → 대화 저장
...
N. disconnect() → Redis에서 대화 기록 삭제
🔄 마무리
구현 성과
이번 작업을 통해 AINFO 챗봇에 멀티턴 대화 기능을 안정적으로 도입했습니다.
- 사용자가 이전 대화를 참조하는 후속 질문을 하더라도 맥락을 유지한 응답을 제공할 수 있게 되었습니다.
ConversationSummaryBufferMemory를 활용하여 대화가 길어지더라도 토큰 제한을 넘지 않으면서 중요한 정보를 보존할 수 있습니다.- Redis를 저장소로 사용하여 다중 인스턴스 환경에서도 일관된 대화 기록을 유지할 수 있는 확장 가능한 구조를 마련했습니다.
회고
멀티턴 기능을 구현하면서 LangChain에서 제공하는 다양한 Memory 옵션들을 비교해 보고, 각각의 장단점을 정리하는 과정에서 이전 대화를 기억하는 방식에도 여러 설계 선택지가 존재한다는 점을 배울 수 있었습니다.
멀티턴 기능은 단순히 “대화를 저장한다”는 문제를 넘어서, 토큰 제한이라는 제약 속에서 메모리 효율성과 정보 보존 사이의 균형을 어떻게 잡을 것인지에 대한 고민이 필요한 영역이라는 점이 재미있었던 것 같습니다. 여러 선택지 중에서 어떤 방식을 사용할지 결정하고, 그 판단을 실제 구현으로 옮기면서 단순히 기능을 붙이는 것이 아니라 내 생각과 기준을 코드에 녹여낸 코드라 애정이 더 생기는 것 같았습니다.
개선 방향
- 대화 삭제 시점 개선: 현재는 WebSocket 연결 종료 시 Redis에 저장된 대화 기록을 삭제하고 있으나, 추후 chatroom 개념을 도입하면 chatroom 삭제 시점에 대화 기록을 정리하는 구조로 변경할 예정입니다.
- 대화 기록 DB 구축: Redis는 실시간 멀티턴 처리를 위한 단기 저장소로 유지하고, 대화 로그 분석 및 이력 관리를 위해 별도의 DB에 대화 내용을 영구 저장하는 구조를 추가할 계획입니다.
- 메모리 정책 최적화: 현재 max_token_limit을 200으로 설정했는데, 실제 사용 패턴을 분석하여 최적의 값을 찾아갈 예정입니다.