Java 类org.springframework.http.codec.ServerSentEvent 实例源码

项目:service-block-samples    文件:ViewController.java   
@GetMapping(value = "/projects/{projectId}/tightCouplingEvents", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<TightCouplingEvent>> streamTightCouplingEvents(@PathVariable Long projectId,
                                                                           HttpServletRequest request) {
    // Stream the events from MongoDB
    Flux<TightCouplingEvent> events = eventRepository.findByProjectId(projectId);

    // Check if this is an SSE reconnection from a client
    String lastEventId = request.getHeader("Last-Event-Id");

    // On SSE client reconnect, skip ahead in the stream to play back only new events
    if (lastEventId != null)
        events = events.skipUntil(e -> e.getId().equals(lastEventId)).skip(1);

    // Subscribe to the tailing events from the reactive repository query
    return events.map(s -> ServerSentEvent.builder(s)
            .event(s.getCreatedDate().toString())
            .id(s.getId())
            .build())
            .delayElements(Duration.ofMillis(100));
}
项目:streamdataio-spring-webflux    文件:StreamdataioSpringWebfluxApplication.java   
@Bean
public CommandLineRunner myCommandLineRunner() {
    return args -> {
        String api = "http://stockmarket.streamdata.io/prices";
        String token = "[YOUR TOKEN HERE]";

        // If ever you want to pass some headers to your API
        // specify a header map associated with the request
        Map<String,String> headers = new HashMap<>();
        // add this header if you wish to stream Json rather than Json-patch
        // NOTE: no 'patch' event will be emitted.
        // headers.put("Accept", "application/json");
        URI streamdataUri = streamdataUri(api, token, headers);

        // source: https://github.com/spring-projects/spring-framework/blob/v5.0.0.RC1/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java
        ResolvableType type = forClassWithGenerics(ServerSentEvent.class, JsonNode.class);

        // Create the web client and the flux of events
        WebClient client = WebClient.create();
        Flux<ServerSentEvent<JsonNode>> events =
            client.get()
                  .uri(streamdataUri)
                  .accept(TEXT_EVENT_STREAM)
                  .exchange()
                  .flatMapMany(response -> response.body(toFlux(type)));

        // use of a transformer to apply the patches
        events.as(new PatchTransformer())
              // Subscribe to the flux with a consumer that applies patches
              .subscribe(System.out::println,
                         Throwable::printStackTrace);

        // Add a block here because CommandLineRunner returns after the execution of the code
        // ... and make the code run 1 day.
        Mono.just("That's the end!")
            .delayElement(Duration.ofDays(1))
            .block();
    };
}
项目:streamdataio-spring-webflux    文件:StreamdataioSpringWebfluxApplication.java   
@Override
public Flux<JsonNode> apply(final Flux<ServerSentEvent<JsonNode>> aFlux) {
    return aFlux.filter(evt -> evt.data().isPresent())
                .filter(evt -> evt.event()
                                  .map(evtType -> "data".equals(evtType)
                                      || "patch".equals(evtType)
                                      || "error".equals(evtType))
                                  .orElse(FALSE))
                .map(new Function<ServerSentEvent<JsonNode>, JsonNode>() {
                    private JsonNode current;

                    @Override
                    public JsonNode apply(final ServerSentEvent<JsonNode> aEvent) {
                        String type = aEvent.event().get();

                        switch (type) {
                            case "data":
                                current = aEvent.data().get();
                                break;

                            case "patch":
                               // current = JsonPatch.apply(aEvent.data().get(), current);
                                current = aEvent.data().get();
                                break;

                            case "error":
                                aEvent.data()
                                      .ifPresent(data -> {
                                          throw new RuntimeException("received an error! " + data);
                                      });
                                break;

                            default:
                                throw new IllegalArgumentException("Unknown type: " + type);
                        }

                        return current;
                    }
                });
}
项目:spring-reactive-playground    文件:SseController.java   
@GetMapping("/sse/event")
Flux<ServerSentEvent<String>> event() {
    return Flux
        .interval(Duration.ofSeconds(1))
        .map(l -> ServerSentEvent
                .builder("foo\nbar")
                .comment("bar\nbaz")
                .id(Long.toString(l))
                .build());
}
项目:spring-boot-admin    文件:InstancesController.java   
@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
    return Flux.from(eventStore)
               .filter(event -> event.getInstance().equals(InstanceId.of(id)))
               .flatMap(event -> registry.getInstance(event.getInstance()))
               .map(event -> ServerSentEvent.builder(event).build())
               .mergeWith(ping());
}
项目:spring-boot-admin    文件:ApplicationsController.java   
@GetMapping(path = "/applications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Application>> applicationsStream() {
    return Flux.from(eventPublisher)
               .flatMap(event -> registry.getInstance(event.getInstance()))
               .map(this::getApplicationForInstance)
               .flatMap(group -> toApplication(group.getT1(), group.getT2()))
               .map(application -> ServerSentEvent.builder(application).build())
               .mergeWith(ping());
}
项目:spring-reactive-playground    文件:SseController.java   
@PostMapping("/sse/receive/{val}")
public void receive(@PathVariable("val") String s) {
    replayProcessor.onNext(ServerSentEvent.builder(s).build());
}
项目:spring-reactive-playground    文件:SseController.java   
@GetMapping("/sse/send")
public Flux<ServerSentEvent<String>> send() {
    return replayProcessor.log("playground");
}
项目:spring-boot-admin    文件:InstancesController.java   
@GetMapping(path = "/instances/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<InstanceEvent>> eventStream() {
    return Flux.from(eventStore).map(event -> ServerSentEvent.builder(event).build()).mergeWith(ping());
}
项目:spring-boot-admin    文件:InstancesController.java   
@SuppressWarnings("unchecked")
private static <T> Flux<ServerSentEvent<T>> ping() {
    return (Flux<ServerSentEvent<T>>) (Flux) PING_FLUX;
}
项目:spring-boot-admin    文件:ApplicationsController.java   
@SuppressWarnings("unchecked")
private static <T> Flux<ServerSentEvent<T>> ping() {
    return (Flux<ServerSentEvent<T>>) (Flux) PING_FLUX;
}