欢迎您, 来到 宁时修博客.^_^

Mysql大数据分段处理实战

2019/07/02 林木立 php,MySQL 536
Mysql大数据分段处理实战 -- 作者:宁时修

一、业务需求

    可能在某些时候,你会有这样的需求:在某一分钟内需要处理大量的数据并且操作数据库的增删改。

    如果假设你的业务需要在同一分钟内,同时处理10W用户单日的用户收益,这个收益需要做到收益明细的新增,以及用户金额的更新。

    那么你会怎么做!!!

    首先排除一次性操作的可能,即便你所用的服务可以承受这样的数据量,性能也是相当差劲的。如果服务硬件条件不够可能直接导致的是mysql崩溃,数据丢失。

    那么这个时候,分批处理就好了,10W数据,我可以分批处理,假设一次1W,10次就可以全部完成,这样MySQL的压力会小很多。



二、案例展示

    该案例代码中用到了以下扩展:

    1.redis扩展:使用redis list队列类型, 这里采取的左进右出的原则。

    2.使用了tp5的扩展Hook行为:用于监听动作,执行动作处理。

    3.在大并发的情况需要利用redis锁的机制:确保进程唯一,只能等待进程处理完毕后下一个进程才能进来, redis setnx 可以了解下。


    TP框架 Hook行为的官方介绍:

    快速通道:https://www.kancloud.cn/manual/thinkphp5/118130

    行为就是你应用执行过程中的一个动作或者处理。


    友情提示:

    1.统一监听方法

Hook::listen('send');


    2.如果是使用闭包这种方式来执行监听的,必须将执行代码放入到初始化方法中,否则无法执行该动作。

Hook::add('send',function($params){
    echo 'Hello,world!';
});




                                 timg.jpg

    代码实现,代码以Tp5.0.*为例:

<?php
namespace app\home\controller;
use think\Db;
use think\Cache;
use think\Hook;

class Test
{
    // 分片处理初始值
    protected $initLimit = 10000;

    // 锁的key
    protected $locker;

  // 初始化方法
  public function __construct()
    {
      $redisHandler = $this->getRedis();
        Hook::add('send_failure', function ($popPage, $sendRedisKey) use ($redisHandler) {
            $redisHandler->rPush($sendRedisKey,json_encode($popPage));
        });
        Hook::add('send_exception', function ($popPage, $sendRedisKey) use ($redisHandler) {
            $redisHandler->rPush($sendRedisKey,json_encode($popPage));
        });
    }

    /**
     * MySQL 表
     * user - 用户表 假设10W  用户金额
     * user_income_log - 用户收益明细 用于写入计算后的收益记录
     */

    /**
     * 用户收益计算+分批处理
     * @return bool
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\ModelNotFoundException
     * @throws \think\exception\DbException
     */
    public function handle()
    {
        // 获取用户的数据总数 假设10W
        $total = Db::name('user')
                ->count();
        // 锁的机制 确保进程唯一
        $lock = 'send_lock';
        $isLock = $this->lock($lock,60);
        // 最大错误循环次数 加一个避免死循环导致死循环
        $maxWhileNum = 10;
        $WhileNum = 0;
        try{
          if ($isLock) {
            $this->locker = $lock;
          } else {
            // 等待进程处理完毕
            $msg = '脚本运行中';
            return $msg;
          }
          // 分批处理
          if ($total > 0) {
              // 获取分批次数
              $diff = ceil($total / $this->initLimit);
              // 初始化redis 并且清除该key
              $redisHandler = $this->getRedis();
              $sendRedisKey = 'user_list';
              $redisHandler->delete($sendRedisKey);
              // 此处你也可以监听一下此次操作的时间,避免一分钟操作2次的情况,我这里就不贴代码了。
              // 循环次数,并且将需要的数据压入到redis队列中。
              // 这里采取的左进右出的原则。
              for ($i=1; $i <= $diff; $i++)
              {
                  $n = $i -1;
                  $pages = ['start' => $n * $this->initLimit, 'limit' => $this->initLimit];
                  $redisHandler->lPush($sendRedisKey,json_encode($pages));
              }
              // 使用while 将此次进程挂起,直到处理完毕。
              while (true)
              {
                if ($WhileNum > $maxWhileNum) {
                // 超过了最大错误次数跳出循环
                // 此处你也可以记录一下错误原因,方便后续排查问题
            break;
                }
                  $len = $redisHandler->lLen($sendRedisKey);
                  if ($len > 0)
                  {
                      // 队列存在,执行处理,如发生异常或者mysql短暂失败,会将此次处理在重新push回去。
                      // 借助Hook来完成
                      $bool = $this->send(json_decode($redisHandler->rPop($sendRedisKey),true), $sendRedisKey);
                      if (!$bool) $WhileNum++;
                  } else {
                      // 处理完毕后,跳出循环。
                      break;
                  }
              }
              // 处理成功完毕后 记得释放锁
              $this->unlock($lock);
          }
        } catch (\Exception $e) {
          // 有异常的情况也要将锁释放掉,虽然锁上面加了时间概念已经避免的死锁的情况。
          $this->unlock($lock);
        }
        return true;
    }

    /**
     * 处理结果集
     * @param array $limit
     * @param null $sendRedisKey
     * @return bool
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\ModelNotFoundException
     * @throws \think\exception\DbException
     */
    protected function send($limit = [], $sendRedisKey = null)
    {
        // 获取用户数据 金额
        $userData = Db::name('user')
                    ->limit($limit['start'],$limit['limit'])
                    ->order('uid','asc')
                    ->field('money,uid')
                    ->select();

        if ($userData) {
            // 计算收益,初始化需要操作数据。
            $result = [];
            foreach ($userData as $k => $v)
            {
                $amount = '1314.88';//假设收益计算的值,案例需要而已。
                if ($amount > 0) {
                    // 假设下面字段是收益明细表需要的数据。
                    $result[] = [
                        'uid' => $v['uid'],
                        'amount' => $amount,
                        'send_time' => date('Y-m-d H:i:s')
                    ];
                }
            }
            // 执行数据写入|更新
            if ($result) {
                try{
                    $sendTotal = count($result);
                    // 启动事务
                    Db::startTrans();
                    $isSuccess = true;
                    $updateTotal = 0;
                    // 在性能测试下,循环里使用insert 和 在外部使用一次性插入使用insertAll 对比下, insertAll的性能远远超过循环里使用insert。
                    $insertToatl = Db::name('user_income_log')->insertAll($result);
                    foreach ($result as $data)
                    {
                        // 更新用户金额
                        $offer = Db::name('user')->where(['uid' => $data['uid']])->setInc('money',$data['amount']);
                        if ($offer) {
                            $updateTotal++;
                        } else {
                            $isSuccess = false;
                            break;
                        }
                    }
                    if ($isSuccess && $insertToatl == $sendTotal && $updateTotal == $sendTotal){
                        Db::commit();
                        // 执行监听,事务一次commit 1W,要么全部成功,要么全部失败。
                        Hook::listen('send_success');
                    }else{
                        Db::rollback();
                        // 回滚,你得需要记录错误日志,方便后续遇到问题查看。
                        // 执行监听,回滚之后需要将此次操作的数据集push回去,让他后续再执行。
                        // 之前采用左进右出的原则,那么push回去,自然就是从右边进了。
                        // 让他保留最后执行,先让其他兄弟先执行完毕后再执行后续push进去的数据。
                        if ($sendRedisKey) Hook::listen('send_failure',$limit, $sendRedisKey);
                        return false;
                    }
                } catch (\Exception $e) {
                    // 异常也进行监听,记录日志,也需要重新把当前处理的redis队列值,再重新push回去。
                    //$errorMsg = '程序异常,错误发生行:(' . $e->getLine() .')- 错误原因:' . $e->getMessage();
                    Hook::listen('send_exception',$limit,$sendRedisKey);
                    return false;
                }
            }
        }
        // 处理成功,返回true
        return true;
    }

    /**
     * 获取redis 原生操作对象
     * [getRedis description]
     * @return [type] [description]
     */
    protected function getRedis()
    {
      $options = [
          // 缓存类型
          'type'  =>  'redis',
          // 服务器地址
          'host'  => '127.0.0.1'
    ];
        $cache = Cache::init($options);
    // 获取缓存对象句柄
    return $cache->handler();
    }

    /**
     * 获取锁
     * @param  String  $key    锁标识
     * @param  Int     $expire 锁过期时间
     * @return Boolean
     */
    public function lock($key, $expire=5){
        $redisHandler = $this->getRedis();
        $is_lock = $redisHandler->setnx($key, time()+$expire);
        // 不能获取锁
        if(!$is_lock){
            // 判断锁是否过期
            $lock_time = $redisHandler->get($key);
            // 锁已过期,删除锁,重新获取,避免死锁
            if(time()>$lock_time){
                $this->unlock($key);
                $is_lock = $redisHandler->setnx($key, time()+$expire);
            }
        }
        return $is_lock ? true : false;
    }

    /**
     * 释放锁
     * @param  String  $key 锁标识
     * @return Boolean
     */
    public function unlock($key){
        return $this->getRedis()->del($key);
    }

    // 借助魔术方法来释放锁,每次处理完毕后会自动释放,确保完全释放
    public function __destruct()
    {
        if ($this->locker){
            $this->unlock($this->locker);
        }
    }
}




点赞
说说你的看法

所有评论: (0)

# 加入组织

1、用手机QQ扫左侧二维码

2、搜Q群:1058582137

3、点击 宁时修博客交流群