http://www.cnblogs.com/yfliufei/p/4386439.html
2015
MQTT,是:
IoT,internet of things,物聯(lián)網(wǎng),MQTT在這方面應用較多。
官方網(wǎng)站:http://mqtt.org/
MQTT協(xié)議是針對如下情況設計的:
MQTT協(xié)議的架構,用一個示例說明。比如有1個溫度傳感器(1個Machine),2個小的顯示屏(2個Machine),顯示屏要顯示溫度傳感器的溫度值。
可通過MQTT V3.1 Protocol Specification查閱詳細規(guī)范的細節(jié)。
顯示器需要先通過MQTT協(xié)議subscribe(訂閱)一個比如叫temperature
的topic(主題):
當溫度傳感器publish(發(fā)布)溫度數(shù)據(jù),顯示器就可以收到了:
注:以上兩張圖,取自MQTT and CoAP, IoT Protocols
協(xié)議里還有2個主要的角色:
它們是通過TCP/IP協(xié)議連接的。
因為MQTT是協(xié)議,所以不能拿來直接用的,就好比HTTP協(xié)議一樣。需要找實現(xiàn)這個協(xié)議的庫或者服務器來運行。
這里是官方的Server support。
我服務器端使用nodejs開發(fā),因此選擇了:
安裝是很簡單的:
npm install mqtt
代碼如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | var mqtt = require('mqtt'); //{'topicName':[clientObj,clientObj ..]} var subscribeTopics={}; //創(chuàng)建服務器對象 var server = mqtt.createServer(function(client) { //建立連接時觸發(fā) client.on('connect', function(packet) { client.connack({returnCode: 0}); }); //客戶端發(fā)布主題時觸發(fā) client.on('publish', function(packet) { var topic=packet.topic; var payload=packet.payload; //如果沒有創(chuàng)建空的主題對應的client數(shù)組 if(subscribeTopics[topic]==null){ subscribeTopics[topic]=[]; }else{ //遍歷該主題下全部client,并逐一發(fā)送消息 for(var i in subscribeTopics[topic]){ var client=subscribeTopics[topic][i]; client.publish({ topic: topic, payload: payload }); } } }); //當客戶端訂閱時觸發(fā) client.on('subscribe', function(packet) { var topic=packet.subscriptions[0].topic; //如沒有,創(chuàng)建空的主題對應的client數(shù)組 if(subscribeTopics[topic]==null){ subscribeTopics[topic]=[]; } //如果client數(shù)組中沒有當前client,加入 if(subscribeTopics[topic].indexOf(client)==-1){ subscribeTopics[topic].push(client); } }); client.on('pingreq', function(packet) { client.pingresp(); }); client.on('disconnect', function(packet) { //遍歷所有主題,檢查對應的數(shù)組中是否有當前client,從數(shù)組中刪除 for (var topic in subscribeTopics){ var index=subscribeTopics[topic].indexOf(client); if(index>-1){ subscribeTopics[topic].splice(index,1); } } }); }); //監(jiān)聽端口 server.listen(1883); |
這是一個最基本的服務器端,消息的存儲和查詢都需要自己編程處理。
比如你如果需要用redis保存和觸發(fā)數(shù)據(jù),可參考這篇中文文章:node mqtt server (redis pub/sub)。
代碼:
1 2 3 4 5 6 7 8 9 10 11 12 | var mqtt = require('mqtt'); client = mqtt.createClient(1883, 'localhost'); client.subscribe('testMessage'); client.publish('testMessage', '發(fā)布測試信息'); client.on('message', function (topic, message) { console.log(message); client.end(); }); |
寫的很簡易,訂閱了主題,然后向相同主題發(fā)布消息,接收到消息后client停止。
MQTT.js只是實現(xiàn)了最基礎的MQTT協(xié)議部分,對于服務器端的處理需要自己完成。
有關MQTT.js是否實現(xiàn)了MQTT server,詳細的說明,可參見MQTT Server: MQTT.js or Mosca?
正好,Mosca在MQTT基礎上實現(xiàn)了這些,它可以:
安裝很簡單:
npm install mosca bunyan -g
運行:
mosca -v | bunyan
然后,還可以用我上文的客戶端代碼運行測試。
我考慮的后端持久化,是用MongoDB。Mosca另外幾個選項:
首先要安裝mosca的庫:
npm install mosca
然后,在本機將mongodb運行起來,應該就可以執(zhí)行下面的代碼了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | var mosca = require('mosca') var settings = { port: 1883, backend:{ type: 'mongo', url: 'mongodb://localhost:27017/mqtt', pubsubCollection: 'ascoltatori', mongo: {} }, persistence:{ factory: mosca.persistence.Mongo, url: "mongodb://localhost:27017/mosca" } }; var server = new mosca.Server(settings); server.on('ready', function(){ console.log('Mosca server is up and running'); }); server.on('published', function(packet, client) { console.log('Published', packet.payload); }); |
直接運行作者文檔中的代碼會在多次運行客戶端后出現(xiàn)錯誤,我是參考了他2天前加上的示例代碼。
作者Matteo Collina生活在意大利的博洛尼亞,寫代碼很勤奮,這個項目更新很快,是不是說明這個方向(mqtt)很活躍呢?
作者也寫了個幻燈片,MQTT and Node.js
從這篇文章MQTT協(xié)議筆記之連接和心跳:
心跳時間(Keep Alive timer)
以秒為單位,定義服務器端從客戶端接收消息的最大時間間隔。一般應用服務會在業(yè)務層次檢測客戶端網(wǎng)絡是否連接,不是TCP/IP協(xié)議層面的 心跳機制(比如開啟SOCKET的SO_KEEPALIVE選項)。 一般來講,在一個心跳間隔內,客戶端發(fā)送一個PINGREQ消息到服務器,服務器返回PINGRESP消息,完成一次心跳交互,繼而等待下一輪。若客戶端 沒有收到心跳反饋,會關閉掉TCP/IP端口連接,離線。 16位兩個字節(jié),可看做一個無符號的short類型值。最大值,2^16-1 = 65535秒 = 18小時。最小值可以為0,表示客戶端不斷開。一般設為幾分鐘,比如微信心跳周期為300秒。
下面的代碼中我設置的是10秒:
1 2 3 4 5 6 7 8 9 10 11 | var mqtt = require('mqtt'); var settings = { keepalive: 10, protocolId: 'MQIsdp', protocolVersion: 3, clientId: 'client-b', clean: false } client = mqtt.createClient(1883, 'localhost',settings); |
可以使用MQTT.js編寫簡單的服務器代碼,觀察到服務器端接收到PING請求,并發(fā)回PING響應:
1 2 3 4 | client.on('pingreq', function(packet) { client.pingresp(); console.log('pingreq & resp'); }); |
完整代碼上面已經(jīng)貼過,另見Gist
QoS在MQTT中有(摘自MQ 遙測傳輸 (MQTT) V3.1 協(xié)議規(guī)范):
MQTT.js只是支持了MQTT協(xié)議,并沒有支持QoS,也就是說,只支持最低級別的“至多一次”(QoS0)。
Mosca支持QoS0和1,但不支持2,見Add support QOS 2
我在應用中的一個主要場景是,使用MQTT.js+Mosca做聊天服務器。
默認Mosca是不支持離線消息的,表現(xiàn)的現(xiàn)象是,如果是有人(client-a)先在主題上發(fā)布了消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | var mqtt = require('mqtt'); var settings = { keepalive: 10, protocolId: 'MQIsdp', protocolVersion: 3, clientId: 'client-a' } client = mqtt.createClient(1883, 'localhost',settings); client.publish('testMessage', '發(fā)布new測試信息0',{qos:1,retain: true}); client.publish('testMessage', '發(fā)布new測試信息1',{qos:1,retain: true}); client.publish('testMessage', '發(fā)布new測試信息2',{qos:1,retain: true}); client.publish('testMessage', '發(fā)布new測試信息3',{qos:1,retain: true}); setTimeout(function(){ client.end(); },1000); |
那么另外一個人(client-b),隨后訂閱,僅能看到最后一條消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | var mqtt = require('mqtt'); var settings = { keepalive: 10, protocolId: 'MQIsdp', protocolVersion: 3, clientId: 'client-b' } client = mqtt.createClient(1883, 'localhost',settings); client.subscribe('testMessage',{qos:1},function(){ console.log('subscribe ok.'); }); client.on("message", function(topic, payload) { console.log('message: '+payload); }); |
運行結果類似這樣:
subscribe ok.
message: 發(fā)布new測試信息3
離線消息,需要以下幾點:
clean: false
,作用是斷開連接重連的時候服務器端幫助恢復session,不需要再次訂閱用代碼說明以下,先運行這段代碼:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | var mqtt = require('mqtt'); var settings = { keepalive: 10, protocolId: 'MQIsdp', protocolVersion: 3, clientId: 'client-b', clean: false } client = mqtt.createClient(1883, 'localhost',settings); client.subscribe('testMessage',{qos:1},function(){ console.log('subscribe ok.'); client.end(); }); |
然后執(zhí)行剛才發(fā)布多條消息的代碼。再執(zhí)行下面的代碼:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | var mqtt = require('mqtt'); var settings = { keepalive: 10, protocolId: 'MQIsdp', protocolVersion: 3, clientId: 'client-b', clean: false } client = mqtt.createClient(1883, 'localhost',settings); client.on("message", function(topic, payload) { console.log('message: '+payload); }); |
運行結果類似這樣:
message: 發(fā)布new測試信息1message: 發(fā)布new測試信息3message: 發(fā)布new測試信息2message: 發(fā)布new測試信息0
收到消息的順序是亂的,為什么會這樣,其實很好理解,為了小型受限設備以及網(wǎng)絡不穩(wěn)定的情況,消息是不好保證順序的。
解決辦法是發(fā)送的消息帶時間戳,接收后再做排序。
另外,擔心客戶端沒有做client.end()
而非正常退出,那么再次連接是否能恢復session,測試了一下,注釋client.end()
,沒有問題,正常收到多條離線消息。
Mosca支持SSL連接,可根據(jù)Nodejs TLS創(chuàng)建公鑰私鑰。
然后類似這樣啟動:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | var mosca = require('mosca') var SECURE_KEY = __dirname + '/../../test/secure/tls-key.pem'; var SECURE_CERT = __dirname + '/../../test/secure/tls-cert.pem'; var settings = { port: 8443, logger: { name: "secureExample", level: 40, }, secure : { keyPath: SECURE_KEY, certPath: SECURE_CERT, } }; var server = new mosca.Server(settings); server.on('ready', setup); // fired when the mqtt server is ready function setup() { console.log('Mosca server is up and running') } |
這部分我沒有測試,直接轉自Mosca Encryption Support。
在Mosca Authentication提供了個簡易的命令行,可創(chuàng)建賬號用于認證并授權。
但是它不適合我的需求場景,我需要自己編寫認證和授權的邏輯。
雖然在作者官方網(wǎng)站上未找到,但在問題管理記錄中提交了這方面的支持:Authentication & Authorization。
有下面兩條支持,應該可以寫出自己的回調,并集成到Mosca中:
不過這塊沒有寫代碼,只是大致能確定。
MQTT.js并不是完整解決方案,不需要考慮它的性能問題。
說一下Mosca,有一個這方面問題作者的答復,what about mosca’s performance,問問題的還是個中國人,我前面還引用了他的文章。作者基本意思是:
It basically depends on the RAM. On an AWS large instance it can reach 10k concurrent connections, with roughly 10k messages/second.