[Spring] SSE를 통한 실시간 통신 서비스 도입 (2)
Service
서비스는 알림 부분만 설명드리고 전체적인 코드를 하단에 첨부하겠습니다.
/**
* 알람 이벤트 구독
*/
public SseEmitter subscribeAlarm(String lastEventId) {
String memberId = sessionMemberProvider.getMemberId();
String emitterId = memberId + "_" + System.currentTimeMillis();
SseEmitter emitter = emitterInitialSetting(emitterId);
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, memberId, emitterId, emitter);
}
return emitter;
}
private boolean hasLostData(String lastEventId) {
return !lastEventId.isEmpty();
}
private void sendLostData(String lastEventId, String memberId, String emitterId, SseEmitter emitter) {
long now = Long.parseLong(lastEventId.substring(memberId.length() + 1));
List<Notification> notifications = notificationRepository.findNotSentNotifications(memberId, now);
notifications.forEach(notification -> sendNotification(
emitter,
emitterId,
emitterId,
notification.getNotificationData(),
String.valueOf(notification.getNotificationType())
)
);
}
- 클라이언트에서 요청을 보내면 구독 처리를 하는 로직입니다.
emitterId
는 회원 식별자와 현재 시간으로 구성했습니다.- 미발신된 알림 목록은
emitterId
와lastEventId
의suffix
를 비교하여 전송됩니다.
/**
* 알림 이벤트 응답 보내기
*/
public void sendAlarm(NotificationType type, Object data, String memberId) throws JsonProcessingException {
long whenEventPublished = System.currentTimeMillis();
String eventId = memberId + "_" + whenEventPublished;
Notification notification = notificationRepository.save(
createNotification(type, objectMapper.writeValueAsString(data), memberId, whenEventPublished)
);
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithMemberId(memberId);
emitters.forEach(
(key, emitter) -> sendNotification(
emitter,
eventId,
key,
new NotificationSummary(notification),
String.valueOf(type)
)
);
}
- 알림은 메모리뿐만 아니라 DB에도 저장해 줍니다.
emitterRepository
에서 발생한event
와 관련된emitter
들을 조회하여 응답을 전송합니다.
@Scheduled(fixedRate = 130_000)
protected void sendHeartbeat() {
Map<String, SseEmitter> emitterMap = emitterRepository.findAll();
log.debug("send heartbeat to all emitter... emitter count: {}", emitterMap.values().size());
emitterMap.forEach((key, emitter) -> {
try {
emitter.send(SseEmitter.event().name("heartbeat").data(""));
} catch (IOException e) {
emitterRepository.deleteById(key);
}
});
}
- 스케줄러를 통해 사용하지 않지만 남아있는
emitter
를 제거해 줍니다.
아래는 Notification
테이블과 NotificationService
전체 로직입니다.
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
public class Notification {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "notification_id")
private Long id;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private NotificationType notificationType;
private String notificationData;
private String memberId;
private long whenEventPublished;
@Builder
public Notification(NotificationType notificationType, String notificationData, String memberId, long whenEventPublished) {
this.notificationType = notificationType;
this.notificationData = notificationData;
this.memberId = memberId;
this.whenEventPublished = whenEventPublished;
}
}
@Slf4j
@RequiredArgsConstructor
@Transactional
@Service
public class NotificationService {
private final SessionMemberProvider sessionMemberProvider;
private final EmitterRepository emitterRepository;
private final NotificationRepository notificationRepository;
private final ObjectMapper objectMapper;
private static final String TASKS_EMITTER_ID_PREFIX = "tasks_";
private static final String TASK_EMITTER_ID_PREFIX = "task_";
private static final String NOTIFICATION_TYPE_MESSAGE = "MESSAGE";
/**
* 알람 이벤트 구독
*/
public SseEmitter subscribeAlarm(String lastEventId) {
String memberId = sessionMemberProvider.getMemberId();
String emitterId = memberId + "_" + System.currentTimeMillis();
SseEmitter emitter = emitterInitialSetting(emitterId);
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, memberId, emitterId, emitter);
}
return emitter;
}
private SseEmitter emitterInitialSetting(String emitterId) {
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(Long.MAX_VALUE));
emitter.onCompletion(() -> {
emitterRepository.deleteById(emitterId);
log.debug("emitter completed... emitter-id: {}", emitterId);
});
emitter.onTimeout(() -> {
emitterRepository.deleteById(emitterId);
log.debug("emitter timeout... emitter-id: {}", emitterId);
});
emitter.onError((e) -> emitter.complete());
// 503 에러 방지
String connectType = "CONNECTED";
sendNotification(emitter, emitterId, emitterId, null, connectType);
return emitter;
}
private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data, String type) {
try {
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event()
.id(eventId)
.name(type);
emitter.send(eventBuilder.data(Objects.requireNonNullElse(data, "")));
} catch (IOException e) {
emitterRepository.deleteById(emitterId);
}
}
private boolean hasLostData(String lastEventId) {
return !lastEventId.isEmpty();
}
private void sendLostData(String lastEventId, String memberId, String emitterId, SseEmitter emitter) {
long now = Long.parseLong(lastEventId.substring(memberId.length() + 1));
List<Notification> notifications = notificationRepository.findNotSentNotifications(memberId, now);
notifications.forEach(notification -> sendNotification(
emitter,
emitterId,
emitterId,
notification.getNotificationData(),
String.valueOf(notification.getNotificationType())
)
);
}
/**
* 알림 이벤트 응답 보내기
*/
public void sendAlarm(NotificationType type, Object data, String memberId) throws JsonProcessingException {
long whenEventPublished = System.currentTimeMillis();
String eventId = memberId + "_" + whenEventPublished;
Notification notification = notificationRepository.save(
createNotification(type, objectMapper.writeValueAsString(data), memberId, whenEventPublished)
);
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithMemberId(memberId);
emitters.forEach(
(key, emitter) -> sendNotification(
emitter,
eventId,
key,
new NotificationSummary(notification),
String.valueOf(type)
)
);
}
private Notification createNotification(NotificationType notificationType,
String notificationData,
String memberId,
long now) {
return Notification.builder()
.notificationType(notificationType)
.notificationData(notificationData)
.memberId(memberId)
.whenEventPublished(now)
.build();
}
/**
* 할 일 목록 이벤트 구독
*/
public SseEmitter subscribeTasks(Long workspaceId, TasksNotificationParams params) {
if (!params.checkValidRequest()) {
throw new TypeNotSpecifiedException();
}
String memberId = sessionMemberProvider.getMemberId();
String emitterId = TASKS_EMITTER_ID_PREFIX + workspaceId + params.getSuffixByParam() + memberId;
return emitterInitialSetting(emitterId);
}
/**
* 할 일 목록 이벤트 응답 보내기
*/
public void sendFindTasksEventNotification(Long workspaceId, Long projectId, Long boardId) {
Map<String, SseEmitter> emitters = emitterRepository.findAllEmittersStartWithTasks();
emitters.forEach((key, emitter) -> {
String[] conditions = key.split("_");
String paramType = conditions[2];
Long emitterWorkspaceId = Long.parseLong(conditions[1]);
switch (paramType) {
case "board":
if (workspaceId.equals(emitterWorkspaceId) &&
projectId.equals(Long.parseLong(conditions[5])) &&
boardId.equals(Long.parseLong(conditions[3]))) {
sendNotification(emitter, key, key, null, NOTIFICATION_TYPE_MESSAGE);
}
break;
case "project":
if (workspaceId.equals(emitterWorkspaceId) && projectId.equals(Long.parseLong(conditions[3]))) {
sendNotification(emitter, key, key, null, NOTIFICATION_TYPE_MESSAGE);
}
break;
case "my":
case "bookmark":
if (workspaceId.equals(emitterWorkspaceId)) {
sendNotification(emitter, key, key, null, NOTIFICATION_TYPE_MESSAGE);
}
break;
}
});
}
/**
* 할 일 상세 보기 이벤트 구독
*/
public SseEmitter subscribeTask(Long taskId) {
String memberId = sessionMemberProvider.getMemberId();
String emitterId = TASK_EMITTER_ID_PREFIX + taskId + "_" + memberId;
return emitterInitialSetting(emitterId);
}
/**
* 할 일 상세 보기 이벤트 응답
*/
public void sendFindTaskEventNotification(Long taskId) {
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithTask();
emitters.forEach((key, emitter) -> {
Long findTaskId = Long.valueOf(StringUtils.substringBetween(key, TASK_EMITTER_ID_PREFIX, "_"));
if (Objects.equals(taskId, findTaskId)) {
sendNotification(emitter, key, key, null, NOTIFICATION_TYPE_MESSAGE);
}
});
}
@Scheduled(fixedRate = 130_000)
protected void sendHeartbeat() {
Map<String, SseEmitter> emitterMap = emitterRepository.findAll();
log.debug("send heartbeat to all emitter... emitter count: {}", emitterMap.values().size());
emitterMap.forEach((key, emitter) -> {
try {
emitter.send(SseEmitter.event().name("heartbeat").data(""));
} catch (IOException e) {
emitterRepository.deleteById(key);
}
});
}
}
Leave a comment