You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

Manager.php 7.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. <?php
  2. namespace app\webscoket;
  3. use think\Config;
  4. use crmeb\services\CacheService;
  5. use Swoole\Websocket\Frame;
  6. use think\Event;
  7. use think\response\Json;
  8. use think\swoole\Websocket;
  9. use think\swoole\websocket\Room;
  10. use app\webscoket\Room as NowRoom;
  11. use think\swoole\websocket\socketio\Handler;
  12. /**
  13. * Class Manager
  14. * @package app\webscoket
  15. */
  16. class Manager extends Handler
  17. {
  18. /**
  19. * @var
  20. */
  21. protected $manager;
  22. /**
  23. * @var int
  24. */
  25. protected $cache_timeout;
  26. /**
  27. * @var Response
  28. */
  29. protected $response;
  30. /**
  31. * @var \Redis
  32. */
  33. protected $cache;
  34. /**
  35. * @var NowRoom
  36. */
  37. protected $nowRoom;
  38. const USER_TYPE = ['admin', 'user', 'kefu', 'store'];
  39. const KEFU_TYPE_NUM = 2;
  40. const USER_TYPE_NUM = 1;
  41. /**
  42. * Manager constructor.
  43. * @param Websocket $websocket
  44. * @param Config $config
  45. * @param Room $room
  46. * @param Event $event
  47. * @param Response $response
  48. * @param \app\webscoket\Room $nowRoom
  49. */
  50. public function __construct(Event $event, Config $config, Websocket $websocket, Response $response, NowRoom $nowRoom)
  51. {
  52. parent::__construct($event, $config, $websocket);
  53. $this->response = $response;
  54. $this->nowRoom = $nowRoom;
  55. $this->cache = CacheService::redisHandler();
  56. $this->nowRoom->setCache($this->cache);
  57. $this->cache_timeout = intval(app()->config->get('swoole.websocket.ping_timeout', 60000) / 1000) + 2;
  58. }
  59. /**
  60. * @param \think\Request $request
  61. * @return bool|void
  62. */
  63. public function onOpen(\think\Request $request)
  64. {
  65. $fd = $this->websocket->getSender();
  66. $type = $request->get('type');
  67. $token = $request->get('token');
  68. $touristUid = $request->get('tourist_uid', '');
  69. $tourist = !!$touristUid;
  70. if (!$token || !in_array($type, self::USER_TYPE)) {
  71. return $this->websocket->close();
  72. }
  73. // 只有用户模式下才能使用游客模式
  74. if ($type !== self::USER_TYPE[1] && $tourist) {
  75. return $this->websocket->close();
  76. }
  77. $types = self::USER_TYPE;
  78. $this->nowRoom->type(array_flip($types)[$type]);
  79. try {
  80. $data = $this->exec($type, 'login', [$fd, $request->get('form_type', null), ['token' => $token, 'tourist' => $tourist], $this->response])->getData();
  81. } catch (\Throwable $e) {
  82. return $this->websocket->close();
  83. }
  84. if ($tourist) {
  85. $data['status'] = 200;
  86. $data['data']['uid'] = $touristUid;
  87. }
  88. if ($data['status'] != 200 || !($data['data']['uid'] ?? null)) {
  89. return $this->websocket->close();
  90. }
  91. $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
  92. $uid = $data['data']['uid'];
  93. $type = array_search($type, self::USER_TYPE);
  94. $this->login($type, $uid, $fd);
  95. $this->nowRoom->add((string)$fd, $uid, 0, $tourist ? 1 : 0);
  96. $this->send($fd, $this->response->message('ping', ['now' => time()]));
  97. return $this->send($fd, $this->response->success());
  98. }
  99. public function login($type, $uid, $fd)
  100. {
  101. $key = '_ws_' . $type;
  102. $this->cache->sadd($key, $fd);
  103. $this->cache->sadd($key . $uid, $fd);
  104. $this->refresh($type, $uid);
  105. }
  106. public function refresh($type, $uid)
  107. {
  108. $key = '_ws_' . $type;
  109. $this->cache->expire($key, 1800);
  110. $this->cache->expire($key . $uid, 1800);
  111. }
  112. public function logout($type, $uid, $fd)
  113. {
  114. $key = '_ws_' . $type;
  115. $this->cache->srem($key, $fd);
  116. $this->cache->srem($key . $uid, $fd);
  117. }
  118. /**
  119. * 获取当前用户所有的fd
  120. * @param $type
  121. * @param string $uid
  122. * @return array
  123. */
  124. public static function userFd($type, $uid = '')
  125. {
  126. $key = '_ws_' . $type . $uid;
  127. return CacheService::redisHandler()->smembers($key) ?: [];
  128. }
  129. /**
  130. * 执行事件调度
  131. * @param $type
  132. * @param $method
  133. * @param $result
  134. * @return null|Json
  135. */
  136. protected function exec($type, $method, $result)
  137. {
  138. if (!in_array($type, self::USER_TYPE)) {
  139. return null;
  140. }
  141. if (!is_array($result)) {
  142. return null;
  143. }
  144. /** @var Json $response */
  145. return $this->event->until('swoole.websocket.' . $type, [$method, $result, $this, $this->nowRoom]);
  146. }
  147. /**
  148. * @param Frame $frame
  149. * @return bool
  150. */
  151. public function onMessage(Frame $frame)
  152. {
  153. $fd = $this->websocket->getSender();
  154. $info = $this->nowRoom->get($fd);
  155. $result = json_decode($frame->data, true) ?: [];
  156. if (!isset($result['type']) || !$result['type']) return true;
  157. $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
  158. $this->refresh($info['type'], $info['uid']);
  159. if ($result['type'] == 'ping') {
  160. return $this->send($fd, $this->response->message('ping', ['now' => time()]));
  161. }
  162. $data = $result['data'] ?? [];
  163. $frame->uid = $info['uid'];
  164. /** @var Response $res */
  165. $res = $this->exec(self::USER_TYPE[$info['type']], $result['type'], [$fd, $result['form_type'] ?? null, $data, $this->response]);
  166. if ($res) return $this->send($fd, $res);
  167. return true;
  168. }
  169. /**
  170. * @param int $type
  171. * @param int $userId
  172. * @param int $toUserId
  173. * @param string $field
  174. */
  175. public function updateTabelField(int $type, int $userId, int $toUserId, string $field = 'to_uid')
  176. {
  177. $fds = self::userFd($type, $userId);
  178. foreach ($fds as $fd) {
  179. $this->nowRoom->update($fd, $field, $toUserId);
  180. }
  181. }
  182. /**
  183. * 发送文本响应
  184. * @param $fd
  185. * @param Json $json
  186. * @return bool
  187. */
  188. public function send($fd, \think\response\Json $json)
  189. {
  190. return $this->pushing($fd, $json->getData());
  191. }
  192. /**
  193. * 发送
  194. * @param $data
  195. * @return bool
  196. */
  197. public function pushing($fds, $data, $exclude = null)
  198. {
  199. if ($data instanceof \think\response\Json) {
  200. $data = $data->getData();
  201. }
  202. $data = is_array($data) ? json_encode($data) : $data;
  203. $fds = is_array($fds) ? $fds : [$fds];
  204. foreach ($fds as $fd) {
  205. if (!$fd) {
  206. continue;
  207. }
  208. if ($exclude && is_array($exclude) && !in_array($fd, $exclude)) {
  209. continue;
  210. } elseif ($exclude && $exclude == $fd) {
  211. continue;
  212. }
  213. $this->websocket->to($fd)->push($data);
  214. }
  215. return true;
  216. }
  217. /**
  218. * 关闭连接
  219. */
  220. public function onClose()
  221. {
  222. $fd = $this->websocket->getSender();
  223. $tabfd = (string)$fd;
  224. if ($this->nowRoom->exist($fd)) {
  225. $data = $this->nowRoom->get($tabfd);
  226. $this->logout($data['type'], $data['uid'], $fd);
  227. $this->nowRoom->type($data['type'])->del($tabfd);
  228. $this->exec(self::USER_TYPE[$data['type']], 'close', [$fd, null, ['data' => $data], $this->response]);
  229. }
  230. parent::onClose();
  231. }
  232. }