醋醋百科网

Good Luck To You!

springboot-如何集成Netty实现网络通信

一、步骤概览

二、步骤说明

1. 引入依赖包

说明:

  • netty-all:netty-all 是 Netty 的一个打包模块,包含了 Netty 的所有功能和依赖库。
  • protostuff-xx:Java 序列化库,用于将对象序列化为字节流,或将字节流反序列化为对象。protostuff-core 提供了 Protostuff 序列化和反序列化的核心功能,protostuff-runtime 则提供了运行时支持。
  • objenesis实例化对象的 Java 库,用于在不调用构造函数的情况下实例化对象。它通过使用 Java 反射机制来创建类的实例,而无需调用该类的构造函数。这在一些需要动态创建对象的场景中非常有用。
  • guava:Google 提供的一个 Java 工具包。

2.定义交互对象

  • Request:请求对象
  • Response:返回对象
  • 3.实现编解码器

    在网络通信中,编码器(Encoder)和解码器(Decoder)在 Netty 中扮演着重要的角色。它们主要用于将消息对象在网络传输过程中进行编码和解码,以便在不同节点之间进行有效的数据交换。

    ①编码器:将应用程序中的数据或消息对象转化为字节流,以便在网络上传输。它将高级的消息对象转换为底层的字节序列,通常使用协议规定的格式进行编码。编码器负责将数据按照特定的格式打包成字节流,以便发送给远程节点。

    • RpcEncoder:自定义编码器

    ②解码器:它的作用和编码器正好相反,它将接收到的字节流解析为应用程序可理解的消息对象。解码器会根据协议规定的格式,从接收到的字节流中解析出有效的数据,并将其转化为高级的消息对象,以便应用程序能够方便地处理和使用这些数据。

    • RpcDecoder:自定义解码器

    4.封装请求结果

    在 Netty 中进行通信时,需要封装请求结果和进行请求结果缓存的原因与 Netty 异步通信的特性有关。Netty 是一个基于 NIO 的网络通信框架,它采用异步事件驱动的方式处理网络操作。这意味着在 Netty 中,网络请求和响应的处理是非阻塞的,请求和响应之间的关系是不同步的,因此需要对结果进行封装和缓存。

    ①.请求结果

    • WriteFuture:请求结果接口定义
    • SyncWriteFuture:请求结果接口实现
    public class SyncWriteFuture implements WriteFuture<Response>{
        private CountDownLatch latch = new CountDownLatch(1);
        private final long begin = System.currentTimeMillis();
        private final String requestId;
        private boolean writeResult;
        private Response response;
        private boolean isTimeout = false;
        private long timeout;
        private Throwable cause;
        public SyncWriteFuture(String requestId) {
            this.requestId = requestId;
        }
    
        @Override
        public Throwable cause() {
            return this.cause;
        }
        @Override
        public void setCause(Throwable cause) {
            this.cause = cause;
        }
        @Override
        public boolean isWriteSuccess() {
            return writeResult;
        }
        @Override
        public void setWriteResult(boolean result) {
            this.writeResult = writeResult;
        }
        @Override
        public String requestId() {
            return this.requestId;
        }
        @Override
        public Response response() {
            return this.response;
        }
    
        @Override
        public void setResponse(Response response) {
            this.response = response;
            latch.countDown();
        }
    
        @Override
        public boolean isTimeout() {
            if (isTimeout) {
                return isTimeout;
            }
            return System.currentTimeMillis() - begin > timeout;
        }
    
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return true;
        }
    
        @Override
        public boolean isCancelled() {
            return false;
        }
    
        @Override
        public boolean isDone() {
            return false;
        }
    
        @Override
        public Response get() throws InterruptedException, ExecutionException {
            latch.wait();
            return this.response;
        }
    
        @Override
        public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            if (latch.await(timeout, unit)) {
                return response;
            }
            return null;
        }
    }

    说明,这边获取结果,使用 CountDownLatch 进行阻塞等待,当Netty 接受到事件,通过 setResponse 方法使CountDownLatch 取消等待,这样就可以同步等待获取请求结果;同时也支持超时等待机制,可以设置等待超时时间。

    ②.请求结果缓存

    由于Netty中网络请求和响应的处理是非阻塞的,因此我们需要将请求结果缓存起来,当Netty异步响应时,可以从缓存中获取请求结果,并将异步获取的返回信息设置到请求结果对象中。

    • SyncWriteCache:请求结果缓存

    5.实现服务端监听

    ① 消息处理器

    消息处理器,主要负责完成对接受事件的处理工作。在实现 Netty 的消息处理器,需要继承
    ChannelInboundHandlerAdapter,它是 Netty 中用于处理接受事件的适配器类,通过继承它并重写其中的方法,可以完成对接受事件的处理逻辑。

    • MyServerHandler:自定义服务端消息处理器

    ② 通道初始化器

    通道初始化器,主要负责完成Channel 的初始化配置工作,主要包括编解码器设置、消息处理器设置等。在实现 Netty 的通道初始化器时,我们只需要继承ChannelInitializer,它是 Netty 中用于对新建的 Channel 进行初始化配置的抽象类,通过继承它并实现其中的方法,可以完成对 Channel 的初始化设置,包括添加各种 ChannelHandler、配置 ChannelPipeline 等操作。

    • MyServerInitializer:自定义服务端通道初始化器

    ③.服务端启动类

    服务器端启动类主要负责实现了一个简单的 Netty 服务器,通过使用 ServerBootstrap、EventLoopGroup 和自定义的 Channel 初始化器,在指定的端口上监听客户端连接,并处理每个连接的具体数据。同时,使用了优雅关闭的方式来关闭服务器的线程资源。

    • NettyServer:Netty实现的服务器

    6.封装客户端

    ①.客户端接口

    客户端接口包括通信客户端接口定义和 Netty的具体实现。

    • IClient:Tcp通信客户端接口定义
    • NettyClient:Netty 方式的客户端实现
    @Slf4j
    public class NettyClient implements IClient {
        private final Channel channel;
        private final ServerAddress serverAddress;
        public NettyClient(Channel channel, ServerAddress serverAddress) {
            this.channel = channel;
            this.serverAddress = serverAddress;
        }
    
        @Override
        public boolean asyncReq(Request request) {
            channel.writeAndFlush(request);
            return true;
        }
    
        @Override
        public Response syncReq(Request request) throws ApiException {
            return syncReq(request,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT_UNIT);
        }
    
        @Override
        public Response syncReq(Request request, long timeout, TimeUnit timeUnit) throws ApiException {
            log.info("request:[{}]",request);
            SyncWriteFuture syncWriteFuture = new SyncWriteFuture(request.getRequestId());
            SyncWriteCache.put(request.getRequestId(),syncWriteFuture);
            channel.writeAndFlush(request);
            Response response;
            try {
                response = syncWriteFuture.get(timeout,timeUnit);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ApiException("请求失败");
            } catch (ExecutionException e) {
                throw new ApiException("请求失败");
            } catch (TimeoutException e) {
                throw new ApiException("请求超时");
            } finally {
                SyncWriteCache.clear(request.getRequestId());
            }
            return response;
        }
    
        @Override
        public ServerAddress getServerAddress() {
            return this.serverAddress;
        }
    
        @Override
        public void close() {
           if(null != channel && channel.isActive()) {
               channel.close();
           }
        }
    
        @Override
        public boolean isConnected() {
            return channel.isActive();
        }
    
        @Override
        public String getClientId() {
            return channel.id().asLongText();
        }
    }

    ②.客户端工厂

    客户端工厂负责简化客户端的获取方式。

    • IClientFactory:客户端工厂接口定义
    • AbstractClientFactory:客户端工厂接口抽象层实现,提供获取客户端的模板方法,具体则由子类实现。
    @Slf4j
    public abstract class AbstractClientFactory implements IClientFactory {
        public static final Cache<ServerAddress, IClient> LONG_CONNECT_CACHE = CacheBuilder.newBuilder()
                // 单机长连接上限,超过上限采用LRU淘汰
                .maximumSize(65535)
                .expireAfterAccess(360, TimeUnit.DAYS)
                // 设置缓存移除监听器
                .removalListener((RemovalListener<ServerAddress, IClient>) notic -> {
                    log.debug("移除client[{}][{}][{}]", notic.getKey(), notic.getValue(), notic.getCause());
                }).build();
    
        protected abstract IClient createClient(ServerAddress address) throws ApiException;
    
        @Override
        public IClient get(ServerAddress address) throws ApiException {
            IClient client;
            if (address.isLongConnection()) {
                try {
                    client = LONG_CONNECT_CACHE.get(address, () -> createClient(address));
                } catch (ExecutionException e) {
                    log.error(e.getMessage(), e);
                    throw new ApiException("连接失败");
                }
            } else {
                client = createClient(address);
            }
            return client;
        }
    
        @Override
        public List<IClient> getCachedClients() {
            List<IClient> clientList = new ArrayList<>();
            clientList.addAll(LONG_CONNECT_CACHE.asMap().values());
            return clientList;
        }
    
        @Override
        public void remove(ServerAddress address) {
            LONG_CONNECT_CACHE.invalidate(address);
        }
    }
    • NettyClientFactory:Netty 客户端工厂实现方式
    @Slf4j
    public class NettyClientFactory extends AbstractClientFactory{
        private static final String BYTE_BUF_POOL_NAME = "bytebuf.pool";
        private static final String IO_RATIO_NAME = "ioratio";
        public static final NioEventLoopGroup WORKER_GROUP = new NioEventLoopGroup();
        public static final ByteBufAllocator BYTE_BUF_ALLOCATOR;
    
        static {
            WORKER_GROUP.setIoRatio(SystemPropertyUtil.getInt(IO_RATIO_NAME, 100));
            if (SystemPropertyUtil.getBoolean(BYTE_BUF_POOL_NAME, false)) {
                BYTE_BUF_ALLOCATOR = PooledByteBufAllocator.DEFAULT;
            } else {
                BYTE_BUF_ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
            }
        }
    
        @Override
        protected IClient createClient(ServerAddress address) throws ApiException {
            //启动引导类
            final Bootstrap bootstrap = new Bootstrap();
            //绑定工作线程组
            bootstrap.group(NettyClientFactory.WORKER_GROUP);
            //设置低延迟
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            //设置让关闭的的端口尽早可以使用
            bootstrap.option(ChannelOption.SO_REUSEADDR, true);
            //若是长连接,开启SOCKET默认的心跳机制,短连接则不不开启
            bootstrap.option(ChannelOption.SO_KEEPALIVE, address.isLongConnection());
            //使用内存池
            bootstrap.option(ChannelOption.ALLOCATOR, NettyClientFactory.BYTE_BUF_ALLOCATOR);
            //设置IO类型
            bootstrap.channel(NioSocketChannel.class);
            //设置初始化连接配置
            bootstrap.handler(InitializerFactory.get(address));
            //若是设置的连接超时时间
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, address.getConnectTimeout());
    
            String targetIp = address.getIp();
            int targetPort = address.getPort();
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIp, targetPort));
            if (future.awaitUninterruptibly(address.getConnectTimeout()) && future.isSuccess() && future.channel().isActive()) {
                log.info("[{}]连接成功,服务器端地址[{}:{}]",targetIp,targetPort);
                Channel channel = future.channel();
                return new NettyClient(channel,address);
            } else {
                future.cancel(true);
                future.channel().close();
                StringBuilder errorMsg = new StringBuilder();
                errorMsg.append("服务器[").append(targetIp).append(":").append(targetPort).append("]连接失败");
                throw new ApiException(errorMsg.toString());
            }
        }
    }

    ③.客户端处理器

    客户端处理器主要负责完成对接收事件的处理工作。

    • AbstractChannelHandler:处理器抽象层,定义接受事件处理的模板方法,具体处理逻辑则由子类实现。
    • ClientHandler:客户端具体处理器,负责对事件的具体逻辑处理。

    ④.通道初始化器

    客户端通道初始化器,主要负责完成Channel 的初始化配置工作,主要包括编解码器设置、消息处理器设置等。

    • AbstractChannelInitializer:通道初始化器抽象层实现,定义通道初始化模板方法,具体初始化哪些组件则从子类获取。

    其中初始化通道模板方法 initChannel 详情如下图所示:

    三、测试

    1.测试代码

    • ClientTest:客户端测试类
    • ServerTest:服务端测试类

    2.测试结果

    先启动服务器端,再启动客户端,看客户端请求是否正常接受到返回数据。

    客户端能请求返回数据,测试成功。


    控制面板
    您好,欢迎到访网站!
      查看权限
    网站分类
    最新留言