利用Netty、SpringBoot、kafka以及Dubbo构建一个分布式的消息通知系统(中版)

前篇回顾

利用Netty、SpringBoot以及Dubbo构建一个分布式的消息通知系统(初版)

kafka的加入

NoticePublishServiceImpl

获取到NoticePackage后,消息发布至kafka消息队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public ResultData publish(NoticePackage noticePackage) {
ResultData<NoticePackage> resultData = noticeDao.save(noticePackage);
if (resultData.getValue() != null) {
kafkaProducer.producerMsg(KafkaPackage.builder()
.topic(kafkaTopicNotice)
.kafkaMsg(KafkaMsg.builder()
.id(UUID.randomUUID().toString())
.body(JsonUtils.toJson(noticePackage))
.sendTime(new Date())
.builded())
.builded());
return new ResultData(HttpResponseStatus.OK.code(), null, "消息发布成功");
}
return ResultData.builder().code(HttpResponseStatus.BAD_REQUEST.code()).errMsg("消息发布失败").builded();
}

根据topic将KafkaMsg对象压入不同的topic中

KafkaProducerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void publish(String topic, KafkaMsg kafkaMsg) {
AtomicInteger retry = new AtomicInteger(0);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, JsonUtils.toJson(kafkaMsg));
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
while (retry.get() != MAX_RETRY_TIMES) {
retry.incrementAndGet();
}
log.error("kafka send failure : {}", ex);
}

@Override
public void onSuccess(SendResult<String, String> result) {
}
});
}

由消息消费者中心通知回调告知通知发布中心该消息消费结果,如果消息通知任务未完成,则消息发布中心将消息回压至kafka消息队列

NoticePublishCenterImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void update(Observable o, Object arg) {
NoticePackage noticePackage = (NoticePackage) arg;
if (noticePackage.getTotalReceivers() == 0) {
noticePackage.setFinish(true);
noticeDao.updateStatus(noticePackage);
} else {
KafkaMsg kafkaMsg = KafkaMsg.builder()
.id(UUID.randomUUID().toString())
.body(JsonUtils.toJson(noticePackage))
.sendTime(new Date())
.builded();
kafkaProducer.producerMsg(KafkaPackage.builder().topic(kafkaTopicNotice).kafkaMsg(kafkaMsg).builded());
}
}

优化措施

  • 去除了ChannelIdPool容器,直接采用了netty-channel的AttributeKey记住client-id信息

  • 消息发送不再循环调用NoticeChannelHandler.publishMsg方法,在Stream流中统一记录待接受者,生成receivers集合,将集合传给NoticeChannelHandler.publishMsg进行统一发送,然后统计发送失败的client-id,回传至任务线程,进行相应的处理

代码

NoticeConsumerCenterImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private class NoticeConsumeTask implements Runnable {

private NoticePackage noticePackage;

NoticeConsumeTask(NoticePackage noticePackage) {
this.noticePackage = noticePackage;
}

@Override
public void run() {
HashSet<String> receivers = new HashSet<>();
noticePackage.getReceivers()
.parallelStream()
.filter(receiver -> ONLINE_PEOPLES.contains(receiver))
.peek(receiver -> {
receivers.add(receiver);
})
.count();
receivers.remove(noticeChannelHandler.publishMsg(noticePackage, receivers));
noticePackage.getReceivers().removeAll(receivers);
noticePackage.setTotalReceivers(noticePackage.getReceivers().size());
setChanged();
notifyObservers(noticePackage);
}
}

NoticeChannelHandlerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
Channel channel = ctx.channel();
String[] contexts = msg.text().split("-");
int clientContextSplitNum = 2;
if (contexts.length != clientContextSplitNum) {
channel.writeAndFlush(new TextWebSocketFrame(StringsValue.CN.CLIENT_SEND_ERR_MSG));
} else {
channel.writeAndFlush(new TextWebSocketFrame("客户端id :" + channel.id()));
NoticeConsumerCenterImpl.addReceiver(contexts[1]);
channel.attr(ClientChannelAttrManage.CLIENT_ID_ATTRIBUTEKEY).set(contexts[1]);
addToChannelGroup(contexts[0], channel);
}
}

@Override
public Set<String> publishMsg(NoticePackage noticePackage, HashSet<String> receiver) {
Set<String> list = null;
if (noticePackage.getGroupType() == CHANNEL_GROUP_STUDENT.getValue()) {
list = send(receiver, studentChannels, noticePackage);
} else if (noticePackage.getGroupType() == CHANNEL_GROUP_TEACHER.getValue()) {
list = send(receiver, teacherChannels, noticePackage);
} else if (noticePackage.getGroupType() == CHANNEL_GROUP_GLOBAL.getValue()) {
list = send(receiver, globalChannels, noticePackage);
}
eturn list;
}

private Set<String> send(HashSet<String> receiver, ChannelGroup channels, NoticePackage noticePackage) {
TextWebSocketFrame frame = new TextWebSocketFrame(noticePackage.getMessage());
Set<String> failures = new HashSet<>();
channels.stream()
.filter(client -> receiver.contains(client.attr(ClientChannelAttrManage.CLIENT_ID_ATTRIBUTEKEY.get()))
.flatMap(client -> {
frame.retain();
if (client.isWritable()) {
client.writeAndFlush(frame).addListener(future -> {
if (future.isSuccess()) {
log.info("消息主题为 [{}] 通知已向客户端 [{}] 推送完成", noticePackage.getMessage(), client.id());
}
});
} else {
log.error("消息主题为 [{}] 通知已向客户端 [{}] 推送失败", noticePackage.getMessage(), client.id());
failures.add(client.attr(ClientChannelAttrManage.CLIENT_ID_ATTRIBUTEKEY).get());
}
eturn null;
}).count();
return failures;
}

####

GitHub 源代码