前篇回顾
利用Netty、SpringBoot以及Dubbo构建一个分布式的消息通知系统(初版)
kafka的加入
NoticePublishServiceImpl
获取到NoticePackage后,消息发布至kafka消息队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Override public ResultData publish(NoticePackage noticePackage) { ResultData<NoticePackage> resultData = noticeDao.save(noticePackage); if (resultData.getValue() != null) { kafkaProducer.producerMsg(KafkaPackage.builder() .topic(kafkaTopicNotice) .kafkaMsg(KafkaMsg.builder() .id(UUID.randomUUID().toString()) .body(JsonUtils.toJson(noticePackage)) .sendTime(new Date()) .builded()) .builded()); return new ResultData(HttpResponseStatus.OK.code(), null, "消息发布成功"); } return ResultData.builder().code(HttpResponseStatus.BAD_REQUEST.code()).errMsg("消息发布失败").builded(); }
|
根据topic将KafkaMsg对象压入不同的topic中
KafkaProducerImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| protected void publish(String topic, KafkaMsg kafkaMsg) { AtomicInteger retry = new AtomicInteger(0); ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, JsonUtils.toJson(kafkaMsg)); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable ex) { while (retry.get() != MAX_RETRY_TIMES) { retry.incrementAndGet(); } log.error("kafka send failure : {}", ex); }
@Override public void onSuccess(SendResult<String, String> result) { } }); }
|
由消息消费者中心通知回调告知通知发布中心该消息消费结果,如果消息通知任务未完成,则消息发布中心将消息回压至kafka消息队列
NoticePublishCenterImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Override public void update(Observable o, Object arg) { NoticePackage noticePackage = (NoticePackage) arg; if (noticePackage.getTotalReceivers() == 0) { noticePackage.setFinish(true); noticeDao.updateStatus(noticePackage); } else { KafkaMsg kafkaMsg = KafkaMsg.builder() .id(UUID.randomUUID().toString()) .body(JsonUtils.toJson(noticePackage)) .sendTime(new Date()) .builded(); kafkaProducer.producerMsg(KafkaPackage.builder().topic(kafkaTopicNotice).kafkaMsg(kafkaMsg).builded()); } }
|
优化措施
去除了ChannelIdPool
容器,直接采用了netty-channel的AttributeKey
记住client-id信息
消息发送不再循环调用NoticeChannelHandler.publishMsg
方法,在Stream流中统一记录待接受者,生成receivers集合,将集合传给NoticeChannelHandler.publishMsg
进行统一发送,然后统计发送失败的client-id,回传至任务线程,进行相应的处理
代码
NoticeConsumerCenterImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private class NoticeConsumeTask implements Runnable {
private NoticePackage noticePackage;
NoticeConsumeTask(NoticePackage noticePackage) { this.noticePackage = noticePackage; }
@Override public void run() { HashSet<String> receivers = new HashSet<>(); noticePackage.getReceivers() .parallelStream() .filter(receiver -> ONLINE_PEOPLES.contains(receiver)) .peek(receiver -> { receivers.add(receiver); }) .count(); receivers.remove(noticeChannelHandler.publishMsg(noticePackage, receivers)); noticePackage.getReceivers().removeAll(receivers); noticePackage.setTotalReceivers(noticePackage.getReceivers().size()); setChanged(); notifyObservers(noticePackage); } }
|
NoticeChannelHandlerImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { Channel channel = ctx.channel(); String[] contexts = msg.text().split("-"); int clientContextSplitNum = 2; if (contexts.length != clientContextSplitNum) { channel.writeAndFlush(new TextWebSocketFrame(StringsValue.CN.CLIENT_SEND_ERR_MSG)); } else { channel.writeAndFlush(new TextWebSocketFrame("客户端id :" + channel.id())); NoticeConsumerCenterImpl.addReceiver(contexts[1]); channel.attr(ClientChannelAttrManage.CLIENT_ID_ATTRIBUTEKEY).set(contexts[1]); addToChannelGroup(contexts[0], channel); } }
@Override public Set<String> publishMsg(NoticePackage noticePackage, HashSet<String> receiver) { Set<String> list = null; if (noticePackage.getGroupType() == CHANNEL_GROUP_STUDENT.getValue()) { list = send(receiver, studentChannels, noticePackage); } else if (noticePackage.getGroupType() == CHANNEL_GROUP_TEACHER.getValue()) { list = send(receiver, teacherChannels, noticePackage); } else if (noticePackage.getGroupType() == CHANNEL_GROUP_GLOBAL.getValue()) { list = send(receiver, globalChannels, noticePackage); } eturn list; }
private Set<String> send(HashSet<String> receiver, ChannelGroup channels, NoticePackage noticePackage) { TextWebSocketFrame frame = new TextWebSocketFrame(noticePackage.getMessage()); Set<String> failures = new HashSet<>(); channels.stream() .filter(client -> receiver.contains(client.attr(ClientChannelAttrManage.CLIENT_ID_ATTRIBUTEKEY.get())) .flatMap(client -> { frame.retain(); if (client.isWritable()) { client.writeAndFlush(frame).addListener(future -> { if (future.isSuccess()) { log.info("消息主题为 [{}] 通知已向客户端 [{}] 推送完成", noticePackage.getMessage(), client.id()); } }); } else { log.error("消息主题为 [{}] 通知已向客户端 [{}] 推送失败", noticePackage.getMessage(), client.id()); failures.add(client.attr(ClientChannelAttrManage.CLIENT_ID_ATTRIBUTEKEY).get()); } eturn null; }).count(); return failures; }
|
####
GitHub 源代码