@Override public TTransport create(ServiceInstance<RpcPayload> instance) throws Exception { TTransport transport = this.createNativeTransport(instance); try { transport.open(); } catch (TException ex) { LOG.warn( "Error when creating new transport on server: " + instance.getAddress() + ":" + instance.getPort(), ex); markError(instance); throw ex; } return new ManagedTransport(transport, instance); }
/** * 根据rc的设置来确定创建什么类型的transport; * * @param instance * @return */ protected TTransport createNativeTransport( ServiceInstance<RpcPayload> instance) { TSocket socket = new TSocket(instance.getAddress(), instance.getPort()); socket.setTimeout(socketTimeout); RpcPayload server = instance.getPayload(); if ((server == null) || (server.getTransport() == null) || (server.getTransport().equals("socket"))) { return socket; } else if ("framed-transport".equals(server.getTransport())) { return new TFramedTransport(socket); } // for default, use TSocket; return socket; }
/** * Creates a Model and a Thrift Bridge to this model. */ protected static EBMI createModelBridge(String host, String pythonExecutable, File opendaPythonPath, File modelPythonPath, String modelPythonModuleName, String modelPythonClassName, File modelRunDir) throws IOException { // start local server. int port = getFreePort(); if (host == null) { //localhost host = "127.0.0.1"; } Process process = startModelProcess(host, port, pythonExecutable, opendaPythonPath, modelPythonPath, modelPythonModuleName, modelPythonClassName, modelRunDir); // connect to server. TTransport transport = connectToCode(host, port, process); // create client. TProtocol protocol = new TBinaryProtocol(transport); BMIService.Client client = new BMIService.Client(protocol); return new ThriftBmiBridge(client, process, transport); }
@Test public void testScribeMessage() throws Exception { TTransport transport = new TFramedTransport(new TSocket("localhost", port)); TProtocol protocol = new TBinaryProtocol(transport); Scribe.Client client = new Scribe.Client(protocol); transport.open(); LogEntry logEntry = new LogEntry("INFO", "Sending info msg to scribe source"); List<LogEntry> logEntries = new ArrayList<LogEntry>(1); logEntries.add(logEntry); client.Log(logEntries); // try to get it from Channels Transaction tx = memoryChannel.getTransaction(); tx.begin(); Event e = memoryChannel.take(); Assert.assertNotNull(e); Assert.assertEquals("Sending info msg to scribe source", new String(e.getBody())); tx.commit(); tx.close(); }
@Override public Pair<TTransport, Bmv2DeviceThriftClient> load(DeviceId deviceId) throws TTransportException { log.debug("Instantiating new client... > deviceId={}", deviceId); // Make the expensive call Bmv2Device device = Bmv2Device.of(deviceId); TTransport transport = new TSocket(device.thriftServerHost(), device.thriftServerPort()); TProtocol protocol = new TBinaryProtocol(transport); // Our BMv2 device implements multiple Thrift services, create a client for each one on the same transport. Standard.Client standardClient = new Standard.Client( new TMultiplexedProtocol(protocol, "standard")); SimpleSwitch.Client simpleSwitch = new SimpleSwitch.Client( new TMultiplexedProtocol(protocol, "simple_switch")); // Wrap clients so to automatically have synchronization and resiliency to connectivity errors Standard.Iface safeStandardClient = SafeThriftClient.wrap(standardClient, Standard.Iface.class, options); SimpleSwitch.Iface safeSimpleSwitchClient = SafeThriftClient.wrap(simpleSwitch, SimpleSwitch.Iface.class, options); Bmv2DeviceThriftClient client = new Bmv2DeviceThriftClient(deviceId, transport, safeStandardClient, safeSimpleSwitchClient); return Pair.of(transport, client); }
@Override public boolean validateObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) { boolean isValidate; try { if (failoverChecker == null) { isValidate = pooledObject.getObject().isOpen(); } else { ConnectionValidator validator = failoverChecker.getConnectionValidator(); isValidate = pooledObject.getObject().isOpen() && (validator == null || validator.isValid(pooledObject.getObject())); } } catch (Throwable e) { logger.warn("Fail to validate tsocket: {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e}); isValidate = false; } if (failoverChecker != null && !isValidate) { failoverChecker.getFailoverStrategy().fail(thriftServer); } logger.info("ValidateObject isValidate:{}", isValidate); return isValidate; }
@Override public void run() { for (ThriftServer thriftServer : getAvailableServers(true)) { TTransport tt = null; boolean valid = false; try { tt = poolProvider.getConnection(thriftServer); valid = connectionValidator.isValid(tt); } catch (Exception e) { valid = false; logger.warn(e.getMessage(), e); } finally { if (tt != null) { if (valid) { poolProvider.returnConnection(thriftServer, tt); } else { failoverStrategy.fail(thriftServer); poolProvider.returnBrokenConnection(thriftServer, tt); } } else { failoverStrategy.fail(thriftServer); } } } }
public static void startClient(String ip ,int port ,int timeout) throws Exception { TTransport transport = new TSocket(ip,port,timeout); TProtocol protocol = new TBinaryProtocol(transport); leafrpc.Client client = new leafrpc.Client(protocol); transport.open(); int i = 0; while(i < 2000000) { client.getID(""); ++i; } transport.close(); }
public static void startClient2(String ip ,int port ,int timeout) throws Exception { TTransport transport = new TFramedTransport(new TSocket(ip,port,timeout)); TProtocol protocol = new TBinaryProtocol(transport); leafrpc.Client client = new leafrpc.Client(protocol); transport.open(); for(int i = 0; i< 1000000; i++) { client.getID(""); if (i % 100000 == 0) { System.out.println(Thread.currentThread().getName() + " " + client.getID("")); } //ai.incrementAndGet(); } transport.close(); }
@Override public TServiceClient makeObject() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); if(address==null){ new ThriftException("No provider available for remote service"); } TSocket tsocket = new TSocket(address.getHostName(), address.getPort()); TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.make(client); } catch (Exception e) { logger.warn("makeObject:{}", e); } } return client; }
/** * Encodes a thrift object into a DEFLATE-compressed binary array. * * @param tBase Object to encode. * @return Deflated, encoded object. * @throws CodingException If the object could not be encoded. */ public static byte[] deflateNonNull(TBase<?, ?> tBase) throws CodingException { requireNonNull(tBase); // NOTE: Buffering is needed here for performance. // There are actually 2 buffers in play here - the BufferedOutputStream prevents thrift from // causing a call to deflate() on every encoded primitive. The DeflaterOutputStream buffer // allows the underlying Deflater to operate on a larger chunk at a time without stopping to // copy the intermediate compressed output to outBytes. // See http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4986239 ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); TTransport transport = new TIOStreamTransport( new BufferedOutputStream( new DeflaterOutputStream(outBytes, new Deflater(DEFLATE_LEVEL), DEFLATER_BUFFER_SIZE), DEFLATER_BUFFER_SIZE)); try { TProtocol protocol = PROTOCOL_FACTORY.getProtocol(transport); tBase.write(protocol); transport.close(); // calls finish() on the underlying stream, completing the compression return outBytes.toByteArray(); } catch (TException e) { throw new CodingException("Failed to serialize: " + tBase, e); } finally { transport.close(); } }
/** * Decodes a thrift object from a DEFLATE-compressed byte array into a target type. * * @param clazz Class to instantiate and deserialize to. * @param buffer Compressed buffer to decode. * @return A populated message. * @throws CodingException If the message could not be decoded. */ public static <T extends TBase<T, ?>> T inflateNonNull(Class<T> clazz, byte[] buffer) throws CodingException { requireNonNull(clazz); requireNonNull(buffer); T tBase = newInstance(clazz); TTransport transport = new TIOStreamTransport( new InflaterInputStream(new ByteArrayInputStream(buffer))); try { TProtocol protocol = PROTOCOL_FACTORY.getProtocol(transport); tBase.read(protocol); return tBase; } catch (TException e) { throw new CodingException("Failed to deserialize: " + e, e); } finally { transport.close(); } }
/** * {@inheritDoc} */ @Override public ServerContext createContext(final TProtocol input, final TProtocol output) { final String userName = "metacat-thrift-interface"; String clientHost = null; //requestContext.getHeaderString("X-Forwarded-For"); final long requestThreadId = Thread.currentThread().getId(); final TTransport transport = input.getTransport(); if (transport instanceof TSocket) { final TSocket thriftSocket = (TSocket) transport; clientHost = thriftSocket.getSocket().getInetAddress().getHostAddress(); } final CatalogServerRequestContext context = new CatalogServerRequestContext( userName, null, clientHost, null, "hive", requestThreadId ); MetacatContextManager.setContext(context); return context; }
protected ClientProcessData(GfxdTSocket socket, int connectionNumber, TProcessor proc, TTransport in, TTransport out, TProtocol inp, TProtocol outp, TServerEventHandler eventHandler) { this.clientSocket = socket; this.connectionNumber = connectionNumber; this.processor = proc; this.inputTransport = in; this.outputTransport = out; this.inputProtocol = inp; this.outputProtocol = outp; this.eventHandler = eventHandler; if (eventHandler != null) { this.connectionContext = eventHandler.createContext(inp, outp); } else { this.connectionContext = null; } this.idle = true; }
protected TProtocol requestTransport(String url) throws TTransportException { // probably not thread safe, but we need it? Not atm. TTransport act; if (!activeTransports.containsKey(url)) { logger.log(Level.DEBUG ,"Creating new transport for: " + url); activeTransports.put(url, new THttpClient(url)); } act = activeTransports.get(url); if (!act.isOpen()) { act.open(); } // THINK: always create new protocol? return new TJSONProtocol(act); }
public Worker(int id, long durationMillis, String path, int globals) throws IOException { this.id = id; this.workerDuration = durationMillis; this.localPath = path + "/f" + Integer.toString(id); this.globalPath = "/f" + Integer.toString(id); this.instanceMap = new HashMap<>(); this.globals = globals; String replicaHost = replicaAddr.split(":")[0]; int replicaPort = Integer.parseInt(replicaAddr.split(":")[1]); TTransport transport = new TSocket(replicaHost, replicaPort); try { transport.open(); } catch (TTransportException e) { throw new RuntimeException(e); } TProtocol protocol = new TBinaryProtocol(transport); c = new FuseOps.Client(protocol); out = new BufferedWriter(new FileWriter(new File(logPrefix + this.id))); }
public Worker(int id, long durationMillis, String path, int globals) throws IOException { this.id = id; this.workerDuration = durationMillis; this.path = path; this.globals = globals; String replicaHost = replicaAddr.split(":")[0]; int replicaPort = Integer.parseInt(replicaAddr.split(":")[1]); TTransport transport = new TSocket(replicaHost, replicaPort); try { transport.open(); } catch (TTransportException e) { throw new RuntimeException(e); } TProtocol protocol = new TBinaryProtocol(transport); c = new FuseOps.Client(protocol); out = new BufferedWriter(new FileWriter(new File(logPrefix + this.id))); }
private static void connectToCMD() { QueryInput query_input = new QueryInput(); query_input.type = "ensemble"; query_input.data = new ArrayList<String>(); query_input.data.add("localhost"); query_input.tags = new ArrayList<String>(); query_input.tags.add("9090"); QuerySpec spec = new QuerySpec(); spec.content = new ArrayList<QueryInput>(); spec.content.add(query_input); // Initialize thrift objects. TTransport transport = new TSocket("localhost", 8080); TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport)); LucidaService.Client client = new LucidaService.Client(protocol); try { transport.open(); System.out.println("Connecting to CMD at port " + 8080); // Register itself to CMD. client.create("", spec); transport.close(); System.out.println("Successfully connected to CMD"); } catch (TException x) { x.printStackTrace(); } }
@Test public void testDefaultThriftServerImpl() { int serverPort = 39001; ThriftServer defaultThriftServer = factory.getThriftServer(serverName, serverPort, new ThriftSimpleService.Processor<Iface>(new ThriftSimpleServiceImpl())); assertEquals(defaultThriftServer.getServerName(), serverName); assertEquals(defaultThriftServer.getServerPort(), serverPort); assertEquals(defaultThriftServer.getThriftServerConfiguration(), configuration); defaultThriftServer.run(); try (ThriftClient thriftClient = factory.getThriftClient(LOCAL_HOST, serverPort); TTransport transport = thriftClient.getTransport()) { Thread.sleep(500); transport.open(); ThriftSimpleService.Client client = new ThriftSimpleService.Client(thriftClient.getProtocol(transport)); assertEquals(testString, client.get(testString)); } catch (Exception e) { fail(); } finally { defaultThriftServer.stop(); } }
@Test public void testNonblockingThriftServerImpl() { int serverPort = 39002; ThriftServer nioThriftServer = factory.getNonblockingThriftServer(serverName, serverPort, new ThriftSimpleService.Processor<Iface>(new ThriftSimpleServiceImpl())); assertEquals(nioThriftServer.getServerName(), serverName); assertEquals(nioThriftServer.getServerPort(), serverPort); assertEquals(nioThriftServer.getThriftServerConfiguration(), configuration); nioThriftServer.run(); try (ThriftClient thriftClient = factory.getThriftClient(LOCAL_HOST, serverPort); TTransport transport = thriftClient.getTransport()) { Thread.sleep(500); transport.open(); ThriftSimpleService.Client client = new ThriftSimpleService.Client(thriftClient.getProtocol(transport)); assertEquals(testString, client.get(testString)); } catch (Exception e) { e.printStackTrace(); fail(); } finally { nioThriftServer.stop(); } }
@Test public void testMultiplexedThriftServerImpl() { int serverPort = 39101; Map<String, TProcessor> processorMap = new HashMap<>(); processorMap.put("testServer", new ThriftSimpleService.Processor<Iface>(new ThriftSimpleServiceImpl())); MultiplexedProcessor processor = new MultiplexedProcessor(processorMap); ThriftServer defaultThriftServer = new DefaultThriftServerImpl(serverName, serverPort, configuration, processor); assertEquals(defaultThriftServer.getServerName(), serverName); assertEquals(defaultThriftServer.getServerPort(), serverPort); assertEquals(defaultThriftServer.getThriftServerConfiguration(), configuration); defaultThriftServer.run(); try (ThriftClient thriftClient = factory.getThriftClient(LOCAL_HOST, serverPort); TTransport transport = thriftClient.getTransport()) { Thread.sleep(500); transport.open(); ThriftSimpleService.Client client = new ThriftSimpleService.Client( thriftClient.getProtocol(transport, "testServer")); assertEquals(testString, client.get(testString)); } catch (Exception e) { fail(); } finally { defaultThriftServer.stop(); } }
@Test public void testCompactDefaultThriftServerImpl() { int serverPort = 39201; Factory factory = new GeneralFactory(new CompactThriftServerConfiguration(), new CompactThriftClientConfiguration()); ThriftServer defaultThriftServer = factory.getThriftServer(serverName, serverPort, new ThriftSimpleService.Processor<Iface>(new ThriftSimpleServiceImpl())); defaultThriftServer.run(); try (ThriftClient thriftClient = factory.getThriftClient(LOCAL_HOST, serverPort); TTransport transport = thriftClient.getTransport()) { Thread.sleep(500); transport.open(); ThriftSimpleService.Client client = new ThriftSimpleService.Client(thriftClient.getProtocol(transport)); assertEquals(testString, client.get(testString)); } catch (Exception e) { fail(); } finally { defaultThriftServer.stop(); } }
@Test public void testTupleDefaultThriftServerImpl() { int serverPort = 39202; Factory factory = new GeneralFactory(new TupleThriftServerConfiguration(), new TupleThriftClientConfiguration()); ThriftServer defaultThriftServer = factory.getThriftServer(serverName, serverPort, new ThriftSimpleService.Processor<Iface>(new ThriftSimpleServiceImpl())); defaultThriftServer.run(); try (ThriftClient thriftClient = factory.getThriftClient(LOCAL_HOST, serverPort); TTransport transport = thriftClient.getTransport()) { Thread.sleep(500); transport.open(); ThriftSimpleService.Client client = new ThriftSimpleService.Client(thriftClient.getProtocol(transport)); assertEquals(testString, client.get(testString)); } catch (Exception e) { fail(); } finally { defaultThriftServer.stop(); } }
@Test public void testDESCompactDefaultThriftServerImpl() { int serverPort = 39203; String key = "12345678"; ThriftServerConfiguration serverConfiguration = new ThriftServerConfiguration(); serverConfiguration.setProtocolFactory(new DESCompactProtocol.Factory(key)); ThriftClientConfiguration clientConfiguration = new ThriftClientConfiguration(); clientConfiguration.setProtocolFactory(new DESCompactProtocol.Factory(key)); Factory factory = new GeneralFactory(serverConfiguration, clientConfiguration); ThriftServer defaultThriftServer = factory.getThriftServer(serverName, serverPort, new ThriftSimpleService.Processor<Iface>(new ThriftSimpleServiceImpl())); defaultThriftServer.run(); try (ThriftClient thriftClient = factory.getThriftClient(LOCAL_HOST, serverPort); TTransport transport = thriftClient.getTransport()) { Thread.sleep(500); transport.open(); ThriftSimpleService.Client client = new ThriftSimpleService.Client(thriftClient.getProtocol(transport)); assertEquals(testString, client.get(testString)); } catch (Exception e) { fail(); } finally { defaultThriftServer.stop(); } }
@Override public void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { if (!"POST".equals(request.getMethod())) { throw new HttpRequestMethodNotSupportedException(request.getMethod(), new String[]{"POST"}, "ThriftServiceExporter only supports POST requests"); } InputStream in = request.getInputStream(); OutputStream out = response.getOutputStream(); try { ThriftContextHolder.init(); response.setContentType("application/x-thrift"); TTransport transport = new TIOStreamTransport(in, out); TProtocol protocol = getProtocolFactory().getProtocol(transport); TProcessor processor = ThriftUtil.buildProcessor(getServiceInterface(), getProxyForService()); processor.process(protocol, protocol); } catch (Throwable e) { response.setContentType("text/plain; charset=UTF-8"); response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); e.printStackTrace(new PrintWriter(out, true)); if (LOGGER.isErrorEnabled()) { LOGGER.error("Thrift server direct error", e); } } finally { ThriftContextHolder.reset(); } }
private String getInetAddress(TProtocol in) { TTransport transport = in.getTransport(); if (transport != null && transport instanceof TSocket) { Socket socket = ((TSocket) in.getTransport()).getSocket(); return socket.getInetAddress().getHostAddress().replace('.', ':'); } else { return UN_KNOWN_IP; } }
@Override public boolean validateObject(ServiceInstance<RpcPayload> instance, PooledObject<TTransport> transport) { if (!super.validateObject(instance, transport)) return false; if (!this.isInstanceAvailable(instance)) return false; return ((ManagedTransport) transport.getObject()).isOpen(); }
/** * Creates and returns a connection to the given server process. */ private static TTransport connectToCode(String host, int port, Process process) throws IOException { Exception exception = null; for (int i = 0; i < MAX_CONNECT_ATTEMPTS; i++) { // first check if the process is still alive try { int exitValue = process.exitValue(); throw new IOException("Process has ended while waiting for Thrift connection with exit code " + exitValue); } catch (IllegalThreadStateException e) { // We are hoping to end up here, because it means the process is // still running. // Note: Java 8 allows a smarter way of implementing this. } // then try connecting to the code try { TTransport transport = new TSocket(host, port); transport.open(); LOGGER.info("obtained connection on the " + i + "th attempt"); return transport; } catch (TTransportException e) { exception = e; LOGGER.info("could not connect to code on the " + i + "th attempt, retrying..."); } // finally, wait a certain time before trying again try { Thread.sleep(CONNECT_TIMEOUT); } catch (InterruptedException e) { // IGNORE } } assert exception != null; throw new IOException("Failed to connect to model, message was: " + exception.getMessage(), exception); }
@Override protected TTransport getTransport(TSocket tsocket) throws Exception { Map<String, String> saslProperties = new HashMap<String, String>(); saslProperties.put(Sasl.QOP, "auth"); String[] names; try { names = FlumeAuthenticationUtil.splitKerberosName(serverPrincipal); } catch (IOException e) { throw new FlumeException( "Error while trying to resolve Principal name - " + serverPrincipal, e); } return new UgiSaslClientTransport( "GSSAPI", null, names[0], names[1], saslProperties, null, tsocket, privilegedExecutor); }
public UgiSaslClientTransport(String mechanism, String authorizationId, String protocol, String serverName, Map<String, String> props, CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) throws IOException { super(mechanism, authorizationId, protocol, serverName, props, cbh, transport); this.privilegedExecutor = privilegedExecutor; }
@Test public void testScribeMultipleMessages() throws Exception { TTransport transport = new TFramedTransport(new TSocket("localhost", port)); TProtocol protocol = new TBinaryProtocol(transport); Scribe.Client client = new Scribe.Client(protocol); transport.open(); List<LogEntry> logEntries = new ArrayList<LogEntry>(10); for (int i = 0; i < 10; i++) { LogEntry logEntry = new LogEntry("INFO", String.format("Sending info msg# %d to scribe source", i)); logEntries.add(logEntry); } client.Log(logEntries); // try to get it from Channels Transaction tx = memoryChannel.getTransaction(); tx.begin(); for (int i = 0; i < 10; i++) { Event e = memoryChannel.take(); Assert.assertNotNull(e); Assert.assertEquals(String.format("Sending info msg# %d to scribe source", i), new String(e.getBody())); } tx.commit(); tx.close(); }
/** * convert thrift object to bytes * * @param obj thrift object * @param <T> Class type * @return bytes data */ public static <T extends TBase<T, ?>> byte[] toBytes(T obj) { Validate.notNull(obj); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); TTransport trans = new TIOStreamTransport(buffer); TCompactProtocol protocol = new TCompactProtocol(trans); try { obj.write(protocol); return buffer.toByteArray(); } catch (TException e) { throw new IllegalStateException("unexpected", e); } }
private void dropOldKeyspace() throws InvalidRequestException, SchemaDisagreementException, TException { TTransport tr = new TFramedTransport(new TSocket("localhost", 9160)); TProtocol proto = new TBinaryProtocol(tr); Cassandra.Client client = new Cassandra.Client(proto); tr.open(); client.system_drop_keyspace(JANUSGRAPH); LOGGER.info("DROPPED keyspace janusgraph"); tr.close(); }
@Override public void onRemoval(RemovalNotification<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> notification) { // close the transport connection Bmv2DeviceThriftClient client = notification.getValue().getRight(); TTransport transport = notification.getValue().getLeft(); // Locking here is ugly, but needed (see SafeThriftClient). synchronized (transport) { if (transport.isOpen()) { transport.close(); } } }
protected Bmv2DeviceThriftClient(DeviceId deviceId, TTransport transport, Standard.Iface standardClient, SimpleSwitch.Iface simpleSwitchClient) { this.deviceId = deviceId; this.transport = transport; this.standardClient = standardClient; this.simpleSwitchClient = simpleSwitchClient; }
public TMultiUdpClient(SocketAddress[] socketAddresses) throws SocketException { if (socketAddresses == null || socketAddresses.length == 0) { throw new IllegalArgumentException("Must provide at least one SocketAddress"); } transports = new TTransport[socketAddresses.length]; for (int i = 0; i < socketAddresses.length; i++) { transports[i] = new TUdpClient(socketAddresses[i]); } }
@Override public boolean isOpen() { for (TTransport transport : transports) { if (!transport.isOpen()) { return false; } } return true; }
@Override public TProcessor getProcessor(TTransport transport) { try { CloseableIHMSHandler baseHandler = federatedHMSHandlerFactory.create(); IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(baseHandler), hiveConf, false); transportMonitor.monitor(transport, baseHandler); return new TSetIpAddressProcessor<>(handler); } catch (MetaException | ReflectiveOperationException | RuntimeException e) { throw new RuntimeException("Error creating TProcessor", e); } }
@Test public void connectionIsMonitored() throws Exception { factory.getProcessor(transport); ArgumentCaptor<TTransport> transportCaptor = ArgumentCaptor.forClass(TTransport.class); ArgumentCaptor<Closeable> handlerCaptor = ArgumentCaptor.forClass(Closeable.class); verify(transportMonitor).monitor(transportCaptor.capture(), handlerCaptor.capture()); assertThat(transportCaptor.getValue(), is(transport)); assertThat(handlerCaptor.getValue(), is(instanceOf(FederatedHMSHandler.class))); }