Java 类org.springframework.util.concurrent.ListenableFuture 实例源码

项目:mtgo-best-bot    文件:LifecycleEventSender.java   
public void send(String topic, LifecycleEvent message) {
    // the KafkaTemplate provides asynchronous send methods returning a Future
    ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);

    // register a callback with the listener to receive the result of the send asynchronously
    future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

        @Override
        public void onSuccess(SendResult<String, Object> result) {
            LOGGER.info("sent message='{}' with offset={}", message,
                    result.getRecordMetadata().offset());
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.error("unable to send message='{}'", message, ex);
        }
    });
}
项目:Spring-5.0-Cookbook    文件:DeptAsyncController.java   
@GetMapping(value="/webSyncDept/{id}.json", produces ="application/json", headers = {"Accept=text/xml, application/json"})
public WebAsyncTask<Department> websyncDeptList(@PathVariable("id") Integer id){

    Callable<Department> callable = new Callable<Department>() {
        public Department call() throws Exception {

             ListenableFuture<Department> listenFuture = departmentServiceImpl.findAllFirstById(id);
             listenFuture.addCallback(new ListenableFutureCallback<Department>(){

                @Override
                public void onSuccess(Department dept) {
                    result = dept;
                }

                @Override
                public void onFailure(Throwable arg0) {
                    result = new Department();
                }

             });
             return result;
          }
    };
    return new WebAsyncTask<Department>(500, callable);
}
项目:dhus-core    文件:FairThreadPoolTaskExecutor.java   
@Override
public ListenableFuture<?> submitListenable (Runnable task)
{
   ExecutorService executor = getThreadPoolExecutor ();
   try
   {
      ListenableFutureTask<Object> future =
         new ListenableFutureTask<Object> (task, null);
      executor.execute (future);
      return future;
   }
   catch (RejectedExecutionException ex)
   {
      throw new TaskRejectedException ("Executor [" + executor +
         "] did not accept task: " + task, ex);
   }
}
项目:dhus-core    文件:FairThreadPoolTaskExecutor.java   
@Override
public <T> ListenableFuture<T> submitListenable (Callable<T> task)
{
   ExecutorService executor = getThreadPoolExecutor ();
   try
   {
      ListenableFutureTask<T> future = new ListenableFutureTask<T> (task);
      executor.execute (future);
      return future;
   }
   catch (RejectedExecutionException ex)
   {
      throw new TaskRejectedException ("Executor [" + executor +
         "] did not accept task: " + task, ex);
   }
}
项目:ReactiveTest    文件:SpringTobyTv009Application.java   
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    DeferredResult<String> dr = new DeferredResult<String>();

    ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "h" + idx);
    f1.addCallback(s -> {

        ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
        f2.addCallback(s2 -> {
            ListenableFuture<String> f3 = myService.work(s2.getBody());
            f3.addCallback(s3 -> {
                dr.setResult(s3);
            }, e3 -> {
                dr.setErrorResult(e3.getMessage());
            });
        }, e2 -> {
            dr.setErrorResult(e2.getMessage());
        });
    }, e -> {
        dr.setErrorResult(e.getMessage());
    });
    return dr;
}
项目:ReactiveTest    文件:SpringTobyTv010Application.java   
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
    DeferredResult<String> dr = new DeferredResult<String>();

    ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity(URL1, String.class, "h" + idx);
    f1.addCallback(s -> {

        ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity(URL2, String.class, s.getBody());
        f2.addCallback(s2 -> {
            ListenableFuture<String> f3 = myService.work(s2.getBody());
            f3.addCallback(s3 -> {
                dr.setResult(s3);
            }, e3 -> {
                dr.setErrorResult(e3.getMessage());
            });
        }, e2 -> {
            dr.setErrorResult(e2.getMessage());
        });
    }, e -> {
        dr.setErrorResult(e.getMessage());
    });
    return dr;
}
项目:IPPR2016    文件:Caller.java   
/**
 * Method to create http requests to services
 */
public default <I, O> ListenableFuture<ResponseEntity<O>> createRequest(final URIBuilder uri,
    final HttpMethod method, final I body, final Class<O> returnClazz, final HttpHeaders header) {
  final AsyncRestTemplate restTemplate = new AsyncRestTemplate();
  restTemplate.setErrorHandler(new DefaultResponseErrorHandler());
  if (header != null) {
    header.setContentType(MediaType.APPLICATION_JSON);
  }

  HttpEntity<?> entity;
  if (body != null) {
    entity = new HttpEntity<I>(body, header);
  } else {
    entity = new HttpEntity<String>(null, header);
  }

  return restTemplate.exchange(uri.toString(), method, entity, returnClazz);
}
项目:IPPR2016    文件:ProcessEngineCallerImpl.java   
@Async
public Future<ProcessStateDTO> getProcessState(final Long piId) throws URISyntaxException {
  final CompletableFuture<ProcessStateDTO> future = new CompletableFuture<>();

  final URIBuilder uri =
      new URIBuilder(gatewayConfig.getProcessEngineAddress()).setPath("processes/state/" + piId);

  final ListenableFuture<ResponseEntity<ProcessStateDTO>> responseFuture =
      createRequest(uri, HttpMethod.GET, null, ProcessStateDTO.class, null);

  responseFuture.addCallback(result -> {
    final List<UserContainer> container = Lists.newArrayList(result.getBody().getSubjects());
    getUser(container);
    future.complete(result.getBody());
  }, error -> {
    future.completeExceptionally(error);
  });

  return future;
}
项目:IPPR2016    文件:ProcessEngineCallerImpl.java   
@Async
public Future<List<ProcessInfoDTO>> getProcessesInfoOfState(final String state, final int page,
    final int size) throws URISyntaxException {
  final CompletableFuture<List<ProcessInfoDTO>> future = new CompletableFuture<>();

  final URIBuilder uri =
      new URIBuilder(gatewayConfig.getProcessEngineAddress()).setPath("processes/" + state)
          .addParameter("page", String.valueOf(page)).addParameter("size", String.valueOf(size));

  final ListenableFuture<ResponseEntity<ProcessInfoDTO[]>> responseFuture =
      createRequest(uri, HttpMethod.GET, null, ProcessInfoDTO[].class, null);

  appendUserInformation(responseFuture, future);

  return future;
}
项目:lams    文件:SimpleBufferingAsyncClientHttpRequest.java   
@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(
        final HttpHeaders headers, final byte[] bufferedOutput) throws IOException {

    return this.taskExecutor.submitListenable(new Callable<ClientHttpResponse>() {
        @Override
        public ClientHttpResponse call() throws Exception {
            for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
                String headerName = entry.getKey();
                for (String headerValue : entry.getValue()) {
                    connection.addRequestProperty(headerName, headerValue);
                }
            }
            if (connection.getDoOutput() && outputStreaming) {
                connection.setFixedLengthStreamingMode(bufferedOutput.length);
            }
            connection.connect();
            if (connection.getDoOutput()) {
                FileCopyUtils.copy(bufferedOutput, connection.getOutputStream());
            }
            return new SimpleClientHttpResponse(connection);
        }
    });
}
项目:sctalk    文件:MessageServerCluster.java   
/**
 * 查询用户在线状态
 * 
 * @param fromUserId 用户ID
 * @param userIdList 查询列表
 * @return
 * @since  1.0
 */
@Async
public ListenableFuture<List<IMBaseDefine.UserStat>> userStatusReq(Long fromUserId, List<Long> userIdList) {

    logger.debug("查询用户在线状态, user_cnt={}", userIdList.size());

    List<IMBaseDefine.UserStat> userStatList = new ArrayList<>();
    for (Long userId: userIdList) {

        UserClientInfoManager.UserClientInfo userClientInfo = userClientInfoManager.getUserInfo(userId);
        IMBaseDefine.UserStat.Builder userStatBuiler = IMBaseDefine.UserStat.newBuilder();
        userStatBuiler.setUserId(userId);
        if (userClientInfo != null) {
            userStatBuiler.setStatus(userClientInfo.getStatus());
        } else {
            userStatBuiler.setStatus(IMBaseDefine.UserStatType.USER_STATUS_OFFLINE);
        }

        userStatList.add(userStatBuiler.build());
    }

    AsyncResult<List<IMBaseDefine.UserStat>> result = new AsyncResult<>(userStatList);
    return result;
}
项目:spring-cloud-gcp    文件:PubSubTemplateTests.java   
@Test
public void testSend_onFailure() {
    ListenableFuture<String> future =
            this.pubSubTemplate.publish("testTopic", this.pubsubMessage);
    this.settableApiFuture.setException(new Exception("future failed."));

    try {
        future.get();
        fail("Test should fail.");
    }
    catch (InterruptedException ie) {
        fail("get() should fail with an ExecutionException.");
    }
    catch (ExecutionException ee) {
        assertEquals("future failed.", ee.getCause().getMessage());
    }
}
项目:lams    文件:AsyncRestTemplate.java   
/**
 * Execute the given method on the provided URI. The
 * {@link org.springframework.http.client.ClientHttpRequest}
 * is processed using the {@link RequestCallback}; the response with
 * the {@link ResponseExtractor}.
 * @param url the fully-expanded URL to connect to
 * @param method the HTTP method to execute (GET, POST, etc.)
 * @param requestCallback object that prepares the request (can be {@code null})
 * @param responseExtractor object that extracts the return value from the response (can
 * be {@code null})
 * @return an arbitrary object, as returned by the {@link ResponseExtractor}
 */
protected <T> ListenableFuture<T> doExecute(URI url, HttpMethod method, AsyncRequestCallback requestCallback,
        ResponseExtractor<T> responseExtractor) throws RestClientException {

    Assert.notNull(url, "'url' must not be null");
    Assert.notNull(method, "'method' must not be null");
    try {
        AsyncClientHttpRequest request = createAsyncRequest(url, method);
        if (requestCallback != null) {
            requestCallback.doWithRequest(request);
        }
        ListenableFuture<ClientHttpResponse> responseFuture = request.executeAsync();
        return new ResponseExtractorFuture<T>(method, url, responseFuture,
                responseExtractor);
    }
    catch (IOException ex) {
        throw new ResourceAccessException("I/O error on " + method.name() +
                " request for \"" + url + "\":" + ex.getMessage(), ex);
    }
}
项目:Spring-5.0-Cookbook    文件:SendAsyncEventLogin.java   
public DeferredResult<LoginDetails> send(String content) {
    System.out.println("send request");
    final DeferredResult<LoginDetails> response = new DeferredResult<>();
    ListenableFuture<LoginDetails> future = asyncRabbitTemplate.convertSendAndReceive(requestQueue.getName(),
            content);
    future.addCallback(new LoginHandlerResponse(response));

    System.out.println(asyncRabbitTemplate.isAutoStartup());
    System.out.println(asyncRabbitTemplate.isRunning());

    return response;
}
项目:Spring-5.0-Cookbook    文件:SendAsyncLogin.java   
public DeferredResult<LoginDetails> send(String content) {  
    System.out.println("send request");
    final DeferredResult<LoginDetails> response = new DeferredResult<>();    
    ListenableFuture<LoginDetails> future =
            asyncRabbitTemplate.convertSendAndReceive(exchange.getName(), "packt.async", content);
    future.addCallback(new LoginHandlerResponse(response));

    System.out.println(asyncRabbitTemplate.isAutoStartup());
    System.out.println(asyncRabbitTemplate.isRunning());

    return response;
}
项目:sctalk    文件:IMBuddyListHandlerImpl.java   
@Override
public void userStatusReq(IMHeader header, MessageLite body, ChannelHandlerContext ctx) {

    // CID_BUDDY_LIST_USERS_STATUS_REQUEST
    logger.debug("Send the users status request to router");
    long userId = super.getUserId(ctx);

    IMUsersStatReq usersStatReq = (IMUsersStatReq) body;

    ListenableFuture<List<IMBaseDefine.UserStat>> userStatFuture =
            messageServerCluster.userStatusReq(userId, usersStatReq.getUserIdListList());
    userStatFuture.addCallback((List<IMBaseDefine.UserStat> userStatList) -> {
        // 查询用户状态后处理
        IMBuddy.IMUsersStatRsp.Builder userStatRes = IMBuddy.IMUsersStatRsp.newBuilder();
        userStatRes.addAllUserStatList(userStatList);
        userStatRes.setUserId(userId);
        userStatRes.setAttachData(usersStatReq.getAttachData());

        IMHeader headerRes = header.clone();
        headerRes.setCommandId(
                (short) BuddyListCmdID.CID_BUDDY_LIST_USERS_STATUS_RESPONSE_VALUE);
        ctx.writeAndFlush(new IMProtoMessage<>(headerRes, userStatRes.build()));
    }, (Throwable e) -> {
        // 异常处理
        logger.warn("处理推送异常", e);
    });
}
项目:java-spring-web    文件:TracingAsyncRestTemplateTest.java   
public TracingAsyncRestTemplateTest() {
    final AsyncRestTemplate restTemplate = new AsyncRestTemplate();
    restTemplate.setInterceptors(Collections.<AsyncClientHttpRequestInterceptor>singletonList(
            new TracingAsyncRestTemplateInterceptor(mockTracer,
                    Collections.<RestTemplateSpanDecorator>singletonList(new RestTemplateSpanDecorator.StandardTags()))));

    client = new Client<AsyncRestTemplate>() {
        @Override
        public <T> ResponseEntity<T> getForEntity(String url, Class<T> clazz) {
            ListenableFuture<ResponseEntity<T>> forEntity = restTemplate.getForEntity(url, clazz);
            try {
                return forEntity.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                Assert.fail();
            }
            return null;
        }

        @Override
        public AsyncRestTemplate template() {
            return restTemplate;
        }
    };

    mockServer = MockRestServiceServer.bindTo(client.template()).ignoreExpectOrder(true).build();
}
项目:ReactiveTest    文件:Spring5Application.java   
@Bean
ApplicationRunner run() {
    return args -> {
        log.debug("run");
        ListenableFuture<String> s = myService.hello();
        s.addCallback(a -> log.debug(a), e -> log.debug(e.getMessage()));
        log.debug("exit");
    };

}
项目:lams    文件:AbstractBufferingAsyncClientHttpRequest.java   
@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(HttpHeaders headers) throws IOException {
    byte[] bytes = this.bufferedOutput.toByteArray();
    if (headers.getContentLength() == -1) {
        headers.setContentLength(bytes.length);
    }
    ListenableFuture<ClientHttpResponse> result = executeInternal(headers, bytes);
    this.bufferedOutput = null;
    return result;
}
项目:IPPR2016    文件:ProcessEngineCallerImpl.java   
@Async
private <T extends UserContainer> void appendUserInformation(
    final ListenableFuture<ResponseEntity<T[]>> responseFuture,
    final CompletableFuture<List<T>> future) {

  responseFuture.addCallback(result -> {
    final List<T> entries = Lists.newArrayList(result.getBody());
    getUser(Lists.newArrayList(result.getBody()));
    future.complete(entries);
  }, error -> future.completeExceptionally(error));
}
项目:lams    文件:ThreadPoolTaskExecutor.java   
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
    ExecutorService executor = getThreadPoolExecutor();
    try {
        ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
        executor.execute(future);
        return future;
    }
    catch (RejectedExecutionException ex) {
        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
}
项目:participationSystem3b    文件:KafkaProducer.java   
public void send(String topic, String data) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            logger.info("Success on sending message \"" + data + "\" to topic " + topic);
        }

        @Override
        public void onFailure(Throwable ex) {
            logger.error("Error on sending message \"" + data + "\", stacktrace " + ex.getMessage());
        }
    });
}
项目:lams    文件:ThreadPoolTaskScheduler.java   
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
    ExecutorService executor = getScheduledExecutor();
    try {
        ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
        executor.execute(errorHandlingTask(future, false));
        return future;
    }
    catch (RejectedExecutionException ex) {
        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
}
项目:lams    文件:ThreadPoolTaskScheduler.java   
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
    ExecutorService executor = getScheduledExecutor();
    try {
        ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
        executor.execute(errorHandlingTask(future, false));
        return future;
    }
    catch (RejectedExecutionException ex) {
        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
    }
}
项目:spring-cloud-gcp    文件:PubSubTemplate.java   
@Override
public ListenableFuture<String> publish(final String topic, PubsubMessage pubsubMessage) {
    ApiFuture<String> publishFuture =
            this.publisherFactory.createPublisher(topic).publish(pubsubMessage);

    final SettableListenableFuture<String> settableFuture = new SettableListenableFuture<>();
    ApiFutures.addCallback(publishFuture, new ApiFutureCallback<String>() {

        @Override
        public void onFailure(Throwable throwable) {
            LOGGER.warn("Publishing to " + topic + " topic failed.", throwable);
            settableFuture.setException(throwable);
        }

        @Override
        public void onSuccess(String result) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(
                        "Publishing to " + topic + " was successful. Message ID: " + result);
            }
            settableFuture.set(result);
        }

    });

    return settableFuture;
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Object... uriVariables)
        throws RestClientException {

    AsyncRequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
    ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
    return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType,
        Map<String, ?> urlVariables) throws RestClientException {

    AsyncRequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
    ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
    return execute(url, HttpMethod.GET, requestCallback, responseExtractor, urlVariables);
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request,
        Map<String, ?> uriVariables) throws RestClientException {
    AsyncRequestCallback requestCallback = httpEntityCallback(request);
    ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
    ListenableFuture<HttpHeaders> headersFuture =
            execute(url, HttpMethod.POST, requestCallback, headersExtractor,
                    uriVariables);
    return extractLocationHeader(headersFuture);
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public ListenableFuture<URI> postForLocation(URI url, HttpEntity<?> request)
        throws RestClientException {
    AsyncRequestCallback requestCallback = httpEntityCallback(request);
    ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
    ListenableFuture<HttpHeaders> headersFuture =
            execute(url, HttpMethod.POST, requestCallback, headersExtractor);
    return extractLocationHeader(headersFuture);
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public <T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
        HttpEntity<?> requestEntity, Class<T> responseType, Object... uriVariables)
        throws RestClientException {
    AsyncRequestCallback requestCallback =
            httpEntityCallback(requestEntity, responseType);
    ResponseExtractor<ResponseEntity<T>> responseExtractor =
            responseEntityExtractor(responseType);
    return execute(url, method, requestCallback, responseExtractor, uriVariables);
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public <T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
        HttpEntity<?> requestEntity, Class<T> responseType,
        Map<String, ?> uriVariables) throws RestClientException {
    AsyncRequestCallback requestCallback =
            httpEntityCallback(requestEntity, responseType);
    ResponseExtractor<ResponseEntity<T>> responseExtractor =
            responseEntityExtractor(responseType);
    return execute(url, method, requestCallback, responseExtractor, uriVariables);
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public <T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
        HttpEntity<?> requestEntity, ParameterizedTypeReference<T> responseType,
        Object... uriVariables) throws RestClientException {
    Type type = responseType.getType();
    AsyncRequestCallback requestCallback = httpEntityCallback(requestEntity, type);
    ResponseExtractor<ResponseEntity<T>> responseExtractor =
            responseEntityExtractor(type);
    return execute(url, method, requestCallback, responseExtractor, uriVariables);
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public <T> ListenableFuture<ResponseEntity<T>> exchange(URI url, HttpMethod method,
        HttpEntity<?> requestEntity, ParameterizedTypeReference<T> responseType)
        throws RestClientException {
    Type type = responseType.getType();
    AsyncRequestCallback requestCallback = httpEntityCallback(requestEntity, type);
    ResponseExtractor<ResponseEntity<T>> responseExtractor =
            responseEntityExtractor(type);
    return execute(url, method, requestCallback, responseExtractor);
}
项目:lams    文件:AsyncRestTemplate.java   
@Override
public <T> ListenableFuture<T> execute(String url, HttpMethod method,
        AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor,
        Object... urlVariables) throws RestClientException {

    URI expanded = new UriTemplate(url).expand(urlVariables);
    return doExecute(expanded, method, requestCallback, responseExtractor);
}
项目:participationSystem3a    文件:KafkaProducerImpl.java   
public void send(String topic, String data) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            logger.info("Success on sending message \"" + data + "\" to topic " + topic);
        }

        @Override
        public void onFailure(Throwable ex) {
            logger.error("Error on sending message \"" + data + "\", stacktrace " + ex.getMessage());
        }
    });
}
项目:lams    文件:AsyncRestTemplate.java   
public ResponseExtractorFuture(HttpMethod method, URI url,
        ListenableFuture<ClientHttpResponse> clientHttpResponseFuture,
        ResponseExtractor<T> responseExtractor) {
    super(clientHttpResponseFuture);
    this.method = method;
    this.url = url;
    this.responseExtractor = responseExtractor;
}
项目:dashboard1b    文件:KafkaProducer.java   
public void send(String topic, String data) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            logger.info(result);
            logger.info("Success on sending message \"" + data + "\" to topic " + topic);
        }

        @Override
        public void onFailure(Throwable ex) {
            logger.error("Error on sending message \"" + data + "\", stacktrace " + ex.getMessage());
        }
    });
}
项目:dashboard1b    文件:KafkaProducer.java   
/**
 * Metodo funndamental para la informacion en Kafka.
 * @param topic Topico de Kafka.
 * @param data Datos a enviar.
 */
public void send(String topic, String data) {
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data); //Valido por si solo para el manejo de la informacion.
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            logger.info("Success on sending message \"" + data + "\" to topic " + topic);
        }

        @Override
        public void onFailure(Throwable ex) {
            logger.error("Error on sending message \"" + data + "\", stacktrace " + ex.getMessage());
        }
    });
}
项目:spring-cloud-gcp    文件:PubSubTemplate.java   
@Override
public ListenableFuture<String> publish(final String topic, String payload,
        Map<String, String> headers, Charset charset) {
    return publish(topic, payload.getBytes(charset), headers);
}
项目:Spring-5.0-Cookbook    文件:DepartmentServiceImpl.java   
@Override
public ListenableFuture<Department> findAllFirstById(Integer id) {
    return departmentRepository.findDeptById(id);
}