workerman-mqtt整合gateway使用

workerman\mqtt

官网:http://doc.workerman.net/components/workemran-mqtt.html#unsubscribe


在gateway的项目中composer worker-mqtt。直接上代码。

<?php


/**
 * 用于检测业务代码死循环或者长时间阻塞等问题
 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
 * 然后观察一段时间workerman.log看是否有process_timeout异常
 */
//declare(ticks=1);

use \GatewayWorker\Lib\Gateway;
use \Workerman\Autoloader; 

/**
 * 主逻辑
 * 主要是处理 onConnect onMessage onClose 三个方法
 * onConnect 和 onClose 如果不需要可以不用实现并删除
 */
class Events
{

    public static function onWorkerStart($businessWorker)
    {  
       
       global $mem;
       global $mqtt;
       $mem = new memcached;
       // $mem->addServer("121.42.47.32",1570);
       $mem->addServer("127.0.0.1",1560);
       
           // mqtt实例配置
      $config = array(
                      'keepalive'=>50,                //  默认50秒,设置成0代表禁用
                      'client_id'=>'gatewaycliId_001',     //  客户端id,如果没设置默认是 "workerman-mqtt-client-".mt_rand()
                      'protocol_name'=>'MQTT',        //  协议名,MQTT(3.1.1版本)或者 MQIsdp(3.1版本),默认是MQTT
                      'protocol_level'=>'4',          //  协议等级, protocol_name是MQTT 时值为4 ,protocol_name是MQIsdp 时值是 3
                      'clean_session'=>false,         //  清理会话,默认为true。设置为false可以接收到QoS 1和QoS 2级别的离线消息
                      'reconnect_period'=>0,          //  重连时间间隔,默认 1 秒,0代表不重连
                      'connect_timeout'=>30,          //  连接mqtt超时时间,默认30 秒
                      'username'=>'kangge',           //  用户名,可选
                      'password'=>'123456',           //  密码,可选
                      'will'=>array(                  //  遗嘱消息,当客户端断线后Broker会自动发送遗嘱消息给其它客户端. 格式为:
                              'topic'=>'cesi',                //  主题
                              'content'=>'中文测试',          //  内容
                              'qos'=>1,                       //  QoS 等级
                              'retain'=>'false'               //  retain标记
                      ),                
                      'resubscribe'=>true,              //  当连接异常断开并重连后,是否重新订阅之前的主题,默认为true
                      'bindto'=>'',                     //  用来指定本地以哪个ip和端口向Broker发起连接,默认值为''
                      'ssl'=>false,                     //  ssl选项,默认是 false,如果设置为true,则以ssl方式连接。
                      'debug'=>true                     //  是否开启debug模式,debug模式可以输出与Broker通讯的详细信息,默认为false
                      );



      $mqtt = new Workerman\Mqtt\Client('mqtt://hechy.cn:1883',$config);
      // $mqtt->onConnect = function($mqtt) {
      //     // $mqtt->subscribe('kang test');
      //     // $mqtt->subscribe('kang test1');
      //     // $mqtt->subscribe('kang test2');
      //     $mqtt->subscribe(array('topic1'=>0,'topic2'=>0));
      // };
      // $mqtt->onMessage = function($topic, $content){
      //     var_dump($topic, $content);
      // };
      // $mqtt->connect();

    }
    
    * 当客户端发来消息时触发
    * @param int $client_id 连接id
    * @param mixed $message 具体消息
    */
   public static function onMessage($client_id, $message) {
        // 向所有人发送 
        global $mem;
        global $mqtt;

        self::outlog($client_id,$message,"allmessage.log");
       
       // Gateway::sendToClient($client_id,"登录成功");
        $msg = json_decode($message,true);

        // $place_id = $msg['roomid'];

        // $_SESSION[$place_id] = $client_id;



        if (!empty($msg)) {
          $type = $msg['type'];
          switch ($type) {
            case 'login':

// self::outlog($client_id,json_encode($_SESSION),"msglog.log");

              $place = $msg['roomid'];
              $res = $mem -> set('place_'.$place,$client_id);

              $_SESSION[$place] = $client_id;

              // if ($res) {
                Gateway::sendToClient($client_id,"{$place}登录成功");
              // }

              break;

            case 'getopenid':

              // return "$callback({\"result\":\"500\"})";
            // Gateway::sendToClient($client_id,"300");
            // break;

              $openid = $msg['openid'];
              $place = $msg['roomid'];
              $placesocket = $mem->get('place_'.$place);

              if ($placesocket) {
                $todata['type'] = 'getopnid';
                $todata['openid'] = $openid;

                Gateway::sendToClient($placesocket,json_encode($todata));
                Gateway::sendToClient($client_id,"200");
              }else{
                Gateway::sendToClient($client_id,"300");
              }

              
              break;


              case 'barragemsg':
              $openid = $msg['openid'];
              $place = $msg['roomid'];
              $placesocket = $mem->get('place_'.$place);

              if ($placesocket) {
                $todata['type'] = 'barragemsg';
                $todata['openid'] = $openid;

                Gateway::sendToClient($placesocket,json_encode($todata));
                Gateway::sendToClient($client_id,"200");
              }else{
                Gateway::sendToClient($client_id,"300");
              }

              
              break;


              case 'mqtt':
              
                  $mqtt->onConnect = function($mqtt) {
                     $mqtt->publish('topic2', 'hello GatewayWorker mqtt');
                  };
                  $mqtt->connect();
              break;
          }

        }



        // Gateway::sendToAll("$client_id said $message");
   }
   
   /**
    * 当用户断开连接时触发
    * @param int $client_id 连接id
    */
   public static function onClose($client_id) {
       
      global $mem;

      // if (is_array($_SESSION)) {
      //   $placeid = array_search($client_id,$_SESSION);
      // }else{
      //   $placeid = array_search($client_id,$_SESSION);
      // }
      $placeid='';

      if (is_array($_SESSION)) {
        foreach ($_SESSION as $key => $val) {
          if ($val == $client_id) {
            $placeid = $key;
          }
        }
      }

      
      
      if ($placeid) {
        $mem->delete('place_'.$placeid);

        unset($_SESSION[$placeid]);

        GateWay::sendToAll("$placeid logout");
        exit();
      }
      GateWay::sendToAll("$client_id logout");
   }



  public static function outlog($client_id,$message,$name="outlog.log")
  {
    $stream = fopen("/tmp/".$name, "a+");
    $time = date("Y-m-d H:i:s");
    $log = "[{$time}]{$client_id}:{$message}\r\n\r\n";
    fwrite($stream, $log);
    fclose($stream);
  }



}


这样就可以在业务中直接使用mqtt,mqtt的主题设置可以直接在后台配置,也可以通过代码实现。


看效果

  1. 给gateway服务推送一条消息。

    Mr.kang博客

  2. gateway将消息pub到mqtt服务器,服务器开了监听的窗口。效果:


     Mr.kang博客

 web页面js监听的效果如下:

     Mr.kang博客








白俊遥博客
请先登录后发表评论
  • 最新评论
  • 总共0条评论