@GetMapping(value = "/projects/{projectId}/tightCouplingEvents", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<TightCouplingEvent>> streamTightCouplingEvents(@PathVariable Long projectId, HttpServletRequest request) { // Stream the events from MongoDB Flux<TightCouplingEvent> events = eventRepository.findByProjectId(projectId); // Check if this is an SSE reconnection from a client String lastEventId = request.getHeader("Last-Event-Id"); // On SSE client reconnect, skip ahead in the stream to play back only new events if (lastEventId != null) events = events.skipUntil(e -> e.getId().equals(lastEventId)).skip(1); // Subscribe to the tailing events from the reactive repository query return events.map(s -> ServerSentEvent.builder(s) .event(s.getCreatedDate().toString()) .id(s.getId()) .build()) .delayElements(Duration.ofMillis(100)); }
@Bean public CommandLineRunner myCommandLineRunner() { return args -> { String api = "http://stockmarket.streamdata.io/prices"; String token = "[YOUR TOKEN HERE]"; // If ever you want to pass some headers to your API // specify a header map associated with the request Map<String,String> headers = new HashMap<>(); // add this header if you wish to stream Json rather than Json-patch // NOTE: no 'patch' event will be emitted. // headers.put("Accept", "application/json"); URI streamdataUri = streamdataUri(api, token, headers); // source: https://github.com/spring-projects/spring-framework/blob/v5.0.0.RC1/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java ResolvableType type = forClassWithGenerics(ServerSentEvent.class, JsonNode.class); // Create the web client and the flux of events WebClient client = WebClient.create(); Flux<ServerSentEvent<JsonNode>> events = client.get() .uri(streamdataUri) .accept(TEXT_EVENT_STREAM) .exchange() .flatMapMany(response -> response.body(toFlux(type))); // use of a transformer to apply the patches events.as(new PatchTransformer()) // Subscribe to the flux with a consumer that applies patches .subscribe(System.out::println, Throwable::printStackTrace); // Add a block here because CommandLineRunner returns after the execution of the code // ... and make the code run 1 day. Mono.just("That's the end!") .delayElement(Duration.ofDays(1)) .block(); }; }
@Override public Flux<JsonNode> apply(final Flux<ServerSentEvent<JsonNode>> aFlux) { return aFlux.filter(evt -> evt.data().isPresent()) .filter(evt -> evt.event() .map(evtType -> "data".equals(evtType) || "patch".equals(evtType) || "error".equals(evtType)) .orElse(FALSE)) .map(new Function<ServerSentEvent<JsonNode>, JsonNode>() { private JsonNode current; @Override public JsonNode apply(final ServerSentEvent<JsonNode> aEvent) { String type = aEvent.event().get(); switch (type) { case "data": current = aEvent.data().get(); break; case "patch": // current = JsonPatch.apply(aEvent.data().get(), current); current = aEvent.data().get(); break; case "error": aEvent.data() .ifPresent(data -> { throw new RuntimeException("received an error! " + data); }); break; default: throw new IllegalArgumentException("Unknown type: " + type); } return current; } }); }
@GetMapping("/sse/event") Flux<ServerSentEvent<String>> event() { return Flux .interval(Duration.ofSeconds(1)) .map(l -> ServerSentEvent .builder("foo\nbar") .comment("bar\nbaz") .id(Long.toString(l)) .build()); }
@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) { return Flux.from(eventStore) .filter(event -> event.getInstance().equals(InstanceId.of(id))) .flatMap(event -> registry.getInstance(event.getInstance())) .map(event -> ServerSentEvent.builder(event).build()) .mergeWith(ping()); }
@GetMapping(path = "/applications", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<Application>> applicationsStream() { return Flux.from(eventPublisher) .flatMap(event -> registry.getInstance(event.getInstance())) .map(this::getApplicationForInstance) .flatMap(group -> toApplication(group.getT1(), group.getT2())) .map(application -> ServerSentEvent.builder(application).build()) .mergeWith(ping()); }
@PostMapping("/sse/receive/{val}") public void receive(@PathVariable("val") String s) { replayProcessor.onNext(ServerSentEvent.builder(s).build()); }
@GetMapping("/sse/send") public Flux<ServerSentEvent<String>> send() { return replayProcessor.log("playground"); }
@GetMapping(path = "/instances/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<InstanceEvent>> eventStream() { return Flux.from(eventStore).map(event -> ServerSentEvent.builder(event).build()).mergeWith(ping()); }
@SuppressWarnings("unchecked") private static <T> Flux<ServerSentEvent<T>> ping() { return (Flux<ServerSentEvent<T>>) (Flux) PING_FLUX; }