Register.php 5.6 KB
<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */
namespace GatewayWorker;

use Workerman\Worker;
use Workerman\Lib\Timer;

/**
 *
 * 注册中心,用于注册 Gateway 和 BusinessWorker
 *
 * @author walkor<walkor@workerman.net>
 *
 */
class Register extends Worker
{
    /**
     * {@inheritdoc}
     */
    public $name = 'Register';

    /**
     * {@inheritdoc}
     */
    public $reloadable = false;
    
    /**
     * 秘钥
     * @var string
     */
    public $secretKey = '';

    /**
     * 所有 gateway 的连接
     *
     * @var array
     */
    protected $_gatewayConnections = array();

    /**
     * 所有 worker 的连接
     *
     * @var array
     */
    protected $_workerConnections = array();

    /**
     * 进程启动时间
     *
     * @var int
     */
    protected $_startTime = 0;

    /**
     * {@inheritdoc}
     */
    public function run()
    {
        // 设置 onMessage 连接回调
        $this->onConnect = array($this, 'onConnect');

        // 设置 onMessage 回调
        $this->onMessage = array($this, 'onMessage');

        // 设置 onClose 回调
        $this->onClose = array($this, 'onClose');

        // 记录进程启动的时间
        $this->_startTime = time();
        
        // 强制使用text协议
        $this->protocol = '\Workerman\Protocols\Text';
        
        // 运行父方法
        parent::run();
    }

    /**
     * 设置个定时器,将未及时发送验证的连接关闭
     *
     * @param \Workerman\Connection\ConnectionInterface $connection
     * @return void
     */
    public function onConnect($connection)
    {
        $connection->timeout_timerid = Timer::add(10, function () use ($connection) {
            Worker::log("Register auth timeout (".$connection->getRemoteIp()."). See http://wiki.workerman.net/Error4 for detail");
            $connection->close();
        }, null, false);
    }

    /**
     * 设置消息回调
     *
     * @param \Workerman\Connection\ConnectionInterface $connection
     * @param string                                    $buffer
     * @return void
     */
    public function onMessage($connection, $buffer)
    {
        // 删除定时器
        Timer::del($connection->timeout_timerid);
        $data       = @json_decode($buffer, true);
        if (empty($data['event'])) {
            $error = "Bad request for Register service. Request info(IP:".$connection->getRemoteIp().", Request Buffer:$buffer). See http://wiki.workerman.net/Error4 for detail";
            Worker::log($error);
            return $connection->close($error);
        }
        $event      = $data['event'];
        $secret_key = isset($data['secret_key']) ? $data['secret_key'] : '';
        // 开始验证
        switch ($event) {
            // 是 gateway 连接
            case 'gateway_connect':
                if (empty($data['address'])) {
                    echo "address not found\n";
                    return $connection->close();
                }
                if ($secret_key !== $this->secretKey) {
                    Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
                    return $connection->close();
                }
                $this->_gatewayConnections[$connection->id] = $data['address'];
                $this->broadcastAddresses();
                break;
            // 是 worker 连接
            case 'worker_connect':
                if ($secret_key !== $this->secretKey) {
                    Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
                    return $connection->close();
                }
                $this->_workerConnections[$connection->id] = $connection;
                $this->broadcastAddresses($connection);
                break;
            case 'ping':
                break;
            default:
                Worker::log("Register unknown event:$event IP: ".$connection->getRemoteIp()." Buffer:$buffer. See http://wiki.workerman.net/Error4 for detail");
                $connection->close();
        }
    }

    /**
     * 连接关闭时
     *
     * @param \Workerman\Connection\ConnectionInterface $connection
     */
    public function onClose($connection)
    {
        if (isset($this->_gatewayConnections[$connection->id])) {
            unset($this->_gatewayConnections[$connection->id]);
            $this->broadcastAddresses();
        }
        if (isset($this->_workerConnections[$connection->id])) {
            unset($this->_workerConnections[$connection->id]);
        }
    }

    /**
     * 向 BusinessWorker 广播 gateway 内部通讯地址
     *
     * @param \Workerman\Connection\ConnectionInterface $connection
     */
    public function broadcastAddresses($connection = null)
    {
        $data   = array(
            'event'     => 'broadcast_addresses',
            'addresses' => array_unique(array_values($this->_gatewayConnections)),
        );
        $buffer = json_encode($data);
        if ($connection) {
            $connection->send($buffer);
            return;
        }
        foreach ($this->_workerConnections as $con) {
            $con->send($buffer);
        }
    }
}