ChannelAdapter.php
3.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
<?php
namespace PHPSocketIO;
class ChannelAdapter extends DefaultAdapter
{
protected $_channelId = null;
public static $ip = '127.0.0.1';
public static $port = 2206;
public function __construct($nsp)
{
parent::__construct($nsp);
$this->_channelId = (function_exists('random_int') ? random_int(1, 10000000): rand(1, 10000000)) . "-" . (function_exists('posix_getpid') ? posix_getpid(): 1);
\Channel\Client::connect(self::$ip, self::$port);
\Channel\Client::$onMessage = array($this, 'onChannelMessage');
\Channel\Client::subscribe("socket.io#/#");
Debug::debug('ChannelAdapter __construct');
}
public function __destruct()
{
Debug::debug('ChannelAdapter __destruct');
}
public function add($id ,$room)
{
$this->sids[$id][$room] = true;
$this->rooms[$room][$id] = true;
$channel = "socket.io#/#$room#";
\Channel\Client::subscribe($channel);
}
public function del($id, $room)
{
unset($this->sids[$id][$room]);
unset($this->rooms[$room][$id]);
if(empty($this->rooms[$room]))
{
unset($this->rooms[$room]);
$channel = "socket.io#/#$room#";
\Channel\Client::unsubscribe($channel);
}
}
public function delAll($id)
{
$rooms = isset($this->sids[$id]) ? array_keys($this->sids[$id]) : array();
if($rooms)
{
foreach($rooms as $room)
{
if(isset($this->rooms[$room][$id]))
{
unset($this->rooms[$room][$id]);
$channel = "socket.io#/#$room#";
\Channel\Client::unsubscribe($channel);
}
if(isset($this->rooms[$room]) && empty($this->rooms[$room]))
{
unset($this->rooms[$room]);
}
}
}
unset($this->sids[$id]);
}
public function onChannelMessage($channel, $msg)
{
if($this->_channelId === array_shift($msg))
{
//echo "ignore same channel_id \n";
return;
}
$packet = $msg[0];
$opts = $msg[1];
if(!$packet)
{
echo "invalid channel:$channel packet \n";
return;
}
if(empty($packet['nsp']))
{
$packet['nsp'] = '/';
}
if($packet['nsp'] != $this->nsp->name)
{
echo "ignore different namespace {$packet['nsp']} != {$this->nsp->name}\n";
return;
}
$this->broadcast($packet, $opts, true);
}
public function broadcast($packet, $opts, $remote = false)
{
parent::broadcast($packet, $opts);
if (!$remote)
{
$packet['nsp'] = '/';
if(!empty($opts['rooms']))
{
foreach($opts['rooms'] as $room)
{
$chn = "socket.io#/#$room#";
$msg = array($this->_channelId, $packet, $opts);
\Channel\Client::publish($chn, $msg);
}
}
else
{
$chn = "socket.io#/#";
$msg = array($this->_channelId, $packet, $opts);
\Channel\Client::publish($chn, $msg);
}
}
}
}