public <X extends TServiceClient> X iface(Class<X> ifaceClass) { if (this.loadBalance == Constant.LoadBalance.HASH) { throw new ValidationException("Can not use HASH without a key."); } switch (this.loadBalance) { case Constant.LoadBalance.RANDOM: return getRandomClient(ifaceClass); case Constant.LoadBalance.ROUND_ROBIN: return getRRClient(ifaceClass); case Constant.LoadBalance.WEIGHT: return getWeightClient(ifaceClass); default: return getRandomClient(ifaceClass); } }
@Override public void afterPropertiesSet() throws Exception { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 加载Iface接口 objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface"); // 加载Client.Factory类 Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader. loadClass(serverAddressProvider.getService() + "$Client$Factory"); TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance(); ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback); pool = new GenericObjectPool<TServiceClient>(clientPool, makePoolConfig()); // InvocationHandler handler = makeProxyHandler();//方式1 InvocationHandler handler = makeProxyHandler2();//方式2 proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, handler); }
@Override public TServiceClient makeObject() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); if(address==null){ new ThriftException("No provider available for remote service"); } TSocket tsocket = new TSocket(address.getHostName(), address.getPort()); TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.make(client); } catch (Exception e) { logger.warn("makeObject:{}", e); } } return client; }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { TServiceClient client = pool.borrowObject(); boolean flag = true; try { return method.invoke(client, args); } catch (Exception e) { flag = false; throw e; } finally { if(flag){ pool.returnObject(client); }else{ pool.invalidateObject(client); } } }
@Override public T getObject() throws Exception { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Server server = serverManager.getService(innerClass.getName()); // 加载Iface接口 innerClass = classLoader.loadClass(server.getName()); String temp = server.getName().replace("$Iface", ""); // 加载Client.Factory类 Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader. loadClass(temp + "$Client$Factory"); TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance(); ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(server, clientFactory, callback, proccessName); pool = new GenericObjectPool<TServiceClient>(clientPool, makePoolConfig()); ServiceProxy serviceProxy = new ServiceProxy(pool); return (T)Proxy.newProxyInstance(innerClass.getClassLoader(),new Class[]{innerClass}, serviceProxy); }
@Override public TServiceClient makeObject() throws Exception { TSocket tsocket = new TSocket(server.getHost(), server.getPort()); tsocket.open(); // TTransport transport = new TFramedTransport(tsocket); TJSONProtocol protocol = new TJSONProtocol(tsocket); TMultiplexedProtocol uProtocol=new TMultiplexedProtocol(protocol, proccessName); TServiceClient client = this.clientFactory.getClient(uProtocol); if (callback != null) { try { callback.make(client); } catch (Exception e) { logger.warn("makeObject:{}", e); } } return client; }
/** * 创建原始连接的方法 * * @throws ThriftConnectionPoolException * 创建连接出现问题时抛出该异常 */ @SuppressWarnings("unchecked") private void createConnection() throws ThriftConnectionPoolException { try { transport = new TSocket(host, port, connectionTimeOut); transport.open(); TProtocol protocol = createTProtocol(transport); // 反射实例化客户端对象 Constructor<? extends TServiceClient> clientConstructor = clientClass.getConstructor(TProtocol.class); client = (T) clientConstructor.newInstance(protocol); if (logger.isDebugEnabled()) { logger.debug("创建新连接成功:" + host + " 端口:" + port); } } catch (Exception e) { throw new ThriftConnectionPoolException("无法连接服务器:" + host + " 端口:" + port); } }
private void returnToPool(TServiceClient client, boolean broken) { if(!thriftConfiguration.actuallyPoolClients()) { ThriftUtils.quietlyClose(client); } String key = getReverseLookup(client); if (client != null && key != null) { try { // Deal with simple server not really being a pool if(thriftConfiguration.getServerMode().isBlocking() || broken) { ThriftUtils.quietlyClose(client); connectionPool.get(key).returnBrokenClient(client); } else { connectionPool.get(key).returnClient(client); } } catch (Exception e) { // close since the object isn't going back to the pool ThriftUtils.quietlyClose(client); logger.warn("Didn't actually return to pool", e); } } else if (client != null) { // close since the object isn't going back to the pool ThriftUtils.quietlyClose(client); logger.warn("Didn't find the client key in the lookup. Nothing returned to pool"); } }
/** * If the reverse lookup contains the client, it will remove it from the lookup and return the key * @param client The TServiceClient * @return The key, if the client exists, otherwise null */ private String getReverseLookup(TServiceClient client) { if (client == null) { return null; } int foundIndex = -1; String key = null; synchronized (reverseLookup) { for(int i = 0; i < reverseLookup.size(); i++) { if (reverseLookup.get(i).client.equals(client)) { foundIndex = i; break; } } if (foundIndex >= 0) { key = reverseLookup.get(foundIndex).key; } } return key; }
/** * Reflectively wraps an already existing Thrift client. * * @param baseClient the client to wrap * @param options options that control behavior of the reconnecting client * @param <T> * @param <C> * @return */ public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) { Class<?>[] interfaces = baseClient.getClass().getInterfaces(); for (Class<?> iface : interfaces) { if (iface.getSimpleName().equals("Iface") && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) { return (C) wrap(baseClient, iface, options); } } throw new RuntimeException("Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead."); }
public <X extends TServiceClient> X iface(Class<X> ifaceClass, String key) { if (this.loadBalance != Constant.LoadBalance.HASH) { throw new ValidationException("Must use other load balance strategy."); } return getHashIface(ifaceClass, key); }
protected <X extends TServiceClient> X getWeightClient(Class<X> ifaceClass) { List<ThriftServer> servers = getAvaliableServers(); if (servers == null || servers.isEmpty()) { throw new NoServerAvailableException("No server available."); } int[] weights = new int[servers.size()]; for (int i = 0; i < servers.size(); i++) { weights[i] = servers.get(i).getWeight(); } return iface(ifaceClass, servers.get(ThriftClientUtil.chooseWithWeight(weights))); }
protected <X extends TServiceClient> X iface(Class<X> ifaceClass, int index) { List<ThriftServer> serverList = getAvaliableServers(); if (serverList == null || serverList.isEmpty()) { throw new NoServerAvailableException("No server available."); } index = Math.abs(index); final ThriftServer selected = serverList.get(index % serverList.size()); return iface(ifaceClass, selected); }
private InvocationHandler makeProxyHandler() throws Exception{ ThriftServiceClient2Proxy handler = null; TServiceClient client = pool.borrowObject(); try { handler = new ThriftServiceClient2Proxy(client); pool.returnObject(client); }catch (Exception e){ pool.invalidateObject(client); throw new ThriftException("获取代理对象失败"); } return handler; }
@Override public void destroyObject(TServiceClient client) throws Exception { if (callback != null) { try { callback.destroy(client); } catch (Exception e) { logger.warn("destroyObject:{}", e); } } logger.info("destroyObject:{}", client); TTransport pin = client.getInputProtocol().getTransport(); pin.close(); TTransport pout = client.getOutputProtocol().getTransport(); pout.close(); }
@Override public boolean validateObject(TServiceClient client) { TTransport pin = client.getInputProtocol().getTransport(); logger.info("validateObject input:{}", pin.isOpen()); TTransport pout = client.getOutputProtocol().getTransport(); logger.info("validateObject output:{}", pout.isOpen()); return pin.isOpen() && pout.isOpen(); }
public ThriftClientPoolFactory(Server server, TServiceClientFactory<TServiceClient> clientFactory, PoolOperationCallBack callback, String proccessName) throws Exception { this.server = server; this.clientFactory = clientFactory; this.callback = callback; this.proccessName = proccessName; }
@Override @SuppressWarnings("unchecked") public <C extends TServiceClient> C getClient(final Class<C> clazz) { return (C) super.clients.computeIfAbsent(ClassNameUtils.getOuterClassName(clazz), className -> { TMultiplexedProtocol tmp = new TMultiplexedProtocol(this.protocol, className); try { return clazz.getConstructor(TProtocol.class).newInstance(tmp); } catch (Exception e) { log.error("never execute"); return null; } }); }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { int remainRetryTimes = clientConfig.getRetryTimes(); String exceptionMsg = null; do { ObjectPool<TTransport> connPool = null; TTransport transport = null; InvokeConn conn = null; try { Invocation invocation = new Invocation(serviceInfo.getInterfaceClazz().getName(), method.getName()); conn = clientConfig.getLoadBalanceStrategy().select(PROVIDER_CONN_LIST, invocation); connPool = conn.getConnPool(); transport = connPool.borrowObject(); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = serviceClientConstructor.newInstance(protocol); return method.invoke(client, args); } catch (Exception e) { // 服务多次重试连接不上,则直接将该服务对应信息移除 if (e instanceof OureaConnCreateException) { if (PROVIDER_CONN_LIST.remove(conn) && conn != null && conn.getConnPool() != null){ conn.getConnPool().close(); conn = null;//help GC } } LOGGER.warn("invoke thrift rpc provider fail.e:", e); exceptionMsg = e.getMessage(); } finally { if (connPool != null && transport != null) { connPool.invalidateObject(transport); } } } while (remainRetryTimes-- > 0); throw new OureaException("invoke fail.msg:" + exceptionMsg); }
private Constructor<TServiceClient> getClientConstructorClazz() { String parentClazzName = StringUtils.substringBeforeLast(serviceInfo.getInterfaceClazz().getCanonicalName(), ".Iface"); String clientClazzName = parentClazzName + "$Client"; try { return ((Class<TServiceClient>) Class.forName(clientClazzName)).getConstructor(TProtocol.class); } catch (Exception e) { // LOGGER.error("get thrift client class constructor fail.e:", e); throw new IllegalArgumentException("invalid iface implement"); } }
public DefaultThriftConnection(String host, int port, int connectionTimeOut, TProtocolType tProtocolType, Class<? extends TServiceClient> clientClass) throws ThriftConnectionPoolException { this.host = host; this.port = port; this.connectionTimeOut = connectionTimeOut; this.tProtocolType = tProtocolType; this.clientClass = clientClass; // 创建连接 createConnection(); }
public MulitServiceThriftConnecion(String host, int port, int connectionTimeOut, TProtocolType tProtocolType, Map<String, Class<? extends TServiceClient>> thriftClientClasses) throws ThriftConnectionPoolException { this.host = host; this.port = port; this.connectionTimeOut = connectionTimeOut; this.tProtocolType = tProtocolType; this.thriftClientClasses = thriftClientClasses; // 创建连接 createConnection(); }
/** * 创建原始连接的方法 * * @throws ThriftConnectionPoolException * 创建连接出现问题时抛出该异常 */ @SuppressWarnings("unchecked") private void createConnection() throws ThriftConnectionPoolException { try { transport = new TSocket(host, port, connectionTimeOut); transport.open(); TProtocol protocol = createTProtocol(transport); Iterator<Entry<String, Class<? extends TServiceClient>>> iterator = thriftClientClasses.entrySet() .iterator(); while (iterator.hasNext()) { Entry<String, Class<? extends TServiceClient>> entry = iterator.next(); String serviceName = entry.getKey(); Class<? extends TServiceClient> clientClass = entry.getValue(); TMultiplexedProtocol multiProtocol = new TMultiplexedProtocol(protocol, serviceName); // 反射实例化客户端对象 Constructor<? extends TServiceClient> clientConstructor = clientClass.getConstructor(TProtocol.class); T client = (T) clientConstructor.newInstance(multiProtocol); clients.put(serviceName, client); if (logger.isDebugEnabled()) { logger.debug("创建新连接成功:" + host + " 端口:" + port); } } } catch (Exception e) { e.printStackTrace(); throw new ThriftConnectionPoolException("无法连接服务器:" + host + " 端口:" + port, e); } }
@SuppressWarnings("unchecked") @Override public <K extends TServiceClient> K getClient(String serviceName, Class<K> clazz) { T serviceClient = this.clients.get(serviceName); if (serviceClient == null) { throw new IllegalArgumentException("未知的服务名称:" + serviceName); } return (K) serviceClient; }
@Override public <K extends TServiceClient> K getClient(String serviceName, Class<K> clazz) { if (thriftConnection != null) { return thriftConnection.getClient(serviceName, clazz); } throw new IllegalStateException("连接代理类没有绑定的原始连接信息"); }
public void testMultiplexedService() throws Exception { ThriftConnectionPoolConfig config = new ThriftConnectionPoolConfig(ThriftServiceType.MULTIPLEXED_INTERFACE); config.setConnectTimeout(3000); config.setThriftProtocol(TProtocolType.BINARY); // 该端口不存在 for (ThriftServerInfo thriftServerInfo : servers) { config.addThriftServer(thriftServerInfo.getHost(), thriftServerInfo.getPort()); } config.addThriftClientClass("example", Example.Client.class); config.addThriftClientClass("other", Other.Client.class); config.setMaxConnectionPerServer(2); config.setMinConnectionPerServer(1); config.setIdleMaxAge(2, TimeUnit.SECONDS); config.setMaxConnectionAge(2); config.setLazyInit(false); config.setAcquireIncrement(2); config.setAcquireRetryDelay(2000); config.setAcquireRetryAttempts(1); config.setMaxConnectionCreateFailedCount(1); config.setConnectionTimeoutInMs(5000); config.check(); ThriftConnectionPool<TServiceClient> pool = new ThriftConnectionPool<TServiceClient>(config); ThriftConnection<TServiceClient> connection = pool.getConnection(); // example service com.wmz7year.thrift.pool.example.Example.Client exampleServiceClient = connection.getClient("example", Example.Client.class); exampleServiceClient.ping(); // other service com.wmz7year.thrift.pool.example.Other.Client otherServiceClient = connection.getClient("other", Other.Client.class); otherServiceClient.ping(); pool.close(); }
public void testThriftConnectionPoolRemoveServerStopThread() throws Exception { ThriftConnectionPoolConfig config = new ThriftConnectionPoolConfig(ThriftServiceType.MULTIPLEXED_INTERFACE); config.setConnectTimeout(3000); config.setThriftProtocol(TProtocolType.BINARY); // 该端口不存在 ThriftServerInfo thriftServerInfo = servers.get(0); config.addThriftServer(thriftServerInfo); config.addThriftClientClass("example", Example.Client.class); config.addThriftClientClass("other", Other.Client.class); config.setMaxConnectionPerServer(2); config.setMinConnectionPerServer(1); config.setIdleMaxAge(2, TimeUnit.SECONDS); config.setMaxConnectionAge(2); config.setLazyInit(false); config.setAcquireIncrement(2); config.setAcquireRetryDelay(2000); config.setAcquireRetryAttempts(1); config.setMaxConnectionCreateFailedCount(1); config.setConnectionTimeoutInMs(5000); ThriftConnectionPool<TServiceClient> pool = new ThriftConnectionPool<TServiceClient>(config); pool.getConnection().close(); // 移除并且停止服务 pool.removeThriftServer(thriftServerInfo); stopAllServers(); pool.close(); }
@SuppressWarnings("unchecked") public static <Y extends TServiceClient> Y getClient(Class<Y> clazz, HostAndPort hostAndPort, Properties properties) throws NoSuchMethodException, TException, Exception { final Constructor<?> constructor = clazz.getConstructor(TProtocol.class); final Object ds = constructor.newInstance(getProtocol(hostAndPort, properties)); return (Y) ds; }
@SuppressWarnings("unchecked") public static <Y extends TServiceClient> Y getClient(Class<Y> clazz, HostAndPort hostAndPort, String securityId, Properties properties) throws NoSuchMethodException, TException, Exception { final Constructor<?> constructor = clazz.getConstructor(TProtocol.class); final Object ds = constructor.newInstance(getProtocol(hostAndPort, securityId, properties)); return (Y) ds; }
@SuppressWarnings("unchecked") public static <Y extends TServiceClient> Y getClient(Class<Y> clazz, HostAndPort hostAndPort, String securityId, Properties properties, TTransportFactory transportFactory) throws NoSuchMethodException, TException, Exception { final Constructor<?> constructor = clazz.getConstructor(TProtocol.class); final Object ds = constructor.newInstance(getProtocol(hostAndPort, securityId, properties, transportFactory)); return (Y) ds; }
public synchronized void clearPool() { reverseLookup.clear(); for(Entry<String, ThriftConnectionPool<TServiceClient>> entry : connectionPool.entrySet()) { entry.getValue().close(); } connectionPool.clear(); }
@Test public void testGetClient() throws Exception { final HostAndPort hostandPort = HostAndPort.fromParts("localhost", portNum); final TServiceClient client = ThriftUtils.getClient(SampleService.Client.class, hostandPort, config.getProperties()); try { assertNotNull(client); } finally { client.getInputProtocol().getTransport().close(); } }
@Test public void testGetCommonClient() throws Exception { TServiceClient client = clientPool.getClient(COMMON_SERVICE, SampleService.Client.class); try { assertNotNull(client); } finally { clientPool.returnToPool(client); } }
@Test public void testGetAppClient() throws Exception { TServiceClient client = clientPool.getClient(APP_SERVICE, SampleService.Client.class); try { assertNotNull(client); } finally { clientPool.returnToPool(client); } }
public <T extends NiftyClientChannel> TNiftyClientChannelTransport connectSync(Class<? extends TServiceClient> clientClass, NiftyClientConnector<T> clientChannelConnector, @Nullable Duration connectTimeout, @Nullable Duration receiveTimeout, @Nullable Duration readTimeout, @Nullable Duration sendTimeout, int maxFrameSize, @Nullable HostAndPort socksProxyAddress) throws TTransportException, InterruptedException { try { T channel = connectAsync(clientChannelConnector, connectTimeout, receiveTimeout, readTimeout, sendTimeout, maxFrameSize, socksProxyAddress).get(); return new TNiftyClientChannelTransport(clientClass, channel); } catch (ExecutionException e) { Throwables.propagateIfInstanceOf(e, TTransportException.class); throw new TTransportException(TTransportException.UNKNOWN, "Failed to establish client connection", e); } }
@Override public void verifyTraces(PluginTestVerifier verifier, String expectedMessage) throws Exception { final InetSocketAddress actualServerAddress = this.environment.getServerAddress(); // ********** Asynchronous Traces // SpanEvent - Asynchronous Invocation ExpectedTrace asyncInvocationTrace = event("ASYNC", "Asynchronous Invocation"); // SpanEvent - TAsyncMethodCall.cleanUpAndFireCallback Method cleanUpAndFireCallback = TAsyncMethodCall.class.getDeclaredMethod("cleanUpAndFireCallback", SelectionKey.class); ExpectedTrace cleanUpAndFireCallbackTrace = event("THRIFT_CLIENT_INTERNAL", cleanUpAndFireCallback); // SpanEvent - TServiceClient.receiveBase Method receiveBase = TServiceClient.class.getDeclaredMethod("receiveBase", TBase.class, String.class); ExpectedAnnotation thriftResult = Expectations.annotation("thrift.result", "echo_result(success:" + expectedMessage + ")"); ExpectedTrace receiveBaseTrace = event("THRIFT_CLIENT_INTERNAL", // ServiceType receiveBase, // Method thriftResult // Annotation("thrift.result") ); // ********** Root trace for Asynchronous traces // SpanEvent - TAsyncClientManager.call Method call = TAsyncClientManager.class.getDeclaredMethod("call", TAsyncMethodCall.class); ExpectedAnnotation thriftUrl = Expectations.annotation("thrift.url", actualServerAddress.getHostName() + ":" + actualServerAddress.getPort() + "/com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo_call"); ExpectedTrace callTrace = event("THRIFT_CLIENT", // ServiceType call, // Method null, // rpc null, // endPoint actualServerAddress.getHostName() + ":" + actualServerAddress.getPort(), // destinationId thriftUrl // Annotation("thrift.url") ); verifier.verifyTrace(async(callTrace, asyncInvocationTrace, cleanUpAndFireCallbackTrace, receiveBaseTrace)); }
@Override public void verifyTraces(PluginTestVerifier verifier, String expectedMessage) throws Exception { final InetSocketAddress actualServerAddress = this.environment.getServerAddress(); // SpanEvent - TServiceClient.sendBase Method sendBase = TServiceClient.class.getDeclaredMethod("sendBase", String.class, TBase.class); // refer to com.navercorp.pinpoint.plugin.thrift.ThriftUtils#getClientServiceName ExpectedAnnotation thriftUrl = Expectations.annotation("thrift.url", actualServerAddress.getHostName() + ":" + actualServerAddress.getPort() + "/com/navercorp/pinpoint/plugin/thrift/dto/EchoService/echo"); ExpectedAnnotation thriftArgs = Expectations.annotation("thrift.args", "echo_args(message:" + expectedMessage + ")"); // SpanEvent - TServiceClient.receiveBase Method receiveBase = TServiceClient.class.getDeclaredMethod("receiveBase", TBase.class, String.class); ExpectedAnnotation thriftResult = Expectations.annotation("thrift.result", "echo_result(success:" + expectedMessage + ")"); verifier.verifyDiscreteTrace(event("THRIFT_CLIENT", // ServiceType sendBase, // Method null, // rpc null, // endPoint HostAndPort.toHostAndPortString(actualServerAddress.getHostName(), actualServerAddress.getPort()), // destinationId thriftUrl, // Annotation("thrift.url") thriftArgs), // Annotation("thrift.args") event("THRIFT_CLIENT_INTERNAL", // ServiceType receiveBase, // Method thriftResult // Annotation("thrift.result") )); }