@Test public void testNullResponse() throws InterruptedException, ExecutionException { Endpoint endpoint = ServiceEndpoints.toEndpoint(this); Map<String, JsonRpcMethod> methods = ServiceEndpoints.getSupportedMethods(LanguageServer.class); MessageJsonHandler handler = new MessageJsonHandler(methods); List<Message> msgs = new ArrayList<>(); MessageConsumer consumer = (message) -> { msgs.add(message); }; RemoteEndpoint re = new RemoteEndpoint(consumer, endpoint); RequestMessage request = new RequestMessage(); request.setId("1"); request.setMethod("shutdown"); re.consume(request); Assert.assertEquals("{\"jsonrpc\":\"2.0\",\"id\":\"1\",\"result\":null}", handler.serialize(msgs.get(0))); msgs.clear(); shutdownReturn = new Object(); re.consume(request); Assert.assertEquals("{\"jsonrpc\":\"2.0\",\"id\":\"1\",\"result\":{}}", handler.serialize(msgs.get(0))); }
/** * Create a new Launcher for a given local service object, a given remote interface and an input and output stream. * Threads are started with the given executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation and tracing can be included. * The {@code configureGson} function can be used to register additional type adapters in the {@link GsonBuilder} * in order to support protocol classes that cannot be handled by Gson's reflective capabilities. * * @param localService - an object on which classes RPC methods are looked up * @param remoteInterface - an interface on which RPC methods are looked up * @param in - inputstream to listen for incoming messages * @param out - outputstream to send outgoing messages * @param executorService - the executor service used to start threads * @param wrapper - a function for plugging in additional message consumers * @param configureGson - a function for Gson configuration */ static <T> DebugLauncher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper, Consumer<GsonBuilder> configureGson) { Map<String, JsonRpcMethod> supportedMethods = new LinkedHashMap<String, JsonRpcMethod>(); supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(remoteInterface)); if (localService instanceof JsonRpcMethodProvider) { JsonRpcMethodProvider rpcMethodProvider = (JsonRpcMethodProvider) localService; supportedMethods.putAll(rpcMethodProvider.supportedMethods()); } else { supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(localService.getClass())); } MessageJsonHandler jsonHandler = new DebugMessageJsonHandler(supportedMethods, configureGson); MessageConsumer outGoingMessageStream = new StreamMessageConsumer(out, jsonHandler); outGoingMessageStream = wrapper.apply(outGoingMessageStream); RemoteEndpoint serverEndpoint = new RemoteEndpoint(outGoingMessageStream, ServiceEndpoints.toEndpoint(localService)); jsonHandler.setMethodProvider(serverEndpoint); // wrap incoming message stream MessageConsumer messageConsumer = wrapper.apply(serverEndpoint); StreamMessageProducer reader = new StreamMessageProducer(in, jsonHandler); T remoteProxy = ServiceEndpoints.toServiceObject(serverEndpoint, remoteInterface); return new DebugLauncher<T> () { @Override public Future<?> startListening() { return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, executorService); } @Override public T getRemoteProxy() { return remoteProxy; } }; }
public static Future<?> startProcessing(MessageProducer messageProducer, MessageConsumer messageConsumer, ExecutorService executorService) { ConcurrentMessageProcessor reader = new ConcurrentMessageProcessor(messageProducer, messageConsumer); final Future<?> result = executorService.submit(reader); return new Future<Object>() { @Override public Object get() throws InterruptedException, ExecutionException { return result.get(); } @Override public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return result.get(timeout, unit); } @Override public boolean isDone() { return result.isDone(); } @Override public boolean cancel(boolean mayInterruptIfRunning) { if (mayInterruptIfRunning && messageProducer instanceof Closeable) { try { ((Closeable) messageProducer).close(); } catch (IOException e) { throw new RuntimeException(e); } } return result.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return result.isCancelled(); } }; }
static <T> Launcher<T> createSocketLauncher(Object localService, Class<T> remoteInterface, SocketAddress socketAddress, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) throws IOException { AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open().bind(socketAddress); AsynchronousSocketChannel socketChannel; try { socketChannel = serverSocket.accept().get(); return Launcher.createIoLauncher(localService, remoteInterface, Channels.newInputStream(socketChannel), Channels.newOutputStream(socketChannel), executorService, wrapper); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return null; }
static <T> Launcher<T> createSocketLauncher(Object localService, Class<T> remoteInterface, SocketAddress socketAddress, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper, InputStream inputStream, OutputStream outputStream) throws IOException { return Launcher.createIoLauncher(localService, remoteInterface, inputStream, outputStream, executorService, wrapper); }
@Override public MessageConsumer apply(final MessageConsumer consumer) { //inject our own consumer to refresh the timestamp return message -> { lastActivityTime=System.currentTimeMillis(); consumer.consume(message); }; }
/** * Create a new Launcher for a given local service object, a given remote * interface and an input and output stream. Threads are started with the given * executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation * and tracing can be included. The {@code configureGson} function can be used * to register additional type adapters in the {@link GsonBuilder} in order to * support protocol classes that cannot be handled by Gson's reflective * capabilities. * * @param localService * - an object on which classes RPC methods are looked up * @param remoteInterface * - an interface on which RPC methods are looked up * @param in * - inputstream to listen for incoming messages * @param out * - outputstream to send outgoing messages * @param executorService * - the executor service used to start threads * @param wrapper * - a function for plugging in additional message consumers * @param configureGson * - a function for Gson configuration */ static <T> DebugLauncher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper, Consumer<GsonBuilder> configureGson) { Map<String, JsonRpcMethod> supportedMethods = new LinkedHashMap<String, JsonRpcMethod>(); supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(remoteInterface)); if (localService instanceof JsonRpcMethodProvider) { JsonRpcMethodProvider rpcMethodProvider = (JsonRpcMethodProvider) localService; supportedMethods.putAll(rpcMethodProvider.supportedMethods()); } else { supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(localService.getClass())); } MessageJsonHandler jsonHandler = new DebugMessageJsonHandler(supportedMethods, configureGson); MessageConsumer outGoingMessageStream = new StreamMessageConsumer(out, jsonHandler); outGoingMessageStream = wrapper.apply(outGoingMessageStream); RemoteEndpoint serverEndpoint = new DebugRemoteEndpoint(outGoingMessageStream, ServiceEndpoints.toEndpoint(localService)); jsonHandler.setMethodProvider(serverEndpoint); // wrap incoming message stream MessageConsumer messageConsumer = wrapper.apply(serverEndpoint); StreamMessageProducer reader = new StreamMessageProducer(in, jsonHandler); T remoteProxy = ServiceEndpoints.toServiceObject(serverEndpoint, remoteInterface); return new DebugLauncher<T>() { @Override public Future<?> startListening() { return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, executorService); } @Override public T getRemoteProxy() { return remoteProxy; } }; }
public static Launcher<LanguageClient> createServerLauncher(LanguageServer server, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) { return Launcher.createLauncher(server, LanguageClient.class, in, out, executorService, wrapper); }
public static Launcher<LanguageServer> createClientLauncher(LanguageClient client, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) { return Launcher.createLauncher(client, LanguageServer.class, in, out, executorService, wrapper); }
public ReflectiveMessageValidator(final MessageConsumer delegate) { this.delegate = delegate; }
public ConcurrentMessageProcessor(MessageProducer messageProducer, MessageConsumer messageConsumer) { this.messageProducer = messageProducer; this.messageConsumer = messageConsumer; }
public DebugRemoteEndpoint(MessageConsumer out, Endpoint localEndpoint) { super(out, localEndpoint); }
public DebugRemoteEndpoint(MessageConsumer out, Endpoint localEndpoint, Function<Throwable, ResponseError> exceptionHandler) { super(out, localEndpoint, exceptionHandler); }
public static Launcher<IDebugProtocolClient> createServerLauncher(IDebugProtocolServer server, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) { return DebugLauncher.createLauncher(server, IDebugProtocolClient.class, in, out, executorService, wrapper); }
public static Launcher<IDebugProtocolServer> createClientLauncher(IDebugProtocolClient client, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) { return DebugLauncher.createLauncher(client, IDebugProtocolServer.class, in, out, executorService, wrapper); }
/** * Create a new Launcher for a given local service object, a given remote * interface and an input and output stream. Threads are started with the given * executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation * and tracing can be included. * * @param localService * - an object on which classes RPC methods are looked up * @param remoteInterface * - an interface on which RPC methods are looked up * @param in * - inputstream to listen for incoming messages * @param out * - outputstream to send outgoing messages * @param executorService * - the executor service used to start threads * @param wrapper * - a function for plugging in additional message consumers */ static <T> DebugLauncher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) { Consumer<GsonBuilder> configureGson = gsonBuilder -> { }; return createIoLauncher(localService, remoteInterface, in, out, executorService, wrapper, configureGson); }
/** * Create a new Launcher for a given local service object, a given remote interface and an input and output stream. * Threads are started with the given executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation and tracing can be included. * * @param localService - an object on which classes RPC methods are looked up * @param remoteInterface - an interface on which RPC methods are looked up * @param in - inputstream to listen for incoming messages * @param out - outputstream to send outgoing messages * @param executorService - the executor service used to start threads * @param wrapper - a function for plugging in additional message consumers */ static <T> DebugLauncher<T> createLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) { return createIoLauncher(localService, remoteInterface, in, out, executorService, wrapper); }
/** * Create a new Launcher for a given local service object, a given remote interface and an input and output stream. * Threads are started with the given executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation and tracing can be included. * * @param localService - an object on which classes RPC methods are looked up * @param remoteInterface - an interface on which RPC methods are looked up * @param in - inputstream to listen for incoming messages * @param out - outputstream to send outgoing messages * @param executorService - the executor service used to start threads * @param wrapper - a function for plugging in additional message consumers */ static <T> DebugLauncher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) { Consumer<GsonBuilder> configureGson = gsonBuilder -> {}; return createIoLauncher(localService, remoteInterface, in, out, executorService, wrapper, configureGson); }
/** * Create a new Launcher for a given local service object, a given remote * interface and an input and output stream. Threads are started with the given * executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation * and tracing can be included. * * @param localService * - an object on which classes RPC methods are looked up * @param remoteInterface * - an interface on which RPC methods are looked up * @param in * - inputstream to listen for incoming messages * @param out * - outputstream to send outgoing messages * @param executorService * - the executor service used to start threads * @param wrapper * - a function for plugging in additional message consumers */ static <T> DebugLauncher<T> createLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) { return createIoLauncher(localService, remoteInterface, in, out, executorService, wrapper); }