Java 类org.springframework.web.servlet.mvc.method.annotation.SseEmitter 实例源码

项目:mirrorgate    文件:ServerSentEventsHandler.java   
public synchronized void removeFromSessionsMap(SseEmitter session, String dashboardId){

        LOGGER.debug("Remove SseEmitter {} to sessions map", dashboardId);

        if(!StringUtils.isEmpty(dashboardId)){
            List<SseEmitter> dashboardEmitters = emittersPerDashboard.get(dashboardId);

            if(dashboardEmitters != null){
                dashboardEmitters.remove(session);

                if(dashboardEmitters.isEmpty()){
                    emittersPerDashboard.remove(dashboardId);
                }
            }
        }
    }
项目:sporticus    文件:RestControllerPushImplSse.java   
@GetMapping("/api/notification/feed")
public ResponseEntity<SseEmitter> getResults() {

    if(sseEngine==null){
        LOGGER.warn(()->"No SSE Engine available");
        return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
    }

    final Long fLoggedInUserId = getLoggedInUserId();
    SseEmitter emitter = sseEngine.getEmitters().get(fLoggedInUserId);
    if (emitter == null) {
        LOGGER.debug(() -> "Creating new emitter");
        SseEmitter fEmitter = emitter = new SseEmitter();

        emitter.onCompletion(() -> {
            LOGGER.debug(() -> "Emitter completed");
            sseEngine.getEmitters().remove(fLoggedInUserId);
        });
        emitter.onTimeout(() -> {
            fEmitter.complete();
        });

        sseEngine.getEmitters().put(fLoggedInUserId, emitter);
    }
    return new ResponseEntity<>(emitter, HttpStatus.OK);
}
项目:sse-eventbus    文件:SseEventBus.java   
/**
 * Creates a {@link SseEmitter} and registers the client in the internal database.
 * Client will be subscribed to the provided events if specified.
 *
 * @param clientId unique client identifier
 * @param timeout timeout value in milliseconds
 * @param unsubscribe if true unsubscribes from all events that are not provided with
 * the next parameter
 * @param events events the client wants to subscribe
 * @return a new SseEmitter instance
 */
public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe,
        boolean completeAfterMessage, String... events) {
    SseEmitter emitter = new SseEmitter(timeout);
    emitter.onTimeout(emitter::complete);
    registerClient(clientId, emitter, completeAfterMessage);

    if (events != null && events.length > 0) {
        if (unsubscribe) {
            unsubscribeFromAllEvents(clientId, events);
        }
        for (String event : events) {
            subscribe(clientId, event);
        }
    }

    return emitter;
}
项目:sse-eventbus    文件:ClientEvent.java   
public SseEventBuilder createSseEventBuilder() {

        SseEventBuilder sseBuilder = SseEmitter.event();

        if (!this.event.event().equals(SseEvent.DEFAULT_EVENT)) {
            sseBuilder.name(this.event.event());
        }

        this.event.id().ifPresent(sseBuilder::id);
        this.event.retry().map(Duration::toMillis).ifPresent(sseBuilder::reconnectTime);
        this.event.comment().ifPresent(sseBuilder::comment);

        if (this.convertedValue != null) {
            sseBuilder.data(this.convertedValue);
        }
        else {
            sseBuilder.data(this.event.data());
        }

        return sseBuilder;

    }
项目:proxyprint-kitchen    文件:NotificationManager.java   
public void sendNotification(String username, Notification notification){
    Consumer consumer = this.consumers.findByUsername(username);
    notification = notifications.save(notification);
    consumer.addNotifications(notification);
    consumers.save(consumer);

    if (this.subscriptions.containsKey(username)) {
        for (SseEmitter sse : this.subscriptions.get(username)) {
            try {
                sse.send(notification, MediaType.APPLICATION_JSON);
            } catch (IOException ex) {
                this.logger.warn("Broken SSE Emitter..discarding it...");
            }
        }
    }
}
项目:proxyprint-kitchen    文件:NotificationsController.java   
@Transactional
@ApiOperation(value = "Returns success/insuccess.", notes = "This method allows a consumer to subscribe the SSE.")
@RequestMapping(value = "/consumer/subscribe", produces = "text/event-stream")
public ResponseEntity<SseEmitter> subscribe(WebRequest request){
    String username = request.getParameter("username");
    String password = request.getParameter("password");

    Consumer consumer = this.consumers.findByUsername(username);
    if (consumer == null) {
        return new ResponseEntity<>(HttpStatus.NOT_FOUND);
    } else if (consumer.getPassword().equals(password)) {
        return new ResponseEntity<>(this.notificationManager.subscribe(username), HttpStatus.OK);
    } else {
        return new ResponseEntity<>(HttpStatus.UNAUTHORIZED);
    }
}
项目:spring-boot-sandbox    文件:ProgressSampleApplication.java   
@GetMapping("/progress/{name}")
SseEmitter progress(@PathVariable String name) {
    UUID key = UUID.fromString(name);
    SseEmitter emitter = new SseEmitter();
    progress.addListener(key, a -> {
        try {
            emitter.send(a);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        if (a >= 1) {
            emitter.complete();
        }
    });
    emitter.onCompletion(() -> progress.removeListener(key));
    return emitter;
}
项目:argos-dashboard    文件:SseEmitterUtil.java   
/**
 * Binds the supplied observable to the SseEmitter. onNext will call the send method on SseEmitter, the Observable
 * subscription will automatically be cancelled when the emitter completes or times out.
 * @param emitter SseEmitter
 * @param observable Observable that will supply the data
 * @return Subscription to the Observable
 */
public static <T> Subscription bindObservable(SseEmitter emitter, Observable<T> observable) {
    Subscription subscription = observable.subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {
            emitter.complete();
        }

        @Override
        public void onError(Throwable e) {
            LOG.warn("Error from stream observable", e);
            emitter.completeWithError(e);
        }

        @Override
        public void onNext(T t) {
            emitSse(emitter, t);
        }
    });
    bindUnsubscribe(emitter, subscription);
    return subscription;
}
项目:voj    文件:SubmissionController.java   
/**
 * 获取实时的评测结果.
 * @param submissionId - 提交记录的唯一标识符
 * @return 包含评测结果信息的StreamingResponseBody对象
 * @throws IOException 
 */
@RequestMapping("/getRealTimeJudgeResult.action")
public SseEmitter getRealTimeJudgeResultAction(
        @RequestParam(value="submissionId") long submissionId,
        @RequestParam(value="csrfToken") String csrfToken,
        HttpServletRequest request, HttpServletResponse response) throws IOException {
    User currentUser = HttpSessionParser.getCurrentUser(request.getSession());
    boolean isCsrfTokenValid = CsrfProtector.isCsrfTokenValid(csrfToken, request.getSession());
    Submission submission = submissionService.getSubmission(submissionId);

    if ( !isCsrfTokenValid || submission == null || 
            !submission.getUser().equals(currentUser) ||
            !submission.getJudgeResult().getJudgeResultSlug().equals("PD") ) {
        throw new ResourceNotFoundException();
    }

    response.addHeader("X-Accel-Buffering", "no");
    SseEmitter sseEmitter = new SseEmitter();
    submissionEventListener.addSseEmitters(submissionId, sseEmitter);
    sseEmitter.send("Established");
    return sseEmitter;
}
项目:voj    文件:ApplicationEventListener.java   
/**
 * 提交事件的处理器.
 * @param event - 提交记录事件
 * @throws IOException 
 */
@EventListener
public void submissionEventHandler(SubmissionEvent event) throws IOException {
    long submissionId = event.getSubmissionId();
    String judgeResult = event.getJudgeResult();
    String message = event.getMessage();
    boolean isCompleted = event.isCompleted();
    SseEmitter sseEmitter = sseEmitters.get(submissionId);

    if ( sseEmitter == null ) {
        LOGGER.warn(String.format("CANNOT get the SseEmitter for submission #%d.", submissionId));
        return;
    }
    Map<String, String> mapMessage = new HashMap<>(3, 1);
    mapMessage.put("judgeResult", judgeResult);
    mapMessage.put("message", message);
    sseEmitter.send(mapMessage);

    if ( isCompleted ) {
        sseEmitter.complete();
        removeSseEmitters(submissionId);
    }
}
项目:mirrorgate    文件:ServerSentEventsHandler.java   
@Override
public void sendEventUpdateMessage(EventType event, String dashboardId) {

    List<SseEmitter> emitters = emittersPerDashboard.get(dashboardId);

    if(emitters != null){

        if(event != EventType.PING) {
            sendEventUpdateMessage(EventType.PING, dashboardId);
        }

        LOGGER.info("Notifying {} dashboards with name {} and event type {}", emitters.size(), dashboardId, event);

        for(int i = emitters.size(); i > 0; i--) {
            SseEmitter sseEmitter = emitters.get(i-1);

            try {
                Map<String, String> message = new HashMap<>();
                message.put("type", event.getValue());
                String jsonMessage = objectMapper.writeValueAsString(message);
                sseEmitter.send(jsonMessage, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                this.removeFromSessionsMap(sseEmitter, dashboardId);
                LOGGER.error("Exception while sending message to emitter for dashboard {}", dashboardId);
            }

        }

    }
}
项目:sporticus    文件:SseEngine.java   
public void send(long id, Object data) throws IOException {
    SseEmitter emitter = emitters.get(id);
    if (emitter == null) {
        LOGGER.warn(() -> "Failed to locate emitter - id=" + id);
    } else {
        emitter.send(data);
        LOGGER.info(() -> "Sent data via emitter - id=" + id);
    }
}
项目:sporticus    文件:RestControllerPushImplSse.java   
private  void send(SseEmitter emitter, INotification notification) {
    new Thread(() -> {
        try {
            //TODO - Convert to JSON.
            emitter.send(notification);
            emitter.complete();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }).start();
}
项目:nges-sample-tic-tac-toe    文件:SseEventStream.java   
public SseEventStream(MessageGroup messageGroup, EventSupplier eventSupplier) {
    this.eventSupplier = eventSupplier;
    this.messageGroup = messageGroup;

    this.sseEmitter = new SseEmitter();
    this.state = State.READY;
}
项目:nges-sample-tic-tac-toe    文件:SseEventStream.java   
private void sendAndStopOnError(String event, Optional<String> id, Object data) {
    try {
        SseEventBuilder builder = SseEmitter.event().name(event).data(data);
        id.ifPresent(builder::id);
        sseEmitter.send(builder);
    } catch (Exception e) {
        log.warn("Error in SSE channel: " + e);
        stop();
    }
}
项目:microservices-sample-project    文件:SseEmitterApplicationEventListener.java   
@EventListener
  public void submissionEventHandler(SubmissionEvent event) throws IOException {

String key = event.getKey();
      Object message = event.getMessage();

      SseEmitter sseEmitter = sseEmitters.get(key);

      if ( sseEmitter == null ) {
          return;
      }

      sseEmitter.send(message, MediaType.APPLICATION_JSON);
  }
项目:microservices-sample-project    文件:ControllerBase.java   
@RequestMapping("/listener/{id}")
public SseEmitter  listen(@PathVariable("id") String id){

    if(this.listenerType == null)
        throw new ListenerTypeNotFound();

    final SseEmitter sseEmitter = new SseEmitter();
    applicationEventListener.addSseEmitters(this.listenerType , id, sseEmitter);
    return sseEmitter;

}
项目:pokemon    文件:SseEmitterApplicationEventListener.java   
@EventListener
  public void submissionEventHandler(SubmissionEvent event) throws IOException {

String key = event.getKey();
      Object message = event.getMessage();

      SseEmitter sseEmitter = sseEmitters.get(key);

      if ( sseEmitter == null ) {
          return;
      }

      sseEmitter.send(message, MediaType.APPLICATION_JSON);
  }
项目:pokemon    文件:ControllerBase.java   
@RequestMapping("/listener/{id}")
public SseEmitter  listen(@PathVariable("id") String id){

    if(this.listenerType == null)
        throw new ListenerTypeNotFound();

    final SseEmitter sseEmitter = new SseEmitter();
    applicationEventListener.addSseEmitters(this.listenerType , id, sseEmitter);
    return sseEmitter;

}
项目:dashboard1b    文件:ControladorHTML.java   
@RequestMapping( value = "/newSugerence")
@EventListener
public void newSugerence(Sugerencia data){

    System.out.println("Evento escuchado!");
    SseEventBuilder newSugerenceEvent = SseEmitter.event().name("evento").data("{ \"tipo\": \"newSugerence\" , \"title\":\"" + data.getTitulo() + "\"}");
    sendEvent(newSugerenceEvent);
}
项目:dashboard1b    文件:ControladorHTML.java   
@RequestMapping( value = "/newComentary")
@EventListener
public void newComentary(Comentario data){


    SseEventBuilder newComentaryEvent = SseEmitter.event().name("evento").data("{ \"tipo\": \"newComentary\" ,  \"title\":\"" + data.getSugerencia().getTitulo() +"\" }");
    sendEvent(newComentaryEvent);
}
项目:dashboard1b    文件:ControladorHTML.java   
private void sendEvent(SseEventBuilder event){
    synchronized (sseEmitters) {
        for(SseEmitter emitter: sseEmitters){
            try {
                System.out.println("Enviando el evento");
                emitter.send(event);
            } catch (IOException e) {
                e.printStackTrace();

            }
        }
    }
}
项目:dashboard1b    文件:ControladorHTML.java   
@RequestMapping("/userPriv/updates")
SseEmitter updateHTML() {
    SseEmitter sseEmitter = new SseEmitter();
    synchronized (this.sseEmitters) {
        this.sseEmitters.add(sseEmitter);
        sseEmitter.onCompletion(() -> {
            synchronized (this.sseEmitters) {
                this.sseEmitters.remove(sseEmitter);
            }
        });
    }
    return sseEmitter;
}
项目:kfka    文件:EventController.java   
public void emit(KfkaMessage event) throws IOException
{
    try
    {
        this.emitter.send(SseEmitter.event()
            .id(Long.toString(event.getId()))
            .data(event.getPayload(), MediaType.APPLICATION_JSON_UTF8));
    }
    catch (IllegalStateException exc)
    {
        logger.debug(exc.getMessage());
    }
}
项目:spring-twitter-stream    文件:WebSocketEventController.java   
@RequestMapping("/tweetLocation")
public SseEmitter streamTweets() throws InterruptedException{

    SseEmitter sseEmitter = new SseEmitter();
    emitters.add(sseEmitter);
    sseEmitter.onCompletion(() -> emitters.remove(sseEmitter));

    streamTweetEventService.streamTweetEvent(emitters);

    return sseEmitter;
}
项目:sse-eventbus    文件:SseEventBus.java   
public void registerClient(String clientId, SseEmitter emitter,
        boolean completeAfterMessage) {
    Client client = this.clients.get(clientId);
    if (client == null) {
        this.clients.put(clientId,
                new Client(clientId, emitter, completeAfterMessage));
    }
    else {
        client.updateEmitter(emitter);
    }
}
项目:tradeshift-app-samples    文件:WebHooksServiceImpl.java   
/**
 * Send message to angularjs client
 *
 * @return SseEmitter
 * @throws ParserConfigurationException
 * @throws SAXException
 * @throws IOException
 */
@Override
public SseEmitter sendWebhookEvent() throws ParserConfigurationException, SAXException, IOException {
    List<WebhookEventDTO> eventDTOList = messages.get(tokenService.getCurrentUserId());
    SseEmitter emitter = new SseEmitter(60000L);

    if (eventDTOList != null) {
        new Thread(new RunProcess(eventDTOList , emitter, tokenService.getCurrentUserId())).start();
    }

    return emitter;
}
项目:spring-cloud-function    文件:MessageController.java   
private SseEmitter emit(Route route, Message<Collection<Object>> message)
        throws IOException {
    SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
    String path = route.getPath();
    if (!emitters.containsKey(path)) {
        emitters.putIfAbsent(path, new HashSet<>());
    }
    emitters.get(path).add(emitter);
    emitter.onCompletion(() -> emitters.get(path).remove(emitter));
    emitter.onTimeout(() -> emitters.get(path).remove(emitter));
    for (Object body : message.getPayload()) {
        emitter.send(body);
    }
    return emitter;
}
项目:spring-cloud-function    文件:MessageController.java   
private void append(String name, Message<?> message) {
    String key = (String) message.getHeaders().get(ROUTE_KEY);
    if (message.getHeaders().getReplyChannel() instanceof MessageChannel) {
        MessageChannel replyChannel = (MessageChannel) message.getHeaders()
                .getReplyChannel();
        replyChannel.send(message);
        return;
    }
    Route route = new Route(key, name);
    String path = route.getPath();
    if (!queues.containsKey(path)) {
        Bridge<Message<?>> flux = new Bridge<>();
        queues.putIfAbsent(path, flux);
    }
    queues.get(path).send(message);
    if (emitters.containsKey(path)) {
        Set<SseEmitter> list = new HashSet<>(emitters.get(path));
        for (SseEmitter emitter : list) {
            try {
                emitter.send(message.getPayload());
            }
            catch (IOException e) {
                emitters.get(path).remove(emitter);
            }
        }
    }
}
项目:spring-react-isomorphic    文件:CommentController.java   
@RequestMapping(path = "/", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
Comment jsonCreate(Comment comment) throws IOException {
    Comment newComment = this.commentRepository.save(comment);
    synchronized (this.sseEmitters) {
        for (SseEmitter sseEmitter : this.sseEmitters) {
            // Servlet containers don't always detect ghost connection, so we must catch exceptions ...
            try {
                sseEmitter.send(newComment, MediaType.APPLICATION_JSON);
            } catch (Exception e) {}
        }
    }
    return comment;
}
项目:spring-react-isomorphic    文件:CommentController.java   
@RequestMapping("/sse/updates")
SseEmitter subscribeUpdates() {
    SseEmitter sseEmitter = new SseEmitter();
    synchronized (this.sseEmitters) {
        this.sseEmitters.add(sseEmitter);
        sseEmitter.onCompletion(() -> {
            synchronized (this.sseEmitters) {
                this.sseEmitters.remove(sseEmitter);
            }
        });
    }
    return sseEmitter;
}
项目:resilience4j    文件:CircuitBreakerEventEmitter.java   
private CircuitBreakerEventEmitter(Flux<CircuitBreakerEventDTO> eventStream) {
    this.sseEmitter = new SseEmitter();
    this.sseEmitter.onCompletion(this::unsubscribe);
    this.sseEmitter.onTimeout(this::unsubscribe);
    this.disposable = eventStream.subscribe(this::notify,
                    this.sseEmitter::completeWithError,
                    this.sseEmitter::complete);
}
项目:resilience4j    文件:CircuitBreakerEventsEndpoint.java   
@RequestMapping(value = "stream/events/{circuitBreakerName}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getEventsStreamFilteredByCircuitBreakerName(@PathVariable("circuitBreakerName") String circuitBreakerName) {
    CircuitBreaker circuitBreaker = circuitBreakerRegistry.getAllCircuitBreakers()
            .find(cb -> cb.getName().equals(circuitBreakerName))
            .getOrElseThrow(() ->
                    new IllegalArgumentException(String.format("circuit breaker with name %s not found", circuitBreakerName)));
    return CircuitBreakerEventEmitter.createSseEmitter(toFlux(circuitBreaker.getEventPublisher()));
}
项目:resilience4j    文件:CircuitBreakerEventsEndpoint.java   
@RequestMapping(value = "stream/events/{circuitBreakerName}/{eventType}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getEventsStreamFilteredByCircuitBreakerNameAndEventType(@PathVariable("circuitBreakerName") String circuitBreakerName,
                                                                    @PathVariable("eventType") String eventType) {
    CircuitBreaker circuitBreaker = circuitBreakerRegistry.getAllCircuitBreakers()
            .find(cb -> cb.getName().equals(circuitBreakerName))
            .getOrElseThrow(() ->
                    new IllegalArgumentException(String.format("circuit breaker with name %s not found", circuitBreakerName)));
    Flux<CircuitBreakerEvent> eventStream = toFlux(circuitBreaker.getEventPublisher())
            .filter(event -> event.getEventType() == CircuitBreakerEvent.Type.valueOf(eventType.toUpperCase()));
    return CircuitBreakerEventEmitter.createSseEmitter(eventStream);
}
项目:resilience4j    文件:RateLimiterEventsEndpoint.java   
@RequestMapping(value = "stream/events/{rateLimiterName}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getEventsStreamFilteredByRateLimiterName(@PathVariable("rateLimiterName") String rateLimiterName) {
    RateLimiter rateLimiter = rateLimiterRegistry.getAllRateLimiters()
        .find(rL -> rL.getName().equals(rateLimiterName))
        .getOrElseThrow(() ->
            new IllegalArgumentException(String.format("rate limiter with name %s not found", rateLimiterName)));
    return RateLimiterEventsEmitter.createSseEmitter(toFlux(rateLimiter.getEventPublisher()));
}
项目:resilience4j    文件:RateLimiterEventsEndpoint.java   
@RequestMapping(value = "stream/events/{rateLimiterName}/{eventType}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getEventsStreamFilteredByRateLimiterNameAndEventType(@PathVariable("rateLimiterName") String rateLimiterName,
                                                                       @PathVariable("eventType") String eventType) {
    RateLimiterEvent.Type targetType = RateLimiterEvent.Type.valueOf(eventType.toUpperCase());
    RateLimiter rateLimiter = rateLimiterRegistry.getAllRateLimiters()
        .find(rL -> rL.getName().equals(rateLimiterName))
        .getOrElseThrow(() ->
            new IllegalArgumentException(String.format("rate limiter with name %s not found", rateLimiterName)));
    Flux<RateLimiterEvent> eventStream = toFlux(rateLimiter.getEventPublisher())
        .filter(event -> event.getEventType() == targetType);
    return RateLimiterEventsEmitter.createSseEmitter(eventStream);
}
项目:resilience4j    文件:RateLimiterEventsEmitter.java   
public RateLimiterEventsEmitter(Flux<RateLimiterEventDTO> eventStream) {
    this.sseEmitter = new SseEmitter();
    this.sseEmitter.onCompletion(this::unsubscribe);
    this.sseEmitter.onTimeout(this::unsubscribe);
    this.disposable = eventStream.subscribe(this::notify,
        this.sseEmitter::completeWithError,
        this.sseEmitter::complete);
}
项目:argos-dashboard    文件:TurbineStreamController.java   
@RequestMapping("/turbine-stream/{cluster}")
public ResponseEntity<SseEmitter> streamHystrix(@PathVariable("cluster") String cluster) {
    Optional<HystrixClusterMonitor> clusterMonitor = clusterRegistry.getCluster(cluster);
    if(!clusterMonitor.isPresent()) {
        return new ResponseEntity<>(HttpStatus.NOT_FOUND);
    }

    final SseEmitter emitter = new SseEmitter(TimeUnit.DAYS.toMillis(45));

    SseEmitterUtil.bindObservable(emitter, clusterMonitor.get().observeJson().takeUntil(shutdown)
            .subscribeOn(Schedulers.io()));

    return ResponseEntity.ok(emitter);
}
项目:voj    文件:ApplicationEventListener.java   
/**
 * 移除Server Sent Event的发送者对象.
 * @param submissionId - 提交记录的唯一标识符
 */
private void removeSseEmitters(long submissionId) {
    sseEmitters.remove(submissionId);

    for ( Entry<Long, SseEmitter> mapEntry : sseEmitters.entrySet() ) {
        long currentSubmissionId = mapEntry.getKey();
        if ( currentSubmissionId < submissionId ) {
            sseEmitters.remove(currentSubmissionId);
        }
    }
}
项目:mirrorgate    文件:ServerSentEventsController.java   
@GetMapping(value = "/emitter/{dashboardId}")
public SseEmitter serverSideEmitter(@PathVariable String dashboardId) throws IOException {

    LOGGER.info("Creating SseEmitter for dashboard {}", dashboardId);

    SseEmitter sseEmitter = new NotCachedSseEmitter();

    sseEmitter.onCompletion(() -> {
        handler.removeFromSessionsMap(sseEmitter, dashboardId);
        sseEmitter.complete();
    });

    sseEmitter.onTimeout(() -> {
        handler.removeFromSessionsMap(sseEmitter, dashboardId);
        sseEmitter.complete();
    });

    handler.addToSessionsMap(sseEmitter, dashboardId);

    sseEmitter.send(SseEmitter.event().reconnectTime(0L));

    return sseEmitter;

}