基于Workerman实现异步任务

websocket处理客户端信息(如批量采集、批量发邮件等任务)时会发生长时间的阻塞,现可基于Workerman实现异步任务来解决此问题

实现原理

首先了解Workerman

Workerman如何实现异步任务

具体实现

  • 下载workerman
  • 创建websocket主服务
  • 创建任务进程服务
  • 测试

下载workerman

composer workerman的依赖包 常用composer命令、依赖包

composer require workerman/workerman

创建websocket主服务

新建文件"websocket.php"作为websocket主服务

<?php
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// websocket服务
$worker = new Worker('websocket://0.0.0.0:7273');
// $ws_worker->count = 4;
$worker->onMessage = function($ws_connection, $message)
{
	echo "收到客户端消息:{$message}\r\n";
	$ws_connection->send('服务端收到您的消息:' . $message);
    // 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
    $task_connection = new AsyncTcpConnection('Text://XXXXX:7274');
    // 任务及参数数据
    $task_data = "任务:{$message}";
    // 发送数据
    $task_connection->send($task_data);
    // 异步获得结果
    $task_connection->onMessage = function($task_connection, $task_result)use($ws_connection)
    {
    	print_r($task_result);
    	 // 通知对应的websocket客户端
		$ws_connection->send($task_result);
		
    	// 获得结果后记得关闭异步连接
    	if($task_result == 'taskFinished'){
    		$task_connection->close();
    	}
    };
    //onError必须在connect之前,否则无法捕获异步连接失败事件
    $task_connection->onError = function($task_connection, $err_code, $err_msg)use($ws_connection)
	{
		$ws_connection->send('任务服务器连接失败');
		echo "任务服务器连接失败";
	    // $ws_connection->send('任务下发失败:任务服务器连接失败  '.$err_msg);
	};
    // 执行异步连接
    $task_connection->connect();
};
// 运行worker
Worker::runAll();

创建任务服务进程

新建文件“task.php”作为任务服务进程

<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:7274');
// task进程数可以根据需要多开一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
//只有php7才支持task->reusePort,可以让每个task进程均衡的接收任务
//$task->reusePort = true;
$task_worker->onMessage = function($connection, $task_data)
{
	$connection->send('getTask');
	echo "收到   ".$task_data;
	echo "\r\n执行  ".$task_data;
	$connection->send('开始执行'.$task_data);
	//睡眠 模拟执行任务
	sleep(5);
    
    echo "\r\n完成:{$task_data}";
    $connection->send('taskFinished');
};
Worker::runAll();

启动服务、测试

1.启动主服务

php websocket.php start
1.启动主服务

2.启动任务服务进程

php task.php start
2.启动任务服务进程

3.客户端连接测试

客户端连接websocket,并发送"发邮件"

客户端连接websocket

websocket服务端接收到客户端消息,并转给任务进程

websocket服务端

任务进程端收到websocket服务端任务,并执行与反馈

任务进程端

推荐:

常用composer命令、依赖包

vatfs (影视全搜索)代码分享

admin

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: