diff --git a/pom.xml b/pom.xml index 1a92d73..abccee2 100644 --- a/pom.xml +++ b/pom.xml @@ -5,6 +5,31 @@ nettybook2 0.0.1-SNAPSHOT + + + ali-repo + ali repository + http://maven.aliyun.com/nexus/content/groups/public + + true + + + true + + + + + 163-repo + ali repository + http://mirrors.163.com/maven/repository/maven-public/ + + true + + + true + + + UTF-8 @@ -147,5 +172,28 @@ 1.2.17 + + org.apache.logging.log4j + log4j-slf4j-impl + 2.10.0 + + + + org.apache.logging.log4j + log4j-core + 2.10.0 + + + org.projectlombok + lombok + 1.18.0 + + + + org.msgpack + msgpack + 0.6.9 + + \ No newline at end of file diff --git a/src/com/phei/netty/frame/msgpack/EchoClient.java b/src/com/phei/netty/frame/msgpack/EchoClient.java new file mode 100644 index 0000000..ff3c3d0 --- /dev/null +++ b/src/com/phei/netty/frame/msgpack/EchoClient.java @@ -0,0 +1,57 @@ +package com.phei.netty.frame.msgpack; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +/** + * @Author: shanghang + * @Project:nettyStudy + * @description:netty客户端-分隔符解码器-解决TCP粘包 + * @Date: 2020.12.26 15:53 + **/ +public class EchoClient { + + private void connect(int port, String ip) { + //配置客户端NIO线程组 + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap b = new Bootstrap(); + b.group(group).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY,true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast("msgpack decoder",new MsgPackDecoder()); + socketChannel.pipeline().addLast("msgpack encoder",new MsgPackEncoder()); + socketChannel.pipeline().addLast(new EchoClientHandler()); + } + }); + + try { + //发起异步连接操作 + ChannelFuture f = b.connect(ip,port).sync(); + //等待客户端链路关闭 + f.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + }finally { + group.shutdownGracefully(); + } + } + + + + public static void main(String[] args) { + int port = 8080; + if(args!=null && args.length>0){ + try{ + port = Integer.parseInt(args[0]); + }catch (Exception e){ + + } + } + new EchoClient().connect(port,"127.0.0.1"); + } +} diff --git a/src/com/phei/netty/frame/msgpack/EchoClientHandler.java b/src/com/phei/netty/frame/msgpack/EchoClientHandler.java new file mode 100644 index 0000000..d7b61f1 --- /dev/null +++ b/src/com/phei/netty/frame/msgpack/EchoClientHandler.java @@ -0,0 +1,61 @@ +package com.phei.netty.frame.msgpack; + +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +/** + * @author shanghang + * @title: EchoClientHandler + * @projectName nettyStudy + * @description: 分隔符解码器 + * @date 2020.12.27-15:10 + */ +@Slf4j +public class EchoClientHandler extends ChannelHandlerAdapter { + private int counter; + static final String ECHO_REQ = "this is a req $_" ; + + EchoClientHandler(){ + + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + UserInfo[] userInfos = UserInfo(); + for (UserInfo userInfo : userInfos) { + ctx.write(userInfo); + } + log.error("send msg"); + ctx.flush(); + } + + + private UserInfo[] UserInfo(){ + UserInfo[] userInfos = new UserInfo[10]; + for (int i =0 ;i<10;i++){ + UserInfo userInfo = new UserInfo(); + userInfo.setUserId(i); + userInfo.setUserName("ABCDEFG---->"+i); + userInfos[i] = userInfo; + } + return userInfos; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.error("this is "+ ++counter +"receiver "+msg); + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/src/com/phei/netty/frame/msgpack/EchoServer.java b/src/com/phei/netty/frame/msgpack/EchoServer.java new file mode 100644 index 0000000..a27babd --- /dev/null +++ b/src/com/phei/netty/frame/msgpack/EchoServer.java @@ -0,0 +1,74 @@ +package com.phei.netty.frame.msgpack; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +/** + * @Author: shanghang + * @Project:nettyStudy + * @description:netty服务端-分隔符解码器-解决粘包 + * @Date: 2020.12.26 15:25 + **/ +public class EchoServer { + public static void main(String[] args) { + int port = 8080; + if(args!=null && args.length>0){ + try{ + port = Integer.parseInt(args[0]); + }catch (Exception e){ + + } + } + new EchoServer().bind(port); + } + + EchoServer(){ + + } + + private void bind(int port) { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workGroup = new NioEventLoopGroup(); + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG,100) + .childHandler(new ChildChannelHandler()); + try { + //绑定端口,同步等待成功 + ChannelFuture f = b.bind(port).sync(); + //等待服务器监听端口关闭 + f.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + }finally { + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer{ + +// @Override +// protected void initChannel(SocketChannel ch) throws Exception { +// //创建分割词 +// ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); +// ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); +// ch.pipeline().addLast(new StringDecoder()); +// ch.pipeline().addLast(new EchoServerHandler()); +// } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + //创建分割词 + ch.pipeline().addLast("msgpack decoder",new MsgPackDecoder()); + ch.pipeline().addLast("msgpack encoder",new MsgPackEncoder()); + ch.pipeline().addLast(new EchoServerHandler()); + } + } +} diff --git a/src/com/phei/netty/frame/msgpack/EchoServerHandler.java b/src/com/phei/netty/frame/msgpack/EchoServerHandler.java new file mode 100644 index 0000000..d6a6391 --- /dev/null +++ b/src/com/phei/netty/frame/msgpack/EchoServerHandler.java @@ -0,0 +1,28 @@ +package com.phei.netty.frame.msgpack; + +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +/** + * @author shanghang + * @title: EchoServerHandler + * @projectName nettyStudy + * @description: 分隔符解码器 + * @date 2020.12.27-14:52 + */ +@Slf4j +public class EchoServerHandler extends ChannelHandlerAdapter { + int counter = 0; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.error(msg.toString()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/src/com/phei/netty/frame/msgpack/MsgPackDecoder.java b/src/com/phei/netty/frame/msgpack/MsgPackDecoder.java new file mode 100644 index 0000000..1f39bcd --- /dev/null +++ b/src/com/phei/netty/frame/msgpack/MsgPackDecoder.java @@ -0,0 +1,39 @@ +package com.phei.netty.frame.msgpack; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; +import org.msgpack.MessagePack; + +import java.util.List; + +/** + * @author shanghang + * @title: MsgPackDecoder + * @projectName nettyStudy + * @description: msgPack解码器 + * @date 2020.12.27-17:23 + */ +@Slf4j +public class MsgPackDecoder extends MessageToMessageDecoder { + /** + * 从数据包msg中获取需要节码的byte数组, + * 然后调用MessagePack的read方法将其反序列化为Object, + * 将解码的对象加入到节码列表arg2中 + * @param ctx + * @param msg + * @param out + * @throws Exception + */ + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + log.error("decode---------------------------------------------------"); + final byte[] array; + final int length = msg.readableBytes(); + array = new byte[length]; + msg.getBytes(msg.readerIndex(),array,0,length); + MessagePack msgPack = new MessagePack(); + out.add(msgPack.read(array)); + } +} diff --git a/src/com/phei/netty/frame/msgpack/MsgPackEncoder.java b/src/com/phei/netty/frame/msgpack/MsgPackEncoder.java new file mode 100644 index 0000000..fbd161b --- /dev/null +++ b/src/com/phei/netty/frame/msgpack/MsgPackEncoder.java @@ -0,0 +1,25 @@ +package com.phei.netty.frame.msgpack; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import lombok.extern.slf4j.Slf4j; +import org.msgpack.MessagePack; + +/** + * @author shanghang + * @title: MsgPackEncoder + * @projectName nettyStudy + * @description: msg编码器 + * @date 2020.12.27-17:20 + */ +@Slf4j +public class MsgPackEncoder extends MessageToByteEncoder { + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + log.error("encode---------------------------------------------------"); + MessagePack msgPack = new MessagePack(); + byte[] raw = msgPack.write(msg); + out.writeBytes(raw); + } +} diff --git a/src/com/phei/netty/frame/msgpack/UserInfo.java b/src/com/phei/netty/frame/msgpack/UserInfo.java new file mode 100644 index 0000000..f62edc7 --- /dev/null +++ b/src/com/phei/netty/frame/msgpack/UserInfo.java @@ -0,0 +1,64 @@ +package com.phei.netty.frame.msgpack; + +import lombok.Data; +import org.msgpack.annotation.Message; + +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** + * @author shanghang + * @title: User + * @projectName nettyStudy + * @description: 序列化demo,msgPack pojo类必须实现Serializable和messagez注解 + * @date 2020.12.27-15:46 + */ +@Data +@Message +public class UserInfo implements Serializable { + private static final long serialVersionUID = 1L; + + private String userName; + + private int userId; + + public UserInfo buildUserName(String userName){ + this.userName = userName; + return this; + } + + public UserInfo buildUserId(int userId){ + this.userId = userId; + return this; + } + + public byte[] codeC(){ + ByteBuffer buffer = ByteBuffer.allocate(1024); + return getBytes(buffer); + } + + public byte[] codeC(ByteBuffer buffer){ + buffer.clear(); + return getBytes(buffer); + } + + private byte[] getBytes(ByteBuffer buffer) { + byte[] value = this.userName.getBytes(); + buffer.putInt(value.length); + buffer.put(value); + buffer.putInt(this.userId); + buffer.flip(); + value = null; + byte[] result = new byte[buffer.remaining()]; + buffer.get(result); + return result; + } + + @Override + public String toString() { + return "UserInfo{" + + "userName='" + userName + '\'' + + ", userId=" + userId + + '}'; + } +}