基于环信MQTT消息云,Java版MQTT客户端快速实现消息收发
本文介绍Java版MQTT 客户端,如何连接环信MQTT消息云快速实现消息的自收自发。
一、前提条件
1.部署Java开发环境
安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
下载安装JDK。
2.导入项目依赖
在IntelliJ IDEA中创建工程,并确认pom.xml中包含以下依赖。
commons-codec
commons-codec
1.10
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.2
org.apache.httpcomponents
httpclient
4.5.2
com.alibaba
fastjson
1.2.76
二、实现流程
1、获取鉴权
为保障客户安全性需求,环信MQTT消息云服务为客户提供【token+clientID】方式实现鉴权认证,其中AppID(clientID中的关键字段)及token标识获取流程如下:
【登录console】
欢迎您登录环信云console控制台,在此控制台中,为您提供应用列表、解决方案、DEMO体验以及常见问题等功能。
在应用列表中,若您未在APP中开通MQTT业务,可参见APP MQTT开通流程
若APP已开通MQTT业务,可在应用列表中选中Appname,点击【查看】操作,进入应用详情。
【获取AppID及连接地址】
进入【查看】后,点击左侧菜单栏【MQTT】->【服务概览】,在下图红色方框内获取当前AppID及服务器连接地址。
【获取token】
为实现对用户管控及接入安全性,环信云console提供用户认证功能,支持对用户账户的增、删、改、查以及为每个用户账户分配唯一token标识,获取token标识可选择以下两种形式。
形式一:console控制台获取(管理员视角)
* 点击左侧菜单栏【应用概览】->【用户认证】页面,点击【创建IM用户】按钮,增添新的账户信息(包 括用户名及密码)。
* 创建成功后,在【用户ID】列表中选中账户,点击【查看token】按钮获取当前账户token信息。
形式二:客户端代码获取(客户端视角)
* 获取域名:点击左侧菜单栏【即时通讯】->【服务概览】页面,查看下图中token域名、org_name、app_name。
* 拼接URL:获取token URL格式为:http:/ /token域名/org_name/app_name/token。
* 用户名/密码:使用【用户ID】列表中已有账户的用户名及密码,例“用户名:test/密码:test123”。
客户端获取token代码示例如下:
public static void main()
{
// 获取token的URL
http://{token域名}/{org_name}/{app_name}/token
// 获取token
String token = "";
// 取token
try (final CloseableHttpClient httpClient = HttpClients.createDefault())
{
final HttpPost httpPost = new HttpPost("http://{token域名}/{org_name}/{app_name}/token");
Map params = new HashMap<>();
params.put("grant_type", "password");
params.put("username", "test");
params.put("password", "test123");
//设置请求体参数
StringEntity entity = new StringEntity(JSONObject.toJSONString(params), Charset.forName("utf-8"));
entity.setContentEncoding("utf-8");
httpPost.setEntity(entity);
//设置请求头部
httpPost.setHeader("Content-Type", "application/json");
//执行请求,返回请求响应
try (final CloseableHttpResponse response = httpClient.execute(httpPost)
{
//请求返回状态码
int statusCode = response.getStatusLine().getStatusCode();
//请求成功
if (statusCode == HttpStatus.SC_OK && statusCode <= HttpStatus.SC_TEMPORARY_REDIRECT)
{
//取出响应体
final HttpEntity entity2 = response.getEntity();
//从响应体中解析出token
String responseBody = EntityUtils.toString(entity2, "utf-8");
JSONObject jsonObject = JSONObject.parseObject(responseBody);
token = jsonObject.getString("access_token");
}
else
{
//请求失败
throw new ClientProtocolException("请求失败,响应码为:" + statusCode);
}
}
}
catch (IOException e)
{
e.printStackTrace();
}
},>
返回结果
{
"access_token": "YWMtN8a0oqV3EeuF0AmiqRgEh-grzF8zZk2Wp8GS3pF-orDW_F-gj3kR6os3h_oz3ROQAwMAAAF5BxhGlwBPGgAvTR8vDrdVsDPNZMQj0fFjv7EaohgZhzMHM9ncVLE30g",
"expires_in": 5184000,
"user":
{
"uuid": "d6fc5fa0-8f79-11ea-8b37-87fa33dd1390",
"type": "user",
"created": 1588756404898,
"modified": 1588756404898,
"username": "test",
"activated": true
}
}
access_token即为要获取的token
2、初始化
在IntelliJ IDEA工程中创建MQTT客户端,客户端初始配置包括创建clientID,topic名称,QoS质量,连接地址等信息。
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
public class MqttDemoStarter
{
public static void main(String[] args) throws MqttException, InterruptedException {
/**
* 用户指定
* /
String deviceId = "xxxxx-xxxx-xxxxx-xxxxx-xxxxx";
/**
* 从console控制台获取
* /
String appId = "1NQ1E9";
/**
* 设置接入点,进入console控制台获取
*/
String endpoint = "1NQ1E9.sandbox.mqtt.chat";
/**
* MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致之前的连接断开。
* clientId由两部分组成,格式为DeviceID@appId,其中DeviceID由业务方自己设置,appId在console控制台创建,clientId总长度不得超过64个字符。
*/
String clientId = deviceId + "@" + appId;
/**
* 需要订阅或发送消息的topic名称
* 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
*/
final String myTopic = "myTopic";
/**
* QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
*/
final int qosLevel = 0;
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是ws或者wss,使用http://;如果是mqtt或者mqtts,使用tcp://
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endpoint + ":1883", clientId, memoryPersistence);
/**
* 设置客户端发送超时时间,防止无限阻塞。
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
}
3、连接服务器
配置连接密码、cleansession标志、心跳间隔、超时时间等信息,调用connect()函数连接至环信MQTT消息云。
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/**
* 用户名,在console中注册
*/
mqttConnectOptions.setUserName("test");
/**
* 用户密码为第一步中申请的token
*/
mqttConnectOptions.setPassword(token.toCharArray());
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
mqttConnectOptions.setConnectionTimeout(5000);
mqttClient.connect(mqttConnectOptions);
//暂停1秒钟,等待连接订阅完成
Thread.sleep(1000);
4、订阅【subscribe】
【订阅主题】
当客户端成功连接环信MQTT消息云后,需尽快向服务器发送订阅主题消息。
mqttClient.setCallback(new MqttCallbackExtended() {
/**
* 连接完成回调方法
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
/**
* 客户端连接成功后就需要尽快订阅需要的Topic。
*/
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {myTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (Exception e) {
e.printStackTrace();
}
});
}
});
【取消订阅】
mqttClient.unsubscribe(new String[]{myTopic});
【接收消息】
配置接收消息回调方法,从环信MQTT消息云接收订阅消息。
mqttClient.setCallback(new MqttCallbackExtended() {
/**
* 接收消息回调方法
* @param s
* @param mqttMessage
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
});
5、发布【publish】
配置发送消息回调方法,向环信MQTT消息云中指定topic发送消息。
for (int i = 0; i < 10; i++) {
/**
* 构建一个Mqtt消息
*/
MqttMessage message = new MqttMessage("hello world pub sub msg".getBytes());
//设置传输质量
message.setQos(qosLevel);
/**
* 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
*/
mqttClient.publish(myTopic, message);
}
6、结果验证
connect success
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
三、更多信息
* 完整demo示例,请参见demo下载。
或直接下载:MQTTChatDemo- Java.zip
* 目前MQTT客户端支持多种语言,请参见 SDK下载。
* 如果您在使用环信MQTT消息云服务中,有任何疑问和建议,欢迎您联系我们。