App消息推送的简单实现

App消息推送的简单实现

现在手机App消息推送一般用极光之类的第三方服务来实现。但有一些消息,前端没有展示需求,一条数据的长度有限,但数量很大。比如硬件设备产生的消息,需要推送到前端,这时使用第三方服务如果免费的容易被限流,要不就得使用付费服务。

这里用在服务端与App端建立WebSocket连接的方式实现了简单的消息推送,这里服务端用的是Java,App端用的是Flutter。

服务端实现

添加依赖

io.netty

netty-all

用Netty来管理WebSocket连接

实现ChannelHandler

继承SimpleChannelInboundHandler来处理WebSocket消息

public class PushWebsocketHandler extends SimpleChannelInboundHandler

在这个类里面重载二个方法

@Override

protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {

if (frame instanceof PingWebSocketFrame){

ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));

} else if (frame instanceof TextWebSocketFrame) {

// resolveTextFrame(ctx, (TextWebSocketFrame) frame);

} else if (frame instanceof CloseWebSocketFrame) {

var user = pool.removeChannel(ctx.channel());

if (user != null){

log.info("用户{}({})下线", user.getUserId(),user.getToken());

}

ctx.close();

}

}

上面channelRead0方法中处理消息,PingWebSocketFrame处理ping消息,CloseWebSocketFrame处理关闭消息,因为当前设计中消息是单向,所以代码中TextWebSocketFrame的部分注释掉了。

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

if (msg instanceof FullHttpRequest){

var request = (FullHttpRequest) msg;

String uri = request.uri();

if (uri.contains(websocketPath) && "websocket".equals(request.headers().get("Upgrade"))){

int index = uri.lastIndexOf("/");

if (index >= uri.length() - 1) {

closeClient(ctx, "无效的连接");

} else {

String token = uri.substring(index + 1);

if (tokenUtil.hasAccount(token)) {

var account = tokenUtil.getAccount(token);

log.info("用户{}({})上线",account.getUserId(), token);

pool.addChannel(account, ctx.channel());

request.setUri(websocketPath);

} else {

closeClient(ctx, "token无效:" + token);

}

}

}else{

closeClient(ctx,"无效的连接");

}

}

super.channelRead(ctx, msg);

}

上面的channelRead方法处理连接请求,如果是个WebSocket请求,则从url中取得token,token验证通过则把当前连接加入池中。closeClient关闭当前连接

private void closeClient(ChannelHandlerContext ctx,String reason){

log.info(reason);

ctx.writeAndFlush(new CloseWebSocketFrame(400,reason)).addListener(ChannelFutureListener.CLOSE);

}

下面则是对于一些异常情况的处理,连接断开、连接空闲、连接发生异常等

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

var user = pool.removeChannel(ctx.channel());

if (user != null){

log.info("用户{}({})连接断了", user.getUserId(),user.getToken());

}

super.channelInactive(ctx);

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {

IdleStateEvent event = (IdleStateEvent) evt;

if (event.state() == IdleState.READER_IDLE) {

var user = pool.removeChannel(ctx.channel());

if (user != null){

log.info("用户{}({})长时间未响应,断开连接", user.getUserId(),user.getToken());

}

ctx.close();

}

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

if (ctx.channel().isActive()) {

var userId = pool.removeChannel(ctx.channel());

log.info("发生异常:{},用户{}连接释放",cause.getLocalizedMessage(), userId);

ctx.close();

}

}

View Code

启动Netty服务

启动的主体代码

bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel channel) {

channel.pipeline()

.addLast(new IdleStateHandler(605,

0, 0, TimeUnit.SECONDS))

.addLast(new HttpServerCodec())

.addLast(new ChunkedWriteHandler())

.addLast(new HttpObjectAggregator(1024*64))

.addLast(handler)

.addLast(new WebSocketServerProtocolHandler(websocketPath))

;

}

}).option(ChannelOption.SO_BACKLOG, 1024)

.option(ChannelOption.SO_REUSEADDR, true)

.childOption(ChannelOption.TCP_NODELAY, true)

.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.bind(port).sync();

if (future.isSuccess()) {

log.info("服务器端口【{}】bind成功", port);

} else {

log.error("服务器端口【{}】bind失败", port);

}

future.channel().closeFuture().sync();

倒数第2个addLast里的handler就是我们上一步定义的PushWebsocketHandler

推送消息

下面是给应用层调用的推送方法

public void pushMessage(PushBody body) throws JsonProcessingException {

if (body.getUsers().size() == 0) return;

for(var user : body.getUsers()){

var channel = pool.getChannel(user);

if (channel.size() > 0){

for(var item : channel){

var id = UUID.randomUUID().toString();

var key = item.getChannelId() + "-" + id;

var msgPackage = new MessagePackage()

.setMessageId(key)

.setContent(body.getMessage());

ObjectMapper objectMapper = new ObjectMapper();

String text = objectMapper.writeValueAsString(msgPackage);

item.getChannel().writeAndFlush(new TextWebSocketFrame(text));

log.info("给用户【{}】推送了{}消息",user,body.getTitle());

}

}

}

}

同一消息可以推送给多个用户,一个用户可以有多个活跃连接

nginx配置

要让服务器正常地处理WebSocket请求,nginx必须作一下配置

location /wss {

proxy_pass http://127.0.0.1:7001/;

proxy_http_version 1.1;

proxy_set_header Upgrade $http_upgrade;

proxy_set_header Connection "Upgrade";

proxy_read_timeout 900;

proxy_send_timeout 900;

}

App端实现

App端处理WebSocket需引入一个web_socket_client

dependencies:

web_socket_client: ^0.1.1

在App启动,用户登录之后,把消息推送的地址拼接出来

static WebSocket? socket;

static Future refreshToken(String token, {reconnect = false}) async {

final url = AppConfig.serverAddress.replaceAll('https', 'wss');

wsUrl = Uri.parse('$url/wss/push/$token');

if (reconnect) {

//关掉原来的,重新连接

health?.cancel();

pushListener?.cancel();

socket?.close(1001, 'TOKEN_REFRESH');

socketConnect();

}

}

url里面/wss对应的是nginx配置里的 location, /push对应的是channelRead方法里的websocketPath,跟服务端保持一致即可

reconnect参数,用于重新刷新token后调用时用,首次启动时传false即可。

下面就是建立连接的方法

static void socketConnect() {

final backoff = LinearBackoff(

initial: Duration.zero,

increment: const Duration(seconds: 3),

maximum: const Duration(seconds: 3),

);

socket = WebSocket(wsUrl!, backoff: backoff);

pushListener = socket?.messages.listen((message) {

final json = convert.jsonDecode(message);

resolveMessage(json["content"]);

});

// connectListener = socket?.connection.listen((state)async {

// if (state is Disconnected){ //断了不用自己重连

// if (kDebugMode) {

// print('跟推送服务连接断了..................');

// }

// }

// });

socket?.connection.firstWhere((state) => state is Connected).then((value) {

if (kDebugMode) {

print('跟推送服务建立了连接..................');

}

health = TimerUtil()

..setInterval(30 * 1000) //30秒保活

..setOnTimerTickCallback((millisUntilFinished) {

socket?.send('--');

})

..startTimer();

});

}

resolveMessage是解析推送消息的方法,这里就不引用了。这个方案不管客户端有没有收到,几分钟或几秒后就推送新的数据过去了,数据丢就丢了,也没大不了的。这种简单实现刚好能满足我们的实际需要。

相关推荐

注意!天涯明月刀佳节令获得方法!(佳节令有什么用处?)
倩女手游卖号,账号交易攻略与注意事项
365bet电子游戏

倩女手游卖号,账号交易攻略与注意事项

📅 10-11 👀 8617
【弹弹堂手游攻略】新版宝珠攻略1/3 概况篇
365bet娱乐app

【弹弹堂手游攻略】新版宝珠攻略1/3 概况篇

📅 06-11 👀 8369
苹果手机拍照九宫格设置与使用指南
365网站平台网址

苹果手机拍照九宫格设置与使用指南

📅 09-03 👀 6658
自保用的日常行車記錄器要保存多久?
365bet娱乐app

自保用的日常行車記錄器要保存多久?

📅 02-02 👀 3410
黄金左脚成害群之马 卡洛斯国脚生涯未获
虚拟机如何设置从u盘启动?虚拟机u盘启动教程
365bet电子游戏

虚拟机如何设置从u盘启动?虚拟机u盘启动教程

📅 08-06 👀 4792
王者荣耀几岁可以玩?游戏年龄限制详解
365bet电子游戏

王者荣耀几岁可以玩?游戏年龄限制详解

📅 06-11 👀 8813
破解薪資密碼,3分鐘看懂調查報告!【人資數據解密系列】