TP5延迟队列
作者:互联网
今天在项目中遇到并发插入的问题。
实际场景
计算用户当当天的累计广告收益,这时候出现了并发问题,一个用户当天出现多条记录。
这时候该如何操作呢?加锁?加缓存?
分析
用户上报的广告数据都是真实有用的,出现多条数据的原因是同一时间用户上报两天记录,这时候我们需要先让一条写入库,后面的数据是修改该条记录
解决
使用延迟队列
// 延迟队列
$is_lock = Cache::store('redis')->handler()->setnx('user_ad_data:' . $this->_userId, 1);
if($is_lock == 0){
$data = [
'user_id' => $this->_userId,
'day' => date('Y-m-d'),
'type' => $type,
'ecpm' => $ecpm,
];
QueueService::pushLaterQueue(3,"app\job\UserAdData", $data, 'UserAdData');
}else{
Cache::store('redis')->handler()->expire('user_ad_data:' . $this->_userId, 60);
UserAdDataModel::create([
'user_id' => $this->_userId,
'ad_total_ecpm' => $ecpm,
'ad_total_show' => 1,
'video_ad_total_ecpm' => $type == 7 ? $ecpm : 0,
'video_ad_total_show' => $type == 7 ? 1 : 0,
'day' => date('Y-m-d'),
'create_time' => time(),
]);
Cache::rm('user_ad_data:' . $this->_userId);
}
- 封装延迟方法
// 延迟队列
public static function pushLaterQueue($delay = 3, $job, $data, $queue)
{
$isPushed = Queue::later($delay, $job, $data, $queue);
if ($isPushed == false) {
Log::error($job . ':Error ' . $queue . ' MQ false');
}
}
- 队列文件UserAdData.php
<?php
namespace app\job;
use app\common\model\UserAdDataModel;
use think\Db;
use think\queue\Job;
class UserAd
{
/**
* fire方法是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
*/
public function fire(Job $job, $data)
{
print("<info>Start Time:" . date('Y-m-d H:i:s') . "</info>\n");
// 有些消息在到达消费者时,可能已经不再需要执行了
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if (!$isJobStillNeedToBeDone) {
$job->delete();
return;
}
//执行任务
$isJobDone = $this->doHelloJob($data);
if ($isJobDone) {
// 如果任务执行成功, 记得删除任务
$job->delete();
// print("<info>Hello Job has been done and deleted" . "</info>\n");
} else {
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
// print("<warn>Hello Job has been retried more than 3 times!" . "</warn>\n");
$job->delete();
// 也可以重新发布这个任务
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
}
}
print("<info>End Time:" . date('Y-m-d H:i:s') . "</info>\n");
}
/**
* 有些消息在到达消费者时,可能已经不再需要执行了
* @param array|mixed $data 发布任务时自定义的数据
* @return boolean 任务执行的结果
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data)
{
return true;
}
/**
* 根据消息中的数据进行实际的业务处理...
*/
private function doHelloJob($data)
{
/*
* $data = [
'user_id' => $this->_userId,
'day' => date('Y-m-d'),
'type' => $type,
'ecpm' => $ecpm,
];
* */
$ecpm = $data['ecpm'];
$update = [
'ad_total_ecpm' => Db::raw("ad_total_ecpm+{$ecpm}"),
'ad_total_show' => Db::raw("ad_total_show+1"),
];
if ($data['type'] == 7) {
$update['video_ad_total_ecpm'] = Db::raw("video_ad_total_ecpm+{$ecpm}");
$update['video_ad_total_show'] = Db::raw("video_ad_total_show+1");
}
$res = UserAdDataModel::where(['user_id' => $data['user_id'], 'day' => $data['day']])->update($update);
if($res){
return true;
}
}
}
这样前一个进程获取到redis锁,后面进程进来的数据无法持有锁,将要修改的数据插入队列,延迟三秒后,第一个进程的数据已经入口了,然后修改
标签:ad,队列,user,job,TP5,ecpm,total,data,延迟 来源: https://www.cnblogs.com/obeigong/p/16495363.html