Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@
<artifactId>nettybook2</artifactId>
<version>0.0.1-SNAPSHOT</version>

<repositories>
<repository>
<id>ali-repo</id>
<name>ali repository</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>

<repository>
<id>163-repo</id>
<name>ali repository</name>
<url>http://mirrors.163.com/maven/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -147,5 +172,28 @@
<version>1.2.17</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.10.0</version>
</dependency>
<!--log4j2-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>

<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.9</version>
</dependency>

</dependencies>
</project>
57 changes: 57 additions & 0 deletions src/com/phei/netty/frame/msgpack/EchoClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.phei.netty.frame.msgpack;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
* @Author: shanghang
* @Project:nettyStudy
* @description:netty客户端-分隔符解码器-解决TCP粘包
* @Date: 2020.12.26 15:53
**/
public class EchoClient {

private void connect(int port, String ip) {
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("msgpack decoder",new MsgPackDecoder());
socketChannel.pipeline().addLast("msgpack encoder",new MsgPackEncoder());
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});

try {
//发起异步连接操作
ChannelFuture f = b.connect(ip,port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}



public static void main(String[] args) {
int port = 8080;
if(args!=null && args.length>0){
try{
port = Integer.parseInt(args[0]);
}catch (Exception e){

}
}
new EchoClient().connect(port,"127.0.0.1");
}
}
61 changes: 61 additions & 0 deletions src/com/phei/netty/frame/msgpack/EchoClientHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.phei.netty.frame.msgpack;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

/**
* @author shanghang
* @title: EchoClientHandler
* @projectName nettyStudy
* @description: 分隔符解码器
* @date 2020.12.27-15:10
*/
@Slf4j
public class EchoClientHandler extends ChannelHandlerAdapter {
private int counter;
static final String ECHO_REQ = "this is a req $_" ;

EchoClientHandler(){

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UserInfo[] userInfos = UserInfo();
for (UserInfo userInfo : userInfos) {
ctx.write(userInfo);
}
log.error("send msg");
ctx.flush();
}


private UserInfo[] UserInfo(){
UserInfo[] userInfos = new UserInfo[10];
for (int i =0 ;i<10;i++){
UserInfo userInfo = new UserInfo();
userInfo.setUserId(i);
userInfo.setUserName("ABCDEFG---->"+i);
userInfos[i] = userInfo;
}
return userInfos;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.error("this is "+ ++counter +"receiver "+msg);
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
74 changes: 74 additions & 0 deletions src/com/phei/netty/frame/msgpack/EchoServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.phei.netty.frame.msgpack;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* @Author: shanghang
* @Project:nettyStudy
* @description:netty服务端-分隔符解码器-解决粘包
* @Date: 2020.12.26 15:25
**/
public class EchoServer {
public static void main(String[] args) {
int port = 8080;
if(args!=null && args.length>0){
try{
port = Integer.parseInt(args[0]);
}catch (Exception e){

}
}
new EchoServer().bind(port);
}

EchoServer(){

}

private void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.childHandler(new ChildChannelHandler());
try {
//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服务器监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

// @Override
// protected void initChannel(SocketChannel ch) throws Exception {
// //创建分割词
// ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
// ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
// ch.pipeline().addLast(new StringDecoder());
// ch.pipeline().addLast(new EchoServerHandler());
// }

@Override
protected void initChannel(SocketChannel ch) throws Exception {
//创建分割词
ch.pipeline().addLast("msgpack decoder",new MsgPackDecoder());
ch.pipeline().addLast("msgpack encoder",new MsgPackEncoder());
ch.pipeline().addLast(new EchoServerHandler());
}
}
}
28 changes: 28 additions & 0 deletions src/com/phei/netty/frame/msgpack/EchoServerHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.phei.netty.frame.msgpack;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

/**
* @author shanghang
* @title: EchoServerHandler
* @projectName nettyStudy
* @description: 分隔符解码器
* @date 2020.12.27-14:52
*/
@Slf4j
public class EchoServerHandler extends ChannelHandlerAdapter {
int counter = 0;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.error(msg.toString());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
39 changes: 39 additions & 0 deletions src/com/phei/netty/frame/msgpack/MsgPackDecoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.phei.netty.frame.msgpack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.msgpack.MessagePack;

import java.util.List;

/**
* @author shanghang
* @title: MsgPackDecoder
* @projectName nettyStudy
* @description: msgPack解码器
* @date 2020.12.27-17:23
*/
@Slf4j
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
/**
* 从数据包msg中获取需要节码的byte数组,
* 然后调用MessagePack的read方法将其反序列化为Object,
* 将解码的对象加入到节码列表arg2中
* @param ctx
* @param msg
* @param out
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
log.error("decode---------------------------------------------------");
final byte[] array;
final int length = msg.readableBytes();
array = new byte[length];
msg.getBytes(msg.readerIndex(),array,0,length);
MessagePack msgPack = new MessagePack();
out.add(msgPack.read(array));
}
}
25 changes: 25 additions & 0 deletions src/com/phei/netty/frame/msgpack/MsgPackEncoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.phei.netty.frame.msgpack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
import org.msgpack.MessagePack;

/**
* @author shanghang
* @title: MsgPackEncoder
* @projectName nettyStudy
* @description: msg编码器
* @date 2020.12.27-17:20
*/
@Slf4j
public class MsgPackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
log.error("encode---------------------------------------------------");
MessagePack msgPack = new MessagePack();
byte[] raw = msgPack.write(msg);
out.writeBytes(raw);
}
}
Loading