// +----------------------------------------------------------------------
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\command;

use think\Config;
use think\console\Command;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\Hook;
use think\queue\Job;
use think\queue\Worker;
use Exception;
use Throwable;
use think\Cache;
use think\exception\Handle;
use think\exception\ThrowableError;

class Work extends Command

     * The queue worker instance.
     * @var \think\queue\Worker
    protected $worker;

    protected function initialize(Input $input, Output $output)
        $this->worker = new Worker();

    protected function configure()
            ->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on')
            ->addOption('daemon', null, Option::VALUE_NONE, 'Run the worker in daemon mode')
            ->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0)
            ->addOption('force', null, Option::VALUE_NONE, 'Force the worker to run even in maintenance mode')
            ->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128)
            ->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3)
            ->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0)
            ->setDescription('Process the next job on a queue');

     * Execute the console command.
     * @param Input  $input
     * @param Output $output
     * @return int|null|void
    public function execute(Input $input, Output $output)
        $queue = $input->getOption('queue');

        $delay = $input->getOption('delay');

        $memory = $input->getOption('memory');

        if ($input->getOption('daemon')) {
            Hook::listen('worker_daemon_start', $queue);
                $queue, $delay, $memory,
                $input->getOption('sleep'), $input->getOption('tries')
        } else {
            $response = $this->worker->pop($queue, $delay, $input->getOption('sleep'), $input->getOption('tries'));

    protected function output($response)
        if (!is_null($response['job'])) {
            /** @var Job $job */
            $job = $response['job'];
            if ($response['failed']) {
                $this->output->writeln('<error>Failed:</error> ' . $job->getName());
            } else {
                $this->output->writeln('<info>Processed:</info> ' . $job->getName());

     * 启动一个守护进程执行任务.
     * @param  string $queue
     * @param  int    $delay
     * @param  int    $memory
     * @param  int    $sleep
     * @param  int    $maxTries
     * @return array
    protected function daemon($queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0)
        $lastRestart = $this->getTimestampOfLastQueueRestart();

        while (true) {
                $queue, $delay, $sleep, $maxTries

            if ( $this->memoryExceeded($memory) ) {
                Hook::listen('worker_memory_exceeded', $queue);
            if ( $this->queueShouldRestart($lastRestart) ) {
                Hook::listen('worker_queue_restart', $queue);

     * 以守护进程的方式执行下个任务.
     * @param  string $queue
     * @param  int    $delay
     * @param  int    $sleep
     * @param  int    $maxTries
     * @return void
    protected function runNextJobForDaemon($queue, $delay, $sleep, $maxTries)
        try {
            $response = $this->worker->pop($queue, $delay, $sleep, $maxTries);

        } catch (Exception $e) {
        } catch (Throwable $e) {
            $this->getExceptionHandler()->report(new ThrowableError($e));

     * 获取上次重启守护进程的时间
     * @return int|null
    protected function getTimestampOfLastQueueRestart()
        return Cache::get('think:queue:restart');

     * 检查是否要重启守护进程
     * @param  int|null $lastRestart
     * @return bool
    protected function queueShouldRestart($lastRestart)
        return $this->getTimestampOfLastQueueRestart() != $lastRestart;

     * 检查内存是否超出
     * @param  int $memoryLimit
     * @return bool
    protected function memoryExceeded($memoryLimit)
        return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;

     * 获取异常处理实例
     * @return \think\exception\Handle
    protected function getExceptionHandler()
        static $handle;

        if (!$handle) {

            if ($class = Config::get('exception_handle')) {
                if (class_exists($class) && is_subclass_of($class, "\\think\\exception\\Handle")) {
                    $handle = new $class;
            if (!$handle) {
                $handle = new Handle();

        return $handle;

     * 停止执行任务的守护进程.
     * @return void
    public function stop()
