update
This commit is contained in:
66
module/Application/src/Command/Swoole/Client/SwLogClient.php
Normal file
66
module/Application/src/Command/Swoole/Client/SwLogClient.php
Normal file
@ -0,0 +1,66 @@
|
||||
<?php
|
||||
/**
|
||||
*
|
||||
* @author:llbjj
|
||||
* @DateTime:2022/5/19 8:28
|
||||
* @Description:
|
||||
*
|
||||
*/
|
||||
|
||||
namespace Application\Command\Swoole\Client;
|
||||
|
||||
use Application\Service\BaseService;
|
||||
use Interop\Container\ContainerInterface;
|
||||
|
||||
class SwLogClient extends BaseService
|
||||
{
|
||||
protected $swClient;
|
||||
public function __construct(ContainerInterface $container)
|
||||
{
|
||||
parent::__construct($container);
|
||||
$this->swClient = new \Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
|
||||
self::connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* @files.saveConflictResolution: overwriteFileOnDisk
|
||||
* @Description: 连接swoole服务器
|
||||
* @Author: llbjj
|
||||
* @Date: 2022-07-04 20:23:15
|
||||
* @return {*}
|
||||
*/
|
||||
private function connect() {
|
||||
$svConfigData = [
|
||||
'ip' => '127.0.0.1',
|
||||
'port' => 9509,
|
||||
'timeout' => -1
|
||||
];
|
||||
$isConnect = $this->swClient->connect($svConfigData['ip'], $svConfigData['port'], $svConfigData['timeout']);
|
||||
if(!$isConnect) {
|
||||
throw new \Exception("connect failed. Error: {".$this->swClient->errCode."}\n");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @files.saveConflictResolution: overwriteFileOnDisk
|
||||
* @Description: 关闭swoole客户端
|
||||
* @Author: llbjj
|
||||
* @Date: 2022-07-04 20:25:50
|
||||
* @return {*}
|
||||
*/
|
||||
private function close() {
|
||||
$this->swClient->close();
|
||||
}
|
||||
/**
|
||||
* Notes: 投递日志异步任务
|
||||
* User: llbjj
|
||||
* DateTime: 2022/5/19 8:54
|
||||
*
|
||||
*/
|
||||
public function send($data){
|
||||
if(is_array($data)) $data = json_encode($data);
|
||||
$data .= '|PHENOL|';
|
||||
$this->swClient->send($data);
|
||||
self::close();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,74 @@
|
||||
<?php
|
||||
/**
|
||||
*
|
||||
* @author:llbjj
|
||||
* @DateTime:2022/5/19 8:28
|
||||
* @Description:
|
||||
*
|
||||
*/
|
||||
|
||||
namespace Application\Command\Swoole\Client;
|
||||
|
||||
use Application\Service\BaseService;
|
||||
use Interop\Container\ContainerInterface;
|
||||
|
||||
class SwTaskClient extends BaseService
|
||||
{
|
||||
protected $swClient;
|
||||
public function __construct(ContainerInterface $container)
|
||||
{
|
||||
parent::__construct($container);
|
||||
}
|
||||
|
||||
/**
|
||||
* @files.saveConflictResolution: overwriteFileOnDisk
|
||||
* @Description: 连接swoole服务器
|
||||
* @Author: llbjj
|
||||
* @Date: 2022-07-04 20:23:15
|
||||
* @return {*}
|
||||
*/
|
||||
private function connect() {
|
||||
$this->swClient = new \Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
|
||||
$svConfigData = [
|
||||
'ip' => '127.0.0.1',
|
||||
'port' => 9511,
|
||||
'timeout' => -1
|
||||
];
|
||||
$isConnect = $this->swClient->connect($svConfigData['ip'], $svConfigData['port'], $svConfigData['timeout']);
|
||||
if(!$isConnect) {
|
||||
throw new \Exception("connect failed. Error: {".$this->swClient->errCode."}\n");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @files.saveConflictResolution: overwriteFileOnDisk
|
||||
* @Description: 关闭swoole客户端
|
||||
* @Author: llbjj
|
||||
* @Date: 2022-07-04 20:25:50
|
||||
* @return {*}
|
||||
*/
|
||||
private function close() {
|
||||
$this->swClient->close();
|
||||
}
|
||||
/**
|
||||
* Notes: 投递日志异步任务
|
||||
* User: llbjj
|
||||
* DateTime: 2022/5/19 8:54
|
||||
*
|
||||
*/
|
||||
public function send(array $data){
|
||||
// 是否开启了异步任务
|
||||
if($this->LocalService()->config['swAsyncTask']['task']) {
|
||||
$data['token'] = $this->LocalService()->identity->getToken();
|
||||
$data['domainName'] = $_SERVER['HTTP_HOST'];
|
||||
|
||||
$this->connect();
|
||||
$data = json_encode($data, true);
|
||||
$data .= '|PHENOL|';
|
||||
$this->swClient->send($data);
|
||||
self::close();
|
||||
}else{
|
||||
$this->LocalService()->{$data['svName']}->{$data['methodName']}($data['params']);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,55 @@
|
||||
<?php
|
||||
/*
|
||||
* @files.saveConflictResolution: overwriteFileOnDisk
|
||||
* @Author: llbjj
|
||||
* @Date: 2022-07-05 08:26:57
|
||||
* @LastEditTime: 2022-07-05 14:16:46
|
||||
* @LastEditors: llbjj
|
||||
* @Description:
|
||||
* @FilePath: /RemoteWorking/module/Application/src/Command/Swoole/Server/SwItemTipCommand.php
|
||||
*/
|
||||
namespace Application\Command\Swoole\Server;
|
||||
|
||||
use Application\Command\BasicCommand;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
class SwItemTipCommand extends BasicCommand {
|
||||
protected function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
// 开启websock服务
|
||||
$wsSv = new \Swoole\WebSocket\Server('0.0.0.0', 9590);
|
||||
|
||||
// 监听WebSocket连接打开事件
|
||||
$wsSv->on('open', function(\Swoole\WebSocket\Server $server, $request) use($output) {
|
||||
// $output->writeln("server: handshake success with fd{$request->fd}".PHP_EOL);
|
||||
});
|
||||
|
||||
// 监听客户端消息事件
|
||||
$wsSv->on('message', function(\Swoole\WebSocket\Server $server, $frame) use($output) {
|
||||
$output->writeln("receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}".PHP_EOL);
|
||||
});
|
||||
|
||||
// 利用swoole的定时器,定时请求数据实时推送到客户端;timer的简单用法
|
||||
$addProcess = new \Swoole\Process( function($process) use($wsSv, $output) {
|
||||
\Swoole\Timer::tick(5000, function (int $timer_id, $wsSv) {
|
||||
foreach ($wsSv->connections as $fd){
|
||||
// 根据实际情况获取数据,发送给客户端(目前只是测试数据)
|
||||
$wsSv->push($fd, $fd.":项目总数量:".$this->LocalService()->itemInfo->getCount().' 个 '.time());
|
||||
}
|
||||
}, $wsSv);
|
||||
});
|
||||
$wsSv->addProcess($addProcess);
|
||||
|
||||
|
||||
// 监听客户端断开链接
|
||||
$wsSv->on('close', function($server, $fd) use($output) {
|
||||
$output->writeln("client {$fd} closed".PHP_EOL);
|
||||
});
|
||||
|
||||
// 启动websock服务
|
||||
$wsSv->start();
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,65 @@
|
||||
<?php
|
||||
/**
|
||||
*
|
||||
* @author:llbjj
|
||||
* @DateTime:2022/5/12 7:13
|
||||
* @Description:
|
||||
*
|
||||
*/
|
||||
|
||||
namespace Application\Command\Swoole\Server;
|
||||
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Process;
|
||||
use Swoole\Server;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Swoole\Coroutine\Server\Connection;
|
||||
|
||||
class SwLogCommand extends \Application\Command\BasicCommand
|
||||
{
|
||||
protected function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
$output->writeln('APP_PATH:'.APP_PATH);
|
||||
//开启LogTcp服务
|
||||
$tcpSv = new Server('127.0.0.1', 9509);
|
||||
$tcpSv->set([
|
||||
'worker_num' => 2, // 工作进程数量
|
||||
'task_worker_num' => 4, // 异步任务进程数
|
||||
'open_eof_check' => true, //打开EOF检测
|
||||
'package_eof' => '|PHENOL|', //设置EOF
|
||||
'package_max_length' => 1024 * 1024 * 5
|
||||
]);
|
||||
|
||||
//接收数据
|
||||
$tcpSv->on('receive', function ($sv, $fd, $reactorId, $data) use($output) {
|
||||
$sendDataArr = array_filter(explode('|PHENOL|', $data));
|
||||
if(!empty($sendDataArr)) {
|
||||
foreach($sendDataArr as $sendData) {
|
||||
$sv->task($sendData);
|
||||
}
|
||||
}
|
||||
|
||||
$output->writeln('oprt Logs writing ...'.PHP_EOL);
|
||||
});
|
||||
|
||||
// 定义异步任务
|
||||
$tcpSv->on('task', function($sv, $task_id, $src_work_id, $data) use ($output){
|
||||
$output->writeln('start write Log'.PHP_EOL);
|
||||
$logObj = $this->LocalService()->log;
|
||||
if(!$logObj->saveLogToDb($data)){
|
||||
//日志落库失败,将日志数据存缓存文件中
|
||||
$logObj->saveLogToFile($data);
|
||||
}
|
||||
$sv->finish("{$data} -> OK");
|
||||
});
|
||||
//处理异步任务的结果(此回调函数在worker进程中执行)
|
||||
$tcpSv->on('Finish', function ($serv, $task_id, $data) {
|
||||
echo "AsyncTask[{$task_id}] Finish: {$data}".PHP_EOL;
|
||||
});
|
||||
|
||||
$tcpSv->start();
|
||||
//parent::execute($input, $output); // TODO: Change the autogenerated stub
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,84 @@
|
||||
<?php
|
||||
/**
|
||||
*
|
||||
* @author:llbjj
|
||||
* @DateTime:2022/9/1 15:00
|
||||
* @Description:
|
||||
*
|
||||
*/
|
||||
namespace Application\Command\Swoole\Server;
|
||||
|
||||
use Application\Command\BasicCommand;
|
||||
use Application\Service\Extension\ErrorHandler;
|
||||
use Application\Service\Extension\Laminas;
|
||||
use Swoole\Server;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
class SwTaskCommand extends BasicCommand{
|
||||
function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
error_reporting(0);
|
||||
$tcpSv = new Server('127.0.0.1', 9511);
|
||||
$tcpSv->set([
|
||||
'worker_num' => 4, // 工作进程数量
|
||||
'task_worker_num' => 8, // 异步任务进程数
|
||||
'open_eof_check' => true, //打开EOF检测
|
||||
'package_eof' => '|PHENOL|', //设置EOF
|
||||
'package_max_length' => 1024 * 1024 * 5,
|
||||
'dispatch_mode' => 3,
|
||||
'task_ipc_mode' => 2,
|
||||
|
||||
// 'daemonize' => 1, //以守护进程 1或0
|
||||
// 'open_eof_split' => true, //swoole底层实现自动分包。比较消耗cpu资源
|
||||
// 'package_eof' => "|PHENOL|", //设置后缀,一般为"
|
||||
]);
|
||||
|
||||
//接收数据
|
||||
$tcpSv->on('receive', function ($sv, $fd, $reactorId, $data) use($output) {
|
||||
$sendDataArr = array_filter(explode('|PHENOL|', $data));
|
||||
if(!empty($sendDataArr)) {
|
||||
foreach($sendDataArr as $sendData) {
|
||||
$sv->task($sendData);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// 定义异步任务
|
||||
$tcpSv->on('task', function($sv, $task_id, $src_work_id, $data) use ($output){
|
||||
$taskData = json_decode($data, true);
|
||||
try {
|
||||
// 设置Token
|
||||
Laminas::$token = $taskData['token'] ?? '';
|
||||
// 调用方法,
|
||||
$result = $this->LocalService()->{$taskData['svName']}->{$taskData['methodName']}($taskData['params']);
|
||||
|
||||
$sv->finish("{$data}");
|
||||
}catch (\Throwable $throwable) {
|
||||
$this->saveErrLog($throwable, $taskData);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
//处理异步任务的结果(此回调函数在worker进程中执行)
|
||||
$tcpSv->on('Finish', function ($serv, $task_id, $data) {
|
||||
$taskData = json_decode($data, true);
|
||||
// 发送完成通知
|
||||
$msgData = [
|
||||
'title' => "【{$taskData['domainName']}】异步任务 {$taskData['svName']}->{$taskData['methodName']}() 执行完成!",
|
||||
'description' => "AsyncTask[{$task_id}] Finish: {$data}",
|
||||
'url' => '#'
|
||||
];
|
||||
|
||||
$this->LocalService()->wechatWork->sendMesToUser($msgData);
|
||||
echo "AsyncTask[{$task_id}] Finish: {$data} -> OK".PHP_EOL;
|
||||
});
|
||||
|
||||
$tcpSv->start();
|
||||
return 0;
|
||||
}
|
||||
|
||||
public function saveErrLog(&$throwable, &$taskData) {
|
||||
ErrorHandler::log2txt($throwable, '', $taskData);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,80 @@
|
||||
<?php
|
||||
/*
|
||||
* @Author: llbjj
|
||||
* @Date: 2022-07-06 16:18:19
|
||||
* @LastEditTime: 2022-07-12 18:04:12
|
||||
* @LastEditors: 863465124 863465124@qq.com
|
||||
* @Description:
|
||||
*/
|
||||
namespace Application\Command\Swoole\Server;
|
||||
|
||||
use Application\Command\BasicCommand;
|
||||
use Application\Common\Com;
|
||||
use Swoole\Process;
|
||||
use Swoole\Server;
|
||||
use Swoole\Timer;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
class SwTimerCommand extends BasicCommand {
|
||||
protected static string $commandName = 'sw:timer';
|
||||
protected InputInterface $input;
|
||||
protected OutputInterface $output;
|
||||
|
||||
protected function configure()
|
||||
{
|
||||
$this->setName(self::$commandName);
|
||||
$this->addOption('sv', '', InputOption::VALUE_REQUIRED, 'Server Name');
|
||||
$this->addOption('method', '', InputOption::VALUE_REQUIRED, 'Server Method');
|
||||
$this->addOption('second', '', InputOption::VALUE_REQUIRED, '任务间隔时间');
|
||||
$this->addOption('startSecond', '', InputOption::VALUE_REQUIRED, '任务启动时间');
|
||||
}
|
||||
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
$this->input = $input;
|
||||
$this->output = $output;
|
||||
|
||||
if($this->input->getOption('startSecond')) {
|
||||
$this->afterTimer();
|
||||
}else {
|
||||
$this->swTimer();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected function afterTimer() {
|
||||
swoole_timer_after($this->input->getOption('startSecond'), function() {
|
||||
$this->swTimer();
|
||||
});
|
||||
}
|
||||
|
||||
public function swTimer() {
|
||||
swoole_timer_tick($this->input->getOption('second'), function() {
|
||||
echo date('Y-m-d H:i:s') . ' -> ' . $this->input->getOption('sv') . ' : ' . $this->input->getOption('method') . PHP_EOL;
|
||||
$this->LocalService()->{$this->input->getOption('sv')}->{$this->input->getOption('method')}();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @Description: 开启一个定时任务信息
|
||||
* @Author: llbjj
|
||||
* @Date: 2022-07-11 14:47:30
|
||||
* @param {*} $timer
|
||||
* @param {*} $timerName
|
||||
*/
|
||||
protected function setTimerFun(array &$timer, string $timerName) {
|
||||
\Swoole\Timer::tick($timer['msec'], function(int $timer_id) use($timer, $timerName){
|
||||
$timerContent = date('Y-m-d h:i:s')." 开始定时任务【{$timerName}】".PHP_EOL;
|
||||
Com::writeFile("{$timerName}.log", APP_PATH."/data/log/swooleTimer/{$timerName}/", $timerContent);
|
||||
$this->LocalService()->{$timer['serviceName']}->{$timer['funName']}();
|
||||
$nowTime = time();
|
||||
if(!empty($timer['endTime']) and $nowTime > strtotime($timer['endTime'])){
|
||||
\Swoole\Timer::clear($timer_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,294 @@
|
||||
<?php
|
||||
|
||||
namespace Application\Command\Swoole\Server;
|
||||
|
||||
use Application\Command\BasicCommand;
|
||||
|
||||
use Application\Service\Extension\ErrorHandler;
|
||||
use Application\Service\Extension\Helper\StringHelper;
|
||||
use Application\Service\Extension\Laminas;
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Http\Request;
|
||||
use Swoole\Http\Response;
|
||||
use Swoole\Process;
|
||||
use Swoole\WebSocket\CloseFrame;
|
||||
use Swoole\Coroutine\Http\Server;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use function Swoole\Coroutine\run;
|
||||
|
||||
/**
|
||||
* https://wiki.swoole.com/#/coroutine/ws_server
|
||||
*/
|
||||
class WebSocketCommand extends BasicCommand
|
||||
{
|
||||
/** @var int 监听的端口 */
|
||||
const LISTEN_PORT = 9502;
|
||||
|
||||
/**
|
||||
* 监听host
|
||||
* @var string
|
||||
*/
|
||||
public string $host = '0.0.0.0';
|
||||
|
||||
public array $serverConfig = [
|
||||
'daemonize' => true,
|
||||
'heartbeat_idle_time' => 60 * 20, //允许最大空闲时间
|
||||
'heartbeat_check_interval' => 60 * 20, //心跳检测间隔
|
||||
'ssl_cert_file' => APP_PATH . '/config/autoload/ssl.crt',
|
||||
'ssl_key_file' => APP_PATH . '/config/autoload/ssl.key',
|
||||
];
|
||||
|
||||
/**
|
||||
* 用来存储 消息模板。用来做缓存
|
||||
* @var array
|
||||
*/
|
||||
protected static array $messageTemplateContext = [];
|
||||
|
||||
protected static array $onlineList = [
|
||||
'user' => [], // 用户对应的socket连接
|
||||
];
|
||||
|
||||
public function execute(InputInterface $input, OutputInterface $output)
|
||||
{
|
||||
set_time_limit(0);// 设置超时时间为无限,防止超时
|
||||
|
||||
// Process::daemon();
|
||||
|
||||
run(function () {
|
||||
try {
|
||||
$server = new Server($this->host, self::LISTEN_PORT, true);
|
||||
$server->set($this->serverConfig);
|
||||
$server->handle('/', $this->onHandle());
|
||||
|
||||
echo "ws服务已开启 {$this->host}:" . self::LISTEN_PORT . PHP_EOL;
|
||||
$server->start();
|
||||
} catch (\Throwable $e) {
|
||||
echo "ws开启失败. {$e->getMessage()}" . PHP_EOL;
|
||||
}
|
||||
});
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理首次请求。验证token.
|
||||
* @param Request $request
|
||||
* @param Response $ws
|
||||
*/
|
||||
public function handleRequest(Request $request, Response $ws)
|
||||
{
|
||||
$header = $request->header;
|
||||
$token = $header['token'] ?? '';
|
||||
// 设置token
|
||||
Laminas::setToken($token);
|
||||
|
||||
// if ($token) {
|
||||
// // 用户id => socket连接
|
||||
// self::$onlineList['user'][$this->getIdentityId()][] = $ws;
|
||||
// \Co\defer(function() {
|
||||
// unset(self::$onlineList['user'][$this->getIdentityId()]);
|
||||
// });
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理事件
|
||||
* @return \Closure
|
||||
*/
|
||||
public function onHandle(): \Closure
|
||||
{
|
||||
return function (Request $request, Response $ws) {
|
||||
try {
|
||||
// 首次连接
|
||||
$this->handleRequest($request, $ws);
|
||||
// post请求
|
||||
if (isset($request->post)) {
|
||||
$this->handleConnect($request, $ws, null);
|
||||
$ws->close();
|
||||
} else {
|
||||
// websocket
|
||||
$ws->upgrade();
|
||||
while (true) {
|
||||
$frame = $ws->recv();
|
||||
if ($frame === '') {
|
||||
$ws->close();
|
||||
break;
|
||||
} else if ($frame === false && !isset($request->post)) {
|
||||
// echo 'errorCode: ' . swoole_last_error() . "\n";
|
||||
$ws->close();
|
||||
break;
|
||||
} else {
|
||||
$this->handleConnect($request, $ws, $frame);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (\Throwable $exception) {
|
||||
ErrorHandler::log2txt(print_r([
|
||||
'type' => get_class($exception),
|
||||
'file' => method_exists($exception, 'getFile') ? $exception->getFile() : '',
|
||||
'errorMessage' => $exception->getMessage(),
|
||||
'executeSql' => method_exists($exception, 'getExecuteSql') ? $exception->getExecuteSql() : null,
|
||||
'line' => $exception->getLine(),
|
||||
'stack-trace' => explode("\n", $exception->getTraceAsString()),
|
||||
]), true);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private function handleConnect($request, $ws, $frame)
|
||||
{
|
||||
$d = $frame->data ?? $request->post['data'];
|
||||
// 心跳
|
||||
if ($d === 'ping') {
|
||||
$ws->push('pong');
|
||||
}
|
||||
|
||||
if (($data = StringHelper::jsonDecode($d, false)) !== false) {
|
||||
if ($response = $this->handleMessage($data, $request, $ws)) {
|
||||
$ws->push(json_encode($response));
|
||||
}
|
||||
}
|
||||
|
||||
if ($d == 'close' || $request->post || (is_object($frame) && get_class($frame) === CloseFrame::class)) {
|
||||
$ws->close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ```
|
||||
* 小程序端拉取所有消息提示 {"action":"pull", "item_id":"项目id"}
|
||||
* 推送消息 {"action":"publish", "to":"用户id", "item_id":"123"}
|
||||
* ```
|
||||
* @param $data
|
||||
* @param Request $request
|
||||
* @return array|void
|
||||
*/
|
||||
private function handleMessage($data, Request $request, $ws)
|
||||
{
|
||||
$action = $data['action'] ?? '';
|
||||
if ($action === 'pull') { // 小程序主动拉取
|
||||
self::$onlineList['user'][$this->getIdentityId()][$data['item_id']][] = $ws;
|
||||
\Co\defer(function() {
|
||||
unset(self::$onlineList['user'][$this->getIdentityId()]);
|
||||
});
|
||||
|
||||
$allMessage = $this->getAllMessage($data, $request->fd);
|
||||
return [
|
||||
'code' => 0,
|
||||
'data' => $allMessage
|
||||
];
|
||||
} elseif ($action === 'publish') { // 广播
|
||||
if (stripos($data['to'], ',') !== false) {
|
||||
$list = explode(',', $data['to']);
|
||||
} else {
|
||||
$list = (array)$data['to'];
|
||||
}
|
||||
|
||||
foreach ($list as $userId) {
|
||||
if (isset(self::$onlineList['user'][$userId][$data['item_id']])) {
|
||||
foreach (self::$onlineList['user'][$userId][$data['item_id']] as $wsConnect) {
|
||||
if ($tmp = $this->getUserMessageTemplate($userId, $data['item_id'])) {
|
||||
$wsConnect->push(json_encode([
|
||||
'code' => 0,
|
||||
'data' => $tmp
|
||||
]));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有消息提醒
|
||||
* @param $request
|
||||
* @param $fd
|
||||
* @return array|mixed|\phpDocumentor\Reflection\Types\Mixed
|
||||
*/
|
||||
private function getAllMessage($request, $fd)
|
||||
{
|
||||
$tmp = $this->getMessageTemplate($request, $fd);
|
||||
foreach ($tmp as &$item) {
|
||||
$item['is_message'] = $this->getIsMessage($item['id'], $this->getIdentityId(), $request['item_id']);
|
||||
}
|
||||
return $tmp;
|
||||
}
|
||||
|
||||
private function getUserMessageTemplate($userId, $itemId)
|
||||
{
|
||||
$template = self::$messageTemplateContext[$userId] ?? null;
|
||||
if (!$template) {
|
||||
return null;
|
||||
}
|
||||
|
||||
foreach ($template as &$item) {
|
||||
$item['is_message'] = $this->getIsMessage($item['id'], $userId, $itemId);
|
||||
}
|
||||
|
||||
return $template;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取用户都有哪些模块
|
||||
* @param $request
|
||||
* @return array|mixed|\phpDocumentor\Reflection\Types\Mixed
|
||||
*/
|
||||
private function getMessageTemplate($request)
|
||||
{
|
||||
if (!isset(self::$messageTemplateContext[$this->getIdentityId()])) {
|
||||
//获取参数
|
||||
$user_id = Laminas::$serviceManager->identity->getId();
|
||||
|
||||
//var_dump('userId', $user_id);
|
||||
$item_id = $request['item_id'] ?? 0;
|
||||
//var_dump('$item_id', $item_id);
|
||||
$user_id = $this->LocalService()->signatoryUser->findsignatoryUser($user_id);
|
||||
//查询项目
|
||||
$itemInfoID = $this->LocalService()->itemInfo->getOneFieldVal('fid', "is_del = 0 and id = " . $item_id) ?: 0;
|
||||
//查询角色下的角色管理用户信息
|
||||
$roleSignatoryRelationDatas = $this->LocalService()->roleSignatoryRelation->changeUser($itemInfoID, $user_id);
|
||||
//var_dump('//查询角色下的角色管理用户信息', $roleSignatoryRelationDatas);
|
||||
if (!empty($roleSignatoryRelationDatas)) {
|
||||
$role_id_arr = array_unique(array_column($roleSignatoryRelationDatas, 'role_id'));
|
||||
//获取当前模块在当前用户的角色中是有操作的权限
|
||||
$whereroleMenuRelation['where'] = "status = 0 and item_id =" . $itemInfoID . " and role_id in (" . implode(',', $role_id_arr) . ")";
|
||||
$whereroleMenuRelation['columns'] = ['id', 'module_id'];
|
||||
$roleMenuRelationDatas = $this->LocalService()->realRolemodulerelation->fetchAll($whereroleMenuRelation);
|
||||
//var_dump('//获取当前模块在当前用户的角色中是有操作的权限', $roleSignatoryRelationDatas);
|
||||
if (!empty($roleMenuRelationDatas)) {
|
||||
$module_id_arr = array_unique(array_column($roleMenuRelationDatas, 'module_id'));
|
||||
//获取模块菜单
|
||||
|
||||
if (!empty($type)) {
|
||||
$whereadminMenu['where'] = "is_del = 0 and menu_name != '' and status = 0 and menu_type = 1 and id in (" . implode(',', $module_id_arr) . ")";
|
||||
$whereadminMenu['columns'] = ['id', 'applets_menu' => 'menu_name', 'icon', 'url'];
|
||||
|
||||
} else {
|
||||
$whereadminMenu['where'] = "is_del = 0 and applets_menu != '' and status = 0 and menu_type = 1 and id in (" . implode(',', $module_id_arr) . ")";
|
||||
$whereadminMenu['columns'] = ['id'];
|
||||
}
|
||||
$whereadminMenu['order'] = ['menu_order'];
|
||||
//var_dump($this->LocalService()->adminMenu->fetchAll($whereadminMenu));
|
||||
self::$messageTemplateContext[$this->getIdentityId()] = $this->LocalService()->adminMenu->fetchAll($whereadminMenu);
|
||||
\Co\defer(function () {
|
||||
unset(self::$messageTemplateContext[$this->getIdentityId()]);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return self::$messageTemplateContext[$this->getIdentityId()] ?? [];
|
||||
}
|
||||
|
||||
private function getIdentityId(): ?int
|
||||
{
|
||||
return Laminas::$serviceManager->identity->getId();
|
||||
}
|
||||
|
||||
private function getIsMessage($menuId, $userId, $itemId): int
|
||||
{
|
||||
return Laminas::$serviceManager->redisExtend->setDatabase(6)->getRedisInstance()->getBit("menu:{$itemId}:{$menuId}", $userId) ?: 0;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user