前话 之前在接商业外包的时候,遇到了一个需要服务端主动推送消息的需求,当时由于对Netty的不熟悉以及没有很好的理解Netty的websocket,因此用了SSE
去解决了这个问题,但是缺点就是会占用大量的带宽
Netty的websocket Netty的一个链接就是一个channel
(通道),因此,如果要实现服务端的主动消息推送,那么就要实现对channel的统一管理;幸运的是,Netty其内部已经实现了对channel的统一管理的容器——ChannelGroup
,ChannelGroup其内部用了一个ConcurrentMap<ChannelId, Channel>
来作为channel的承载容器,而channelId是channel的一个属性,但是,如果仅仅依靠ChannelGroup还不能够实现真正可用于生产环境中的消息推送系统,因为channelId与业务是完全没有关联的,因此,我这里又用了一个ConcurrentMap<String, ChannelId>
来实现用户的业务标识与channelId的对应关系:当客户端需要连接消息推送中心时,需要带上自己的业务编号信息,然后,服务端接收消息后将用户业务编号解析并与channel一同存储到自定义的管理容器中
代码实现
Netty的websocket实现端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Slf 4j@Component public class NoticeChannelHandlerImpl extends SimpleChannelInboundHandler <TextWebSocketFrame > implements NoticeChannelHandler { protected static ChannelGroup studentChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); protected static ChannelGroup teacherChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); protected static ChannelGroup globalChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { Channel channel = ctx.channel(); String[] contexts = msg.text().split("-" ); int clientContextSplitNum = 2 ; if (contexts.length != clientContextSplitNum) { channel.writeAndFlush(new TextWebSocketFrame(StringsValue.CN.CLIENT_SEND_ERR_MSG)); } else { ChannelIdPool.add(contexts[1 ], channel.id()); addToChannelGroup(contexts[0 ], channel); } } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.info("新接入编号为 [{}] 的客户端" , ctx.channel().id()); super .channelActive(ctx); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { super .channelInactive(ctx); log.info("客户端编号为 [{}] 已断开链接" , ctx.channel().id()); ChannelIdPool.remove(ctx.channel().id()); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { super .exceptionCaught(ctx, cause); log.error("客户端编号为 [{}] 报错:[{}]" , ctx.channel().id(), cause.getMessage()); ctx.close(); } }
ConcurrentMap<String, ChannelId>容器的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ChannelIdPool { private static ConcurrentHashMap<String, ChannelId> channelIdMap; static { channelIdMap = new ConcurrentHashMap<>(); } public ChannelIdPool () {} public static Optional<ChannelId> get (String key) { return Optional.ofNullable(channelIdMap.get(key)); } public static void add (String key, ChannelId channelId) { channelIdMap.put(key, channelId); } public static void remove (String key) { channelIdMap.remove(key); } public static void remove (ChannelId channelId) { Iterator<Map.Entry<String, ChannelId>> iterator = channelIdMap.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, ChannelId> entry = iterator.next(); if (entry.getValue().compareTo(channelId) == 0 ) { iterator.remove(); } } } }
如何设置Netty服务端相关配置
Netty 服务端的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 @PropertySource (value = "classpath:netty-server.properties" )@Configuration public class NettyServerConfigure { @Value ("${netty.server.tcp.port}" ) private int tcpPort; @Value ("${netty.server.boss.thread.count}" ) private int bossCount; @Value ("${netty.server.worker.thread.count}" ) private int workerCount; @Value ("${netty.server.so.keepalive}" ) private boolean keepAlive; @Value ("${netty.server.so.backlog}" ) private int backlog; @Bean (value = "ServerBootstrap" ) public ServerBootstrap bootstrap () { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup(), workerGroup()) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(nettyWebSocketChannelInitializer); Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions(); Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet(); for (@SuppressWarnings ("rawtypes" ) ChannelOption option : keySet) { b.option(option, tcpChannelOptions.get(option)); } return b; } @Autowired @Qualifier ("ChannelInitializer" ) private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer; @Bean (name = "tcpChannelOptions" ) public Map<ChannelOption<?>, Object> tcpChannelOptions() { Map<ChannelOption<?>, Object> options = new HashMap<>(); options.put(ChannelOption.SO_KEEPALIVE, keepAlive); options.put(ChannelOption.SO_BACKLOG, backlog); return options; } @Bean (name = "bossGroup" , destroyMethod = "shutdownGracefully" ) public NioEventLoopGroup bossGroup () { return new NioEventLoopGroup(bossCount); } @Bean (name = "workerGroup" , destroyMethod = "shutdownGracefully" ) public NioEventLoopGroup workerGroup () { return new NioEventLoopGroup(workerCount); } @Bean (name = "tcpSocketAddress" ) public InetSocketAddress tcpPort () { return new InetSocketAddress(tcpPort); } }
Netty的websocket的初始化配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Component @Qualifier ("ChannelInitializer" )public class NettyWebSocketChannelInitializer extends ChannelInitializer <SocketChannel > { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536 )); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new WebSocketServerProtocolHandler("/notice" )); pipeline.addLast(new NoticeChannelHandlerImpl()); } }
真正的Tcp服务端启动相关配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Component public class TcpServerConfigure { @Autowired @Qualifier ("ServerBootstrap" ) private ServerBootstrap serverBootstrap; @Autowired @Qualifier ("tcpSocketAddress" ) private InetSocketAddress tcpPort; private Channel serverChannel; public void start () throws Exception { serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel(); } @PreDestroy public void stop () throws Exception { serverChannel.close(); serverChannel.parent().close(); } public ServerBootstrap getServerBootstrap () { return serverBootstrap; } public void setServerBootstrap (ServerBootstrap serverBootstrap) { this .serverBootstrap = serverBootstrap; } public InetSocketAddress getTcpPort () { return tcpPort; } public void setTcpPort (InetSocketAddress tcpPort) { this .tcpPort = tcpPort; } }
消息发布、消费中心
消息发布中心
对于消息发布中心,其职能就只限与接收由发布者需要发布的消息,将消息缓存在一个消息发布容器中,为什么需要缓存起来?因为可能有些用户是这些消息的接收者,但是由于没有登陆,也就没有对应的channel注册,因此需要将消息缓存在容器中,当消息的所有接收者都接收到消息时,将该消息从容器中移除,至此,一个消息在消息发布中心的生命到此结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Component public class NoticePublishCenterImpl extends Observable implements NoticePublishCenter { private static final Object lock = new Object(); @Autowired private NoticeConsumerCenter noticeConsumerCenter; protected static ConcurrentHashMap<String, NoticePackage> noticeCenterPool; static { noticeCenterPool = new ConcurrentHashMap<>(); } public NoticePublishCenterImpl () {} @PostConstruct public void init () { addObserver(noticeConsumerCenter); } @Override public boolean createNoticeGroup (NoticePackage noticePackage) { if (noticeCenterPool.containsKey(noticePackage.getNoticeLabel())) { noticeCenterPool.put(noticePackage.getNoticeLabel(), noticePackage); notifyObservers(noticePackage); return true ; } return false ; } private void finishNotice (String groupName) { synchronized (lock) { noticeCenterPool.remove(groupName); } } @Override public void update (Observable o, Object arg) { NoticePackage noticePackage = (NoticePackage) arg; if (noticePackage.getTotalReceivers() == 0 ) { finishNotice(noticePackage.getNoticeLabel()); } else { noticeCenterPool.put(noticePackage.getNoticeLabel(), noticePackage); } } }
在代码中看到了观察者模式所特有的addObserver
以及notifyObservers
,在这里,我将消息发布中心与消费中心用了观察者模式分解;当消息发布中心接收到发布事件后,通知消息消费中心准备开始执行消息发布任务,通知有,消息发布中心就不需要阻塞等待消息消费中心的作业完成结果,而是继续做自己的事情;而消息消费中心同样也实现了观察者模式,当消息消费中心完了消息发布中心所通知的任务后,将任务完成结果通知消息发布中心,剩下的关于消息消费的结果的处理就完全扔给了消息发布中心,而消息消费中心则可以继续自己的其他消息消费任务
消息消费中心
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 @Slf 4j@Component public class NoticeConsumerCenterImpl extends Observable implements NoticeConsumerCenter { private static ConcurrentLinkedQueue<String> receivers = new ConcurrentLinkedQueue<>(); @Autowired private NoticeChannelHandler noticeChannelHandler; @Autowired private NoticePublishCenter publishCenter; private static ThreadPoolExecutor PublishThreadPool; @PostConstruct private void init () { addObserver(publishCenter); PublishThreadPool = new ThreadPoolExecutor(4 , 12 , 60 , TimeUnit.SECONDS, new LinkedBlockingQueue<>(100 ), new NoticeThreadFactory(), new RejectHandler()); } @Override public void update (Observable o, Object arg) { NoticePackage noticePackage = (NoticePackage) arg; PublishThreadPool.execute(new NoticeCallable(noticePackage)); } @Override public void addReceiver (String receiver) { if (!receivers.contains(receiver)) { receivers.add(receiver); } } @Override public void removeReceiver (String receiver) { receivers.remove(receiver); } private class NoticeCallable implements Runnable { private NoticePackage noticePackage; public NoticeCallable (NoticePackage noticePackage) { this .noticePackage = noticePackage; } @Override public void run () { List<String> tmpReceivers = noticePackage.getReceivers(); tmpReceivers.stream().filter(receiver -> receivers.contains(receiver)).flatMap(receiver -> { noticePackage.setUserId(receiver); noticeChannelHandler.publishMsg(noticePackage); noticePackage.setTotalReceivers(noticePackage.getTotalReceivers() - 1 ); return Stream.of(noticePackage); }).flatMap(noticePackage -> { if (noticePackage.getTotalReceivers() == 0 ) { notifyObservers(noticePackage); } return Stream.empty(); }).count(); } } private class RejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution (Runnable r, ThreadPoolExecutor executor) { log.error("消息推送服务任务被拒绝。 {}" , executor.toString()); } } private class NoticeThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger nextId = new AtomicInteger(1 ); NoticeThreadFactory() { namePrefix = "消息推送-工作线程-" ; } @Override public Thread newThread (Runnable r) { String name = namePrefix + nextId.getAndDecrement(); return new Thread(r, name); } } }
哔哩哔哩项目演示视频 哔哩哔哩项目演示视频:利用Netty、SpringBoot以及Dubbo构建一个分布式的消息通知系统(初版)