開源項目:用環信MQTT實現"世界頻道"只需5分鐘【附源碼】

2022-04-26 17:29:43

來源:晶報網

說到“世界頻道”想必大家都不陌生,常見的如王者榮耀的世界廣播搖人組隊以及最近興起的Discord社區交友等等。究其目的就是在應用內讓海量用戶可以實時互動。有些開發者為了實現這種場景會選擇聊天室方案來實現,但是這種方式存在一定的局限性,比如聊天室人數上限、海量消息處理等各種情況。

當然如果有錢有顏,可以直接選擇云廠商產品(比如環信的聊天室方案和超級社區),如果有才有time,也可以選擇平替版MQTT實現方案。今天小猿將介紹用環信MQTT消息云實現應用內的世界頻道,滿滿干貨,不要錯過~~

使用MQTT實現世界頻道-Demo效果演示

協議優勢:

在介紹具體方案之前,我們先嘮一嘮為啥選擇MQTT協議。

輕量級:MQTT本身是物聯網的連接協議,專為受限設備和低帶寬場景使用。所以其代碼占用空間較小,同樣適用于注重SDK大小的移動應用領域(比如:游戲領域)。

易集成:MQTT作為標準開放的消息協議,經過多年演進,已支持30多種開發語言,10余種SDK,無論何種開發環境,都可以快速找到開源SDK。

高并發:MQTT是輕量級的消息傳輸協議,2字節心跳報文,最小化傳輸和連接成本,云廠商broker產品都可支持千萬級并發接入,適用于高并發連接場景。

低成本:MQTT是基于客戶端-服務器的訂閱/發布模型,通過服務器中間件實現消息分發,減少消息復制成本,快速實現一對多在線推送。

靈活性:MQTT協議支持多種消息特性,包括:topic主題層級、消息分級(QoS0,1,2)、遺囑消息、保留消息等,可以靈活實現多種業務場景。

衍生功能:隨著MQTT云服務的發展,部分服務器廠商已支持消息存儲、獲取在線設備列表、查看歷史消息等衍生功能,降低開發工作量與消息存儲成本。

實現方案:

言歸正傳,上干貨。本次技術實現方案包含:移動客戶端(Android)、后端服務(Java)以及MQTT服務器。這里提一下,MQTT服務器使用環信MQTT消息云,使用三方云服務比較省心,既節省開發時間,產品性能也不需要擔心,現在注冊可以直接使用環信MQTT消息云超高額度的免費版:每月100并發連接、300萬消息,完全滿足功能開發使用。

客戶端實現:

客戶端實現主要包含以下兩部分:

底層MQTT業務集成:包含引入SDK、MQTT方法封裝、業務交互(消息收發)。

APP上層交互:在APP首頁提供世界頻道入口,實現心情彈幕飄窗(接收)和發送。

接下來上底層MQTT業務集成代碼。

引入SDK:

這一步環信官方文檔比較明確,就是根據自己的平臺引入相應的mqtt客戶端sdk,這里簡單貼一下AndroidStudio的引入配置

1// 在根目錄 build.gradle repositories 下加入配置

2maven

3...

4// 然后加入 MQTT 依賴

5// MQTT sdk

6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'

7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

方法封裝

這里貼一下對mqtt相關方法的簡單封裝,代碼在vmmqtt模塊兒的MQTTHelper類下:

1 /**

2 * Create by lzan13 on 2022/3/22

3 * 描述:MQTT 幫助類

4 */

5 object MQTTHelper {

6

7 private var mqttClient: MqttAndroidClient? = null

8

9 // 緩存主題集合

10 private val topicList = mutableListOf()

11

12 /**

13 * 鏈接MQTT

14 * @param id 用戶 Id

15 * @param token 用戶鏈接 MQTT 的 Token

16 * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱

17 */

18 fun connect(id: String, token: String, topic: String = "") {

19 // 處理訂閱主題

20 if (topic.isNotEmpty()) topicList.add(topic)

21

22 // 拼接鏈接地址

23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

24 // 拼接 clientId

25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"

26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

27

28 //連接參數

29 val options = MqttConnectOptions()

30 options.isAutomaticReconnect = true //設置自動重連

31 options.isCleanSession = true // 緩存

32 options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時時間,單位:秒

33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發送間隔,單位:秒

34 options.userName = id // 用戶名

35 options.password = token.toCharArray() // 密碼

36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

37 // 設置MQTT監聽

38 mqttClient?.setCallback(object : MqttCallback {

39 override fun connectionLost(t: Throwable) {

40 // 通知鏈接斷開

41 VMLog.d("MQTT 鏈接斷開 $t")

42 }

43

44 @Throws(Exception::class)

45 override fun messageArrived(topic: String, message: MqttMessage) {

46 // 通知收到消息

47 VMLog.d("MQTT 收到消息:$message")

48 // 如果未訂閱則直接丟棄

49 if (!topicList.contains(topic)) return

50 notifyEvent(topic, String(message.payload))

51 }

52

53 override fun deliveryComplete(token: IMqttDeliveryToken) {}

54 })

55 //進行連接

56 mqttClient?.connect(options, null, object : IMqttActionListener {

57 override fun onSuccess(token: IMqttToken) {

58 VMLog.d("MQTT 鏈接成功")

59 // 鏈接成功,循環訂閱緩存的主題

60 topicList.forEach { subscribe(it) }

61 }

62

63 override fun onFailure(token: IMqttToken, t: Throwable) {

64 VMLog.d("MQTT 鏈接失敗 $t")

65 }

66 })

67 }

68

69 /**

70 * 訂閱主題

71 * @param topic 主題

72 */

73 fun subscribe(topic: String) {

74 if (!topicList.contains(topic)) {

75 topicList.add(topic)

76 }

77 try {

78 //連接成功后訂閱主題

79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

80 override fun onSuccess(token: IMqttToken) {

81 VMLog.d("MQTT 訂閱成功 $topic")

82 }

83

84 override fun onFailure(token: IMqttToken, t: Throwable) {

85 VMLog.d("MQTT 訂閱失敗 $topic $t")

86 }

87 })

88 } catch (e: MqttException) {

89 e.printStackTrace()

90 }

91 }

92

93 /**

94 * 取消訂閱

95 * @param topic 主題

96 */

97 fun unsubscribe(topic: String) {

98 if (topicList.contains(topic)) {

99 topicList.remove(topic)

100 }

101 try {

102 mqttClient?.unsubscribe(topic)

103 } catch (e: MqttException) {

104 e.printStackTrace()

105 }

106 }

107

108 /**

109 * 發送 MQTT 消息

110 * @param topic 主題

111 * @param content 內容

112 */

113 fun sendMsg(topic: String, content: String) {

114 val msg = MqttMessage()

115 msg.payload = content.encodeToByteArray() // 設置消息內容

116 msg.qos = 0 //設置消息發送質量,可為0,1,2.

117 // 設置消息的topic,并發送。

118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

119 override fun onSuccess(asyncActionToken: IMqttToken) {

120 VMLog.d("MQTT 消息發送成功")

121 }

122

123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

124 VMLog.d("MQTT 消息發送失敗 ${exception.message}")

125 }

126 })

127 }

128

129 /**

130 * 通知 MQTT 事件

131 */

132 private fun notifyEvent(topic: String, data: String) {

133 LDEventBus.post(topic, data)

134 }

135 }

業務交互

和業務相關的就是在啟動APP后,使用后端服務器返回的鑒權token信息及連接封裝接口登錄環信通MQTT服務器,登錄成功后訂閱主題并監聽消息。

1// 請求 token 成功后,調用MQTTHelper.connect()鏈接 MQTT 服務器,這里會同時傳遞監聽的主題

2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)

3

4/**

5 * 發送匹配信息

6 */

7private fun sendMatchInfo() {

8 if (selfMatch.user.nickname.isEmpty()) return

9 // 提交自己的匹配信息到服務器

10 mViewModel.submitMatch(selfMatch)

11 val json = JSONObject()

12 json.put("content", selfMatch.content)

13 json.put("emotion", selfMatch.emotion)

14 json.put("gender", selfMatch.gender)

15 json.put("type", selfMatch.type)

16 val jsonUser = JSONObject()

17 jsonUser.put("avatar", mUser.avatar)

18 jsonUser.put("id", mUser.id)

19 jsonUser.put("nickname", mUser.nickname)

20 jsonUser.put("username", mUser.username)

21 json.put("user", jsonUser)

22 MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())

23}

24

25// 監聽消息這里使用了一個事件總線進行通知,在上邊封裝 MQTTHelper 發送消息也使用了這個,

26// 訂閱 MQTT 事件

27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {

28 val match = JsonUtils.fromJson(it, Match::class.java)

29 // 這里收到匹配信息之后就增加一條彈幕

30 addBarrage(match)

31}

后端服務實現

接下來介紹后端服務實現,主要包含以下兩部分:

配置連接信息:配置環信MQTT消息云連接信息。

獲取鑒權信息:獲取客戶端連接需要的鑒權信息。

配置連接信息

配置部分只需要按照環信后臺配置信息進行替換就好,配置在config目錄下的config.xxx.json文件內

1/**

2 * Easemob MQTT 配置

3 */

4config.mqtt = {

5 host: 'mqtt host', // MQTT 鏈接地址

6 appId: 'appId', // MQTT AppId

7 port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)

8 restHost:  // MQTT 服務 API 地址

9 clientId: 'client id', // 替換環信后臺 clientId

10 clientSecret: 'client secret', // 替換環信后臺 clientSecret

11};

獲取鑒權信息

這里主要是獲取客戶端連接所需要的鑒權信息token,為了安全token肯定是要放在服務器端生成的,廢話不多說,上代碼:

1/**

2 * Create by lzan13 on 2022/3/22

3 * 描述:MQTT 幫助類

4 */

5object MQTTHelper {

6

7 private var mqttClient: MqttAndroidClient? = null

8

9 // 緩存主題集合

10 private val topicList = mutableListOf()

11

12 /**

13 * 鏈接MQTT

14 * @param id 用戶 Id

15 * @param token 用戶鏈接 MQTT 的 Token

16 * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱

17 */

18 fun connect(id: String, token: String, topic: String = "") {

19 // 處理訂閱主題

20 if (topic.isNotEmpty()) topicList.add(topic)

21

22 // 拼接鏈接地址

23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

24 // 拼接 clientId

25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"

26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

27

28 //連接參數

29 val options = MqttConnectOptions()

30 options.isAutomaticReconnect = true //設置自動重連

31 options.isCleanSession = true // 緩存

32 options.connectionTimeout = CConstants.timeMinute.toInt() // 設置超時時間,單位:秒

33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發送間隔,單位:秒

34 options.userName = id // 用戶名

35 options.password = token.toCharArray() // 密碼

36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

37 // 設置MQTT監聽

38 mqttClient?.setCallback(object : MqttCallback {

39 override fun connectionLost(t: Throwable) {

40 // 通知鏈接斷開

41 VMLog.d("MQTT 鏈接斷開 $t")

42 }

43

44 @Throws(Exception::class)

45 override fun messageArrived(topic: String, message: MqttMessage) {

46 // 通知收到消息

47 VMLog.d("MQTT 收到消息:$message")

48 // 如果未訂閱則直接丟棄

49 if (!topicList.contains(topic)) return

50 notifyEvent(topic, String(message.payload))

51 }

52

53 override fun deliveryComplete(token: IMqttDeliveryToken) {}

54 })

55 //進行連接

56 mqttClient?.connect(options, null, object : IMqttActionListener {

57 override fun onSuccess(token: IMqttToken) {

58 VMLog.d("MQTT 鏈接成功")

59 // 鏈接成功,循環訂閱緩存的主題

60 topicList.forEach { subscribe(it) }

61 }

62

63 override fun onFailure(token: IMqttToken, t: Throwable) {

64 VMLog.d("MQTT 鏈接失敗 $t")

65 }

66 })

67 }

68

69 /**

70 * 訂閱主題

71 * @param topic 主題

72 */

73 fun subscribe(topic: String) {

74 if (!topicList.contains(topic)) {

75 topicList.add(topic)

76 }

77 try {

78 //連接成功后訂閱主題

79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

80 override fun onSuccess(token: IMqttToken) {

81 VMLog.d("MQTT 訂閱成功 $topic")

82 }

83

84 override fun onFailure(token: IMqttToken, t: Throwable) {

85 VMLog.d("MQTT 訂閱失敗 $topic $t")

86 }

87 })

88 } catch (e: MqttException) {

89 e.printStackTrace()

90 }

91 }

92

93 /**

94 * 取消訂閱

95 * @param topic 主題

96 */

97 fun unsubscribe(topic: String) {

98 if (topicList.contains(topic)) {

99 topicList.remove(topic)

100 }

101 try {

102 mqttClient?.unsubscribe(topic)

103 } catch (e: MqttException) {

104 e.printStackTrace()

105 }

106 }

107

108 /**

109 * 發送 MQTT 消息

110 * @param topic 主題

111 * @param content 內容

112 */

113 fun sendMsg(topic: String, content: String) {

114 val msg = MqttMessage()

115 msg.payload = content.encodeToByteArray() // 設置消息內容

116 msg.qos = 0 //設置消息發送質量,可為0,1,2.

117 // 設置消息的topic,并發送。

118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

119 override fun onSuccess(asyncActionToken: IMqttToken) {

120 VMLog.d("MQTT 消息發送成功")

121 }

122

123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

124 VMLog.d("MQTT 消息發送失敗 ${exception.message}")

125 }

126 })

127 }

128

129 /**

130 * 通知 MQTT 事件

131 */

132 private fun notifyEvent(topic: String, data: String) {

133 LDEventBus.post(topic, data)

134 }

135}

源碼地址

核心代碼就這么多,不超過500行,這里沒有直接調用環信歷史消息接口獲取消息存儲記錄,后續可以在進行改良,簡化實現流程。源碼鏈接附上,配合使用效果更佳。

服務端github源碼:

客戶端github源碼:

寫在最后

MQTT協議資源占用小,并發連接高,集成簡單,特別適用于高頻數據交互場景,比如:游戲的世界廣場、視頻平臺彈幕等等等等,歡迎各位小伙伴集思廣益,基于MQTT服務實現更多的業務場景,享受技術帶來的便利與快樂。

關鍵詞: