国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
MQTT協(xié)議的簡(jiǎn)單介紹和服務(wù)器的安裝

MQTT協(xié)議的簡(jiǎn)單介紹和服務(wù)器的安裝

  最近公司做的項(xiàng)目中有用到消息推送,經(jīng)過多方面的篩選之后確定了使用MQTT協(xié)議,相對(duì)于XMPP,MQTT更加輕量級(jí),并且占用用戶很少的帶寬。

MQTT是IBM推出的一種針對(duì)移動(dòng)終端設(shè)備的基于TCP/IP的發(fā)布/預(yù)訂協(xié)議,可以連接大量的遠(yuǎn)程傳感器和控制設(shè)備。

  MQTT的官網(wǎng)見:http://mqtt.org/。其中http://mqtt.org/software里面提供了官方推薦的各種服務(wù)器和客戶端使用的各種語言版本的API。

  下面以服務(wù)器Apollo 1.6為例,之前嘗試過使用ActiveMQ,效果很不理想,只能實(shí)現(xiàn)服務(wù)器和客戶端一對(duì)一的通信,從官網(wǎng)上了解到Apollo屬于activemq的一個(gè)子工程。先不管這些了,言歸正傳,以下在windows環(huán)境下。

  1、在這里下載Apollo服務(wù)器,下載后解壓,然后運(yùn)行apache-apollo-1.6\bin\apollo.cmd,輸入create mybroker(名字任意取,這里是根據(jù)官網(wǎng)介紹的來取的)創(chuàng)建服務(wù)器實(shí)例,服務(wù)器實(shí)例包含了所有的配置,運(yùn)行時(shí)數(shù)據(jù)等,并且和一個(gè)服務(wù)器進(jìn)程關(guān)聯(lián)。

  2、create mybroker之后會(huì)在bin目錄下生成mybroker文件夾,里面包含有很多信息,其中etc\apollo.xml文件下是配置服務(wù)器信息的文件,etc\users.properties文件包含連接MQTT服務(wù)器時(shí)用到的用戶名和密碼,后面會(huì)介紹,可以修改原始的admin=password,可以接著換行添加新的用戶名密碼。

  3、打開cmd,運(yùn)行…apache-apollo-1.6\bin\mybroker\bin\apollo-broker.cmd run 開啟服務(wù)器,可以在瀏覽器中輸入http://127.0.0.1:61680/查看是否安裝成功,該界面展示了topic,連接數(shù)等很多信息。

  經(jīng)過上面的簡(jiǎn)單步驟,服務(wù)器基本上就已經(jīng)完成,下一篇將介紹Android客戶端的編寫和注意事項(xiàng)。

  客戶端使用的API,開始我使用的是mqtt-client,使用過后發(fā)現(xiàn)問題百出,不能很好的滿足要求,后來使用了官方推薦的Eclipse Paho,下面開始客戶端代碼的編寫,為了方便測(cè)試這里有android和j2se兩個(gè)工程:

  1、新建android工程MQTTClient

  2、MainActivity代碼如下:

[java] view plaincopyprint?package ldw.mqttclient; import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import android.app.Activity;import android.os.Bundle;import android.os.Handler;import android.os.Message;import android.view.KeyEvent;import android.widget.TextView;import android.widget.Toast; public class MainActivity extends Activity {        private TextView resultTv;        private String host = "tcp://127.0.0.1:1883";       private String userName = "admin";       private String passWord = "password";        private Handler handler;        private MqttClient client;        private String myTopic = "test/topic";        private MqttConnectOptions options;        private ScheduledExecutorService scheduler;        @Override       protected void onCreate(Bundle savedInstanceState) {              super.onCreate(savedInstanceState);              setContentView(R.layout.main);               resultTv = (TextView) findViewById(R.id.result);               init();               handler = new Handler() {                     @Override                     public void handleMessage(Message msg) {                            super.handleMessage(msg);                            if(msg.what == 1) {                                   Toast.makeText(MainActivity.this, (String) msg.obj,                                                 Toast.LENGTH_SHORT).show();                                   System.out.println("-----------------------------");                            } else if(msg.what == 2) {                                   Toast.makeText(MainActivity.this, "連接成功", Toast.LENGTH_SHORT).show();                                   try {                                          client.subscribe(myTopic, 1);                                   } catch (Exception e) {                                          e.printStackTrace();                                   }                            } else if(msg.what == 3) {                                   Toast.makeText(MainActivity.this, "連接失敗,系統(tǒng)正在重連", Toast.LENGTH_SHORT).show();                            }                     }              };               startReconnect();        }        private void startReconnect() {              scheduler = Executors.newSingleThreadScheduledExecutor();              scheduler.scheduleAtFixedRate(new Runnable() {                      @Override                     public void run() {                            if(!client.isConnected()) {                                   connect();                            }                     }              }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);       }        private void init() {              try {                       //host為主機(jī)名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標(biāo)識(shí)符表示,MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存                     client = new MqttClient(host, "test",                                   new MemoryPersistence());                       //MQTT的連接設(shè)置                     options = new MqttConnectOptions();                       //設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接                     options.setCleanSession(true);                       //設(shè)置連接的用戶名                     options.setUserName(userName);                       //設(shè)置連接的密碼                     options.setPassword(passWord.toCharArray());                     // 設(shè)置超時(shí)時(shí)間 單位為秒                     options.setConnectionTimeout(10);                     // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制                     options.setKeepAliveInterval(20);                        //設(shè)置回調(diào)                     client.setCallback(new MqttCallback() {                             @Override                            public void connectionLost(Throwable cause) {                                        //連接丟失后,一般在這里面進(jìn)行重連                                   System.out.println("connectionLost----------");                            }                             @Override                            public void deliveryComplete(IMqttDeliveryToken token) {                                        //publish后會(huì)執(zhí)行到這里                                   System.out.println("deliveryComplete---------"                                                 + token.isComplete());                            }                             @Override                            public void messageArrived(String topicName, MqttMessage message)                                          throws Exception {                                        //subscribe后得到的消息會(huì)執(zhí)行到這里面                                   System.out.println("messageArrived----------");                                   Message msg = new Message();                                   msg.what = 1;                                   msg.obj = topicName+"---"+message.toString();                                   handler.sendMessage(msg);                            }                     });//                   connect();              } catch (Exception e) {                     e.printStackTrace();              }       }        private void connect() {              new Thread(new Runnable() {                      @Override                     public void run() {                            try {                                   client.connect(options);                                   Message msg = new Message();                                   msg.what = 2;                                   handler.sendMessage(msg);                            } catch (Exception e) {                                   e.printStackTrace();                                   Message msg = new Message();                                   msg.what = 3;                                   handler.sendMessage(msg);                            }                     }              }).start();       }        @Override       public boolean onKeyDown(int keyCode, KeyEvent event) {              if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {                     try {                            client.disconnect();                     } catch (Exception e) {                            e.printStackTrace();                     }              }              return super.onKeyDown(keyCode, event);       }        @Override       protected void onDestroy() {              super.onDestroy();              try {                     scheduler.shutdown();                     client.disconnect();              } catch (MqttException e) {                     e.printStackTrace();              }       }}

  

  由于項(xiàng)目需要,我用到了心跳重連。根據(jù)這里的解釋設(shè)置apollo.xml,主要有設(shè)置主機(jī)連接的地址。另外,options還有個(gè)setWill方法,如果項(xiàng)目中需要知道客戶端是否掉線可以調(diào)用該方法。

  3、新建j2se工程MQTTServer

  4、Server代碼如下:

[java] view plaincopyprint?import java.awt.Container;import java.awt.event.ActionEvent;import java.awt.event.ActionListener; import javax.swing.JButton;import javax.swing.JFrame;import javax.swing.JPanel; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class Server extends JFrame {       private static final long serialVersionUID = 1L;       private JPanel panel;       private JButton button;        private MqttClient client;       private String host = "tcp://127.0.0.1:1883";//     private String host = "tcp://localhost:1883";       private String userName = "test";       private String passWord = "test";       private MqttTopic topic;       private MqttMessage message;        private String myTopic = "test/topic";        public Server() {               try {                     client = new MqttClient(host, "Server",                                   new MemoryPersistence());                     connect();              } catch (Exception e) {                     e.printStackTrace();              }               Container container = this.getContentPane();              panel = new JPanel();              button = new JButton("發(fā)布話題");              button.addActionListener(new ActionListener() {                      @Override                     public void actionPerformed(ActionEvent ae) {                            try {                                   MqttDeliveryToken token = topic.publish(message);                                   token.waitForCompletion();                                   System.out.println(token.isComplete()+"========");                            } catch (Exception e) {                                   e.printStackTrace();                            }                     }              });              panel.add(button);              container.add(panel, "North");        }        private void connect() {               MqttConnectOptions options = new MqttConnectOptions();              options.setCleanSession(false);              options.setUserName(userName);              options.setPassword(passWord.toCharArray());              // 設(shè)置超時(shí)時(shí)間              options.setConnectionTimeout(10);              // 設(shè)置會(huì)話心跳時(shí)間              options.setKeepAliveInterval(20);              try {                     client.setCallback(new MqttCallback() {                             @Override                            public void connectionLost(Throwable cause) {                                   System.out.println("connectionLost-----------");                            }                             @Override                            public void deliveryComplete(IMqttDeliveryToken token) {                                   System.out.println("deliveryComplete---------"+token.isComplete());                            }                             @Override                            public void messageArrived(String topic, MqttMessage arg1)                                          throws Exception {                                   System.out.println("messageArrived----------");                             }                     });                      topic = client.getTopic(myTopic);                      message = new MqttMessage();                     message.setQos(1);                     message.setRetained(true);                     System.out.println(message.isRetained()+"------ratained狀態(tài)");                     message.setPayload("eeeeeaaaaaawwwwww---".getBytes());                      client.connect(options);              } catch (Exception e) {                     e.printStackTrace();              }        }        public static void main(String[] args) {              Server s = new Server();              s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);              s.setSize(600, 370);              s.setLocationRelativeTo(null);              s.setVisible(true);       }}

  上面代碼跟客戶端的代碼差不多,這里就不做解釋了。

  沒什么好說的,MQTT就是這么簡(jiǎn)單,但開始在使用的時(shí)候要注意一些參數(shù)的設(shè)置來適應(yīng)項(xiàng)目的需求。

  jar包下載地址:https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/

  轉(zhuǎn)自:http://www.longdw.com/mqtt-server-client-android/

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
MQTT協(xié)議之訂閱及發(fā)布
基于MQTT的消息推送
mosquitto與paho實(shí)現(xiàn)推送服務(wù)
如何在 Django 項(xiàng)目中使用 MQTT
MQTT(EMQX) - SpringBoot 整合MQTT 連接池 Demo - 附源代碼
activeMQ 推送之mqtt客戶端
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服