/** * Create a new Launcher for a given local service object, a given remote interface and an input and output stream. * Threads are started with the given executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation and tracing can be included. * The {@code configureGson} function can be used to register additional type adapters in the {@link GsonBuilder} * in order to support protocol classes that cannot be handled by Gson's reflective capabilities. * * @param localService - an object on which classes RPC methods are looked up * @param remoteInterface - an interface on which RPC methods are looked up * @param in - inputstream to listen for incoming messages * @param out - outputstream to send outgoing messages * @param executorService - the executor service used to start threads * @param wrapper - a function for plugging in additional message consumers * @param configureGson - a function for Gson configuration */ static <T> DebugLauncher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper, Consumer<GsonBuilder> configureGson) { Map<String, JsonRpcMethod> supportedMethods = new LinkedHashMap<String, JsonRpcMethod>(); supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(remoteInterface)); if (localService instanceof JsonRpcMethodProvider) { JsonRpcMethodProvider rpcMethodProvider = (JsonRpcMethodProvider) localService; supportedMethods.putAll(rpcMethodProvider.supportedMethods()); } else { supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(localService.getClass())); } MessageJsonHandler jsonHandler = new DebugMessageJsonHandler(supportedMethods, configureGson); MessageConsumer outGoingMessageStream = new StreamMessageConsumer(out, jsonHandler); outGoingMessageStream = wrapper.apply(outGoingMessageStream); RemoteEndpoint serverEndpoint = new RemoteEndpoint(outGoingMessageStream, ServiceEndpoints.toEndpoint(localService)); jsonHandler.setMethodProvider(serverEndpoint); // wrap incoming message stream MessageConsumer messageConsumer = wrapper.apply(serverEndpoint); StreamMessageProducer reader = new StreamMessageProducer(in, jsonHandler); T remoteProxy = ServiceEndpoints.toServiceObject(serverEndpoint, remoteInterface); return new DebugLauncher<T> () { @Override public Future<?> startListening() { return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, executorService); } @Override public T getRemoteProxy() { return remoteProxy; } }; }
/** * Create a new Launcher for a given local service object, a given remote interface and an input and output stream. * Threads are started with the given executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation and tracing can be included. * The {@code configureGson} function can be used to register additional type adapters in the {@link GsonBuilder} * in order to support protocol classes that cannot be handled by Gson's reflective capabilities. * * @param localService - an object on which classes RPC methods are looked up * @param remoteInterface - an interface on which RPC methods are looked up * @param in - inputstream to listen for incoming messages * @param out - outputstream to send outgoing messages * @param executorService - the executor service used to start threads * @param wrapper - a function for plugging in additional message consumers * @param configureGson - a function for Gson configuration */ static <T> Launcher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper, Consumer<GsonBuilder> configureGson) { Map<String, JsonRpcMethod> supportedMethods = new LinkedHashMap<String, JsonRpcMethod>(); supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(remoteInterface)); if (localService instanceof JsonRpcMethodProvider) { JsonRpcMethodProvider rpcMethodProvider = (JsonRpcMethodProvider) localService; supportedMethods.putAll(rpcMethodProvider.supportedMethods()); } else { supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(localService.getClass())); } MessageJsonHandler jsonHandler = new MessageJsonHandler(supportedMethods, configureGson); MessageConsumer outGoingMessageStream = new StreamMessageConsumer(out, jsonHandler); outGoingMessageStream = wrapper.apply(outGoingMessageStream); RemoteEndpoint serverEndpoint = new RemoteEndpoint(outGoingMessageStream, ServiceEndpoints.toEndpoint(localService)); jsonHandler.setMethodProvider(serverEndpoint); // wrap incoming message stream MessageConsumer messageConsumer = wrapper.apply(serverEndpoint); StreamMessageProducer reader = new StreamMessageProducer(in, jsonHandler); T remoteProxy = ServiceEndpoints.toServiceObject(serverEndpoint, remoteInterface); return new Launcher<T> () { @Override public Future<?> startListening() { return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, executorService); } @Override public T getRemoteProxy() { return remoteProxy; } }; }
/** * Create a new Launcher for a given local service object, a given remote * interface and an input and output stream. Threads are started with the given * executor service. The wrapper function is applied to the incoming and * outgoing message streams so additional message handling such as validation * and tracing can be included. The {@code configureGson} function can be used * to register additional type adapters in the {@link GsonBuilder} in order to * support protocol classes that cannot be handled by Gson's reflective * capabilities. * * @param localService * - an object on which classes RPC methods are looked up * @param remoteInterface * - an interface on which RPC methods are looked up * @param in * - inputstream to listen for incoming messages * @param out * - outputstream to send outgoing messages * @param executorService * - the executor service used to start threads * @param wrapper * - a function for plugging in additional message consumers * @param configureGson * - a function for Gson configuration */ static <T> DebugLauncher<T> createIoLauncher(Object localService, Class<T> remoteInterface, InputStream in, OutputStream out, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper, Consumer<GsonBuilder> configureGson) { Map<String, JsonRpcMethod> supportedMethods = new LinkedHashMap<String, JsonRpcMethod>(); supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(remoteInterface)); if (localService instanceof JsonRpcMethodProvider) { JsonRpcMethodProvider rpcMethodProvider = (JsonRpcMethodProvider) localService; supportedMethods.putAll(rpcMethodProvider.supportedMethods()); } else { supportedMethods.putAll(ServiceEndpoints.getSupportedMethods(localService.getClass())); } MessageJsonHandler jsonHandler = new DebugMessageJsonHandler(supportedMethods, configureGson); MessageConsumer outGoingMessageStream = new StreamMessageConsumer(out, jsonHandler); outGoingMessageStream = wrapper.apply(outGoingMessageStream); RemoteEndpoint serverEndpoint = new DebugRemoteEndpoint(outGoingMessageStream, ServiceEndpoints.toEndpoint(localService)); jsonHandler.setMethodProvider(serverEndpoint); // wrap incoming message stream MessageConsumer messageConsumer = wrapper.apply(serverEndpoint); StreamMessageProducer reader = new StreamMessageProducer(in, jsonHandler); T remoteProxy = ServiceEndpoints.toServiceObject(serverEndpoint, remoteInterface); return new DebugLauncher<T>() { @Override public Future<?> startListening() { return ConcurrentMessageProcessor.startProcessing(reader, messageConsumer, executorService); } @Override public T getRemoteProxy() { return remoteProxy; } }; }