最近公司做的項(xiàng)目中有用到消息推送,經(jīng)過多方面的篩選之后確定了使用MQTT協(xié)議,相對(duì)于XMPP,MQTT更加輕量級(jí),并且占用用戶很少的帶寬。
MQTT是IBM推出的一種針對(duì)移動(dòng)終端設(shè)備的基于TCP/IP的發(fā)布/預(yù)訂協(xié)議,可以連接大量的遠(yuǎn)程傳感器和控制設(shè)備。
MQTT的官網(wǎng)見:http://mqtt.org/。其中http://mqtt.org/software里面提供了官方推薦的各種服務(wù)器和客戶端使用的各種語言版本的API。
下面以服務(wù)器Apollo 1.6為例,之前嘗試過使用ActiveMQ,效果很不理想,只能實(shí)現(xiàn)服務(wù)器和客戶端一對(duì)一的通信,從官網(wǎng)上了解到Apollo屬于activemq的一個(gè)子工程。先不管這些了,言歸正傳,以下在windows環(huán)境下。
1、在這里下載Apollo服務(wù)器,下載后解壓,然后運(yùn)行apache-apollo-1.6\bin\apollo.cmd,輸入create mybroker(名字任意取,這里是根據(jù)官網(wǎng)介紹的來取的)創(chuàng)建服務(wù)器實(shí)例,服務(wù)器實(shí)例包含了所有的配置,運(yùn)行時(shí)數(shù)據(jù)等,并且和一個(gè)服務(wù)器進(jìn)程關(guān)聯(lián)。
2、create mybroker之后會(huì)在bin目錄下生成mybroker文件夾,里面包含有很多信息,其中etc\apollo.xml文件下是配置服務(wù)器信息的文件,etc\users.properties文件包含連接MQTT服務(wù)器時(shí)用到的用戶名和密碼,后面會(huì)介紹,可以修改原始的admin=password,可以接著換行添加新的用戶名密碼。
3、打開cmd,運(yùn)行…apache-apollo-1.6\bin\mybroker\bin\apollo-broker.cmd run 開啟服務(wù)器,可以在瀏覽器中輸入http://127.0.0.1:61680/查看是否安裝成功,該界面展示了topic,連接數(shù)等很多信息。
經(jīng)過上面的簡(jiǎn)單步驟,服務(wù)器基本上就已經(jīng)完成,下一篇將介紹Android客戶端的編寫和注意事項(xiàng)。
客戶端使用的API,開始我使用的是mqtt-client,使用過后發(fā)現(xiàn)問題百出,不能很好的滿足要求,后來使用了官方推薦的Eclipse Paho,下面開始客戶端代碼的編寫,為了方便測(cè)試這里有android和j2se兩個(gè)工程:
1、新建android工程MQTTClient
2、MainActivity代碼如下:
[java] view plaincopyprint?package ldw.mqttclient; import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import android.app.Activity;import android.os.Bundle;import android.os.Handler;import android.os.Message;import android.view.KeyEvent;import android.widget.TextView;import android.widget.Toast; public class MainActivity extends Activity { private TextView resultTv; private String host = "tcp://127.0.0.1:1883"; private String userName = "admin"; private String passWord = "password"; private Handler handler; private MqttClient client; private String myTopic = "test/topic"; private MqttConnectOptions options; private ScheduledExecutorService scheduler; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main); resultTv = (TextView) findViewById(R.id.result); init(); handler = new Handler() { @Override public void handleMessage(Message msg) { super.handleMessage(msg); if(msg.what == 1) { Toast.makeText(MainActivity.this, (String) msg.obj, Toast.LENGTH_SHORT).show(); System.out.println("-----------------------------"); } else if(msg.what == 2) { Toast.makeText(MainActivity.this, "連接成功", Toast.LENGTH_SHORT).show(); try { client.subscribe(myTopic, 1); } catch (Exception e) { e.printStackTrace(); } } else if(msg.what == 3) { Toast.makeText(MainActivity.this, "連接失敗,系統(tǒng)正在重連", Toast.LENGTH_SHORT).show(); } } }; startReconnect(); } private void startReconnect() { scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { if(!client.isConnected()) { connect(); } } }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS); } private void init() { try { //host為主機(jī)名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標(biāo)識(shí)符表示,MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存 client = new MqttClient(host, "test", new MemoryPersistence()); //MQTT的連接設(shè)置 options = new MqttConnectOptions(); //設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接 options.setCleanSession(true); //設(shè)置連接的用戶名 options.setUserName(userName); //設(shè)置連接的密碼 options.setPassword(passWord.toCharArray()); // 設(shè)置超時(shí)時(shí)間 單位為秒 options.setConnectionTimeout(10); // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制 options.setKeepAliveInterval(20); //設(shè)置回調(diào) client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { //連接丟失后,一般在這里面進(jìn)行重連 System.out.println("connectionLost----------"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { //publish后會(huì)執(zhí)行到這里 System.out.println("deliveryComplete---------" + token.isComplete()); } @Override public void messageArrived(String topicName, MqttMessage message) throws Exception { //subscribe后得到的消息會(huì)執(zhí)行到這里面 System.out.println("messageArrived----------"); Message msg = new Message(); msg.what = 1; msg.obj = topicName+"---"+message.toString(); handler.sendMessage(msg); } });// connect(); } catch (Exception e) { e.printStackTrace(); } } private void connect() { new Thread(new Runnable() { @Override public void run() { try { client.connect(options); Message msg = new Message(); msg.what = 2; handler.sendMessage(msg); } catch (Exception e) { e.printStackTrace(); Message msg = new Message(); msg.what = 3; handler.sendMessage(msg); } } }).start(); } @Override public boolean onKeyDown(int keyCode, KeyEvent event) { if(client != null && keyCode == KeyEvent.KEYCODE_BACK) { try { client.disconnect(); } catch (Exception e) { e.printStackTrace(); } } return super.onKeyDown(keyCode, event); } @Override protected void onDestroy() { super.onDestroy(); try { scheduler.shutdown(); client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } }}
由于項(xiàng)目需要,我用到了心跳重連。根據(jù)這里的解釋設(shè)置apollo.xml,主要有設(shè)置主機(jī)連接的地址。另外,options還有個(gè)setWill方法,如果項(xiàng)目中需要知道客戶端是否掉線可以調(diào)用該方法。
3、新建j2se工程MQTTServer
4、Server代碼如下:
[java] view plaincopyprint?import java.awt.Container;import java.awt.event.ActionEvent;import java.awt.event.ActionListener; import javax.swing.JButton;import javax.swing.JFrame;import javax.swing.JPanel; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class Server extends JFrame { private static final long serialVersionUID = 1L; private JPanel panel; private JButton button; private MqttClient client; private String host = "tcp://127.0.0.1:1883";// private String host = "tcp://localhost:1883"; private String userName = "test"; private String passWord = "test"; private MqttTopic topic; private MqttMessage message; private String myTopic = "test/topic"; public Server() { try { client = new MqttClient(host, "Server", new MemoryPersistence()); connect(); } catch (Exception e) { e.printStackTrace(); } Container container = this.getContentPane(); panel = new JPanel(); button = new JButton("發(fā)布話題"); button.addActionListener(new ActionListener() { @Override public void actionPerformed(ActionEvent ae) { try { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println(token.isComplete()+"========"); } catch (Exception e) { e.printStackTrace(); } } }); panel.add(button); container.add(panel, "North"); } private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 設(shè)置超時(shí)時(shí)間 options.setConnectionTimeout(10); // 設(shè)置會(huì)話心跳時(shí)間 options.setKeepAliveInterval(20); try { client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("connectionLost-----------"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------"+token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage arg1) throws Exception { System.out.println("messageArrived----------"); } }); topic = client.getTopic(myTopic); message = new MqttMessage(); message.setQos(1); message.setRetained(true); System.out.println(message.isRetained()+"------ratained狀態(tài)"); message.setPayload("eeeeeaaaaaawwwwww---".getBytes()); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Server s = new Server(); s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); s.setSize(600, 370); s.setLocationRelativeTo(null); s.setVisible(true); }}
上面代碼跟客戶端的代碼差不多,這里就不做解釋了。
沒什么好說的,MQTT就是這么簡(jiǎn)單,但開始在使用的時(shí)候要注意一些參數(shù)的設(shè)置來適應(yīng)項(xiàng)目的需求。
jar包下載地址:https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/