public function curls()
{
cli_set_process_title(__FILE__ . ':curls');
$redis = $this->redis;
$file_mtime = $this->file_mtime();
$st = time();
error_reporting(E_ALL);
ini_set('swoole.display_errors', 'On');
\Swoole\Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
\Swoole\Coroutine\run(function () use ($st, $file_mtime, $redis) {
for ($i = 0; $i < 5; $i++) {
\Swoole\Coroutine::create(function () use ($i, $st, $file_mtime, $redis) {
while (true) {
$task = $redis->blPop('curl_queue', 5);
if (!empty($task)) {
[$key, $val] = $task;
$json = json_decode($val, true);
if ($json) {
$method = $json['method'];
$uri = $json['uri'];
$headers = $json['headers'] ?? [];
$body = $json['body'] ?? null;
$ch = curl_init();
if ($method === 'POST') {
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
}
if (count($headers)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
}
curl_setopt($ch, CURLOPT_URL, $uri);
curl_setopt($ch, CURLOPT_HEADER, false);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
$result = curl_exec($ch);
curl_close($ch);
$content = date('Y-m-d H:i:s') . ": \n";
$content .= "url: $method $uri\n";
$content .= "headers: " . json_encode($headers) . "\n";
$content .= "body: $body\n";
$content .= "result: $result\n";
$content .= "\n";
recordlog($content, 'curls');
}
} else {
\Swoole\Coroutine::sleep(0.001);
}
if ($file_mtime != $this->file_mtime()) break;
if (time() - $st > 3600) break;
}
});
}
});
}
php swoole 队列、异步、协程并发curl请求
发表评论

