> 文档中心 > thrift实战《代码》

thrift实战《代码》

简单demo

1,编写IDL文件 User.thrift

namespace java thrift.generatedtypedef i16 shorttypedef i32 inttypedef i64 longtypedef bool booleantypedef string Stringstruct User{    1:optional int id,    2:String name,    3: boolean married;}exception DataException{    1: String message}service UserService{    User getPersonByUserName(1:required String username) throws(1:DataException datae)    void savePerson(1:required User person)    bool isExist(1:String name)}

2,使用thrift 生成 java接口文件 命令为

thrift-0.16.0.exe -gen java  User.thrift

3,生成的如下三个文件
在这里插入图片描述
4,编写实现类

@Slf4jpublic class UserServiceImpl implements UserService.Iface {    @Override    public User getPersonByUserName(String username) throws DataException, TException { log.info("getPersonByUserName............."); System.out.println("getPersonByUserName"); User user = new User(); user.setId(123); user.setName(username); user.setMarried(false); return user;    }    @Override    public void savePerson(User person) throws TException { System.out.println("savePerson"); System.out.println("用户保存到数据库。。。。");    }    @Override    public boolean isExist(String name) throws TException { System.out.println("isExist"); if ("tt".equals(name)) {     return true; } return false;    }}

5,客户端编写

 public static void main(String[] args) { TTransport transport = null; try {     transport = new TSocket("localhost", 9999);     transport = new TFramedTransport(transport);     TCompactProtocol tBinaryProtocol = new TCompactProtocol(transport);     UserService.Client client = new UserService.Client(tBinaryProtocol);     transport.open();     Thread.sleep(10000);     //发起rpc调用     User user = client.getPersonByUserName("额威威----" + i);     System.out.println(user);     transport.close(); } catch (Exception e) {     log.info(e.getMessage()); }    }

6,服务端编写

public static void main(String[] args) throws TTransportException { TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(9999); //获取Processor UserService.Processor<UserServiceImpl> processor = new UserService.Processor<>(new UserServiceImpl()); //指导传输层协议 TCompactProtocol.Factory factory = new TCompactProtocol.Factory(); //指定传输层格式 TFramedTransport.Factory transport = new TFramedTransport.Factory(); TThreadedSelectorServer.Args targs = new TThreadedSelectorServer.Args(serverSocket); targs.processor(processor); targs.protocolFactory(factory); targs.transportFactory(transport); //引入线程池处理业务 TServer server = new TThreadedSelectorServer(targs); System.out.println("服务启动。。。。"); server.serve();    }

复杂demo

这里对client进行修改,采用GenericKeyedObjectPool线程池和反射的方式对client进行优化。
场景: 项目中有大量的rpc请求,请求的特点时间长。
解决方案: 使用GenericKeyedObjectPool,T为TServiceClient,简单的说就是一个socket,Node是服务端的节点信息,包含ip和port,线程池中存的是每个服务节点作为key的连接,这样我们可以对控制每个服务端节点调用策略,而且可以有效避免多次创建socket连接。
GenericKeyedObjectPool: commons-pool 对象池,池中存储的是对象,类似K-V,不过V是集合的方式。特点是可以设置对象池中的对象特征
GenericKeyedObjectPool.Config:
config.minEvictableIdleTimeMillis = 1000L 60L10L;//空闲对象清理之前在池中闲置的最小时间
config.timeBetweenEvictionRunsMillis = 1000L 60L30L;//空闲对象清理运行周期
config.whenExhaustedAction = GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW;//当达到最大容量时直接创建
config.minIdle = 10;//最小空闲对象数
config.maxWait = 10 * 1000L;//获取对象的等待时间
config.testOnBorrow = true;//借出对象时监测对象是否可用
config.maxIdle = 30;//最大空闲对象数
config.maxActive = 100;//每个键分配的最大对象数

1,服务端代码用简单demo里的

2,客户端代码
1)pom

   <dependency>     <groupId>org.apache.thrift</groupId>     <artifactId>libthrift</artifactId>     <version>0.12.0</version> </dependency>

2)线程池对象工厂 ,主要用来定义创建对象,销毁对象,校验对象逻辑

@Slf4jpublic class MyBeanPooledFactory<T> implements KeyedPoolableObjectFactory<ServerNode, T> {    private final TServiceClientFactory<TServiceClient> clientFactory;    private final int timeout;    public MyBeanPooledFactory(TServiceClientFactory<TServiceClient> clientFactory, int timeout) { this.clientFactory = clientFactory; this.timeout = timeout;    }    @SuppressWarnings("unchecked")    @Override    public T makeObject(ServerNode key) throws Exception { if (key != null) {     TSocket tsocket = new TSocket(key.getIp(), key.getPort(), timeout);     TFramedTransport tFramedTransport = new TFramedTransport(tsocket);     TProtocol protocol = new TCompactProtocol(tFramedTransport);     TServiceClient client = clientFactory.getClient(protocol);     tsocket.open();     log.info("创建一个socket....{}",key.getIp());     return (T) client; } log.error("Not find a server!"); throw new Exception("Not find a server!");    }    @Override    public void destroyObject(ServerNode key, T client) throws Exception { TTransport tp = ((TServiceClient) client).getInputProtocol().getTransport(); tp.close();    }    @Override    public boolean validateObject(ServerNode key, T client) { TTransport tp = ((TServiceClient) client).getInputProtocol().getTransport(); return tp.isOpen();    }    @Override    public void activateObject(ServerNode serverNode, T t) throws Exception {    }    @Override    public void passivateObject(ServerNode serverNode, T t) throws Exception {    }

3)线程池工厂

public class KeyPoolFactory<T> {    /**     * 对象池     */    private GenericKeyedObjectPool<ServerNode, T> pool;    /**     * 配置     */    private static GenericKeyedObjectPool.Config config;    /**     * 对象吃每隔key最大实例化对象数     */    private final static int TOTAL_PERK_KEY = 10;    /**     * 对象池每隔key最大的闲置对象数     */    private static int idle_PER_KEY = 3;    private static int maxActive = 8;    /**     * 链接池中最大空闲的连接数,默认为100     */    private static int maxIdle = 5;    /**     * 连接池中最少空闲的连接数,默认为0     */    private static int minIdle = 3;    /**     * 当连接池资源耗尽时,调用者最大阻塞的时间     */    private static int maxWait = 2000;    /**     * 空闲链接”检测线程,检测的周期,毫秒数,默认位3min,-1表示关闭空闲检测     */    private static int timeBetweenEvictionRunsMillis = 180000;    /**     * 空闲时是否进行连接有效性验证,如果验证失败则移除,默认为false     */    private static boolean testWhileIdle = false;    static { // 设置poolConfig config = new GenericKeyedObjectPool.Config(); config.maxActive = maxActive; config.maxIdle = maxIdle; config.minIdle = minIdle; config.maxWait = maxWait; config.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; config.testWhileIdle = testWhileIdle;    }    /**     * 从对象池中获取对象     */    public T getBean(ServerNode key) throws Exception { if (Objects.isNull(pool)) {     init(); } return pool.borrowObject(key);    }    /**     * 归还对象     */    public void returnBean(ServerNode key, T user) throws Exception { if (pool == null) {     init(); } pool.returnObject(key, user);    }    /**     * 关闭对象池     */    public synchronized void close() throws Exception { if (pool != null) {     pool.close();     pool = null; }    }    public synchronized void init() throws ClassNotFoundException, IllegalAccessException, InstantiationException { ClassLoader classLoader = UserService.Iface.class.getClassLoader(); // 加载Iface接口 Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(findOutClassName() + "$Client$Factory"); TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance(); MyBeanPooledFactory<T> clientPool = new MyBeanPooledFactory<T>(clientFactory, 1000); if (pool != null) return; pool = new GenericKeyedObjectPool<ServerNode, T>(clientPool, config);    }    public GenericKeyedObjectPool<ServerNode, T> getGenericKeyedObjectPool() throws IllegalAccessException, InstantiationException, ClassNotFoundException { if (pool == null) {     init(); } return pool;    }    private static String findOutClassName() { if (UserService.Iface.class.getName().contains("$")) {     return UserService.Iface.class.getName().substring(0, UserService.Iface.class.getName().indexOf("$")); } return UserService.Iface.class.getName();    }}

4)反射相关的接口
反射接口定义

public interface Invoker {    /**     * 调用     * 

* * @param method * @param args * @return result * @throws Exception */ Object invoke(Method method, Object[] args) throws Exception;}

默认反射调用实现

public class DefaultInvoker<T> implements Invoker {    private final Logger LOGGER = LoggerFactory.getLogger(getClass());    private final GenericKeyedObjectPool<ServerNode, T> pool;    private final int retry;    private final ServerNode clientNode;    public DefaultInvoker(ServerNode clientNode, GenericKeyedObjectPool<ServerNode, T> pool, int retry) { this.clientNode = clientNode; this.pool = pool; this.retry = retry;    }    @Override    public Object invoke(Method method, Object[] args) throws Exception { T client = null; ServerNode serverNode = new ServerNode("127.0.0.1", 9999); Throwable exception = null; for (int i = 0; i == 0 || i < retry + 1; i++) {     try {  if (serverNode == null) {      continue;  }  if (LOGGER.isDebugEnabled()) {      LOGGER.debug("Invoke to {}.", serverNode);  }  client = pool.borrowObject(serverNode);  Object result = method.invoke(client, args);  System.out.println(result);  return result;     } catch (InvocationTargetException ite) {     } } throw new Exception(exception);    }}

创建代理类

public class DynamicClientHandler implements InvocationHandler {    private final Invoker invoker;    public DynamicClientHandler(Invoker invoker) { this.invoker = invoker;    }    @SuppressWarnings("unchecked")    public <T> T bind(ClassLoader classLoader, Class<?> serviceClass) throws ClassNotFoundException { return (T) Proxy.newProxyInstance(classLoader, new Class[] { serviceClass }, this);    }    @Override    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { return invoker.invoke(method, args);    }}

5) 对线程池及配置进行加载

public class ThriftClient {    static final String iface = UserService.Iface.class.getName();    public static UserService.Iface anomalyDetectorService;    static { try {     anomalyDetectorService = (UserService.Iface) createProxy(); } catch (Exception var1) {     var1.printStackTrace(); }    }    private static Object createProxy() throws Exception { ClassLoader classLoader = UserService.Iface.class.getClassLoader(); // 加载Iface接口 Class<?> objectClass = classLoader.loadClass(iface); KeyPoolFactory<T> objectKeyPoolFactory = new KeyPoolFactory<>(); Invoker invoker = new DefaultInvoker<T>(new ServerNode("127.0.0.1", 9999), objectKeyPoolFactory.getGenericKeyedObjectPool(), 3); DynamicClientHandler dynamicClientHandler = new DynamicClientHandler(invoker); return dynamicClientHandler.<T>bind(classLoader, objectClass);    }}

6)测试

public class PoolClient<T> {    static ServerNode[] keys = {new ServerNode("127.0.0.1", 9999)};    /**     * 随机key     */    Random r = new Random();    /**     * 获取key值     *     * @return     */    ServerNode getKey() { return keys[r.nextInt(keys.length)];    } @Test    public void getBean() { try { TimeUnit.SECONDS.sleep(1);  User user= ThriftClient.anomalyDetectorService.getPersonByUserName("ccc");   System.out.println(user); } catch (InterruptedException e) {     e.printStackTrace(); } catch (Exception e) {     e.printStackTrace(); }    }}

后面自己可以实现心跳机制和调用策略
心跳机制:ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory(“rpc-live”, true));然后用scheduled 去创建一个调度延迟线程跑ping任务,对服务节点进行连接测试