使用MQTTnet部署MQTT服务
7/11/2020 6:34:47 PM
一. 服务端
1. 创建配置参数
可以使用 `var options = new MqttServerOptions();` 直接构建一个options。你也可以通过参数构建器 `var options = new MqttServerOptionsBuilder();` 使代码更简洁美观。
构建器的函数说明:
函数名 |
功能说明 |
Build |
构建配置参数 |
WithApplicationMessageInterceptor |
允许处理来自客户端的所有已发布消息 |
WithClientId |
服务端发布消息时使用的ClientId |
WithConnectionBacklog |
设置要保留的连接数 |
WithConnectionValidator |
验证连接 |
WithDefaultCommunicationTimeout |
设置默认的通信超时 |
WithDefaultEndpoint |
使用默认端点 |
WithDefaultEndpointBoundIPAddress |
使用默认端点IPv4地址 |
WithDefaultEndpointBoundIPV6Address |
使用默认端点IPv6地址 |
WithDefaultEndpointPort |
使用默认端点端口 |
WithEncryptedEndpoint |
使用加密的端点 |
WithEncryptedEndpointBoundIPAddress |
使用加密的端点IPv4地址 |
WithEncryptedEndpointBoundIPV6Address |
使用加密的端点IPv6地址 |
WithEncryptedEndpointPort |
使用加密的端点端口 |
WithEncryptionCertificate |
使用证书进行SSL连接 |
WithEncryptionSslProtocol |
使用SSL协议级别 |
WithMaxPendingMessagesPerClient |
每个客户端允许最多未决消息 |
WithPersistentSessions |
保持会话 |
WithStorage |
使用存储 |
WithSubscriptionInterceptor |
允许处理来自客户端的所有订阅 |
WithoutDefaultEndpoint |
禁用默认端点 |
WithoutEncryptedEndpoint |
禁用默认(SSL)端点 |
验证账号密码
- options.WithConnectionValidator(c =>
- {
- if (c.Username != "seven")
- {
- c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
- }
- });
2. 启动服务端
- var server = new MqttFactory().CreateMqttServer();
- server.StartAsync(options.Build());
服务启动事件
- server.StartedHandler = new MqttServerStartedHandlerDelegate(Started);
-
- static async Task Started(EventArgs e)
- {
- Console.WriteLine("Started");
- }
服务关闭事件
- server.StoppedHandler = new MqttServerStoppedHandlerDelegate(Stopped);
-
- static async Task Stopped(EventArgs e)
- {
- Console.WriteLine("Stopped");
- }
客户端连接事件
- server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(Connected);
- server.UseClientConnectedHandler(c => Connected(c));
-
- static async Task Connected(MqttServerClientConnectedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} connected");
- }
客户端断开事件
- server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(Disconnected);
- server.UseClientDisconnectedHandler(c => Disconnected(c));
-
- static async Task Disconnected(MqttServerClientDisconnectedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} disconnected");
- }
客户端订阅Topic
- server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(Subscribed);
-
- static async Task Subscribed(MqttServerClientSubscribedTopicEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} subscribed {e.TopicFilter.Topic}");
- }
客户端取消订阅Topic
- server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(c => Unsubscribed(c));
-
- static async Task Unsubscribed(MqttServerClientUnsubscribedTopicEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} unsubscribed {e.TopicFilter}");
- }
消息接收
- server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceived);
- server.UseApplicationMessageReceivedHandler(c => MessageReceived(c));
- static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} get {e.ApplicationMessage.Topic}");
- }
3. 操作
发布消息
- var message = new MqttApplicationMessage()
- {
- Topic = "testTopic",
- Payload = Encoding.UTF8.GetBytes("hello seven")
- };
- server.PublishAsync(message);
查询客户端状态
- var list = server.GetClientStatusAsync().Result;
获取会话信息
- var sessions = server.GetSessionStatusAsync().Result;
查询Retain的消息
- var messages = server.GetRetainedApplicationMessagesAsync().Result;
清空Retain消息
- server.ClearRetainedApplicationMessagesAsync();
停止服务
- server.StopAsync();
4. 其他
自带的日志跟踪
- var logger = new MqttNetLogger();
- logger.LogMessagePublished += LogMessagePublished;
-
- private static void LogMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
- {
- Console.WriteLine(e.LogMessage);
- Console.WriteLine(e.TraceMessage);
- }
二. 客户端
1. 创建配置参数
可以使用 `var options = new MqttClientOptions();` 直接构建一个options。你也可以通过参数构建器 `var options = new MqttClientOptionsBuilder();` 使代码更简洁美观。
构建器的函数说明:
函数名 |
功能说明 |
Build |
构建配置参数 |
WithAuthentication |
允许使用不同的身份验证模式 |
WithCleanSession |
将客户端与MQTT干净会话支持一起使用 |
WithClientId |
设置客户端ID |
WithCommunicationTimeout |
设置通信超时 |
WithCredentials |
设置登录凭证 |
WithExtendedAuthenticationExchangeHandler |
以自定义方式处理身份验证 |
WithKeepAlivePeriod |
设置保持有效期 |
WithKeepAliveSendInterval |
设置保活的发送间隔 |
WithMaximumPacketSize |
设置最大数据包大小 |
WithNoKeepAlive |
不要使用保持活动状态 |
WithProtocolVersion |
设置MQTT协议版本 |
WithProxy |
设置代理 |
WithTls |
客户端使用SSL/TLS |
WithTopicAliasMaximum |
允许最大数量的主题别名 |
WithReceiveMaximum |
允许最大数量的已接收数据包 |
WithRequestProblemInformation |
显示请求问题信息 |
WithRequestResponseInformation |
显示请求响应问题信息 |
WithSessionExpiryInterval |
一段时间后终止会话 |
WithTcpServer |
告诉客户端(通过TCP)连接到哪个MQTT代理。 |
WithWebSocketServer |
告诉客户端(通过WebSocket)连接到哪个MQTT代理 |
WithWillMessage |
告诉客户端最后一条消息将被发送。 |
WithWillDelayInterval |
告诉客户端最后一个消息得延迟间隔 |
2. 连接服务端
- var client = new MqttFactory().CreateMqttClient();
- client.ConnectAsync(options.Build());
客户端连接完成事件
- client.UseConnectedHandler(e => Connected(e));
-
- static async Task Connected(MqttClientConnectedEventArgs e)
- {
- Console.WriteLine($"Connected");
- }
客户端断开事件
- client.UseDisconnectedHandler(e => Disconnected(e));
-
- static async Task Disconnected(MqttClientDisconnectedEventArgs e)
- {
- Console.WriteLine($"Disconnected");
- }
订阅主题(必须在成功连接以后才生效)
- client.UseApplicationMessageReceivedHandler(e => MessageReceived(e));
-
- static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId}");
- }
发布消息( 必须在成功连接以后才生效 )
- var message = new MqttApplicationMessageBuilder()
- .WithTopic("myTopic")
- .WithPayload("seven365.cn")
- .WithExactlyOnceQoS()
- .WithRetainFlag()
- .Build();
- client.PublishAsync(message, CancellationToken.None);
3. 操作
客户端重连
- client.ReconnectAsync();
客户端断开
- client.DisconnectAsync