利用Netty、SpringBoot以及Dubbo构建一个分布式的消息通知系统(初版)

前话

之前在接商业外包的时候,遇到了一个需要服务端主动推送消息的需求,当时由于对Netty的不熟悉以及没有很好的理解Netty的websocket,因此用了SSE去解决了这个问题,但是缺点就是会占用大量的带宽

Netty的websocket

Netty的一个链接就是一个channel(通道),因此,如果要实现服务端的主动消息推送,那么就要实现对channel的统一管理;幸运的是,Netty其内部已经实现了对channel的统一管理的容器——ChannelGroup,ChannelGroup其内部用了一个ConcurrentMap<ChannelId, Channel>来作为channel的承载容器,而channelId是channel的一个属性,但是,如果仅仅依靠ChannelGroup还不能够实现真正可用于生产环境中的消息推送系统,因为channelId与业务是完全没有关联的,因此,我这里又用了一个ConcurrentMap<String, ChannelId>来实现用户的业务标识与channelId的对应关系:当客户端需要连接消息推送中心时,需要带上自己的业务编号信息,然后,服务端接收消息后将用户业务编号解析并与channel一同存储到自定义的管理容器中

代码实现

Netty的websocket实现端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j
@Component
public class NoticeChannelHandlerImpl extends SimpleChannelInboundHandler<TextWebSocketFrame>
implements NoticeChannelHandler {

/**
* 仅仅通知学生用户
*/
protected static ChannelGroup studentChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 仅仅通知教师用户
*/
protected static ChannelGroup teacherChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 全局通知所有用户
*/
protected static ChannelGroup globalChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**
* 客户端首次接入时,需发送用户编号信息
* 客户端发送的信息:{ChannelGroupType}-{user_uuid}
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
Channel channel = ctx.channel();
String[] contexts = msg.text().split("-");
int clientContextSplitNum = 2;
if (contexts.length != clientContextSplitNum) {
channel.writeAndFlush(new TextWebSocketFrame(StringsValue.CN.CLIENT_SEND_ERR_MSG));
} else {
ChannelIdPool.add(contexts[1], channel.id());
addToChannelGroup(contexts[0], channel);
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("新接入编号为 [{}] 的客户端", ctx.channel().id());
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
log.info("客户端编号为 [{}] 已断开链接", ctx.channel().id());
ChannelIdPool.remove(ctx.channel().id());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
log.error("客户端编号为 [{}] 报错:[{}]", ctx.channel().id(), cause.getMessage());
ctx.close();
}

}

ConcurrentMap<String, ChannelId>容器的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ChannelIdPool {

private static ConcurrentHashMap<String, ChannelId> channelIdMap;

static {
channelIdMap = new ConcurrentHashMap<>();
}

public ChannelIdPool() {}

public static Optional<ChannelId> get(String key) {
return Optional.ofNullable(channelIdMap.get(key));
}

public static void add(String key, ChannelId channelId) {
channelIdMap.put(key, channelId);
}

public static void remove(String key) {
channelIdMap.remove(key);
}

public static void remove(ChannelId channelId) {
Iterator<Map.Entry<String, ChannelId>> iterator = channelIdMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, ChannelId> entry = iterator.next();
if (entry.getValue().compareTo(channelId) == 0) {
iterator.remove();
}
}
}

}

如何设置Netty服务端相关配置

Netty 服务端的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@PropertySource(value = "classpath:netty-server.properties")
@Configuration
public class NettyServerConfigure {

@Value("${netty.server.tcp.port}")
private int tcpPort;

@Value("${netty.server.boss.thread.count}")
private int bossCount;

@Value("${netty.server.worker.thread.count}")
private int workerCount;

@Value("${netty.server.so.keepalive}")
private boolean keepAlive;

@Value("${netty.server.so.backlog}")
private int backlog;

@Bean(value = "ServerBootstrap")
public ServerBootstrap bootstrap() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(nettyWebSocketChannelInitializer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}

@Autowired
@Qualifier("ChannelInitializer")
private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer;

@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions() {
Map<ChannelOption<?>, Object> options = new HashMap<>();
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
options.put(ChannelOption.SO_BACKLOG, backlog);
return options;
}

@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossCount);
}

@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount);
}

@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPort() {
return new InetSocketAddress(tcpPort);
}

}

Netty的websocket的初始化配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
@Qualifier("ChannelInitializer")
public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/notice"));
// 心跳机制
// pipeline.addLast("ping", new IdleStateHandler(25, 15, 10, TimeUnit.SECONDS));
pipeline.addLast(new NoticeChannelHandlerImpl());
}

}

真正的Tcp服务端启动相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class TcpServerConfigure {

@Autowired
@Qualifier("ServerBootstrap")
private ServerBootstrap serverBootstrap;

@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;

private Channel serverChannel;

public void start() throws Exception {
serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();
}

@PreDestroy
public void stop() throws Exception {
serverChannel.close();
serverChannel.parent().close();
}

public ServerBootstrap getServerBootstrap() {
return serverBootstrap;
}

public void setServerBootstrap(ServerBootstrap serverBootstrap) {
this.serverBootstrap = serverBootstrap;
}

public InetSocketAddress getTcpPort() {
return tcpPort;
}

public void setTcpPort(InetSocketAddress tcpPort) {
this.tcpPort = tcpPort;
}

}

消息发布、消费中心

消息发布中心

对于消息发布中心,其职能就只限与接收由发布者需要发布的消息,将消息缓存在一个消息发布容器中,为什么需要缓存起来?因为可能有些用户是这些消息的接收者,但是由于没有登陆,也就没有对应的channel注册,因此需要将消息缓存在容器中,当消息的所有接收者都接收到消息时,将该消息从容器中移除,至此,一个消息在消息发布中心的生命到此结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Component
public class NoticePublishCenterImpl extends Observable implements NoticePublishCenter {

private static final Object lock = new Object();

@Autowired private NoticeConsumerCenter noticeConsumerCenter;

protected static ConcurrentHashMap<String, NoticePackage> noticeCenterPool;

static {
noticeCenterPool = new ConcurrentHashMap<>();
}

public NoticePublishCenterImpl() {}

@PostConstruct
public void init() {
addObserver(noticeConsumerCenter);
}

@Override
public boolean createNoticeGroup(NoticePackage noticePackage) {
if (noticeCenterPool.containsKey(noticePackage.getNoticeLabel())) {
noticeCenterPool.put(noticePackage.getNoticeLabel(), noticePackage);
notifyObservers(noticePackage);
return true;
}
return false;
}

/**
* 该推送任务已完成,从通知消息池中销毁该通知消息
* @param groupName
*/
private void finishNotice(String groupName) {
synchronized (lock) {
noticeCenterPool.remove(groupName);
}
}

/**
* 由消息消费者中心通知回调告知通知发布中心该消息消费结果
* @param o
* @param arg
*/
@Override
public void update(Observable o, Object arg) {
NoticePackage noticePackage = (NoticePackage) arg;
if (noticePackage.getTotalReceivers() == 0) {
finishNotice(noticePackage.getNoticeLabel());
} else {
noticeCenterPool.put(noticePackage.getNoticeLabel(), noticePackage);
}
}

}

在代码中看到了观察者模式所特有的addObserver以及notifyObservers,在这里,我将消息发布中心与消费中心用了观察者模式分解;当消息发布中心接收到发布事件后,通知消息消费中心准备开始执行消息发布任务,通知有,消息发布中心就不需要阻塞等待消息消费中心的作业完成结果,而是继续做自己的事情;而消息消费中心同样也实现了观察者模式,当消息消费中心完了消息发布中心所通知的任务后,将任务完成结果通知消息发布中心,剩下的关于消息消费的结果的处理就完全扔给了消息发布中心,而消息消费中心则可以继续自己的其他消息消费任务

消息消费中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@Slf4j
@Component
public class NoticeConsumerCenterImpl extends Observable implements NoticeConsumerCenter {

private static ConcurrentLinkedQueue<String> receivers = new ConcurrentLinkedQueue<>();

@Autowired
private NoticeChannelHandler noticeChannelHandler;
@Autowired
private NoticePublishCenter publishCenter;

private static ThreadPoolExecutor PublishThreadPool;

@PostConstruct
private void init() {
addObserver(publishCenter);
PublishThreadPool = new ThreadPoolExecutor(4,
12,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new NoticeThreadFactory(),
new RejectHandler());
}

@Override
public void update(Observable o, Object arg) {
NoticePackage noticePackage = (NoticePackage) arg;
PublishThreadPool.execute(new NoticeCallable(noticePackage));
}

@Override
public void addReceiver(String receiver) {
if (!receivers.contains(receiver)) {
receivers.add(receiver);
}
}

@Override
public void removeReceiver(String receiver) {
receivers.remove(receiver);
}

private class NoticeCallable implements Runnable {

private NoticePackage noticePackage;

public NoticeCallable(NoticePackage noticePackage) {
this.noticePackage = noticePackage;
}

@Override
public void run() {
List<String> tmpReceivers = noticePackage.getReceivers();
tmpReceivers.stream().filter(receiver -> receivers.contains(receiver)).flatMap(receiver -> {
noticePackage.setUserId(receiver);
noticeChannelHandler.publishMsg(noticePackage);
noticePackage.setTotalReceivers(noticePackage.getTotalReceivers() - 1);
return Stream.of(noticePackage);
}).flatMap(noticePackage -> {
if (noticePackage.getTotalReceivers() == 0) {
notifyObservers(noticePackage);
}
return Stream.empty();
}).count();
}
}

/**
* 线程池任务拒绝策略
*/
private class RejectHandler implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("消息推送服务任务被拒绝。 {}", executor.toString());
}
}

private class NoticeThreadFactory implements ThreadFactory {

private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);

NoticeThreadFactory() {
namePrefix = "消息推送-工作线程-";
}

@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextId.getAndDecrement();
return new Thread(r, name);
}
}
}

哔哩哔哩项目演示视频

哔哩哔哩项目演示视频:利用Netty、SpringBoot以及Dubbo构建一个分布式的消息通知系统(初版)