php swoole 队列、异步、协程并发curl请求

  public function curls()
    {
        cli_set_process_title(__FILE__ . ':curls');
        $file_mtime = $this->file_mtime();
        $st = time();

        error_reporting(E_ALL);
        ini_set('swoole.display_errors', 'On');

        \Swoole\Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
        \Swoole\Coroutine\run(function () use ($st, $file_mtime) {
            $channel = new \Swoole\Coroutine\Channel(1);
            $exit = null;
            \Swoole\Coroutine::create(function () use ($channel, $st, $file_mtime, &$exit) {
                $redis = \services\iRedis::getInstance();
                while (true) {
                    $task = $redis->blPop('curl_queue', 5);
                    if (!empty($task)) $channel->push($task[1]);
                    if ($file_mtime != $this->file_mtime()) break;
                    if (time() - $st > 3600) break;
                }
                $exit = true;
            });
            for ($i = 0; $i < 5; $i++) {
                \Swoole\Coroutine::create(function () use ($channel, &$exit) {
                    while (true) {
                        $task = $channel->pop(5);
                        if (!empty($task)) {
                            $json = json_decode($task, true);
                            if ($json) {
                                $method = $json['method'];
                                $uri = $json['uri'];
                                $headers = $json['headers'] ?? [];
                                $body = $json['body'] ?? null;
                                $result = \services\Tools::curl($method, $uri, $body, $headers);
                            }
                        }
                        if ($exit && $channel->isEmpty()) break;
                    }
                });
            }
        });
    }