[AINFO] 실시간성을 위한 챗봇 로직 비동기 처리 및 Streaming 추가

⚠️ 안내: 이 글은 학습 기록용입니다. 오류나 보완 의견은 댓글로 알려주세요.

📌 관련 개념 정리

관련된 기술에 대한 개념적 정리는 아래 포스트를 참고해주세요.

🧭 배경

이전에 구현한 WebSocket 기반 통신 구조에서는 서버와 클라이언트가 연결을 유지한 상태로 메시지를 주고받는 흐름이 정상적으로 동작하고 있었습니다. consumer.py의 메시지 처리 로직 또한 비동기 방식으로 구현되어 있어, 여러 사용자가 동시에 접속하더라도 각 WebSocket 연결을 무리 없이 처리할 수 있는 구조였습니다.

그러나 내부 동작을 자세히 살펴보면, WebSocket 통신과 Consumer 레벨에서는 비동기 처리가 이루어지고 있었던 반면, 챗봇 응답을 생성하는 로직은 여전히 동기적으로 동작하고 있었습니다. 이로 인해 사용자 메시지를 수신한 이후 LLM 응답이 완전히 생성될 때까지 해당 WebSocket 연결이 응답을 반환하지 않는 구조였으며, 응답 생성 시간이 길어질수록 하나의 연결이 이벤트 루프를 상대적으로 오래 점유하게 되는 문제가 있었습니다.

이러한 구조에서는

  • 응답이 길어질수록 첫 출력까지의 대기 시간이 증가하고,
  • 사용자는 응답이 생성 중인지 혹은 처리가 중단된 것인지 판단하기 어려웠습니다.
  • 또한 실시간 통신이 제공할 수 있는 사용자 경험상의 이점을 충분히 활용하지 못했으며, 동시 접속자가 증가할 경우 처리 효율과 응답 체감 속도 측면에서도 한계를 가질 수 있는 구조였습니다.

이에 따라 이번 작업에서는 챗봇 응답 생성 로직을 비동기 흐름에 맞게 재구성하고, 응답을 한 번에 전달하는 방식이 아닌 생성되는 즉시 점진적으로 전달하는 스트리밍 방식을 도입하게 되었습니다.

🧩 구현 과정

1. 챗봇 로직 변경

기존 코드

# chatbot/utils.py

def get_chatbot_response(user_message):
    ...
    llm = chatopenai(model_name="gpt-4o-mini", temperature=0.3)

    response = rag_chain.invoke(user_message)

    return {
        "response": response,
        "retrieved_context": retrieved_context,  # rag에서 가져온 문서 같이 출력되도록 함(확인용)
    }

  • 기존 get_chatbot_response()는 내부에서 LLM과 RAG 체인을 실행한 뒤, 응답이 완전히 생성된 이후에만 결과를 반환하는 구조

변경된 코드

# after
async def get_chatbot_response(user_message):
    ...
    llm = chatopenai(model_name="gpt-4o-mini", temperature=0.3, streaming=True)

    async for chunk in rag_chain.astream(user_message):
    yield chunk
  • 단순히 async로 바꾸는 것이 아니라, LLM 응답을 한 번에 return 하지 않고 yield로 흘려보내는 구조로 변경했습니다.
  • streaming=True 옵션을 통해 LLM이 토큰 단위로 응답을 생성하도록 설정했습니다.
  • 응답이 전부 생성될 때까지 기다리는 invoke() 대신 LLM이 토큰을 만들어내는 대로 값을 받는 astream() 사용
  • 결과적으로 이 함수는 응답을 모두 만든 뒤 반환하는 구조에서, 응답 생성과 전달이 동시에 이루어지는 비동기 generator 구조로 변경되었습니다.

2. WebSocket Consumer 변경

기존 코드

class ChatConsumer(AsyncWebsocketconsumer):
    ...


    async def receive(self, text_data):
        ...

        llm_response = await self.llm_response(user_message)
        await self.send({
            "response": llm_response
        })

    async def llm_response(self, user_message):
        llm_response = get_chatbot_response(user_message)
        response = llm_response.get("response")
        return response
  • 기존 Consumer는 메시지를 수신한 뒤, LLM 응답이 완전히 생성될 때까지 기다린 후 한 번만 send() 하는 구조였습니다.
  • 해당 구조의 Consumer는 async로 동작하지만, 내부에서 호출하는 LLM 로직이 끝날 때까지 실질적인 응답 전송이 발생하지 않아 대기 시간이 길어지고, 클라이언트는 처리 진행 여부로 알 수 없었습니다.

변경된 코드

class ChatConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        ...
        # 생성되고 있는 답변의 chunk과 스트리밍 중임을 알림
        async for chunk in get_chatbot_response(user_message):
            await self.send(
                text_data=json.dumps(
                    {"response": chunk, "is_streaming": True},
                    ensure_ascii=False,
                )
            )

        # 완전히 답변 생성이 끝나면 최종 답변과 스트리밍이 끝남을 알림
        await self.send(
            text_data=json.dumps(
                {"response": chunk, "is_streaming": False},
                ensure_ascii=False,
            )
        )
  • 앞서 챗봇 로직을 비동기 generator 형태로 변경했기 때문에, Consumer의 역할도 “응답을 받아서 전송”하는 방식에서 **“응답 생성 흐름을 그대로 중계”하는 방식으로 변경했습니다.
  • async for 루프를 통해 응답 생성과 동시에 전송(send())합니다.
  • 첫 토큰이 생성되는 시점부터 클라이언트로 응답이 전달됩니다.
  • 응답 생성과 전송이 동시에 진행됩니다.
  • 또한, 스트리밍 종료 시점을 명확히 알리기 위해 모든 chunk 전송이 끝난 뒤 한 번 더 메시지를 전송합니다.

🔄 마무리

구현 성과

이번 작업을 통해 WebSocket 기반 챗봇의 응답 처리 구조를 한 단계 더 실시간성에 맞게 개선할 수 있었습니다. LLM 응답이 완전히 생성될 때까지 기다리지 않고, 첫 토큰 생성 시점부터 클라이언트로 응답을 전달할 수 있게 되었습니다. 사용자는 응답이 생성 중임을 즉시 인지할 수 있어, 대기 시간에 대한 체감이 크게 줄어들었습니다.

결과적으로 WebSocket을 사용하고 있음에도 동기 호출에 가까웠던 병목 지점을 제거하고, 실시간 통신 구조의 장점을 응답 생성 단계까지 확장할 수 있었습니다.


© 2024. All rights reserved.

Powered by Hydejack v9.2.1