swoole基础-websocket通知案例以及多端口复合协议的使用

最初是打算写个聊天室分享给大家,后来仔细斟酌了一下,还是讲个web通知吧,两个案例都差不多。

当然,在前面两篇介绍websocket的基础之上,相信你一定会觉得web通知这个功能就是一个小case。所以本文我们把重点放在后面多端口复合协议的使用。

websocket通知的实现方式,基本上跟websocket初识一文中最后介绍的案例差不多,只不过我们当时是循环所有的客户端推送消息,此时我们是一对一推送提醒。

需求分析:

我们以评论被回复为例,当一条评论被其他某个用户(假设是用户B)回复,即发一条通知给被回复的评论所属人(假设是用户A),告诉A,他的评论被回复了。

功能分析:

我们不能保证用户B和用户A都处于连接状态,但是通常情况下,用户B至少是连接状态,用户A不一定跟server保持连接;
任一用户都不止对应一个客户端。换言之,用户A和用户B都可能打开了多个tab页,对于一个tab页,就会有一个独立的fd标识,我们这里认为任一用户只有最新的fd有效,或者你可以认为用户所有的tab页的连接都有效;
因为没有用户系统,我们以get传递的参数uid为标识,uid=100视为用户A,uid=101视为用户B;
我们不准备做一个评论系统,我们模拟的tab页包将会包含一个输入内容的文本框、一个输入目标uid的input和一个发送的按钮以满足需求。
流程分析:

用户A($_GET[‘uid’] = 100)在某个tab页的输入框输入”回复xxx的内容”字样后,点击发送
用户B($_GET[‘uid’] = 101)如果处于连接状态,则alert提醒用户B,他的评论被回复了
分析了半天,我们看server端代码的实现,源码可参考CommentServer.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
<?php

class CommentServer
{
private $_serv;
public $key = '^manks.top&swoole$';
// 用户id和fd对应的映射,key => value,key是用户的uid,value是用户的fd
public $user2fd = [];

public function __construct()
{
$this->_serv = new swoole_websocket_server("127.0.0.1", 9501);
$this->_serv->set([
'worker_num' => 1,
'heartbeat_check_interval' => 60,
'heartbeat_idle_time' => 125,
]);
$this->_serv->on('open', [$this, 'onOpen']);
$this->_serv->on('message', [$this, 'onMessage']);
$this->_serv->on('close', [$this, 'onClose']);
}

/**
* @param $serv
* @param $request
* @return mixed
*/
public function onOpen($serv, $request)
{
// 连接授权
$accessResult = $this->checkAccess($serv, $request);
if (!$accessResult) {
return false;
}
// 始终把用户最新的fd跟uid映射在一起
if (array_key_exists($request->get['uid'], $this->user2fd)) {
$existFd = $this->user2fd[$request->get['uid']];
$this->close($existFd, 'uid exists.');
$this->user2fd[$request->get['uid']] = $request->fd;
return false;
} else {
$this->user2fd[$request->get['uid']] = $request->fd;
}
}

/**
* @param $serv
* @param $frame
* @return mixed
*/
public function onMessage($serv, $frame)
{
// 校验数据的有效性,我们认为数据被`json_decode`处理之后是数组并且数组的`event`项非空才是有效数据
// 非有效数据,关闭该连接
$data = $frame->data;
$data = json_decode($data, true);
if (!$data || !is_array($data) || empty($data['event'])) {
$this->close($frame->fd, 'data format invalidate.');
return false;
}
// 根据数据的`event`项,判断要做什么,`event`映射到当前类具体的某一个方法,方法不存在则关闭连接
$method = $data['event'];
if (!method_exists($this, $method)) {
$this->close($frame->fd, 'event is not exists.');
return false;
}
$this->$method($frame->fd, $data);
}
public function onClose($serv, $fd)
{
echo "client {$fd} closed.\n";
}

/**
* 校验客户端连接的合法性,无效的连接不允许连接
* @param $serv
* @param $request
* @return mixed
*/
public function checkAccess($serv, $request)
{
// get不存在或者uid和token有一项不存在,关闭当前连接
if (!isset($request->get) || !isset($request->get['uid']) || !isset($request->get['token'])) {
$this->close($request->fd, 'access faild.');
return false;
}
$uid = $request->get['uid'];
$token = $request->get['token'];
// 校验token是否正确,无效关闭连接
if (md5(md5($uid) . $this->key) != $token) {
$this->close($request->fd, 'token invalidate.');
return false;
}
return true;
}

/**
* @param $fd
* @param $message
* 关闭$fd的连接,并删除该用户的映射
*/
public function close($fd, $message = '')
{
// 关闭连接
$this->_serv->close($fd);
// 删除映射关系
if ($uid = array_search($fd, $this->user2fd)) {
unset($this->user2fd[$uid]);
}
}

public function alertTip($fd, $data)
{
// 推送目标用户的uid非真或者该uid尚无保存的映射fd,关闭连接
if (empty($data['toUid']) || !array_key_exists($data['toUid'], $this->user2fd)) {
$this->close($fd);
return false;
}
$this->push($this->user2fd[$data['toUid']], ['event' => $data['event'], 'msg' => '收到一条新的回复.']);
}
/**
* @param $fd
* @param $message
*/
public function push($fd, $message)
{
if (!is_array($message)) {
$message = [$message];
}
$message = json_encode($message);
// push失败,close
if ($this->_serv->push($fd, $message) == false) {
$this->close($fd);
}
}

public function start()
{
$this->_serv->start();
}
}

$server = new CommentServer;
$server->start();

满眼看下来,代码挺长的,没关系,我们整理了一下代码的逻辑

我们给CommentServer类增加了一个属性 $user2fd,这个是key => value结构,用于保存uid和fd的映射关系
onOpen回调做两件事,验证授权和添加新的映射关系
onMessage回调只接收含有event项的数组,event等同于CommentServer类的方法名,我们这里只有一个用于web通知的alertTip方法
此外,我们封装了该类的close方法和push方法,close方法用于server主动关闭连接,删除uid和fd的映射,push方法用于向指定的fd推送消息
客户端代码如下,详细的见于CommentClient.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<div>
发送内容:<textarea name="content" id="content" cols="30" rows="10"></textarea><br>
发送给谁:<input type="text" name="toUid" value="" id="toUid"><br>
<button onclick="send();">发送</button>
</div>

<script>
var ws = new WebSocket("ws://127.0.0.1:9501?uid=<?php echo $uid ?>&token=<?php echo $token; ?>");
ws.onopen = function(event) {
};
ws.onmessage = function(event) {
var data = event.data;
data = eval("("+data+")");
if (data.event == 'alertTip') {
alert(data.msg);
}
};
ws.onclose = function(event) {
console.log('Client has closed.\n');
};

function send() {
var obj = document.getElementById('content');
var content = obj.value;
var toUid = document.getElementById('toUid').value;
ws.send('{"event":"alertTip", "toUid": '+toUid+'}');
}
</script>

server开启之后,演示的效果我们看下动图

结果中,注意看地址栏,alert弹窗是在哪个tab页弹出的。

上例中,我们模拟的是评论被回复的简单例子。

回顾过去讲的内容,无论是tcp server,http server还是websocket server,server都是独立的,server与server之间并没有太多的交互。

实际上有没有交互的必要呢?

假设现在有这么一个需求,在刚刚评论的案例中,前文用户的回复不是直接发送给被回复的用户,而是评论在后台被人审核成功的一瞬间,再通知被回复的用户呢?

审核操作改为ajax操作,success回调内再new一个websocket客户端,然后send?可以,但是这显然不是一个很好的操作。

在websocket初识的时候我们说过,要想与websocket server通信,客户端只能是websocket客户端!既然我们刚刚否决了new一个websocket客户端,那是要怎么做呢?

从程序的角度出发,如果我们在php的层面上直接就能通知到websocket服务器,换言之,如果我们能够从php的层面上,直接实现alertTip方法的功能是不是就对了?

前文我们介绍tcp server的时候了解到,首先我们要想让web应用同server进行“互撩”,swoole_client少不了,既然有swoole_client,swoole_server肯定也少不了。但是目前server正在跑websocket,难不成我们在单独跑一个tcp server?对,我们就是要在websocket server的基础之上,想办法再跑一个tcp server。

为了使用多端口复合协议,swoole为server提供了listen方法,可以让当前server监听新的端口。

比如我们可以让刚刚创建的websocket server额外监听9502端口,这个端口主要负责tcp的工作。

1
2
3
4
5
6
7
$this->_tcp = $this->_serv->listen('127.0.0.1', 9502, SWOOLE_SOCK_TCP);
$this->_tcp->set([
'open_eof_check' => true, //打开EOF检测
'package_eof' => "\r\n", //设置EOF
'open_eof_split' => true, // 自动分包
]);
$this->_tcp->on('Receive', [$this, 'onReceive']);

listen函数返回的是swoole_server_port对象,需要注意的是swoole_server_port的set函数只能设置一些特定的参数,比如socket参数、协议相关等,像worker_num、log_file、max_request等等这些都是不支持的。就tcp服务器而言,swoole_server_port对象也仅仅对onConnect\onReceive\onClose这三个回调支持,其他的一律不可用,详细可翻阅swoole手册查看。

下面我们就以评论审核通知来看看多端口复合协议的玩法。

再来看下我们现在的流程

1、用户回复某评论 => 评论进入审核状态 ;很明显这个过程我们不需要做什么

2、管理员审核该评论 => 通知被回复的人;这个时候我们要做的就等同于alertTip函数要做的

server端除了刚刚设置的$this->_tcp一段代码之外,我们单独绑定了onReceive回调,下面看onReceive回调的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public function onReceive($serv, $fd, $fromId, $data)
{
try {
$data = json_decode($data, true);
if (!isset($data['event'])) {
throw new \Exception("params error, needs event param.", 1);
}

$method = $data['event'];

// 调起对应的方法
if(!method_exists($this, $method)) {
throw new \Exception("params error, not support method.", 1);
}
$this->$method($fd, $data);

return true;

} catch (\Exception $e) {
$msg = $e->getMessage();
throw new \Exception("{$msg}", 1);
}
}

可以看到,除了进行简单的判断之外,如果tcp客户单携带一个event=alertTip即可

在这之前,websocket客户端的代码我们依然以CommentClient.php为例,假设要回复的用户uid=100,我们运行server之后,先让uid=100的客户端连接到server,运行的客户端地址栏添加uid参数等于100即可

下面我们再写一个tcp client,连接9502端口,我们的tcp server在这个端口监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?php

class Client
{
private $client;

public function __construct ()
{
$this->client = new Swoole\Client(SWOOLE_SOCK_TCP);

if (!$this->client->connect('127.0.0.1', 9502)) {
$msg = 'swoole client connect failed.';
throw new \Exception("Error: {$msg}.");
}
}

/**
* @param $data Array
* send data
*/
public function sendData ($data)
{
$data = $this->togetherDataByEof($data);
$this->client->send($data);
}

/**
* 数据末尾拼接EOF标记
* @param Array $data 要处理的数据
* @return String json_encode($data) . EOF
*/
public function togetherDataByEof($data)
{
if (!is_array($data)) {
return false;
}

return json_encode($data) . "\r\n";
}
}

$client = new Client;
$client->sendData([
'event' => 'alertTip',
'toUid' => 100,
]);

现在无论是websocket服务器、tcp 服务器还是websocket客户端 tcp客户端都已经准备就绪了,下面我们浏览器直接访问下tcp client,如果正常的话,websocket客户端所在页面会弹出有新回复的通知。

看动图运行结果

关于swoole的内容差不多就讲这么多,后面会额外再补充一些很多人留言关于swoole如何结合框架的问题。