阿里云物联网通讯测试

1.软件准备

mqtt.fx下载:www.jensd.de/apps/mqttfx

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。
MQTT.fx 是目前主流的mqtt客户端,可以快速验证是否可以与IoT Hub 服务交流发布或订阅消息。

2.使用模式

阿里云物联网通讯测试
连接属性详解
阿里云物联网通讯测试

服务端代码

//1. 接收消息package com.WulianwangTest;import java.net.URI;import java.util.Hashtable;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import javax.crypto.Mac;import javax.crypto.spec.SecretKeySpec;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Session;import javax.naming.Context;import javax.naming.InitialContext;import com.aliyuncs.DefaultAcsClient;import com.aliyuncs.IAcsClient;import com.aliyuncs.iot.model.v20180120.PubRequest;import com.aliyuncs.iot.model.v20180120.PubResponse;import com.aliyuncs.profile.DefaultProfile;import org.apache.commons.codec.EncoderException;import org.apache.qpid.jms.JmsConnection;import org.apache.qpid.jms.JmsConnectionListener;import org.apache.qpid.jms.message.JmsInboundMessageDispatch;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.apache.commons.codec.binary.Base64;public class AmqpJavaClientDemo {    private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);    //业务处理异步线程池,线程池参数可以根据您的业务特点调整;或者您也可以用其他异步方式处理接收到的消息    private final static ExecutorService executorService = new ThreadPoolExecutor(    Runtime.getRuntime().availableProcessors(),    Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,    new LinkedBlockingQueue>(50000));    public static void main(String[] args) throws Exception {//参数说明,请参见文档:AMQP客户端接入说明。String accessKey = "${YourAccessKeyID}";String accessSecret = "${YourAccessKeySecret}";String consumerGroupId = "${YourConsumerGroupId}";  //iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。String iotInstanceId = "";long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5,hmacsha1和hmacsha256String signMethod = "hmacsha1";//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。String clientId =  "ecs_"+System.currentTimeMillis();//UserName组装方法,请参见文档:AMQP客户端接入说明。String userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod + ",timestamp=" + timeStamp + ",authId=" + accessKey + ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|";//password组装方法,请参见文档:AMQP客户端接入说明。String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;String password = doSign(signContent,accessSecret, signMethod);//按照qpid-jms的规范,组装连接URL。   //     String connectionUrl = "failover:(amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671mqp.idleTimeout=80000)"   //             + "ailover.reconnectDelay=30";String connectionUrl = "failover:(amqps://1402577367912554.iot-amqp.cn-shanghai.aliyuncs.com:5671mqp.idleTimeout=80000)" + "ailover.reconnectDelay=30";HashtableString, String> hashtable = new Hashtable>();hashtable.put("connectionfactory.SBCF",connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");Destination queue = (Destination)context.lookup("QUEUE");// Create ConnectionConnection connection = cf.createConnection(userName, password);((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);// Create Session// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// Create Receiver LinkMessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);  }    private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(Message message) {    try { //1.收到消息之后一定要ACK // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。 // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。 // message.acknowledge(); //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。 // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。 executorService.submit(() -> processMessage(message));    } catch (来源:allen_swj
                                                        

声明:本站部分文章及图片转载于互联网,内容版权归原作者所有,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2020年4月12日
下一篇 2020年4月12日

相关推荐