swoole基础-swoole之task初体验

上一文我们介绍了server的简单应用,这两天也有几个小伙伴私下反馈说不知道server有啥用。

确实哈,对于初学者,你现在的确看不出来这玩意能干啥。有过swoole基础的同学,根据上文的原型脑海中映射一个简单的对话应该是没问题。我们还是继续慢慢的打磨基础。

task初体验

在上文和IO模型中我们都对同步和异步进行了详细的解释,可能你们都懂了,可能部分人还是没懂,毕竟异步始终是个抽象的概念。

今天我们再来强化下这个概念,说一说Async Task。

AsyncTask,即异步任务。我们可以利用AsyncTask将一个耗时的任务投递到队列中,由进程池异步去执行。

博主你说人话,啥是异步任务?

总有些人吐槽不知道swoole的应用场景是啥,我们就以实际中遇到的问题为例:

情景一:管理员需要给指定的用户发送邮件,当勾选10封甚至更多封的时候,点击发送,浏览器会一直转圈,直到邮件全部发送完毕。

情景二:大家都爱看小说,我们以某小说网站的一个需求为例:要求作者可以把他事先写好的小说直接批量导入到网站(根据某种规则),这个操作起来同样会比较耗时。

从我们理解的角度思考,这其实都是php线程一直被阻塞,客户端才一直在等待服务端的响应。

对用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。

我们的目的就是当用户选了10000封邮件或者提交了他含有500章节的内容之后,及时的通知用户邮件正在发送中或者提示用户章节内容正在上传中,对不对?明白我们今天的重点,额吗?

对,你没理解错,AsyncTask的目的就是这个。下面我们来介绍下AsyncTask的使用。

1、先创建一个server

1
$serv = new swoole_server("127.0.0.1", 9501);

2、开启task功能

task功能默认是关闭的,开启task功能需要满足两个条件

配置task进程的数量
注册task的回调函数onTask和onFinish
配置task进程的数量,即配置task_worker_num这个配置项。比如我们开启一个task进程

1
2
3
$serv->set([
'task_worker_num' => 1,
]);

3、task怎么使用?

task进程其实是要在worker进程内发起的,即我们把需要投递的任务,通过worker进程投递到task进程中去处理。

怎么操作呢?我们可以利用swoole_server->task函数把任务数据投递到task进程池中。

swoole_server->task函数是非阻塞函数,任务投递到task进程中后会立即返回,即不管任务需要在task进程内处理多久,worker进程也不需要任何的等待,不会影响到worker进程的其他操作。但是task进程却是阻塞的,如果当前task进程都处于繁忙状态即都在处理任务,你又投递过来100个甚至更多任务,这个时候新投递的任务就只能乖乖的排队等task进程空闲才能继续处理。

如果投递的任务量总是大于task进程的处理能力,建议适当的调大task_worker_num的数量,增加task进程数,不然一旦task塞满缓冲区,就会导致worker进程阻塞,这将是我们不期望的结果。

我们写一个例子来解释下上面所说的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$serv->on('Connect', function ($serv, $fd) {
echo "new client connected." . PHP_EOL;
});
$serv->on('Receive', function ($serv, $fd, $fromId, $data) {
echo "worker received data: {$data}" . PHP_EOL;

// 投递一个任务到task进程中
$serv->task($data);

// 通知客户端server收到数据了
$serv->send($fd, 'This is a message from server.');

// 为了校验task是否是异步的,这里和task进程内都输出内容,看看谁先输出
echo "worker continue run." . PHP_EOL;
});

注册onTask回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* $serv swoole_server
* $taskId 投递的任务id,因为task进程是由worker进程发起,所以多worker多task下,该值可能会相同
* $fromId 来自那个worker进程的id
* $data 要投递的任务数据
*/
$serv->on('Task', function ($serv, $taskId, $fromId, $data) {
echo "task start. --- from worker id: {$fromId}." . PHP_EOL;
for ($i=0; $i < 5; $i++) {
sleep(1);
echo "task runing. --- {$i}" . PHP_EOL;
}
echo "task end." . PHP_EOL;
});

为了模拟判断到底是不是异步的,我们在task的回调中循环一个耗时任务,另一个需要注意的地方,我们在task回调内的结尾并没有return任何内容。

注册onFinish回调

1
2
3
4
5
6
/**
* 只有在task进程中调用了finish方法或者return了结果,才会触发finish
*/
$serv->on('Finish', function ($serv, $taskId, $data) {
echo "finish received data '{$data}'" . PHP_EOL;
});

最后,调用server的start方法

1
$serv->start();

整个过程是这样的:我们在worker进程收到数据后,直接调用swoole_server->task函数把数据投递给task进程,随后在swoole_server->task调用后和task进程内都输出内容。

准备就绪之后我们在终端下启动server,执行

1
php server.php

客户端的测试,我们仍然利用上文在client.php写好的代码进行测试,新开一个终端,执行

1
php client.php

一起看下测试结果:

服务端

1
2
3
4
5
6
7
8
9
10
new client connected.
worker received data: hello server.
worker continue run.
task start. --- from worker id: 3.
task runing. --- 0
task runing. --- 1
task runing. --- 2
task runing. --- 3
task runing. --- 4
task end.

客户端

1
This is a message from server.

从测试结果中,我们看到在swoole_server的task函数之后输出的内容“worker continue run”在task进程开始之前输出。第二个应该引起你注意的是在结果中我们并没有看到在onFinish回调中输出的信息,我们把task回调函数的最后一句echo改为return再试一次

1
return "task end." . PHP_EOL;

如果你修改了代码之后,直接去执行client.php,你会发现结果并没有任何变化。

我们在server启动的那个终端下,按Ctrl+C退出,然后再重新启动server

1
php server.php

在client.php所在的终端,重新执行下

1
php client.php

发现了什么?有没有看到server终端下面的最后一行显示的信息变了?

1
finish received data 'task end.

怎么回事,为什么是这样的呢?大白天见鬼啦?为什么要重启下server代码才生效呢?

这个问题跟常驻内存有关,我们准备后面单独增加一个章节说说这个事。

在结果中我们看到了在onFinish回调中打印的信息。为什么这个时候能输出onFinish回调的内容了呢?

这是因为task进程内一旦return或者调用swoole_server->finish方法,就会通知到worker进程该任务已经完成,worker进程会继续触发onFinish回调,进一步对投递的结果进行处理。

这个过程有没有必要呢?讲真话,还真得看自己的业务需求。比如我们以开篇抛出的情境一发送邮件为例,如果我们在task进程内发送完邮件就完事了,不需要关注邮件是否发送成功,反正发不发也无所谓,这个时候就没必要调onFinish回调了。但是如果说我们还需要确认发送的邮件是否成功,没成功还要再继续发,这个时候我们就可以在onFinish回调中继续处理task的结果了。

最后,我们对本篇做个总结:

没有耗时任务的情况下,worker直接运行,无需开启task
对于耗时的任务,可以在worker内调用task函数,把异步任务投递给task进程进行处理,task进程的数量取决于task_worker_num的配置
task进程内可以选择调用finish方法或者return,来通知worker进程此任务已完成,worker进程会在onFinish回调中对task的执行结果进一步处理。如果worker进程不关心任务的结果,finish就不需要了。