Client.php 9.9 KB
<?php
namespace Channel;

use Workerman\Connection\AsyncTcpConnection;
use Workerman\Lib\Timer;
use Workerman\Protocols\Frame;

/**
 * Channel/Client
 * @version 1.0.7
 */
class Client
{
    /**
     * onMessage.
     * @var callback
     */
    public static $onMessage = null;

    /**
     * onConnect
     * @var callback
     */
    public static $onConnect = null;

    /**
     * onClose
     * @var callback
     */
    public static $onClose = null;

    /**
     * Connction to channel server.
     * @var \Workerman\Connection\TcpConnection
     */
    protected static $_remoteConnection = null;

    /**
     * Channel server ip.
     * @var string
     */
    protected static $_remoteIp = null;

    /**
     * Channel server port.
     * @var int
     */
    protected static $_remotePort = null;

    /**
     * Reconnect timer.
     * @var Timer
     */
    protected static $_reconnectTimer = null;

    /**
     * Ping timer.
     * @var Timer
     */
    protected static $_pingTimer = null;

    /**
     * All event callback.
     * @var array
     */
    protected static $_events = array();

    /**
     * All queue callback.
     * @var callable
     */
    protected static $_queues = array();

    /**
     * @var bool
     */
    protected static $_isWorkermanEnv = true;

    /**
     * Ping interval.
     * @var int
     */
    public static $pingInterval = 25;

    /**
     * Connect to channel server
     * @param string $ip Channel server ip address or unix domain socket address
     * Ip like (TCP): 192.168.1.100
     * Unix domain socket like: unix:///tmp/workerman-channel.sock
     * @param int $port Port to connect when use tcp
     */
    public static function connect($ip = '127.0.0.1', $port = 2206)
    {
        if (self::$_remoteConnection) {
            return;
        }

        self::$_remoteIp = $ip;
        self::$_remotePort = $port;

        if (PHP_SAPI !== 'cli' || !class_exists('Workerman\Worker', false)) {
            self::$_isWorkermanEnv = false;
        }

        // For workerman environment.
        if (self::$_isWorkermanEnv) {
            if (strpos($ip, 'unix://') === false) {
                $conn = new AsyncTcpConnection('frame://' . self::$_remoteIp . ':' . self::$_remotePort);
            } else {
                $conn = new AsyncTcpConnection($ip);
                $conn->protocol = Frame::class;
            }

            $conn->onClose = [self::class, 'onRemoteClose'];
            $conn->onConnect = [self::class, 'onRemoteConnect'];
            $conn->onMessage = [self::class , 'onRemoteMessage'];
            $conn->connect();

            if (empty(self::$_pingTimer)) {
                self::$_pingTimer = Timer::add(self::$pingInterval, 'Channel\Client::ping');
            }
            // Not workerman environment.
        } else {
            $remote = strpos($ip, 'unix://') === false ? 'tcp://'.self::$_remoteIp.':'.self::$_remotePort : $ip;
            $conn = stream_socket_client($remote, $code, $message, 5);
            if (!$conn) {
                throw new \Exception($message);
            }
        }

        self::$_remoteConnection = $conn;
    }

    /**
     * onRemoteMessage.
     * @param \Workerman\Connection\TcpConnection $connection
     * @param string $data
     * @throws \Exception
     */
    public static function onRemoteMessage($connection, $data)
    {
        $data = unserialize($data);
        $type = $data['type'];
        $event = $data['channel'];
        $event_data = $data['data'];

        $callback = null;

        if ($type == 'event') {
	        if (!empty(self::$_events[$event])) {
		        call_user_func(self::$_events[$event], $event_data);
	        } elseif (!empty(Client::$onMessage)) {
		        call_user_func(Client::$onMessage, $event, $event_data);
	        } else {
		        throw new \Exception("event:$event have not callback");
	        }
        } else {
	        if (isset(self::$_queues[$event])) {
		        call_user_func(self::$_queues[$event], $event_data);
	        } else {
		        throw new \Exception("queue:$event have not callback");
	        }
        }
    }

    /**
     * Ping.
     * @return void
     */
    public static function ping()
    {
        if(self::$_remoteConnection)
        {
            self::$_remoteConnection->send('');
        }
    }

    /**
     * onRemoteClose.
     * @return void
     */
    public static function onRemoteClose()
    {
        echo "Waring channel connection closed and try to reconnect\n";
        self::$_remoteConnection = null;
        self::clearTimer();
        self::$_reconnectTimer = Timer::add(1, 'Channel\Client::connect', array(self::$_remoteIp, self::$_remotePort));
        if (self::$onClose) {
            call_user_func(Client::$onClose);
        }
    }

    /**
     * onRemoteConnect.
     * @return void
     */
    public static function onRemoteConnect()
    {
        $all_event_names = array_keys(self::$_events);
        if($all_event_names)
        {
            self::subscribe($all_event_names);
        }
        self::clearTimer();

        if (self::$onConnect) {
            call_user_func(Client::$onConnect);
        }
    }

    /**
     * clearTimer.
     * @return void
     */
    public static function clearTimer()
    {
        if (!self::$_isWorkermanEnv) {
            throw new \Exception('Channel\\Client not support clearTimer method when it is not in the workerman environment.');
        }
        if(self::$_reconnectTimer)
        {
            Timer::del(self::$_reconnectTimer);
            self::$_reconnectTimer = null;
        }
    }

    /**
     * On.
     * @param string $event
     * @param callback $callback
     * @throws \Exception
     */
    public static function on($event, $callback)
    {
        if (!is_callable($callback)) {
            throw new \Exception('callback is not callable for event.');
        }
        self::$_events[$event] = $callback;
        self::subscribe($event);
    }

    /**
     * Subscribe.
     * @param string $events
     * @return void
     */
    public static function subscribe($events)
    {
        $events = (array)$events;
        self::send(array('type' => 'subscribe', 'channels'=>$events));
        foreach ($events as $event) {
            if(!isset(self::$_events[$event])) {
                self::$_events[$event] = null;
            }
        }
    }

    /**
     * Unsubscribe.
     * @param string $events
     * @return void
     */
    public static function unsubscribe($events)
    {
        $events = (array)$events;
        self::send(array('type' => 'unsubscribe', 'channels'=>$events));
        foreach($events as $event) {
            unset(self::$_events[$event]);
        }
    }

    /**
     * Publish.
     * @param string $events
     * @param mixed $data
     */
    public static function publish($events, $data)
    {
        self::sendAnyway(array('type' => 'publish', 'channels' => (array)$events, 'data' => $data));
    }

    /**
     * Watch a channel of queue
     * @param string|array $channels
     * @param callable $callback
     * @param boolean $autoReserve Auto reserve after callback finished.
     * But sometime you may don't want reserve immediately, or in some asynchronous job,
     * you want reserve in finished callback, so you should set $autoReserve to false
     * and call Client::reserve() after watch() and in finish callback manually.
     * @throws \Exception
     */
    public static function watch($channels, $callback, $autoReserve=true)
    {
        if (!is_callable($callback)) {
            throw new \Exception('callback is not callable for watch.');
        }

        if ($autoReserve) {
        	$callback = static function($data) use ($callback) {
		        try {
			        call_user_func($callback, $data);
		        } catch (\Exception $e) {
			        throw $e;
		        } catch (\Error $e) {
			        throw $e;
		        } finally {
			        self::reserve();
		        }
	        };
        }

	    $channels = (array)$channels;
        self::send(array('type' => 'watch', 'channels'=>$channels));

        foreach ($channels as $channel) {
        	self::$_queues[$channel] = $callback;
        }

        if ($autoReserve) {
	        self::reserve();
        }
    }

    /**
     * Unwatch a channel of queue
     * @param string $channel
     * @throws \Exception
     */
    public static function unwatch($channels)
    {
	    $channels = (array)$channels;
        self::send(array('type' => 'unwatch', 'channels'=>$channels));
        foreach ($channels as $channel) {
	        if (isset(self::$_queues[$channel])) {
		        unset(self::$_queues[$channel]);
	        }
        }
    }

	/**
	 * Put data to queue
	 * @param string|array $channels
	 * @param mixed $data
	 * @throws \Exception
	 */
    public static function enqueue($channels, $data)
    {
        self::sendAnyway(array('type' => 'enqueue', 'channels' => (array)$channels, 'data' => $data));
    }

	/**
	 * Start reserve queue manual
	 * @throws \Exception
	 */
    public static function reserve()
    {
	    self::send(array('type' => 'reserve'));
    }

    /**
     * Send through workerman environment
     * @param $data
     * @throws \Exception
     */
    protected static function send($data)
    {
        if (!self::$_isWorkermanEnv) {
            throw new \Exception("Channel\\Client not support {$data['type']} method when it is not in the workerman environment.");
        }
        self::connect(self::$_remoteIp, self::$_remotePort);
        self::$_remoteConnection->send(serialize($data));
    }

    /**
     * Send from any environment
     * @param $data
     * @throws \Exception
     */
    protected static function sendAnyway($data)
    {
        self::connect(self::$_remoteIp, self::$_remotePort);
        $body = serialize($data);
        if (self::$_isWorkermanEnv) {
            self::$_remoteConnection->send($body);
        } else {
            $buffer = pack('N', 4+strlen($body)) . $body;
            fwrite(self::$_remoteConnection, $buffer);
        }
    }

}