easySwoole 对 swoole 扩展的 AsyncTask 进行了封装,用户无需关心底层异步逻辑就可以编写高性能的异步代码。example
基本使用
在 swoole 扩展中,如果你要编写 async task,你需要如下步骤 :
- 编写
onTask
和onFinish
两个回调函数来处理异步代码逻辑和处理异步逻辑结果 - 设置 task 进程数量,使用
task
函数来投递任务到 task 进程池。
那么 easySwoole 在这个基础上进行了封装,主要有 AbstractAsyncTask
和 AsyncTaskManager
两个类。
AbstractAsyncTask
是一个抽象类,所有异步任务都抽象成一个类,所以的异步类都需要继承 AbstractAsyncTask
这个抽象类,然后实现两个抽象方法 :
handle
: 异步回调逻辑,对应onTask
函数finishCallBack
: 异步逻辑结束之后执行的函数,对应onFinish
函数
创建好一个 AsyncTask
之后,使用 AsyncTaskManager
管理类把异步任务投递到 task 进程池中。
AsyncTaskManager::getInstance()->add(AsyncTask::class);
除了通过创建一个 AsyncTask
类来编写异步代码逻辑之外,还可以直接使用闭包函数把任务投递到 task 进程池中。
AsyncTaskManager::getInstance()->add(function () {
Logger::console("this is async task 2");
return "test data for async task 2";
});
执行完异步逻辑的代码之后,可以通过以下方式把数据传递给 worker 进程。
- 如果是使用异步类的
handle
函数来执行异步逻辑,那么可以调用$this->finish($data)
或者直接使用return $data
的方式把数据从 task 进程传递到 worker 进程。 - 如果是闭包函数,则使用
return $data
来返回数据到 worker 进程。
这里要注意两点 :
finishCallBack
是在 worker 进程中被执行的,执行finishCallBack
的 worker 进程和投递 worker 任务的 worker 进程是同一个进程。- 如果没有调用
finish
或者return
返回数据,那么 worker 进程不会执行finishCallBack
函数。
AsyncTaskManager
的 add
函数一共有三个参数,其含义与 swoole 扩展中的 task
函数一致 :
- 第一个参数为闭包函数或者继承
AbstractAsyncTask
的异步类。 - 第二个参数为欲投递的 task 进程的目标 id。如果为 -1 则 swoole 会根据 task 进程的繁忙程度智能投递。
- 第三个参数为一个闭包函数,如果设置了该参数,那么异步任务结束之后不会调用
onFinish
函数,转而直接执行该闭包函数。该闭包函数的参数与onFinish
一样。
除了投递异步非阻塞任务之外,还可以使用 AsyncTaskManager
的 addSyncTask($callable, $timeout = 0.5, $workerId = self::TASK_DISPATCHER_TYPE_RANDOM)
来投递异步阻塞任务。
AsyncTaskManager::getInstance()->addSyncTask(SyncTask::class);
addSyncTask
和 add
的不同之处在于,addSyncTask
是阻塞的。当 worker 进程通过 addSyncTask
函数把任务投递到 task 进程之后,worker 进程就会阻塞等待,直到异步任务完成或者超时。addSyncTask
函数的第二个参数为超时时间,如果缺省,那么默认为 0.5 s.
栅栏机制
基于异步任务,easySwoole 实现了栅栏机制。利用栅栏机制,可以投递多个 task 任务并发执行,并且设置超时时间。只有当所有 task 任务都执行完毕返回结果,或者超时才会继续往下执行。栅栏机制常用于多进程下载等功能。
在 easySwoole 中实现栅栏机制很简单 :
- 创建一个
Barrier
实例$barrier = new Barrier()
。 - 使用
add($taskName, $callable)
函数添加 task 任务。 - 调用
$barrier->run(1)
设置超时时间并且开始并发执行任务。
$barrier = new Barrier();
$barrier->add("a", function () {
usleep(50000);
return time();
});
$barrier->add("b", function () {
sleep(2);
return time();
});
$barrier->add("c", function () {
usleep(50000);
return time();
});
$this->response()->write($barrier->run(1));
run
函数最后的返回值是一个数组,数组的 key 为所有成功执行完毕的 taskName, value 为对应 task 的执行结果。
除了 add
和 run
方法,Barrier
还包含如下两个方法 :
getResult($taskName)
获取某个 taskName 的执行结果,如果该 taskName 执行超时或者失败,返回 null。getResults()
获取所有 task 的执行结果。
关于栅栏机制的具体使用,easySwoole 给出了多线程下载的案例,可在 这里 查看。