easySwoole 对 swoole 扩展的 AsyncTask 进行了封装,用户无需关心底层异步逻辑就可以编写高性能的异步代码。example
基本使用
在 swoole 扩展中,如果你要编写 async task,你需要如下步骤 :
- 编写
onTask和onFinish两个回调函数来处理异步代码逻辑和处理异步逻辑结果 - 设置 task 进程数量,使用
task函数来投递任务到 task 进程池。
那么 easySwoole 在这个基础上进行了封装,主要有 AbstractAsyncTask 和 AsyncTaskManager 两个类。
AbstractAsyncTask 是一个抽象类,所有异步任务都抽象成一个类,所以的异步类都需要继承 AbstractAsyncTask 这个抽象类,然后实现两个抽象方法 :
handle: 异步回调逻辑,对应onTask函数finishCallBack: 异步逻辑结束之后执行的函数,对应onFinish函数
创建好一个 AsyncTask 之后,使用 AsyncTaskManager 管理类把异步任务投递到 task 进程池中。
AsyncTaskManager::getInstance()->add(AsyncTask::class);
除了通过创建一个 AsyncTask 类来编写异步代码逻辑之外,还可以直接使用闭包函数把任务投递到 task 进程池中。
AsyncTaskManager::getInstance()->add(function () {
Logger::console("this is async task 2");
return "test data for async task 2";
});
执行完异步逻辑的代码之后,可以通过以下方式把数据传递给 worker 进程。
- 如果是使用异步类的
handle函数来执行异步逻辑,那么可以调用$this->finish($data)或者直接使用return $data的方式把数据从 task 进程传递到 worker 进程。 - 如果是闭包函数,则使用
return $data来返回数据到 worker 进程。
这里要注意两点 :
finishCallBack是在 worker 进程中被执行的,执行finishCallBack的 worker 进程和投递 worker 任务的 worker 进程是同一个进程。- 如果没有调用
finish或者return返回数据,那么 worker 进程不会执行finishCallBack函数。
AsyncTaskManager 的 add 函数一共有三个参数,其含义与 swoole 扩展中的 task函数一致 :
- 第一个参数为闭包函数或者继承
AbstractAsyncTask的异步类。 - 第二个参数为欲投递的 task 进程的目标 id。如果为 -1 则 swoole 会根据 task 进程的繁忙程度智能投递。
- 第三个参数为一个闭包函数,如果设置了该参数,那么异步任务结束之后不会调用
onFinish函数,转而直接执行该闭包函数。该闭包函数的参数与onFinish一样。
除了投递异步非阻塞任务之外,还可以使用 AsyncTaskManager 的 addSyncTask($callable, $timeout = 0.5, $workerId = self::TASK_DISPATCHER_TYPE_RANDOM) 来投递异步阻塞任务。
AsyncTaskManager::getInstance()->addSyncTask(SyncTask::class);
addSyncTask 和 add 的不同之处在于,addSyncTask 是阻塞的。当 worker 进程通过 addSyncTask 函数把任务投递到 task 进程之后,worker 进程就会阻塞等待,直到异步任务完成或者超时。addSyncTask 函数的第二个参数为超时时间,如果缺省,那么默认为 0.5 s.
栅栏机制
基于异步任务,easySwoole 实现了栅栏机制。利用栅栏机制,可以投递多个 task 任务并发执行,并且设置超时时间。只有当所有 task 任务都执行完毕返回结果,或者超时才会继续往下执行。栅栏机制常用于多进程下载等功能。
在 easySwoole 中实现栅栏机制很简单 :
- 创建一个
Barrier实例$barrier = new Barrier()。 - 使用
add($taskName, $callable)函数添加 task 任务。 - 调用
$barrier->run(1)设置超时时间并且开始并发执行任务。
$barrier = new Barrier();
$barrier->add("a", function () {
usleep(50000);
return time();
});
$barrier->add("b", function () {
sleep(2);
return time();
});
$barrier->add("c", function () {
usleep(50000);
return time();
});
$this->response()->write($barrier->run(1));
run 函数最后的返回值是一个数组,数组的 key 为所有成功执行完毕的 taskName, value 为对应 task 的执行结果。
除了 add 和 run 方法,Barrier 还包含如下两个方法 :
getResult($taskName)获取某个 taskName 的执行结果,如果该 taskName 执行超时或者失败,返回 null。getResults()获取所有 task 的执行结果。
关于栅栏机制的具体使用,easySwoole 给出了多线程下载的案例,可在 这里 查看。