使用MQTTnet部署MQTT服务

7/11/2020 6:34:47 PM

一. 服务端

1. 创建配置参数

可以使用 `var options = new MqttServerOptions();` 直接构建一个options。你也可以通过参数构建器 `var options = new MqttServerOptionsBuilder();` 使代码更简洁美观。

构建器的函数说明:

函数名 功能说明
Build 构建配置参数
WithApplicationMessageInterceptor 允许处理来自客户端的所有已发布消息
WithClientId 服务端发布消息时使用的ClientId
WithConnectionBacklog 设置要保留的连接数
WithConnectionValidator 验证连接
WithDefaultCommunicationTimeout 设置默认的通信超时
WithDefaultEndpoint 使用默认端点
WithDefaultEndpointBoundIPAddress 使用默认端点IPv4地址
WithDefaultEndpointBoundIPV6Address 使用默认端点IPv6地址
WithDefaultEndpointPort 使用默认端点端口
WithEncryptedEndpoint 使用加密的端点
WithEncryptedEndpointBoundIPAddress 使用加密的端点IPv4地址
WithEncryptedEndpointBoundIPV6Address 使用加密的端点IPv6地址
WithEncryptedEndpointPort 使用加密的端点端口
WithEncryptionCertificate 使用证书进行SSL连接
WithEncryptionSslProtocol 使用SSL协议级别
WithMaxPendingMessagesPerClient 每个客户端允许最多未决消息
WithPersistentSessions 保持会话
WithStorage 使用存储
WithSubscriptionInterceptor 允许处理来自客户端的所有订阅
WithoutDefaultEndpoint 禁用默认端点
WithoutEncryptedEndpoint 禁用默认(SSL)端点

验证账号密码

  1. options.WithConnectionValidator(c =>
  2. {
  3. if (c.Username != "seven")
  4. {
  5. c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  6. }
  7. });

2. 启动服务端

  1. var server = new MqttFactory().CreateMqttServer();
  2. server.StartAsync(options.Build());

服务启动事件

  1. server.StartedHandler = new MqttServerStartedHandlerDelegate(Started);
  2.  
  3. static async Task Started(EventArgs e)
  4. {
  5. Console.WriteLine("Started");
  6. }

服务关闭事件

  1. server.StoppedHandler = new MqttServerStoppedHandlerDelegate(Stopped);
  2.  
  3. static async Task Stopped(EventArgs e)
  4. {
  5. Console.WriteLine("Stopped");
  6. }

客户端连接事件

  1. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(Connected);
  1. server.UseClientConnectedHandler(c => Connected(c));
  2.  
  3. static async Task Connected(MqttServerClientConnectedEventArgs e)
  4. {
  5. Console.WriteLine($"{e.ClientId} connected");
  6. }

客户端断开事件

  1. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(Disconnected);
  1. server.UseClientDisconnectedHandler(c => Disconnected(c));
  2.  
  3. static async Task Disconnected(MqttServerClientDisconnectedEventArgs e)
  4. {
  5. Console.WriteLine($"{e.ClientId} disconnected");
  6. }

客户端订阅Topic

  1. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(Subscribed);
  2.  
  3. static async Task Subscribed(MqttServerClientSubscribedTopicEventArgs e)
  4. {
  5. Console.WriteLine($"{e.ClientId} subscribed {e.TopicFilter.Topic}");
  6. }

客户端取消订阅Topic

  1. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(c => Unsubscribed(c));
  2.  
  3. static async Task Unsubscribed(MqttServerClientUnsubscribedTopicEventArgs e)
  4. {
  5. Console.WriteLine($"{e.ClientId} unsubscribed {e.TopicFilter}");
  6. }

消息接收

  1. server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceived);
  1. server.UseApplicationMessageReceivedHandler(c => MessageReceived(c));
  2. static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
  3. {
  4. Console.WriteLine($"{e.ClientId} get {e.ApplicationMessage.Topic}");
  5. }

3. 操作

发布消息

  1. var message = new MqttApplicationMessage()
  2. {
  3. Topic = "testTopic",
  4. Payload = Encoding.UTF8.GetBytes("hello seven")
  5. };
  6. server.PublishAsync(message);

查询客户端状态

  1. var list = server.GetClientStatusAsync().Result;

获取会话信息

  1. var sessions = server.GetSessionStatusAsync().Result;

查询Retain的消息

  1. var messages = server.GetRetainedApplicationMessagesAsync().Result;

清空Retain消息

  1. server.ClearRetainedApplicationMessagesAsync();

停止服务

  1. server.StopAsync();

4. 其他

自带的日志跟踪

  1. var logger = new MqttNetLogger();
  2. logger.LogMessagePublished += LogMessagePublished;
  3.  
  4. private static void LogMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  5. {
  6. Console.WriteLine(e.LogMessage);
  7. Console.WriteLine(e.TraceMessage);
  8. }

二. 客户端

1. 创建配置参数

可以使用 `var options = new MqttClientOptions();` 直接构建一个options。你也可以通过参数构建器 `var options = new MqttClientOptionsBuilder();` 使代码更简洁美观。

构建器的函数说明:

函数名 功能说明
Build 构建配置参数
WithAuthentication 允许使用不同的身份验证模式
WithCleanSession 将客户端与MQTT干净会话支持一起使用
WithClientId 设置客户端ID
WithCommunicationTimeout 设置通信超时
WithCredentials 设置登录凭证
WithExtendedAuthenticationExchangeHandler 以自定义方式处理身份验证
WithKeepAlivePeriod 设置保持有效期
WithKeepAliveSendInterval 设置保活的发送间隔
WithMaximumPacketSize 设置最大数据包大小
WithNoKeepAlive 不要使用保持活动状态
WithProtocolVersion 设置MQTT协议版本
WithProxy 设置代理
WithTls 客户端使用SSL/TLS
WithTopicAliasMaximum 允许最大数量的主题别名
WithReceiveMaximum 允许最大数量的已接收数据包
WithRequestProblemInformation 显示请求问题信息
WithRequestResponseInformation 显示请求响应问题信息
WithSessionExpiryInterval 一段时间后终止会话
WithTcpServer 告诉客户端(通过TCP)连接到哪个MQTT代理。
WithWebSocketServer 告诉客户端(通过WebSocket)连接到哪个MQTT代理
WithWillMessage 告诉客户端最后一条消息将被发送。
WithWillDelayInterval 告诉客户端最后一个消息得延迟间隔

2. 连接服务端

  1. var client = new MqttFactory().CreateMqttClient();
  2. client.ConnectAsync(options.Build());

客户端连接完成事件

  1. client.UseConnectedHandler(e => Connected(e));
  2.  
  3. static async Task Connected(MqttClientConnectedEventArgs e)
  4. {
  5. Console.WriteLine($"Connected");
  6. }

客户端断开事件

  1. client.UseDisconnectedHandler(e => Disconnected(e));
  2.  
  3. static async Task Disconnected(MqttClientDisconnectedEventArgs e)
  4. {
  5. Console.WriteLine($"Disconnected");
  6. }

订阅主题(必须在成功连接以后才生效)

  1. client.UseApplicationMessageReceivedHandler(e => MessageReceived(e));
  2.  
  3. static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
  4. {
  5. Console.WriteLine($"{e.ClientId}");
  6. }

发布消息( 必须在成功连接以后才生效 )

  1. var message = new MqttApplicationMessageBuilder()
  2. .WithTopic("myTopic")
  3. .WithPayload("seven365.cn")
  4. .WithExactlyOnceQoS()
  5. .WithRetainFlag()
  6. .Build();
  7. client.PublishAsync(message, CancellationToken.None);

3. 操作

客户端重连

  1. client.ReconnectAsync();

客户端断开

  1. client.DisconnectAsync