@RequestMapping(value = "/{job:.*}/logStream", method = GET, produces = MediaType.APPLICATION_JSON_VALUE) public ResponseBodyEmitter getJobLogStream(@PathVariable("job") String job) { JobInstance ji = jobsManager.getJob(job); ExtendedAssert.notFound(ji, "Job was not found by id: " + job); ResponseBodyEmitter emitter = new ResponseBodyEmitter(TimeUnit.MINUTES.toMillis(10L)); JobEventConsumer consumer = new JobEventConsumer(this.jobsManager, emitter, ji); ji.atEnd().addListener(() -> { // it need for job which finish before request emitter.complete(); }, ExecutorUtils.DIRECT); // TODO we may want to consume history, also. Subscription subs = jobsManager.getSubscriptions().openSubscriptionOnKey(consumer, ji.getInfo()); emitter.onCompletion(() -> { // Emitter not invoke this at client disconnect, // may be it will be fix in future versions subs.close(); }); return emitter; }
private void fillEmitterByBamTrack(final Track<Read> track, final BamQueryOption options, ResponseBodyEmitter emitter) throws IOException { final BamTrackEmitter bamTrackEmitter = new BamTrackEmitter(emitter); // TODO: track.getEndIndex() - track.getStartIndex() > maxCoverageRange if (options.getMode() == BamTrackMode.REGIONS) { taskExecutorService.executeTrackTask( bamTrackEmitter, SEQUENTIAL, () -> bamTrackEmitter.writeTrackAndFinish(bamHelper.getRegionsFromFile(track)) ); } else { taskExecutorService.executeTrackTask( bamTrackEmitter, ASYNC, () -> bamHelper.getReadsFromFile(track, options, bamTrackEmitter) ); } }
private void fillEmitterByBamTrackFromURL(final Track<Read> track, String bamUrl, String indexUrl, final BamQueryOption options, ResponseBodyEmitter emitter) throws IOException { final BamTrackEmitter bamTrackEmitter = new BamTrackEmitter(emitter); if (track.getEndIndex() - track.getStartIndex() > maxCoverageRange) { taskExecutorService.executeTrackTask( bamTrackEmitter, SEQUENTIAL, () -> bamTrackEmitter.writeTrackAndFinish(bamHelper.getRegionsFromUrl(track, bamUrl, indexUrl)) ); } else { taskExecutorService.executeTrackTask( bamTrackEmitter, ASYNC, () -> bamHelper.fillEmitterByReadsFromUrl(track, bamUrl, indexUrl, options, bamTrackEmitter) ); } }
@RequestMapping(value = "/ws/event") @ResponseBody public ResponseEntity<ResponseBodyEmitter> handle() { ResponseBodyEmitter emitter = new ResponseBodyEmitter(); executorService.execute(() -> { try { for (int i = 0 ; i<10 ;i++) { emitter.send("event: datetime\n"); emitter.send("data: " + LocalDateTime.now().toString() + "\n\n"); Thread.sleep(1000); } emitter.complete(); } catch (Exception e) { LOG.error(e.getMessage(), e); } }); return ResponseEntity.ok().contentType(MediaType.parseMediaType("text/event-stream")).body(emitter); }
public ResponseBodyEmitterObserver(MediaType mediaType, Observable<T> observable, ResponseBodyEmitter responseBodyEmitter) { this.mediaType = mediaType; this.responseBodyEmitter = responseBodyEmitter; this.responseBodyEmitter.onTimeout(this); this.responseBodyEmitter.onCompletion(this); observable.subscribe(this); }
@RequestMapping(value = "/clusters/{cluster}/containers/update", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) public ResponseBodyEmitter update(@PathVariable("cluster") String cluster, @RequestBody UiUpdateContainers req) { log.info("got scale update request: {}", req); JobParameters params = createParametersString(cluster, req); ResponseBodyEmitter emitter = new ResponseBodyEmitter(TimeUnit.MINUTES.toMillis(10L)); JobInstance jobInstance = jobsManager.create(params); JobApi.JobEventConsumer consumer = new JobApi.JobEventConsumer(this.jobsManager, emitter, jobInstance); jobsManager.getSubscriptions().subscribeOnKey(consumer, jobInstance.getInfo()); log.info("Try start job: {}", params); jobInstance.start(); return emitter; }
/** * Returns {@code Track} filled with BAM data from a specified BAM file in the server's file system * @param track input track * @param option BAM track options * @param emitter where to write data * @throws IOException on resource reading errors */ public void sendBamTrackToEmitter(final Track<Read> track, BamQueryOption option, ResponseBodyEmitter emitter) throws IOException { final Chromosome chromosome = trackHelper.validateTrack(track); BamQueryOption currentOptions = option == null ? new BamQueryOption() : option; BamUtil.validateOptions(currentOptions, chromosome); fillEmitterByBamTrack(track, currentOptions, emitter); }
public BamTrackEmitter(ResponseBodyEmitter emitter) throws IOException { this.emitter = emitter; this.stringBuffer = new StringBuilder(BUFFER_SIZE); this.jsonMapper = new JsonMapper(); writeHeader(); }
public ResponseBodyEmitterSubscriber(HttpHeaders request, MediaType mediaType, Publisher<?> observable, ResponseBodyEmitter responseBodyEmitter, boolean json) { this.request = request; this.mediaType = mediaType; this.responseBodyEmitter = responseBodyEmitter; this.json = json; this.responseBodyEmitter.onTimeout(new Timeout()); this.responseBodyEmitter.onCompletion(new Complete()); this.single = observable instanceof Mono; observable.subscribe(this); }
private ResponseBodyEmitter getEmitter(Long timeout, Publisher<?> flux, MediaType mediaType, HttpHeaders request) { Publisher<?> exported = flux instanceof Mono ? Mono.from(flux) : Flux.from(flux).timeout(Duration.ofMillis(timeout), Flux.empty()); if (!MediaType.ALL.equals(mediaType) && EVENT_STREAM.isCompatibleWith(mediaType)) { // TODO: more subtle content negotiation return new FluxResponseSseEmitter(request, MediaType.APPLICATION_JSON, exported); } return new FluxResponseBodyEmitter(request, mediaType, exported); }
public JobEventConsumer(JobsManager jobsManager, ResponseBodyEmitter emitter, JobInstance jobInstance) { this.jobsManager = jobsManager; this.emitter = emitter; this.jobInstance = jobInstance; }
@RequestMapping(value = "/bam/track/get", method = RequestMethod.POST) @ApiOperation( value = "Returns data (chunked) matching the given query to fill in a bam track. Returns all information " + "about reads.", notes = NOTES + "<br/>option:<br/>" + "all the following params are <b>optional</b>, if any of the params is incorrect, " + "it will be set to default value:<br/><br/>" + "1) <b>trackDirection</b> - can be 'LEFT', 'MIDDLE' or 'RIGHT', responsible for direction <br/>" + "default - MIDDLE" + "2) <b>showClipping</b> - if true handles the track with soft clipping, default is false;<br/>" + "3) <b>showSpliceJunction</b> - return a information about splice junction, " + "default is false;<br/>" + "4) <b>frame</b> - size of frame for downsampling, default is null;<br/>" + "5) <b>count</b> - count of read in frame, default is null ;<br/>" + "if frame or count default or incorrect, return track without downsampling<br/>" + "6) <b>mode</b> controls BAM display mode: REGIONS - return only regions of possible read " + "location; <br/>" + "COVERAGE - return only BAM coverage;<br/>" + "FULL - return both reads and coverage", produces = MediaType.APPLICATION_JSON_VALUE) @ApiResponses( value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION) }) public final ResponseEntity<ResponseBodyEmitter> loadTrackStream( @RequestBody final TrackQuery query, @RequestParam(required = false) final String fileUrl, @RequestParam(required = false) final String indexUrl) throws IOException { final ResponseBodyEmitter emitter = new ResponseBodyEmitter(EMITTER_TIMEOUT); if (fileUrl == null) { bamManager.sendBamTrackToEmitter(convertToTrack(query), query.getOption(), emitter); } else { bamManager.sendBamTrackToEmitterFromUrl(convertToTrack(query), query.getOption(), fileUrl, indexUrl, emitter); } HttpHeaders responseHeaders = new HttpHeaders(); responseHeaders.setContentType(MediaType.APPLICATION_JSON_UTF8); return new ResponseEntity<>(emitter, responseHeaders, HttpStatus.OK); }
/** * Returns {@code Track} filled with BAM data from a specified URL * @param track input track * @param option BAM track options * @param bamUrl path to BAM file * @param indexUrl path to Bam index file * @param emitter where to write data * @throws IOException on resource reading errors */ public void sendBamTrackToEmitterFromUrl(final Track<Read> track, BamQueryOption option, String bamUrl, String indexUrl, ResponseBodyEmitter emitter) throws IOException { final Chromosome chromosome = trackHelper.validateUrlTrack(track, bamUrl, indexUrl); BamQueryOption currentOptions = option == null ? new BamQueryOption() : option; BamUtil.validateOptions(currentOptions, chromosome); fillEmitterByBamTrackFromURL(track, bamUrl, indexUrl, currentOptions, emitter); }