Skip to content

Commit

Permalink
MQTT connect configuration (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler authored Nov 3, 2023
1 parent e074c5c commit 345ef16
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 24 deletions.
38 changes: 32 additions & 6 deletions Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extension MQTTClient {
/// - queue: Dispatch Queue to run shutdown on
public func shutdown(queue: DispatchQueue = .global()) async throws {
return try await withUnsafeThrowingContinuation { cont in
shutdown(queue: queue) { error in
self.shutdown(queue: queue) { error in
if let error = error {
cont.resume(throwing: error)
} else {
Expand All @@ -39,15 +39,16 @@ extension MQTTClient {

/// Connect to MQTT server
///
/// Completes when CONNACK is received
///
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier).
/// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session
/// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on
/// state from the current Session (as identified by the Client identifier). If there is no Session
/// associated with the Client identifier the Server MUST create a new Session. The Client and Server
/// MUST store the Session after the Client and Server are disconnected. If set to true then the Client
/// and Server MUST discard any previous Session and start a new one
///
/// - Parameters:
/// - cleanSession: should we start with a new session
/// - will: Publish message to be posted as soon as connection is made
/// - Returns: EventLoopFuture to be updated with whether server holds a session for this client
/// - Returns: Whether server held a session for this client and has restored it.
@discardableResult public func connect(
cleanSession: Bool = true,
Expand All @@ -56,6 +57,31 @@ extension MQTTClient {
return try await self.connect(cleanSession: cleanSession, will: will).get()
}

/// Connect to MQTT server
///
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on
/// state from the current Session (as identified by the Client identifier). If there is no Session
/// associated with the Client identifier the Server MUST create a new Session. The Client and Server
/// MUST store the Session after the Client and Server are disconnected. If set to true then the Client
/// and Server MUST discard any previous Session and start a new one
///
/// - Parameters:
/// - cleanSession: should we start with a new session
/// - will: Publish message to be posted as soon as connection is made
/// - Returns: EventLoopFuture to be updated with whether server holds a session for this client
/// - Returns: Whether server held a session for this client and has restored it.
@discardableResult public func connect(
cleanSession: Bool = true,
will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool)? = nil,
connectConfiguration: ConnectConfiguration
) async throws -> Bool {
return try await self.connect(
cleanSession: cleanSession,
will: will, connectConfiguration:
connectConfiguration
).get()
}

/// Publish message to topic
///
/// Depending on QoS completes when message is sent, when PUBACK is received or when PUBREC
Expand Down
43 changes: 39 additions & 4 deletions Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import NIOCore
extension MQTTClient.V5 {
/// Connect to MQTT server
///
/// If `cleanStart` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier).
/// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session
/// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one
/// If `cleanStart` is set to false the Server MUST resume communications with the Client based on
/// state from the current Session (as identified by the Client identifier). If there is no Session
/// associated with the Client identifier the Server MUST create a new Session. The Client and Server
/// MUST store the Session after the Client and Server are disconnected. If set to true then the
/// Client and Server MUST discard any previous Session and start a new one
///
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
///
Expand All @@ -29,7 +31,7 @@ extension MQTTClient.V5 {
/// - properties: properties to attach to connect message
/// - will: Publish message to be posted as soon as connection is made
/// - authWorkflow: The authentication workflow. This is currently unimplemented.
/// - Returns: EventLoopFuture to be updated with connack
/// - Returns: CONNACK response
public func connect(
cleanStart: Bool = true,
properties: MQTTProperties = .init(),
Expand All @@ -39,6 +41,39 @@ extension MQTTClient.V5 {
return try await self.connect(cleanStart: cleanStart, properties: properties, will: will, authWorkflow: authWorkflow).get()
}

/// Connect to MQTT server
///
/// If `cleanStart` is set to false the Server MUST resume communications with the Client based on
/// state from the current Session (as identified by the Client identifier). If there is no Session
/// associated with the Client identifier the Server MUST create a new Session. The Client and Server
/// MUST store the Session after the Client and Server are disconnected. If set to true then the
/// Client and Server MUST discard any previous Session and start a new one
///
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
///
/// - Parameters:
/// - cleanStart: should we start with a new session
/// - properties: properties to attach to connect message
/// - will: Publish message to be posted as soon as connection is made
/// - authWorkflow: The authentication workflow. This is currently unimplemented.
/// - connectConfiguration: Override client configuration during connection
/// - Returns: CONNACK response
public func connect(
cleanStart: Bool = true,
properties: MQTTProperties = .init(),
will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool, properties: MQTTProperties)? = nil,
authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture<MQTTAuthV5>)? = nil,
connectConfiguration: MQTTClient.ConnectConfiguration
) async throws -> MQTTConnackV5 {
return try await self.connect(
cleanStart: cleanStart,
properties: properties,
will: will,
authWorkflow: authWorkflow,
connectConfiguration: connectConfiguration
).get()
}

/// Publish message to topic
/// - Parameters:
/// - topicName: Topic name on which the message is published
Expand Down
42 changes: 36 additions & 6 deletions Sources/MQTTNIO/MQTTClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,11 @@ public final class MQTTClient {

/// Connect to MQTT server
///
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier).
/// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session
/// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on
/// state from the current Session (as identified by the Client identifier). If there is no Session
/// associated with the Client identifier the Server MUST create a new Session. The Client and Server
/// MUST store the Session after the Client and Server are disconnected. If set to true then the Client
/// and Server MUST discard any previous Session and start a new one
///
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
///
Expand All @@ -255,6 +257,33 @@ public final class MQTTClient {
public func connect(
cleanSession: Bool = true,
will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool)? = nil
) -> EventLoopFuture<Bool> {
self.connect(
cleanSession: cleanSession,
will: will,
connectConfiguration: .init()
)
}

/// Connect to MQTT server
///
/// If `cleanSession` is set to false the Server MUST resume communications with the Client based on
/// state from the current Session (as identified by the Client identifier). If there is no Session
/// associated with the Client identifier the Server MUST create a new Session. The Client and Server
/// MUST store the Session after the Client and Server are disconnected. If set to true then the Client
/// and Server MUST discard any previous Session and start a new one
///
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
///
/// - Parameters:
/// - cleanSession: should we start with a new session
/// - will: Publish message to be posted as soon as connection is made
/// - connectConfiguration: Override client configuration during connection
/// - Returns: EventLoopFuture to be updated with whether server holds a session for this client
public func connect(
cleanSession: Bool = true,
will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool)? = nil,
connectConfiguration: ConnectConfiguration
) -> EventLoopFuture<Bool> {
let publish = will.map {
MQTTPublishInfo(
Expand All @@ -270,12 +299,13 @@ public final class MQTTClient {
if self.configuration.version == .v5_0, cleanSession == false {
properties.append(.sessionExpiryInterval(0xFFFF_FFFF))
}
let keepAliveInterval = connectConfiguration.keepAliveInterval ?? self.configuration.keepAliveInterval
let packet = MQTTConnectPacket(
cleanSession: cleanSession,
keepAliveSeconds: UInt16(configuration.keepAliveInterval.nanoseconds / 1_000_000_000),
keepAliveSeconds: UInt16(keepAliveInterval.nanoseconds / 1_000_000_000),
clientIdentifier: self.identifier,
userName: self.configuration.userName,
password: self.configuration.password,
userName: connectConfiguration.userName ?? self.configuration.userName,
password: connectConfiguration.password ?? self.configuration.password,
properties: properties,
will: publish
)
Expand Down
52 changes: 44 additions & 8 deletions Sources/MQTTNIO/MQTTClientV5.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ extension MQTTClient {

/// Connect to MQTT server
///
/// If `cleanStart` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier).
/// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session
/// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one
/// If `cleanStart` is set to false the Server MUST resume communications with the Client based on
/// state from the current Session (as identified by the Client identifier). If there is no Session
/// associated with the Client identifier the Server MUST create a new Session. The Client and Server
/// MUST store the Session after the Client and Server are disconnected. If set to true then the
/// Client and Server MUST discard any previous Session and start a new one
///
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
///
Expand All @@ -37,6 +39,39 @@ extension MQTTClient {
properties: MQTTProperties = .init(),
will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool, properties: MQTTProperties)? = nil,
authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture<MQTTAuthV5>)? = nil
) -> EventLoopFuture<MQTTConnackV5> {
self.connect(
cleanStart: cleanStart,
properties: properties,
will: will,
authWorkflow: authWorkflow,
connectConfiguration: .init()
)
}

/// Connect to MQTT server
///
/// If `cleanStart` is set to false the Server MUST resume communications with the Client based on
/// state from the current Session (as identified by the Client identifier). If there is no Session
/// associated with the Client identifier the Server MUST create a new Session. The Client and Server
/// MUST store the Session after the Client and Server are disconnected. If set to true then the
/// Client and Server MUST discard any previous Session and start a new one
///
/// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client.
///
/// - Parameters:
/// - cleanStart: should we start with a new session
/// - properties: properties to attach to connect message
/// - will: Publish message to be posted as soon as connection is made
/// - authWorkflow: The authentication workflow. This is currently unimplemented.
/// - connectConfiguration: Override client configuration during connection
/// - Returns: EventLoopFuture to be updated with connack
public func connect(
cleanStart: Bool = true,
properties: MQTTProperties = .init(),
will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool, properties: MQTTProperties)? = nil,
authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture<MQTTAuthV5>)? = nil,
connectConfiguration: ConnectConfiguration
) -> EventLoopFuture<MQTTConnackV5> {
let publish = will.map {
MQTTPublishInfo(
Expand All @@ -48,17 +83,18 @@ extension MQTTClient {
properties: $0.properties
)
}
let keepAliveInterval = connectConfiguration.keepAliveInterval ?? self.client.configuration.keepAliveInterval
let packet = MQTTConnectPacket(
cleanSession: cleanStart,
keepAliveSeconds: UInt16(client.configuration.keepAliveInterval.nanoseconds / 1_000_000_000),
keepAliveSeconds: UInt16(keepAliveInterval.nanoseconds / 1_000_000_000),
clientIdentifier: self.client.identifier,
userName: self.client.configuration.userName,
password: self.client.configuration.password,
userName: connectConfiguration.userName ?? self.client.configuration.userName,
password: connectConfiguration.password ?? self.client.configuration.password,
properties: properties,
will: publish
)

return self.client.connect(packet: packet).map {
return self.client.connect(packet: packet, authWorkflow: authWorkflow).map {
.init(
sessionPresent: $0.sessionPresent,
reason: MQTTReasonCode(rawValue: $0.returnCode) ?? .unrecognisedReason,
Expand Down Expand Up @@ -152,7 +188,7 @@ extension MQTTClient {
return eventLoop.makeSucceededFuture(auth)
}
guard let authWorkflow = authWorkflow else { return eventLoop.makeFailedFuture(MQTTError.authWorkflowRequired) }
return client.processAuth(authPacket, authWorkflow: authWorkflow, on: eventLoop)
return self.client.processAuth(authPacket, authWorkflow: authWorkflow, on: eventLoop)
}
.flatMapThrowing { response -> MQTTAuthV5 in
guard let auth = response as? MQTTAuthPacket else { throw MQTTError.unexpectedMessage }
Expand Down
26 changes: 26 additions & 0 deletions Sources/MQTTNIO/MQTTConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,30 @@ extension MQTTClient {
/// WebSocket configuration
public let webSocketConfiguration: WebSocketConfiguration?
}

/// Configuration used at connection time to override values stored in the MQTTClient.Configuration
public struct ConnectConfiguration {
/// MQTT user name.
public let userName: String?
/// MQTT password.
public let password: String?
/// MQTT keep alive period.
public let keepAliveInterval: TimeAmount?

/// Initialize MQTTClient connect configuration struct
///
/// - Parameters:
/// - keepAliveInterval: MQTT keep alive period.
/// - userName: MQTT user name
/// - password: MQTT password
public init(
keepAliveInterval: TimeAmount? = nil,
userName: String? = nil,
password: String? = nil
) {
self.keepAliveInterval = keepAliveInterval
self.userName = userName
self.password = password
}
}
}

0 comments on commit 345ef16

Please sign in to comment.