其他分享
首页 > 其他分享> > TP5延迟队列

TP5延迟队列

作者:互联网

今天在项目中遇到并发插入的问题。

实际场景

计算用户当当天的累计广告收益,这时候出现了并发问题,一个用户当天出现多条记录。
这时候该如何操作呢?加锁?加缓存?

分析

用户上报的广告数据都是真实有用的,出现多条数据的原因是同一时间用户上报两天记录,这时候我们需要先让一条写入库,后面的数据是修改该条记录

解决

使用延迟队列

   // 延迟队列
    $is_lock = Cache::store('redis')->handler()->setnx('user_ad_data:' . $this->_userId, 1);
    if($is_lock == 0){
        $data = [
            'user_id'             => $this->_userId,
            'day'                 => date('Y-m-d'),
            'type'                => $type,
            'ecpm'                => $ecpm,
        ];
        QueueService::pushLaterQueue(3,"app\job\UserAdData", $data, 'UserAdData');
    }else{
        Cache::store('redis')->handler()->expire('user_ad_data:' . $this->_userId, 60);
        UserAdDataModel::create([
            'user_id'             => $this->_userId,
            'ad_total_ecpm'       => $ecpm,
            'ad_total_show'       => 1,
            'video_ad_total_ecpm' => $type == 7 ? $ecpm : 0,
            'video_ad_total_show' => $type == 7 ? 1 : 0,
            'day'                 => date('Y-m-d'),
            'create_time'         => time(),
        ]);
        Cache::rm('user_ad_data:' . $this->_userId);
    }
  // 延迟队列
    public static function pushLaterQueue($delay = 3, $job, $data, $queue)
    {
        $isPushed = Queue::later($delay, $job, $data, $queue);
        if ($isPushed == false) {
            Log::error($job . ':Error ' . $queue . ' MQ false');
        }
    }
<?php

namespace app\job;

use app\common\model\UserAdDataModel;
use think\Db;
use think\queue\Job;

class UserAd
{
    /**
     * fire方法是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
     */
    public function fire(Job $job, $data)
    {
        print("<info>Start Time:" . date('Y-m-d H:i:s') . "</info>\n");

        // 有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if (!$isJobStillNeedToBeDone) {
            $job->delete();
            return;
        }

        //执行任务
        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            // 如果任务执行成功, 记得删除任务
            $job->delete();
//            print("<info>Hello Job has been done and deleted" . "</info>\n");
        } else {
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
//                print("<warn>Hello Job has been retried more than 3 times!" . "</warn>\n");
                $job->delete();
                // 也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
        print("<info>End Time:" . date('Y-m-d H:i:s') . "</info>\n");
    }

    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param array|mixed $data 发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data)
    {
        return true;
    }

    /**
     * 根据消息中的数据进行实际的业务处理...
     */
    private function doHelloJob($data)
    {
        /*
         *  $data = [
                    'user_id'             => $this->_userId,
                    'day'                 => date('Y-m-d'),
                    'type'                => $type,
                    'ecpm'                => $ecpm,
                ];
         * */

        $ecpm = $data['ecpm'];

        $update = [
            'ad_total_ecpm' => Db::raw("ad_total_ecpm+{$ecpm}"),
            'ad_total_show' => Db::raw("ad_total_show+1"),
        ];
        if ($data['type'] == 7) {
            $update['video_ad_total_ecpm'] = Db::raw("video_ad_total_ecpm+{$ecpm}");
            $update['video_ad_total_show'] = Db::raw("video_ad_total_show+1");
        }

        $res = UserAdDataModel::where(['user_id' => $data['user_id'], 'day' => $data['day']])->update($update);
        if($res){
            return true;
        }
    }
}

这样前一个进程获取到redis锁,后面进程进来的数据无法持有锁,将要修改的数据插入队列,延迟三秒后,第一个进程的数据已经入口了,然后修改

标签:ad,队列,user,job,TP5,ecpm,total,data,延迟
来源: https://www.cnblogs.com/obeigong/p/16495363.html