public synchronized void removeFromSessionsMap(SseEmitter session, String dashboardId){ LOGGER.debug("Remove SseEmitter {} to sessions map", dashboardId); if(!StringUtils.isEmpty(dashboardId)){ List<SseEmitter> dashboardEmitters = emittersPerDashboard.get(dashboardId); if(dashboardEmitters != null){ dashboardEmitters.remove(session); if(dashboardEmitters.isEmpty()){ emittersPerDashboard.remove(dashboardId); } } } }
@GetMapping("/api/notification/feed") public ResponseEntity<SseEmitter> getResults() { if(sseEngine==null){ LOGGER.warn(()->"No SSE Engine available"); return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR); } final Long fLoggedInUserId = getLoggedInUserId(); SseEmitter emitter = sseEngine.getEmitters().get(fLoggedInUserId); if (emitter == null) { LOGGER.debug(() -> "Creating new emitter"); SseEmitter fEmitter = emitter = new SseEmitter(); emitter.onCompletion(() -> { LOGGER.debug(() -> "Emitter completed"); sseEngine.getEmitters().remove(fLoggedInUserId); }); emitter.onTimeout(() -> { fEmitter.complete(); }); sseEngine.getEmitters().put(fLoggedInUserId, emitter); } return new ResponseEntity<>(emitter, HttpStatus.OK); }
/** * Creates a {@link SseEmitter} and registers the client in the internal database. * Client will be subscribed to the provided events if specified. * * @param clientId unique client identifier * @param timeout timeout value in milliseconds * @param unsubscribe if true unsubscribes from all events that are not provided with * the next parameter * @param events events the client wants to subscribe * @return a new SseEmitter instance */ public SseEmitter createSseEmitter(String clientId, Long timeout, boolean unsubscribe, boolean completeAfterMessage, String... events) { SseEmitter emitter = new SseEmitter(timeout); emitter.onTimeout(emitter::complete); registerClient(clientId, emitter, completeAfterMessage); if (events != null && events.length > 0) { if (unsubscribe) { unsubscribeFromAllEvents(clientId, events); } for (String event : events) { subscribe(clientId, event); } } return emitter; }
public SseEventBuilder createSseEventBuilder() { SseEventBuilder sseBuilder = SseEmitter.event(); if (!this.event.event().equals(SseEvent.DEFAULT_EVENT)) { sseBuilder.name(this.event.event()); } this.event.id().ifPresent(sseBuilder::id); this.event.retry().map(Duration::toMillis).ifPresent(sseBuilder::reconnectTime); this.event.comment().ifPresent(sseBuilder::comment); if (this.convertedValue != null) { sseBuilder.data(this.convertedValue); } else { sseBuilder.data(this.event.data()); } return sseBuilder; }
public void sendNotification(String username, Notification notification){ Consumer consumer = this.consumers.findByUsername(username); notification = notifications.save(notification); consumer.addNotifications(notification); consumers.save(consumer); if (this.subscriptions.containsKey(username)) { for (SseEmitter sse : this.subscriptions.get(username)) { try { sse.send(notification, MediaType.APPLICATION_JSON); } catch (IOException ex) { this.logger.warn("Broken SSE Emitter..discarding it..."); } } } }
@Transactional @ApiOperation(value = "Returns success/insuccess.", notes = "This method allows a consumer to subscribe the SSE.") @RequestMapping(value = "/consumer/subscribe", produces = "text/event-stream") public ResponseEntity<SseEmitter> subscribe(WebRequest request){ String username = request.getParameter("username"); String password = request.getParameter("password"); Consumer consumer = this.consumers.findByUsername(username); if (consumer == null) { return new ResponseEntity<>(HttpStatus.NOT_FOUND); } else if (consumer.getPassword().equals(password)) { return new ResponseEntity<>(this.notificationManager.subscribe(username), HttpStatus.OK); } else { return new ResponseEntity<>(HttpStatus.UNAUTHORIZED); } }
@GetMapping("/progress/{name}") SseEmitter progress(@PathVariable String name) { UUID key = UUID.fromString(name); SseEmitter emitter = new SseEmitter(); progress.addListener(key, a -> { try { emitter.send(a); } catch (IOException e) { throw new UncheckedIOException(e); } if (a >= 1) { emitter.complete(); } }); emitter.onCompletion(() -> progress.removeListener(key)); return emitter; }
/** * Binds the supplied observable to the SseEmitter. onNext will call the send method on SseEmitter, the Observable * subscription will automatically be cancelled when the emitter completes or times out. * @param emitter SseEmitter * @param observable Observable that will supply the data * @return Subscription to the Observable */ public static <T> Subscription bindObservable(SseEmitter emitter, Observable<T> observable) { Subscription subscription = observable.subscribe(new Subscriber<T>() { @Override public void onCompleted() { emitter.complete(); } @Override public void onError(Throwable e) { LOG.warn("Error from stream observable", e); emitter.completeWithError(e); } @Override public void onNext(T t) { emitSse(emitter, t); } }); bindUnsubscribe(emitter, subscription); return subscription; }
/** * 获取实时的评测结果. * @param submissionId - 提交记录的唯一标识符 * @return 包含评测结果信息的StreamingResponseBody对象 * @throws IOException */ @RequestMapping("/getRealTimeJudgeResult.action") public SseEmitter getRealTimeJudgeResultAction( @RequestParam(value="submissionId") long submissionId, @RequestParam(value="csrfToken") String csrfToken, HttpServletRequest request, HttpServletResponse response) throws IOException { User currentUser = HttpSessionParser.getCurrentUser(request.getSession()); boolean isCsrfTokenValid = CsrfProtector.isCsrfTokenValid(csrfToken, request.getSession()); Submission submission = submissionService.getSubmission(submissionId); if ( !isCsrfTokenValid || submission == null || !submission.getUser().equals(currentUser) || !submission.getJudgeResult().getJudgeResultSlug().equals("PD") ) { throw new ResourceNotFoundException(); } response.addHeader("X-Accel-Buffering", "no"); SseEmitter sseEmitter = new SseEmitter(); submissionEventListener.addSseEmitters(submissionId, sseEmitter); sseEmitter.send("Established"); return sseEmitter; }
/** * 提交事件的处理器. * @param event - 提交记录事件 * @throws IOException */ @EventListener public void submissionEventHandler(SubmissionEvent event) throws IOException { long submissionId = event.getSubmissionId(); String judgeResult = event.getJudgeResult(); String message = event.getMessage(); boolean isCompleted = event.isCompleted(); SseEmitter sseEmitter = sseEmitters.get(submissionId); if ( sseEmitter == null ) { LOGGER.warn(String.format("CANNOT get the SseEmitter for submission #%d.", submissionId)); return; } Map<String, String> mapMessage = new HashMap<>(3, 1); mapMessage.put("judgeResult", judgeResult); mapMessage.put("message", message); sseEmitter.send(mapMessage); if ( isCompleted ) { sseEmitter.complete(); removeSseEmitters(submissionId); } }
@Override public void sendEventUpdateMessage(EventType event, String dashboardId) { List<SseEmitter> emitters = emittersPerDashboard.get(dashboardId); if(emitters != null){ if(event != EventType.PING) { sendEventUpdateMessage(EventType.PING, dashboardId); } LOGGER.info("Notifying {} dashboards with name {} and event type {}", emitters.size(), dashboardId, event); for(int i = emitters.size(); i > 0; i--) { SseEmitter sseEmitter = emitters.get(i-1); try { Map<String, String> message = new HashMap<>(); message.put("type", event.getValue()); String jsonMessage = objectMapper.writeValueAsString(message); sseEmitter.send(jsonMessage, MediaType.APPLICATION_JSON); } catch (IOException e) { this.removeFromSessionsMap(sseEmitter, dashboardId); LOGGER.error("Exception while sending message to emitter for dashboard {}", dashboardId); } } } }
public void send(long id, Object data) throws IOException { SseEmitter emitter = emitters.get(id); if (emitter == null) { LOGGER.warn(() -> "Failed to locate emitter - id=" + id); } else { emitter.send(data); LOGGER.info(() -> "Sent data via emitter - id=" + id); } }
private void send(SseEmitter emitter, INotification notification) { new Thread(() -> { try { //TODO - Convert to JSON. emitter.send(notification); emitter.complete(); } catch (IOException e) { e.printStackTrace(); } }).start(); }
public SseEventStream(MessageGroup messageGroup, EventSupplier eventSupplier) { this.eventSupplier = eventSupplier; this.messageGroup = messageGroup; this.sseEmitter = new SseEmitter(); this.state = State.READY; }
private void sendAndStopOnError(String event, Optional<String> id, Object data) { try { SseEventBuilder builder = SseEmitter.event().name(event).data(data); id.ifPresent(builder::id); sseEmitter.send(builder); } catch (Exception e) { log.warn("Error in SSE channel: " + e); stop(); } }
@EventListener public void submissionEventHandler(SubmissionEvent event) throws IOException { String key = event.getKey(); Object message = event.getMessage(); SseEmitter sseEmitter = sseEmitters.get(key); if ( sseEmitter == null ) { return; } sseEmitter.send(message, MediaType.APPLICATION_JSON); }
@RequestMapping("/listener/{id}") public SseEmitter listen(@PathVariable("id") String id){ if(this.listenerType == null) throw new ListenerTypeNotFound(); final SseEmitter sseEmitter = new SseEmitter(); applicationEventListener.addSseEmitters(this.listenerType , id, sseEmitter); return sseEmitter; }
@RequestMapping( value = "/newSugerence") @EventListener public void newSugerence(Sugerencia data){ System.out.println("Evento escuchado!"); SseEventBuilder newSugerenceEvent = SseEmitter.event().name("evento").data("{ \"tipo\": \"newSugerence\" , \"title\":\"" + data.getTitulo() + "\"}"); sendEvent(newSugerenceEvent); }
@RequestMapping( value = "/newComentary") @EventListener public void newComentary(Comentario data){ SseEventBuilder newComentaryEvent = SseEmitter.event().name("evento").data("{ \"tipo\": \"newComentary\" , \"title\":\"" + data.getSugerencia().getTitulo() +"\" }"); sendEvent(newComentaryEvent); }
private void sendEvent(SseEventBuilder event){ synchronized (sseEmitters) { for(SseEmitter emitter: sseEmitters){ try { System.out.println("Enviando el evento"); emitter.send(event); } catch (IOException e) { e.printStackTrace(); } } } }
@RequestMapping("/userPriv/updates") SseEmitter updateHTML() { SseEmitter sseEmitter = new SseEmitter(); synchronized (this.sseEmitters) { this.sseEmitters.add(sseEmitter); sseEmitter.onCompletion(() -> { synchronized (this.sseEmitters) { this.sseEmitters.remove(sseEmitter); } }); } return sseEmitter; }
public void emit(KfkaMessage event) throws IOException { try { this.emitter.send(SseEmitter.event() .id(Long.toString(event.getId())) .data(event.getPayload(), MediaType.APPLICATION_JSON_UTF8)); } catch (IllegalStateException exc) { logger.debug(exc.getMessage()); } }
@RequestMapping("/tweetLocation") public SseEmitter streamTweets() throws InterruptedException{ SseEmitter sseEmitter = new SseEmitter(); emitters.add(sseEmitter); sseEmitter.onCompletion(() -> emitters.remove(sseEmitter)); streamTweetEventService.streamTweetEvent(emitters); return sseEmitter; }
public void registerClient(String clientId, SseEmitter emitter, boolean completeAfterMessage) { Client client = this.clients.get(clientId); if (client == null) { this.clients.put(clientId, new Client(clientId, emitter, completeAfterMessage)); } else { client.updateEmitter(emitter); } }
/** * Send message to angularjs client * * @return SseEmitter * @throws ParserConfigurationException * @throws SAXException * @throws IOException */ @Override public SseEmitter sendWebhookEvent() throws ParserConfigurationException, SAXException, IOException { List<WebhookEventDTO> eventDTOList = messages.get(tokenService.getCurrentUserId()); SseEmitter emitter = new SseEmitter(60000L); if (eventDTOList != null) { new Thread(new RunProcess(eventDTOList , emitter, tokenService.getCurrentUserId())).start(); } return emitter; }
private SseEmitter emit(Route route, Message<Collection<Object>> message) throws IOException { SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); String path = route.getPath(); if (!emitters.containsKey(path)) { emitters.putIfAbsent(path, new HashSet<>()); } emitters.get(path).add(emitter); emitter.onCompletion(() -> emitters.get(path).remove(emitter)); emitter.onTimeout(() -> emitters.get(path).remove(emitter)); for (Object body : message.getPayload()) { emitter.send(body); } return emitter; }
private void append(String name, Message<?> message) { String key = (String) message.getHeaders().get(ROUTE_KEY); if (message.getHeaders().getReplyChannel() instanceof MessageChannel) { MessageChannel replyChannel = (MessageChannel) message.getHeaders() .getReplyChannel(); replyChannel.send(message); return; } Route route = new Route(key, name); String path = route.getPath(); if (!queues.containsKey(path)) { Bridge<Message<?>> flux = new Bridge<>(); queues.putIfAbsent(path, flux); } queues.get(path).send(message); if (emitters.containsKey(path)) { Set<SseEmitter> list = new HashSet<>(emitters.get(path)); for (SseEmitter emitter : list) { try { emitter.send(message.getPayload()); } catch (IOException e) { emitters.get(path).remove(emitter); } } } }
@RequestMapping(path = "/", method = RequestMethod.POST, produces = "application/json") @ResponseBody Comment jsonCreate(Comment comment) throws IOException { Comment newComment = this.commentRepository.save(comment); synchronized (this.sseEmitters) { for (SseEmitter sseEmitter : this.sseEmitters) { // Servlet containers don't always detect ghost connection, so we must catch exceptions ... try { sseEmitter.send(newComment, MediaType.APPLICATION_JSON); } catch (Exception e) {} } } return comment; }
@RequestMapping("/sse/updates") SseEmitter subscribeUpdates() { SseEmitter sseEmitter = new SseEmitter(); synchronized (this.sseEmitters) { this.sseEmitters.add(sseEmitter); sseEmitter.onCompletion(() -> { synchronized (this.sseEmitters) { this.sseEmitters.remove(sseEmitter); } }); } return sseEmitter; }
private CircuitBreakerEventEmitter(Flux<CircuitBreakerEventDTO> eventStream) { this.sseEmitter = new SseEmitter(); this.sseEmitter.onCompletion(this::unsubscribe); this.sseEmitter.onTimeout(this::unsubscribe); this.disposable = eventStream.subscribe(this::notify, this.sseEmitter::completeWithError, this.sseEmitter::complete); }
@RequestMapping(value = "stream/events/{circuitBreakerName}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM) public SseEmitter getEventsStreamFilteredByCircuitBreakerName(@PathVariable("circuitBreakerName") String circuitBreakerName) { CircuitBreaker circuitBreaker = circuitBreakerRegistry.getAllCircuitBreakers() .find(cb -> cb.getName().equals(circuitBreakerName)) .getOrElseThrow(() -> new IllegalArgumentException(String.format("circuit breaker with name %s not found", circuitBreakerName))); return CircuitBreakerEventEmitter.createSseEmitter(toFlux(circuitBreaker.getEventPublisher())); }
@RequestMapping(value = "stream/events/{circuitBreakerName}/{eventType}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM) public SseEmitter getEventsStreamFilteredByCircuitBreakerNameAndEventType(@PathVariable("circuitBreakerName") String circuitBreakerName, @PathVariable("eventType") String eventType) { CircuitBreaker circuitBreaker = circuitBreakerRegistry.getAllCircuitBreakers() .find(cb -> cb.getName().equals(circuitBreakerName)) .getOrElseThrow(() -> new IllegalArgumentException(String.format("circuit breaker with name %s not found", circuitBreakerName))); Flux<CircuitBreakerEvent> eventStream = toFlux(circuitBreaker.getEventPublisher()) .filter(event -> event.getEventType() == CircuitBreakerEvent.Type.valueOf(eventType.toUpperCase())); return CircuitBreakerEventEmitter.createSseEmitter(eventStream); }
@RequestMapping(value = "stream/events/{rateLimiterName}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM) public SseEmitter getEventsStreamFilteredByRateLimiterName(@PathVariable("rateLimiterName") String rateLimiterName) { RateLimiter rateLimiter = rateLimiterRegistry.getAllRateLimiters() .find(rL -> rL.getName().equals(rateLimiterName)) .getOrElseThrow(() -> new IllegalArgumentException(String.format("rate limiter with name %s not found", rateLimiterName))); return RateLimiterEventsEmitter.createSseEmitter(toFlux(rateLimiter.getEventPublisher())); }
@RequestMapping(value = "stream/events/{rateLimiterName}/{eventType}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM) public SseEmitter getEventsStreamFilteredByRateLimiterNameAndEventType(@PathVariable("rateLimiterName") String rateLimiterName, @PathVariable("eventType") String eventType) { RateLimiterEvent.Type targetType = RateLimiterEvent.Type.valueOf(eventType.toUpperCase()); RateLimiter rateLimiter = rateLimiterRegistry.getAllRateLimiters() .find(rL -> rL.getName().equals(rateLimiterName)) .getOrElseThrow(() -> new IllegalArgumentException(String.format("rate limiter with name %s not found", rateLimiterName))); Flux<RateLimiterEvent> eventStream = toFlux(rateLimiter.getEventPublisher()) .filter(event -> event.getEventType() == targetType); return RateLimiterEventsEmitter.createSseEmitter(eventStream); }
public RateLimiterEventsEmitter(Flux<RateLimiterEventDTO> eventStream) { this.sseEmitter = new SseEmitter(); this.sseEmitter.onCompletion(this::unsubscribe); this.sseEmitter.onTimeout(this::unsubscribe); this.disposable = eventStream.subscribe(this::notify, this.sseEmitter::completeWithError, this.sseEmitter::complete); }
@RequestMapping("/turbine-stream/{cluster}") public ResponseEntity<SseEmitter> streamHystrix(@PathVariable("cluster") String cluster) { Optional<HystrixClusterMonitor> clusterMonitor = clusterRegistry.getCluster(cluster); if(!clusterMonitor.isPresent()) { return new ResponseEntity<>(HttpStatus.NOT_FOUND); } final SseEmitter emitter = new SseEmitter(TimeUnit.DAYS.toMillis(45)); SseEmitterUtil.bindObservable(emitter, clusterMonitor.get().observeJson().takeUntil(shutdown) .subscribeOn(Schedulers.io())); return ResponseEntity.ok(emitter); }
/** * 移除Server Sent Event的发送者对象. * @param submissionId - 提交记录的唯一标识符 */ private void removeSseEmitters(long submissionId) { sseEmitters.remove(submissionId); for ( Entry<Long, SseEmitter> mapEntry : sseEmitters.entrySet() ) { long currentSubmissionId = mapEntry.getKey(); if ( currentSubmissionId < submissionId ) { sseEmitters.remove(currentSubmissionId); } } }
@GetMapping(value = "/emitter/{dashboardId}") public SseEmitter serverSideEmitter(@PathVariable String dashboardId) throws IOException { LOGGER.info("Creating SseEmitter for dashboard {}", dashboardId); SseEmitter sseEmitter = new NotCachedSseEmitter(); sseEmitter.onCompletion(() -> { handler.removeFromSessionsMap(sseEmitter, dashboardId); sseEmitter.complete(); }); sseEmitter.onTimeout(() -> { handler.removeFromSessionsMap(sseEmitter, dashboardId); sseEmitter.complete(); }); handler.addToSessionsMap(sseEmitter, dashboardId); sseEmitter.send(SseEmitter.event().reconnectTime(0L)); return sseEmitter; }