Service

서비스는 알림 부분만 설명드리고 전체적인 코드를 하단에 첨부하겠습니다.


/**
 * 알람 이벤트 구독
 */
public SseEmitter subscribeAlarm(String lastEventId) {
        String memberId = sessionMemberProvider.getMemberId();
        String emitterId = memberId + "_" + System.currentTimeMillis();

        SseEmitter emitter = emitterInitialSetting(emitterId);

        if (hasLostData(lastEventId)) {
            sendLostData(lastEventId, memberId, emitterId, emitter);
        }

        return emitter;
    }
private boolean hasLostData(String lastEventId) {
    return !lastEventId.isEmpty();
}

private void sendLostData(String lastEventId, String memberId, String emitterId, SseEmitter emitter) {
    long now = Long.parseLong(lastEventId.substring(memberId.length() + 1));

    List<Notification> notifications = notificationRepository.findNotSentNotifications(memberId, now);
    notifications.forEach(notification -> sendNotification(
                    emitter,
                    emitterId,
                    emitterId,
                    notification.getNotificationData(),
                    String.valueOf(notification.getNotificationType())
            )
    );
}
  • 클라이언트에서 요청을 보내면 구독 처리를 하는 로직입니다.
  • emitterId는 회원 식별자와 현재 시간으로 구성했습니다.
  • 미발신된 알림 목록은 emitterIdlastEventIdsuffix를 비교하여 전송됩니다.


/**
 * 알림 이벤트 응답 보내기
 */
public void sendAlarm(NotificationType type, Object data, String memberId) throws JsonProcessingException {
    long whenEventPublished = System.currentTimeMillis();

    String eventId = memberId + "_" + whenEventPublished;
    Notification notification = notificationRepository.save(
            createNotification(type, objectMapper.writeValueAsString(data), memberId, whenEventPublished)
    );

    Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithMemberId(memberId);
    emitters.forEach(
            (key, emitter) -> sendNotification(
                    emitter,
                    eventId,
                    key,
                    new NotificationSummary(notification),
                    String.valueOf(type)
            )
    );
}
  • 알림은 메모리뿐만 아니라 DB에도 저장해 줍니다.
  • emitterRepository에서 발생한 event와 관련된 emitter들을 조회하여 응답을 전송합니다.


@Scheduled(fixedRate = 130_000)
protected void sendHeartbeat() {
    Map<String, SseEmitter> emitterMap = emitterRepository.findAll();
    log.debug("send heartbeat to all emitter... emitter count: {}", emitterMap.values().size());

    emitterMap.forEach((key, emitter) -> {
        try {
            emitter.send(SseEmitter.event().name("heartbeat").data(""));
        } catch (IOException e) {
            emitterRepository.deleteById(key);
        }
    });
}
  • 스케줄러를 통해 사용하지 않지만 남아있는 emitter를 제거해 줍니다.


아래는 Notification 테이블과 NotificationService 전체 로직입니다.

@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
public class Notification {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "notification_id")
    private Long id;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private NotificationType notificationType;

    private String notificationData;

    private String memberId;

    private long whenEventPublished;

    @Builder
    public Notification(NotificationType notificationType, String notificationData, String memberId, long whenEventPublished) {
        this.notificationType = notificationType;
        this.notificationData = notificationData;
        this.memberId = memberId;
        this.whenEventPublished = whenEventPublished;
    }
}
@Slf4j
@RequiredArgsConstructor
@Transactional
@Service
public class NotificationService {

    private final SessionMemberProvider sessionMemberProvider;
    private final EmitterRepository emitterRepository;
    private final NotificationRepository notificationRepository;
    private final ObjectMapper objectMapper;

    private static final String TASKS_EMITTER_ID_PREFIX = "tasks_";
    private static final String TASK_EMITTER_ID_PREFIX = "task_";
    private static final String NOTIFICATION_TYPE_MESSAGE = "MESSAGE";


    /**
     * 알람 이벤트 구독
     */
    public SseEmitter subscribeAlarm(String lastEventId) {
        String memberId = sessionMemberProvider.getMemberId();
        String emitterId = memberId + "_" + System.currentTimeMillis();

        SseEmitter emitter = emitterInitialSetting(emitterId);

        if (hasLostData(lastEventId)) {
            sendLostData(lastEventId, memberId, emitterId, emitter);
        }

        return emitter;
    }

    private SseEmitter emitterInitialSetting(String emitterId) {
        SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(Long.MAX_VALUE));
        emitter.onCompletion(() -> {
            emitterRepository.deleteById(emitterId);
            log.debug("emitter completed... emitter-id: {}", emitterId);
        });
        emitter.onTimeout(() -> {
            emitterRepository.deleteById(emitterId);
            log.debug("emitter timeout... emitter-id: {}", emitterId);
        });
        emitter.onError((e) -> emitter.complete());

        // 503 에러 방지
        String connectType = "CONNECTED";
        sendNotification(emitter, emitterId, emitterId, null, connectType);
        return emitter;
    }

    private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data, String type) {
        try {
            SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event()
                    .id(eventId)
                    .name(type);
            emitter.send(eventBuilder.data(Objects.requireNonNullElse(data, "")));
        } catch (IOException e) {
            emitterRepository.deleteById(emitterId);
        }
    }

    private boolean hasLostData(String lastEventId) {
        return !lastEventId.isEmpty();
    }

    private void sendLostData(String lastEventId, String memberId, String emitterId, SseEmitter emitter) {
        long now = Long.parseLong(lastEventId.substring(memberId.length() + 1));

        List<Notification> notifications = notificationRepository.findNotSentNotifications(memberId, now);
        notifications.forEach(notification -> sendNotification(
                        emitter,
                        emitterId,
                        emitterId,
                        notification.getNotificationData(),
                        String.valueOf(notification.getNotificationType())
                )
        );
    }

    /**
     * 알림 이벤트 응답 보내기
     */
    public void sendAlarm(NotificationType type, Object data, String memberId) throws JsonProcessingException {
        long whenEventPublished = System.currentTimeMillis();

        String eventId = memberId + "_" + whenEventPublished;
        Notification notification = notificationRepository.save(
                createNotification(type, objectMapper.writeValueAsString(data), memberId, whenEventPublished)
        );

        Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithMemberId(memberId);
        emitters.forEach(
                (key, emitter) -> sendNotification(
                        emitter,
                        eventId,
                        key,
                        new NotificationSummary(notification),
                        String.valueOf(type)
                )
        );
    }

    private Notification createNotification(NotificationType notificationType,
                                            String notificationData,
                                            String memberId,
                                            long now) {
        return Notification.builder()
                .notificationType(notificationType)
                .notificationData(notificationData)
                .memberId(memberId)
                .whenEventPublished(now)
                .build();
    }

    /**
     * 할 일 목록 이벤트 구독
     */
    public SseEmitter subscribeTasks(Long workspaceId, TasksNotificationParams params) {
        if (!params.checkValidRequest()) {
            throw new TypeNotSpecifiedException();
        }

        String memberId = sessionMemberProvider.getMemberId();
        String emitterId = TASKS_EMITTER_ID_PREFIX + workspaceId + params.getSuffixByParam() + memberId;

        return emitterInitialSetting(emitterId);
    }

    /**
     * 할 일 목록 이벤트 응답 보내기
     */
    public void sendFindTasksEventNotification(Long workspaceId, Long projectId, Long boardId) {
        Map<String, SseEmitter> emitters = emitterRepository.findAllEmittersStartWithTasks();
        emitters.forEach((key, emitter) -> {
            String[] conditions = key.split("_");
            String paramType = conditions[2];
            Long emitterWorkspaceId = Long.parseLong(conditions[1]);

            switch (paramType) {
                case "board":
                    if (workspaceId.equals(emitterWorkspaceId) &&
                            projectId.equals(Long.parseLong(conditions[5])) &&
                            boardId.equals(Long.parseLong(conditions[3]))) {
                        sendNotification(emitter, key, key, null, NOTIFICATION_TYPE_MESSAGE);
                    }
                    break;
                case "project":
                    if (workspaceId.equals(emitterWorkspaceId) && projectId.equals(Long.parseLong(conditions[3]))) {
                        sendNotification(emitter, key, key, null, NOTIFICATION_TYPE_MESSAGE);
                    }
                    break;
                case "my":
                case "bookmark":
                    if (workspaceId.equals(emitterWorkspaceId)) {
                        sendNotification(emitter, key, key, null, NOTIFICATION_TYPE_MESSAGE);
                    }
                    break;
            }
        });
    }

    /**
     * 할 일 상세 보기 이벤트 구독
     */
    public SseEmitter subscribeTask(Long taskId) {
        String memberId = sessionMemberProvider.getMemberId();
        String emitterId = TASK_EMITTER_ID_PREFIX + taskId + "_" + memberId;

        return emitterInitialSetting(emitterId);
    }

    /**
     * 할 일 상세 보기 이벤트 응답
     */
    public void sendFindTaskEventNotification(Long taskId) {
        Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithTask();
        emitters.forEach((key, emitter) -> {
            Long findTaskId = Long.valueOf(StringUtils.substringBetween(key, TASK_EMITTER_ID_PREFIX, "_"));
            if (Objects.equals(taskId, findTaskId)) {
                sendNotification(emitter, key, key, null, NOTIFICATION_TYPE_MESSAGE);
            }
        });
    }

    @Scheduled(fixedRate = 130_000)
    protected void sendHeartbeat() {
        Map<String, SseEmitter> emitterMap = emitterRepository.findAll();
        log.debug("send heartbeat to all emitter... emitter count: {}", emitterMap.values().size());

        emitterMap.forEach((key, emitter) -> {
            try {
                emitter.send(SseEmitter.event().name("heartbeat").data(""));
            } catch (IOException e) {
                emitterRepository.deleteById(key);
            }
        });
    }
}

Categories:

Updated:

Leave a comment