Queue.php 2.1 KB
<?php

namespace Channel;

use Workerman\Connection\TcpConnection;

class Queue
{

    public $name = 'default';
    public $watcher = array();
    public $consumer = array();
    protected $queue = null;

    public function __construct($name)
    {
        $this->name = $name;
        $this->queue = new \SplQueue();
    }

    /**
     * @param TcpConnection $connection
     */
    public function addWatch($connection)
    {
    	if (!isset($this->watcher[$connection->id])) {
		    $this->watcher[$connection->id] = $connection;
		    $connection->watchs[] = $this->name;
	    }
    }

    /**
     * @param TcpConnection $connection
     */
    public function removeWatch($connection)
    {
        if (isset($connection->watchs) && in_array($this->name, $connection->watchs)) {
        	$idx = array_search($this->name, $connection->watchs);
            unset($connection->watchs[$idx]);
        }
        if (isset($this->watcher[$connection->id])) {
            unset($this->watcher[$connection->id]);
        }
        if (isset($this->consumer[$connection->id])) {
            unset($this->consumer[$connection->id]);
        }
    }

	/**
	 * @param TcpConnection $connection
	 */
    public function addConsumer($connection)
    {
    	if (isset($this->watcher[$connection->id]) && !isset($this->consumer[$connection->id])) {
    		$this->consumer[$connection->id] = $connection;
	    }
	    $this->dispatch();
    }

    public function enqueue($data)
    {
    	$this->queue->enqueue($data);
    	$this->dispatch();
    }

    private function dispatch()
    {
    	if ($this->queue->isEmpty() || count($this->consumer) == 0) {
    		return;
	    }

		while (!$this->queue->isEmpty()) {
    		$data = $this->queue->dequeue();
    		$idx = key($this->consumer);
    		$connection = $this->consumer[$idx];
    		unset($this->consumer[$idx]);
	        $connection->send(serialize(array('type'=>'queue', 'channel'=>$this->name, 'data' => $data)));
	        if (count($this->consumer) == 0) {
		        break;
	        }
		}
    }

    public function isEmpty()
    {
        return empty($this->watcher) && $this->queue->isEmpty();
    }

}