thrift实战《代码》
简单demo
1,编写IDL文件 User.thrift
namespace java thrift.generatedtypedef i16 shorttypedef i32 inttypedef i64 longtypedef bool booleantypedef string Stringstruct User{ 1:optional int id, 2:String name, 3: boolean married;}exception DataException{ 1: String message}service UserService{ User getPersonByUserName(1:required String username) throws(1:DataException datae) void savePerson(1:required User person) bool isExist(1:String name)}
2,使用thrift 生成 java接口文件 命令为
thrift-0.16.0.exe -gen java User.thrift
3,生成的如下三个文件
4,编写实现类
@Slf4jpublic class UserServiceImpl implements UserService.Iface { @Override public User getPersonByUserName(String username) throws DataException, TException { log.info("getPersonByUserName............."); System.out.println("getPersonByUserName"); User user = new User(); user.setId(123); user.setName(username); user.setMarried(false); return user; } @Override public void savePerson(User person) throws TException { System.out.println("savePerson"); System.out.println("用户保存到数据库。。。。"); } @Override public boolean isExist(String name) throws TException { System.out.println("isExist"); if ("tt".equals(name)) { return true; } return false; }}
5,客户端编写
public static void main(String[] args) { TTransport transport = null; try { transport = new TSocket("localhost", 9999); transport = new TFramedTransport(transport); TCompactProtocol tBinaryProtocol = new TCompactProtocol(transport); UserService.Client client = new UserService.Client(tBinaryProtocol); transport.open(); Thread.sleep(10000); //发起rpc调用 User user = client.getPersonByUserName("额威威----" + i); System.out.println(user); transport.close(); } catch (Exception e) { log.info(e.getMessage()); } }
6,服务端编写
public static void main(String[] args) throws TTransportException { TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(9999); //获取Processor UserService.Processor<UserServiceImpl> processor = new UserService.Processor<>(new UserServiceImpl()); //指导传输层协议 TCompactProtocol.Factory factory = new TCompactProtocol.Factory(); //指定传输层格式 TFramedTransport.Factory transport = new TFramedTransport.Factory(); TThreadedSelectorServer.Args targs = new TThreadedSelectorServer.Args(serverSocket); targs.processor(processor); targs.protocolFactory(factory); targs.transportFactory(transport); //引入线程池处理业务 TServer server = new TThreadedSelectorServer(targs); System.out.println("服务启动。。。。"); server.serve(); }
复杂demo
这里对client进行修改,采用GenericKeyedObjectPool线程池和反射的方式对client进行优化。
场景: 项目中有大量的rpc请求,请求的特点时间长。
解决方案: 使用GenericKeyedObjectPool,T为TServiceClient,简单的说就是一个socket,Node是服务端的节点信息,包含ip和port,线程池中存的是每个服务节点作为key的连接,这样我们可以对控制每个服务端节点调用策略,而且可以有效避免多次创建socket连接。
GenericKeyedObjectPool: commons-pool 对象池,池中存储的是对象,类似K-V,不过V是集合的方式。特点是可以设置对象池中的对象特征
GenericKeyedObjectPool.Config:
config.minEvictableIdleTimeMillis = 1000L 60L10L;//空闲对象清理之前在池中闲置的最小时间
config.timeBetweenEvictionRunsMillis = 1000L 60L30L;//空闲对象清理运行周期
config.whenExhaustedAction = GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW;//当达到最大容量时直接创建
config.minIdle = 10;//最小空闲对象数
config.maxWait = 10 * 1000L;//获取对象的等待时间
config.testOnBorrow = true;//借出对象时监测对象是否可用
config.maxIdle = 30;//最大空闲对象数
config.maxActive = 100;//每个键分配的最大对象数
1,服务端代码用简单demo里的
2,客户端代码
1)pom
<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.12.0</version> </dependency>
2)线程池对象工厂 ,主要用来定义创建对象,销毁对象,校验对象逻辑
@Slf4jpublic class MyBeanPooledFactory<T> implements KeyedPoolableObjectFactory<ServerNode, T> { private final TServiceClientFactory<TServiceClient> clientFactory; private final int timeout; public MyBeanPooledFactory(TServiceClientFactory<TServiceClient> clientFactory, int timeout) { this.clientFactory = clientFactory; this.timeout = timeout; } @SuppressWarnings("unchecked") @Override public T makeObject(ServerNode key) throws Exception { if (key != null) { TSocket tsocket = new TSocket(key.getIp(), key.getPort(), timeout); TFramedTransport tFramedTransport = new TFramedTransport(tsocket); TProtocol protocol = new TCompactProtocol(tFramedTransport); TServiceClient client = clientFactory.getClient(protocol); tsocket.open(); log.info("创建一个socket....{}",key.getIp()); return (T) client; } log.error("Not find a server!"); throw new Exception("Not find a server!"); } @Override public void destroyObject(ServerNode key, T client) throws Exception { TTransport tp = ((TServiceClient) client).getInputProtocol().getTransport(); tp.close(); } @Override public boolean validateObject(ServerNode key, T client) { TTransport tp = ((TServiceClient) client).getInputProtocol().getTransport(); return tp.isOpen(); } @Override public void activateObject(ServerNode serverNode, T t) throws Exception { } @Override public void passivateObject(ServerNode serverNode, T t) throws Exception { }
3)线程池工厂
public class KeyPoolFactory<T> { /** * 对象池 */ private GenericKeyedObjectPool<ServerNode, T> pool; /** * 配置 */ private static GenericKeyedObjectPool.Config config; /** * 对象吃每隔key最大实例化对象数 */ private final static int TOTAL_PERK_KEY = 10; /** * 对象池每隔key最大的闲置对象数 */ private static int idle_PER_KEY = 3; private static int maxActive = 8; /** * 链接池中最大空闲的连接数,默认为100 */ private static int maxIdle = 5; /** * 连接池中最少空闲的连接数,默认为0 */ private static int minIdle = 3; /** * 当连接池资源耗尽时,调用者最大阻塞的时间 */ private static int maxWait = 2000; /** * 空闲链接”检测线程,检测的周期,毫秒数,默认位3min,-1表示关闭空闲检测 */ private static int timeBetweenEvictionRunsMillis = 180000; /** * 空闲时是否进行连接有效性验证,如果验证失败则移除,默认为false */ private static boolean testWhileIdle = false; static { // 设置poolConfig config = new GenericKeyedObjectPool.Config(); config.maxActive = maxActive; config.maxIdle = maxIdle; config.minIdle = minIdle; config.maxWait = maxWait; config.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; config.testWhileIdle = testWhileIdle; } /** * 从对象池中获取对象 */ public T getBean(ServerNode key) throws Exception { if (Objects.isNull(pool)) { init(); } return pool.borrowObject(key); } /** * 归还对象 */ public void returnBean(ServerNode key, T user) throws Exception { if (pool == null) { init(); } pool.returnObject(key, user); } /** * 关闭对象池 */ public synchronized void close() throws Exception { if (pool != null) { pool.close(); pool = null; } } public synchronized void init() throws ClassNotFoundException, IllegalAccessException, InstantiationException { ClassLoader classLoader = UserService.Iface.class.getClassLoader(); // 加载Iface接口 Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(findOutClassName() + "$Client$Factory"); TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance(); MyBeanPooledFactory<T> clientPool = new MyBeanPooledFactory<T>(clientFactory, 1000); if (pool != null) return; pool = new GenericKeyedObjectPool<ServerNode, T>(clientPool, config); } public GenericKeyedObjectPool<ServerNode, T> getGenericKeyedObjectPool() throws IllegalAccessException, InstantiationException, ClassNotFoundException { if (pool == null) { init(); } return pool; } private static String findOutClassName() { if (UserService.Iface.class.getName().contains("$")) { return UserService.Iface.class.getName().substring(0, UserService.Iface.class.getName().indexOf("$")); } return UserService.Iface.class.getName(); }}
4)反射相关的接口
反射接口定义
public interface Invoker { /** * 调用 * * * @param method * @param args * @return result * @throws Exception */
Object invoke(Method method, Object[] args) throws Exception;}
默认反射调用实现
public class DefaultInvoker<T> implements Invoker { private final Logger LOGGER = LoggerFactory.getLogger(getClass()); private final GenericKeyedObjectPool<ServerNode, T> pool; private final int retry; private final ServerNode clientNode; public DefaultInvoker(ServerNode clientNode, GenericKeyedObjectPool<ServerNode, T> pool, int retry) { this.clientNode = clientNode; this.pool = pool; this.retry = retry; } @Override public Object invoke(Method method, Object[] args) throws Exception { T client = null; ServerNode serverNode = new ServerNode("127.0.0.1", 9999); Throwable exception = null; for (int i = 0; i == 0 || i < retry + 1; i++) { try { if (serverNode == null) { continue; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Invoke to {}.", serverNode); } client = pool.borrowObject(serverNode); Object result = method.invoke(client, args); System.out.println(result); return result; } catch (InvocationTargetException ite) { } } throw new Exception(exception); }}
创建代理类
public class DynamicClientHandler implements InvocationHandler { private final Invoker invoker; public DynamicClientHandler(Invoker invoker) { this.invoker = invoker; } @SuppressWarnings("unchecked") public <T> T bind(ClassLoader classLoader, Class<?> serviceClass) throws ClassNotFoundException { return (T) Proxy.newProxyInstance(classLoader, new Class[] { serviceClass }, this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { return invoker.invoke(method, args); }}
5) 对线程池及配置进行加载
public class ThriftClient { static final String iface = UserService.Iface.class.getName(); public static UserService.Iface anomalyDetectorService; static { try { anomalyDetectorService = (UserService.Iface) createProxy(); } catch (Exception var1) { var1.printStackTrace(); } } private static Object createProxy() throws Exception { ClassLoader classLoader = UserService.Iface.class.getClassLoader(); // 加载Iface接口 Class<?> objectClass = classLoader.loadClass(iface); KeyPoolFactory<T> objectKeyPoolFactory = new KeyPoolFactory<>(); Invoker invoker = new DefaultInvoker<T>(new ServerNode("127.0.0.1", 9999), objectKeyPoolFactory.getGenericKeyedObjectPool(), 3); DynamicClientHandler dynamicClientHandler = new DynamicClientHandler(invoker); return dynamicClientHandler.<T>bind(classLoader, objectClass); }}
6)测试
public class PoolClient<T> { static ServerNode[] keys = {new ServerNode("127.0.0.1", 9999)}; /** * 随机key */ Random r = new Random(); /** * 获取key值 * * @return */ ServerNode getKey() { return keys[r.nextInt(keys.length)]; } @Test public void getBean() { try { TimeUnit.SECONDS.sleep(1); User user= ThriftClient.anomalyDetectorService.getPersonByUserName("ccc"); System.out.println(user); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }}
后面自己可以实现心跳机制和调用策略
心跳机制:ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory(“rpc-live”, true));然后用scheduled 去创建一个调度延迟线程跑ping任务,对服务节点进行连接测试