easySwoole 对 swoole 扩展的 AsyncTask 进行了封装,用户无需关心底层异步逻辑就可以编写高性能的异步代码。example

基本使用

在 swoole 扩展中,如果你要编写 async task,你需要如下步骤 :

  • 编写 onTaskonFinish 两个回调函数来处理异步代码逻辑和处理异步逻辑结果
  • 设置 task 进程数量,使用 task 函数来投递任务到 task 进程池。

那么 easySwoole 在这个基础上进行了封装,主要有 AbstractAsyncTaskAsyncTaskManager 两个类。

AbstractAsyncTask 是一个抽象类,所有异步任务都抽象成一个类,所以的异步类都需要继承 AbstractAsyncTask 这个抽象类,然后实现两个抽象方法 :

  • handle : 异步回调逻辑,对应 onTask 函数
  • finishCallBack : 异步逻辑结束之后执行的函数,对应 onFinish 函数

创建好一个 AsyncTask 之后,使用 AsyncTaskManager 管理类把异步任务投递到 task 进程池中。

AsyncTaskManager::getInstance()->add(AsyncTask::class);

除了通过创建一个 AsyncTask 类来编写异步代码逻辑之外,还可以直接使用闭包函数把任务投递到 task 进程池中。

AsyncTaskManager::getInstance()->add(function () {
    Logger::console("this is async task 2");
    return "test data for async task 2";
});

执行完异步逻辑的代码之后,可以通过以下方式把数据传递给 worker 进程。

  • 如果是使用异步类的 handle 函数来执行异步逻辑,那么可以调用 $this->finish($data) 或者直接使用 return $data 的方式把数据从 task 进程传递到 worker 进程。
  • 如果是闭包函数,则使用 return $data 来返回数据到 worker 进程。

这里要注意两点 :

  • finishCallBack 是在 worker 进程中被执行的,执行 finishCallBack 的 worker 进程和投递 worker 任务的 worker 进程是同一个进程。
  • 如果没有调用 finish 或者 return 返回数据,那么 worker 进程不会执行 finishCallBack 函数。

AsyncTaskManageradd 函数一共有三个参数,其含义与 swoole 扩展中的 task函数一致 :

  • 第一个参数为闭包函数或者继承 AbstractAsyncTask 的异步类。
  • 第二个参数为欲投递的 task 进程的目标 id。如果为 -1 则 swoole 会根据 task 进程的繁忙程度智能投递。
  • 第三个参数为一个闭包函数,如果设置了该参数,那么异步任务结束之后不会调用 onFinish 函数,转而直接执行该闭包函数。该闭包函数的参数与 onFinish 一样。

除了投递异步非阻塞任务之外,还可以使用 AsyncTaskManageraddSyncTask($callable, $timeout = 0.5, $workerId = self::TASK_DISPATCHER_TYPE_RANDOM) 来投递异步阻塞任务。

AsyncTaskManager::getInstance()->addSyncTask(SyncTask::class);

addSyncTaskadd 的不同之处在于,addSyncTask 是阻塞的。当 worker 进程通过 addSyncTask 函数把任务投递到 task 进程之后,worker 进程就会阻塞等待,直到异步任务完成或者超时。addSyncTask 函数的第二个参数为超时时间,如果缺省,那么默认为 0.5 s.

栅栏机制

基于异步任务,easySwoole 实现了栅栏机制。利用栅栏机制,可以投递多个 task 任务并发执行,并且设置超时时间。只有当所有 task 任务都执行完毕返回结果,或者超时才会继续往下执行。栅栏机制常用于多进程下载等功能。

在 easySwoole 中实现栅栏机制很简单 :

  • 创建一个 Barrier 实例 $barrier = new Barrier()
  • 使用 add($taskName, $callable) 函数添加 task 任务。
  • 调用 $barrier->run(1) 设置超时时间并且开始并发执行任务。
$barrier = new Barrier();
$barrier->add("a", function () {
    usleep(50000);
    return time();
});
$barrier->add("b", function () {
    sleep(2);
    return time();
});
$barrier->add("c", function () {
    usleep(50000);
    return time();
});
$this->response()->write($barrier->run(1));

run 函数最后的返回值是一个数组,数组的 key 为所有成功执行完毕的 taskName, value 为对应 task 的执行结果。

除了 addrun 方法,Barrier 还包含如下两个方法 :

  • getResult($taskName) 获取某个 taskName 的执行结果,如果该 taskName 执行超时或者失败,返回 null。
  • getResults() 获取所有 task 的执行结果。

关于栅栏机制的具体使用,easySwoole 给出了多线程下载的案例,可在 这里 查看。

results matching ""

    No results matching ""