개발 환경

  • Java 11
  • SpringBoot 2.7.17


도입 배경

현재 진행중인 프로젝트는 협업 툴 제공 서비스입니다.

서비스 구조를 말씀드리면 워크스페이스(Workspace)라는 그룹이 있고 그 안에는 프로젝트(Project)를 생성할 수 있습니다. 그리고 각각의 프로젝트에는 할 일(Task)들을 할당할 수 있습니다.

여기서 A 회원이 C 프로젝트에 있는 Task 목록을 보고있는데, 만약 B 회원이 C 프로젝트의 어떤 Task를 변경한다거나 위치를 이동시킨 후 A 회원에게 해당 Task에 대해서 얘기를 하면 어떻게 될까요?

A 회원이 보고 있는 Task와 B 회원이 보고 있는 Task의 상태가 달라서 대화의 핀트가 어긋날 수 있을 것입니다.

만약 B 회원이 Task의 상태를 변경했을 때, A 회원이 보고 있는 Task 목록에 변경 사항이 바로 반영된다면 위와 같은 상황을 방지할 수 있습니다.

또한, 실시간 서비스를 도입하면 특정 회원에게 워크스페이스에 초대되었다거나 어떤 Task에 대한 담당자로 지정되었다는 등의 알림을 전송할 수 있습니다.

이러한 서비스는 사용자 경험을 향상할 수 있다고 생각하여 도입하게 되었습니다.


왜 SSE 인가?

실시간 통신을 구현하는 데 고려했던 기술들은 Polling, WebSocket, Server-Sent Events (SSE) 이렇게 총 3가지가 있었습니다.

저는 이 중에서 SSE를 선택했는데 그 이유를 말씀드리겠습니다.


Polling

우선 Polling입니다. Polling은 클라이언트가 일정한 간격으로 서버에게 데이터를 주기적으로 요청하는 방식입니다.

클라이언트는 주기적으로 서버에게 “새로운 데이터 있어?” 라고 물어보고, 서버는 데이터가 업데이트되면 응답합니다. 이 방식은 불필요한 HTTP요청이 계속 발생하기 때문에 패스했습니다.

WebSocket

WebSocket은 클라이언트와 서버 간 양방향 실시간 통신을 지원합니다.

한 번 요청을 보내면 연결이 지속되기 때문에 리소스 낭비의 걱정이 덜어집니다.

하지만 저희 프로젝트에는 WebSocket을 사용하지 않고 SSE를 도입했습니다. 그 이유는 아래에 작성하겠습니다.

Server-Sent Events (SSE)

SSEWebSocket과 같이 한 번 요청을 보내면 연결을 지속할 수 있습니다.

WebSocket과 다른 점은 단방향 통신이라는 것입니다. 즉, 클라이언트에서 연결 요청을 보내면 연결이 종료되기 전까지 서버에서 클라이언트로 응답만 보낼 수 있습니다.

저희 프로젝트에 실시간 서비스를 도입하는 데 클라이언트의 요청은 필요하지 않으므로 SSE 기술을 통해 실시간 서비스를 도입했습니다.


저장소(Repository)에 대한 고민

SSE를 사용하려는 서비스는 2가지가 있었습니다.

  1. 워크스페이스 초대, 할 일 담당자 지정 등의 알림 서비스
  2. 서비스를 이용하는 중, 실시간 변경 사항 적용

사실 실시간 변경 사항이 적용되는 부분은 해당 서비스를 이용하는 중이 아니라면 실시간으로 데이터를 응답 받을 필요가 없습니다.

하지만 알림의 경우는 SSE가 연결되어 있지 않더라도 이벤트가 발생하면 알림을 보관해뒀다가 사용자가 서비스에 접속하면 알림들을 전송해 줄 필요가 있었습니다.

따라서 발생되는 event들을 어떻게 저장하고 발송해야 할까에 대한 고민을 하다 다음 방법으로 처리했습니다.

우선 발생되는 모든 event는 메모리에 저장합니다. 그리고 알림 이벤트는 Notification 이라는 테이블을 하나 만들어서 따로 저장합니다.

따라서 갑작스럽게 서버가 종료되거나 네트워크 문제가 발생했을 시, 알림 이벤트를 잃어버리는 문제를 막을 수 있습니다.

여기서 사용자가 서비스를 종료한 뒤에 발생한 알림 이벤트들은 SSE가 종료된 시점과 사용자가 서비스를 다시 연결하여 SSE가 다시 시작된 시점을 비교하여 그 사이의 이벤트들을 전송해 줍니다.

SSE 구현

SSESpringSseEmitter를 사용했습니다.


Controller

@RequiredArgsConstructor
@RequestMapping("/api")
@RestController
public class NotificationController {

    private final NotificationService notificationService;

    @GetMapping(value = "/notifications/subscribe", produces = "text/event-stream")
    public SseEmitter subscribeAlarm(
            @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {

        return notificationService.subscribeAlarm(lastEventId);
    }

    @GetMapping(value = "/subscribe/workspaces/{workspaceId}/type", produces = "text/event-stream")
    public SseEmitter subscribeTasks(@PathVariable Long workspaceId ,
                                     @ModelAttribute TasksNotificationParams params) {

        return notificationService.subscribeTasks(workspaceId, params);
    }

    @GetMapping(value = "/subscribe/workspaces/projects/tasks/{taskId}", produces = "text/event-stream")
    public SseEmitter subscribeTask(@PathVariable Long taskId) {

        return notificationService.subscribeTask(taskId);
    }
}
  • 연결 요청을 처리하기 위해서 MIME Typetext/event-stream으로 설정해야 합니다.
  • Last-Event-Id는 이전에 받지 못한 event가 존재하는 경우, 마지막으로 받은 eventId를 전달받습니다.
  • ProjectTask목록은 내가 담당자인 Task, 북마크한 Task 등 여러 조건에 따라 필터링이 적용됩니다. 따라서 사용자가 어떤 화면을 보고 있는지 구분하기 위해 TaskNotificationParams를 받아와서 처리합니다.


Repository

@NoArgsConstructor
@Repository
public class EmitterRepositoryImpl implements EmitterRepository {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    @Override
    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

    @Override
    public Map<String, SseEmitter> findAll() {
        return emitters;
    }

    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithMemberId(String memberId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public Map<String, SseEmitter> findAllEmittersStartWithTasks() {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith("tasks"))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithTask() {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith("task_"))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public void deleteById(String id) {
        emitters.remove(id);
    }
}
@Getter
public class TasksNotificationParams {

    private Long projectId;

    private Long boardId;

    private boolean bookmark;

    private boolean my;

    public boolean checkValidRequest() {
        return projectId != null || boardId != null || bookmark || my;
    }

    public String getSuffixByParam() {
        if (bookmark) {
            return "_bookmark_";
        } else if (my) {
            return "_my_";
        } else if (boardId != null) {
            return "_board_" + boardId + "_project_" + projectId + "_";
        } else {
            return "_project_" + projectId + "_";
        }
    }
}
  • 동시성을 고려하여 ConcurrentHashMap을 통해 SSE를 저장해 줍니다.
  • EmitterId는 회원의 식별자를 넣어 구분했습니다. 그리고 알림과 실시간 변경 사항을 분리하기 위해 저만의 알고리즘(?)을 넣어서 startsWith로 위에서 말씀드린 필터링된 할 일 목록을 구분했습니다.

서비스 로직은 다음 게시글에 올리겠습니다.

Categories:

Updated:

Leave a comment