一、步骤概览
二、步骤说明
1. 引入依赖包
说明:
- netty-all:netty-all 是 Netty 的一个打包模块,包含了 Netty 的所有功能和依赖库。
- protostuff-xx:Java 序列化库,用于将对象序列化为字节流,或将字节流反序列化为对象。protostuff-core 提供了 Protostuff 序列化和反序列化的核心功能,protostuff-runtime 则提供了运行时支持。
- objenesis:实例化对象的 Java 库,用于在不调用构造函数的情况下实例化对象。它通过使用 Java 反射机制来创建类的实例,而无需调用该类的构造函数。这在一些需要动态创建对象的场景中非常有用。
- guava:Google 提供的一个 Java 工具包。
2.定义交互对象
- Request:请求对象
3.实现编解码器
在网络通信中,编码器(Encoder)和解码器(Decoder)在 Netty 中扮演着重要的角色。它们主要用于将消息对象在网络传输过程中进行编码和解码,以便在不同节点之间进行有效的数据交换。
①编码器:将应用程序中的数据或消息对象转化为字节流,以便在网络上传输。它将高级的消息对象转换为底层的字节序列,通常使用协议规定的格式进行编码。编码器负责将数据按照特定的格式打包成字节流,以便发送给远程节点。
- RpcEncoder:自定义编码器
②解码器:它的作用和编码器正好相反,它将接收到的字节流解析为应用程序可理解的消息对象。解码器会根据协议规定的格式,从接收到的字节流中解析出有效的数据,并将其转化为高级的消息对象,以便应用程序能够方便地处理和使用这些数据。
- RpcDecoder:自定义解码器
4.封装请求结果
在 Netty 中进行通信时,需要封装请求结果和进行请求结果缓存的原因与 Netty 异步通信的特性有关。Netty 是一个基于 NIO 的网络通信框架,它采用异步事件驱动的方式处理网络操作。这意味着在 Netty 中,网络请求和响应的处理是非阻塞的,请求和响应之间的关系是不同步的,因此需要对结果进行封装和缓存。
①.请求结果
- WriteFuture:请求结果接口定义
- SyncWriteFuture:请求结果接口实现
public class SyncWriteFuture implements WriteFuture<Response>{
private CountDownLatch latch = new CountDownLatch(1);
private final long begin = System.currentTimeMillis();
private final String requestId;
private boolean writeResult;
private Response response;
private boolean isTimeout = false;
private long timeout;
private Throwable cause;
public SyncWriteFuture(String requestId) {
this.requestId = requestId;
}
@Override
public Throwable cause() {
return this.cause;
}
@Override
public void setCause(Throwable cause) {
this.cause = cause;
}
@Override
public boolean isWriteSuccess() {
return writeResult;
}
@Override
public void setWriteResult(boolean result) {
this.writeResult = writeResult;
}
@Override
public String requestId() {
return this.requestId;
}
@Override
public Response response() {
return this.response;
}
@Override
public void setResponse(Response response) {
this.response = response;
latch.countDown();
}
@Override
public boolean isTimeout() {
if (isTimeout) {
return isTimeout;
}
return System.currentTimeMillis() - begin > timeout;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return true;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Response get() throws InterruptedException, ExecutionException {
latch.wait();
return this.response;
}
@Override
public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (latch.await(timeout, unit)) {
return response;
}
return null;
}
}
说明,这边获取结果,使用 CountDownLatch 进行阻塞等待,当Netty 接受到事件,通过 setResponse 方法使CountDownLatch 取消等待,这样就可以同步等待获取请求结果;同时也支持超时等待机制,可以设置等待超时时间。
②.请求结果缓存
由于Netty中网络请求和响应的处理是非阻塞的,因此我们需要将请求结果缓存起来,当Netty异步响应时,可以从缓存中获取请求结果,并将异步获取的返回信息设置到请求结果对象中。
- SyncWriteCache:请求结果缓存
5.实现服务端监听
① 消息处理器
消息处理器,主要负责完成对接受事件的处理工作。在实现 Netty 的消息处理器,需要继承
ChannelInboundHandlerAdapter,它是 Netty 中用于处理接受事件的适配器类,通过继承它并重写其中的方法,可以完成对接受事件的处理逻辑。
- MyServerHandler:自定义服务端消息处理器
② 通道初始化器
通道初始化器,主要负责完成Channel 的初始化配置工作,主要包括编解码器设置、消息处理器设置等。在实现 Netty 的通道初始化器时,我们只需要继承ChannelInitializer,它是 Netty 中用于对新建的 Channel 进行初始化配置的抽象类,通过继承它并实现其中的方法,可以完成对 Channel 的初始化设置,包括添加各种 ChannelHandler、配置 ChannelPipeline 等操作。
- MyServerInitializer:自定义服务端通道初始化器
③.服务端启动类
服务器端启动类主要负责实现了一个简单的 Netty 服务器,通过使用 ServerBootstrap、EventLoopGroup 和自定义的 Channel 初始化器,在指定的端口上监听客户端连接,并处理每个连接的具体数据。同时,使用了优雅关闭的方式来关闭服务器的线程资源。
- NettyServer:Netty实现的服务器
6.封装客户端
①.客户端接口
客户端接口包括通信客户端接口定义和 Netty的具体实现。
- IClient:Tcp通信客户端接口定义
- NettyClient:Netty 方式的客户端实现
@Slf4j
public class NettyClient implements IClient {
private final Channel channel;
private final ServerAddress serverAddress;
public NettyClient(Channel channel, ServerAddress serverAddress) {
this.channel = channel;
this.serverAddress = serverAddress;
}
@Override
public boolean asyncReq(Request request) {
channel.writeAndFlush(request);
return true;
}
@Override
public Response syncReq(Request request) throws ApiException {
return syncReq(request,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT_UNIT);
}
@Override
public Response syncReq(Request request, long timeout, TimeUnit timeUnit) throws ApiException {
log.info("request:[{}]",request);
SyncWriteFuture syncWriteFuture = new SyncWriteFuture(request.getRequestId());
SyncWriteCache.put(request.getRequestId(),syncWriteFuture);
channel.writeAndFlush(request);
Response response;
try {
response = syncWriteFuture.get(timeout,timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ApiException("请求失败");
} catch (ExecutionException e) {
throw new ApiException("请求失败");
} catch (TimeoutException e) {
throw new ApiException("请求超时");
} finally {
SyncWriteCache.clear(request.getRequestId());
}
return response;
}
@Override
public ServerAddress getServerAddress() {
return this.serverAddress;
}
@Override
public void close() {
if(null != channel && channel.isActive()) {
channel.close();
}
}
@Override
public boolean isConnected() {
return channel.isActive();
}
@Override
public String getClientId() {
return channel.id().asLongText();
}
}
②.客户端工厂
客户端工厂负责简化客户端的获取方式。
- IClientFactory:客户端工厂接口定义
- AbstractClientFactory:客户端工厂接口抽象层实现,提供获取客户端的模板方法,具体则由子类实现。
@Slf4j
public abstract class AbstractClientFactory implements IClientFactory {
public static final Cache<ServerAddress, IClient> LONG_CONNECT_CACHE = CacheBuilder.newBuilder()
// 单机长连接上限,超过上限采用LRU淘汰
.maximumSize(65535)
.expireAfterAccess(360, TimeUnit.DAYS)
// 设置缓存移除监听器
.removalListener((RemovalListener<ServerAddress, IClient>) notic -> {
log.debug("移除client[{}][{}][{}]", notic.getKey(), notic.getValue(), notic.getCause());
}).build();
protected abstract IClient createClient(ServerAddress address) throws ApiException;
@Override
public IClient get(ServerAddress address) throws ApiException {
IClient client;
if (address.isLongConnection()) {
try {
client = LONG_CONNECT_CACHE.get(address, () -> createClient(address));
} catch (ExecutionException e) {
log.error(e.getMessage(), e);
throw new ApiException("连接失败");
}
} else {
client = createClient(address);
}
return client;
}
@Override
public List<IClient> getCachedClients() {
List<IClient> clientList = new ArrayList<>();
clientList.addAll(LONG_CONNECT_CACHE.asMap().values());
return clientList;
}
@Override
public void remove(ServerAddress address) {
LONG_CONNECT_CACHE.invalidate(address);
}
}
- NettyClientFactory:Netty 客户端工厂实现方式
@Slf4j
public class NettyClientFactory extends AbstractClientFactory{
private static final String BYTE_BUF_POOL_NAME = "bytebuf.pool";
private static final String IO_RATIO_NAME = "ioratio";
public static final NioEventLoopGroup WORKER_GROUP = new NioEventLoopGroup();
public static final ByteBufAllocator BYTE_BUF_ALLOCATOR;
static {
WORKER_GROUP.setIoRatio(SystemPropertyUtil.getInt(IO_RATIO_NAME, 100));
if (SystemPropertyUtil.getBoolean(BYTE_BUF_POOL_NAME, false)) {
BYTE_BUF_ALLOCATOR = PooledByteBufAllocator.DEFAULT;
} else {
BYTE_BUF_ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
}
}
@Override
protected IClient createClient(ServerAddress address) throws ApiException {
//启动引导类
final Bootstrap bootstrap = new Bootstrap();
//绑定工作线程组
bootstrap.group(NettyClientFactory.WORKER_GROUP);
//设置低延迟
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//设置让关闭的的端口尽早可以使用
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
//若是长连接,开启SOCKET默认的心跳机制,短连接则不不开启
bootstrap.option(ChannelOption.SO_KEEPALIVE, address.isLongConnection());
//使用内存池
bootstrap.option(ChannelOption.ALLOCATOR, NettyClientFactory.BYTE_BUF_ALLOCATOR);
//设置IO类型
bootstrap.channel(NioSocketChannel.class);
//设置初始化连接配置
bootstrap.handler(InitializerFactory.get(address));
//若是设置的连接超时时间
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, address.getConnectTimeout());
String targetIp = address.getIp();
int targetPort = address.getPort();
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIp, targetPort));
if (future.awaitUninterruptibly(address.getConnectTimeout()) && future.isSuccess() && future.channel().isActive()) {
log.info("[{}]连接成功,服务器端地址[{}:{}]",targetIp,targetPort);
Channel channel = future.channel();
return new NettyClient(channel,address);
} else {
future.cancel(true);
future.channel().close();
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("服务器[").append(targetIp).append(":").append(targetPort).append("]连接失败");
throw new ApiException(errorMsg.toString());
}
}
}
③.客户端处理器
客户端处理器主要负责完成对接收事件的处理工作。
- AbstractChannelHandler:处理器抽象层,定义接受事件处理的模板方法,具体处理逻辑则由子类实现。
- ClientHandler:客户端具体处理器,负责对事件的具体逻辑处理。
④.通道初始化器
客户端通道初始化器,主要负责完成Channel 的初始化配置工作,主要包括编解码器设置、消息处理器设置等。
- AbstractChannelInitializer:通道初始化器抽象层实现,定义通道初始化模板方法,具体初始化哪些组件则从子类获取。
其中初始化通道模板方法 initChannel 详情如下图所示:
三、测试
1.测试代码
- ClientTest:客户端测试类
- ServerTest:服务端测试类
2.测试结果
先启动服务器端,再启动客户端,看客户端请求是否正常接受到返回数据。
客户端能请求返回数据,测试成功。