[Spring] SSE를 통한 실시간 통신 서비스 도입 (1)
개발 환경
- Java 11
- SpringBoot 2.7.17
도입 배경
현재 진행중인 프로젝트는 협업 툴 제공 서비스입니다.
서비스 구조를 말씀드리면 워크스페이스(Workspace)
라는 그룹이 있고 그 안에는 프로젝트(Project)
를 생성할 수 있습니다. 그리고 각각의 프로젝트에는 할 일(Task)
들을 할당할 수 있습니다.
여기서 A 회원이 C 프로젝트에 있는 Task 목록을 보고있는데, 만약 B 회원이 C 프로젝트의 어떤 Task를 변경한다거나 위치를 이동시킨 후 A 회원에게 해당 Task에 대해서 얘기를 하면 어떻게 될까요?
A 회원이 보고 있는 Task와 B 회원이 보고 있는 Task의 상태가 달라서 대화의 핀트가 어긋날 수 있을 것입니다.
만약 B 회원이 Task의 상태를 변경했을 때, A 회원이 보고 있는 Task 목록에 변경 사항이 바로 반영된다면 위와 같은 상황을 방지할 수 있습니다.
또한, 실시간 서비스를 도입하면 특정 회원에게 워크스페이스에 초대되었다거나 어떤 Task에 대한 담당자로 지정되었다는 등의 알림을 전송할 수 있습니다.
이러한 서비스는 사용자 경험을 향상할 수 있다고 생각하여 도입하게 되었습니다.
왜 SSE 인가?
실시간 통신을 구현하는 데 고려했던 기술들은 Polling
, WebSocket
, Server-Sent Events (SSE)
이렇게 총 3가지가 있었습니다.
저는 이 중에서 SSE
를 선택했는데 그 이유를 말씀드리겠습니다.
Polling
우선 Polling
입니다. Polling
은 클라이언트가 일정한 간격으로 서버에게 데이터를 주기적으로 요청하는 방식입니다.
클라이언트는 주기적으로 서버에게 “새로운 데이터 있어?” 라고 물어보고, 서버는 데이터가 업데이트되면 응답합니다. 이 방식은 불필요한 HTTP요청이 계속 발생하기 때문에 패스했습니다.
WebSocket
WebSocket
은 클라이언트와 서버 간 양방향 실시간 통신을 지원합니다.
한 번 요청을 보내면 연결이 지속되기 때문에 리소스 낭비의 걱정이 덜어집니다.
하지만 저희 프로젝트에는 WebSocket
을 사용하지 않고 SSE
를 도입했습니다. 그 이유는 아래에 작성하겠습니다.
Server-Sent Events (SSE)
SSE
는 WebSocket
과 같이 한 번 요청을 보내면 연결을 지속할 수 있습니다.
WebSocket
과 다른 점은 단방향 통신이라는 것입니다. 즉, 클라이언트에서 연결 요청을 보내면 연결이 종료되기 전까지 서버에서 클라이언트로 응답만 보낼 수 있습니다.
저희 프로젝트에 실시간 서비스를 도입하는 데 클라이언트의 요청은 필요하지 않으므로 SSE
기술을 통해 실시간 서비스를 도입했습니다.
저장소(Repository)에 대한 고민
SSE
를 사용하려는 서비스는 2가지가 있었습니다.
- 워크스페이스 초대, 할 일 담당자 지정 등의 알림 서비스
- 서비스를 이용하는 중, 실시간 변경 사항 적용
사실 실시간 변경 사항이 적용되는 부분은 해당 서비스를 이용하는 중이 아니라면 실시간으로 데이터를 응답 받을 필요가 없습니다.
하지만 알림의 경우는 SSE
가 연결되어 있지 않더라도 이벤트가 발생하면 알림을 보관해뒀다가 사용자가 서비스에 접속하면 알림들을 전송해 줄 필요가 있었습니다.
따라서 발생되는 event
들을 어떻게 저장하고 발송해야 할까에 대한 고민을 하다 다음 방법으로 처리했습니다.
우선 발생되는 모든 event
는 메모리에 저장합니다. 그리고 알림 이벤트는 Notification
이라는 테이블을 하나 만들어서 따로 저장합니다.
따라서 갑작스럽게 서버가 종료되거나 네트워크 문제가 발생했을 시, 알림 이벤트를 잃어버리는 문제를 막을 수 있습니다.
여기서 사용자가 서비스를 종료한 뒤에 발생한 알림 이벤트들은 SSE
가 종료된 시점과 사용자가 서비스를 다시 연결하여 SSE
가 다시 시작된 시점을 비교하여 그 사이의 이벤트들을 전송해 줍니다.
SSE 구현
SSE
는 Spring
의 SseEmitter
를 사용했습니다.
Controller
@RequiredArgsConstructor
@RequestMapping("/api")
@RestController
public class NotificationController {
private final NotificationService notificationService;
@GetMapping(value = "/notifications/subscribe", produces = "text/event-stream")
public SseEmitter subscribeAlarm(
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
return notificationService.subscribeAlarm(lastEventId);
}
@GetMapping(value = "/subscribe/workspaces/{workspaceId}/type", produces = "text/event-stream")
public SseEmitter subscribeTasks(@PathVariable Long workspaceId ,
@ModelAttribute TasksNotificationParams params) {
return notificationService.subscribeTasks(workspaceId, params);
}
@GetMapping(value = "/subscribe/workspaces/projects/tasks/{taskId}", produces = "text/event-stream")
public SseEmitter subscribeTask(@PathVariable Long taskId) {
return notificationService.subscribeTask(taskId);
}
}
- 연결 요청을 처리하기 위해서
MIME Type
을text/event-stream
으로 설정해야 합니다. Last-Event-Id
는 이전에 받지 못한event
가 존재하는 경우, 마지막으로 받은eventId
를 전달받습니다.Project
의Task
목록은 내가 담당자인Task
, 북마크한Task
등 여러 조건에 따라 필터링이 적용됩니다. 따라서 사용자가 어떤 화면을 보고 있는지 구분하기 위해TaskNotificationParams
를 받아와서 처리합니다.
Repository
@NoArgsConstructor
@Repository
public class EmitterRepositoryImpl implements EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
@Override
public Map<String, SseEmitter> findAll() {
return emitters;
}
@Override
public Map<String, SseEmitter> findAllEmitterStartWithMemberId(String memberId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, SseEmitter> findAllEmittersStartWithTasks() {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("tasks"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, SseEmitter> findAllEmitterStartWithTask() {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("task_"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void deleteById(String id) {
emitters.remove(id);
}
}
@Getter
public class TasksNotificationParams {
private Long projectId;
private Long boardId;
private boolean bookmark;
private boolean my;
public boolean checkValidRequest() {
return projectId != null || boardId != null || bookmark || my;
}
public String getSuffixByParam() {
if (bookmark) {
return "_bookmark_";
} else if (my) {
return "_my_";
} else if (boardId != null) {
return "_board_" + boardId + "_project_" + projectId + "_";
} else {
return "_project_" + projectId + "_";
}
}
}
- 동시성을 고려하여
ConcurrentHashMap
을 통해SSE
를 저장해 줍니다. EmitterId
는 회원의 식별자를 넣어 구분했습니다. 그리고 알림과 실시간 변경 사항을 분리하기 위해 저만의 알고리즘(?)을 넣어서startsWith
로 위에서 말씀드린 필터링된 할 일 목록을 구분했습니다.
서비스 로직은 다음 게시글에 올리겠습니다.
Leave a comment