--- This class contains the MQTT client implementation. -- @classmod Client local _M = {} ------- -- load required stuff local type = type local error = error local select = select local require = require local tostring = tostring local os = require("os") local os_time = os.time local string = require("string") local str_format = string.format local str_gsub = string.gsub local table = require("table") local table_remove = table.remove local math = require("math") local math_random = math.random local luamqtt_VERSION local protocol = require("mqtt.protocol") local packet_type = protocol.packet_type local check_qos = protocol.check_qos local next_packet_id = protocol.next_packet_id local packet_id_required = protocol.packet_id_required local protocol4 = require("mqtt.protocol4") local make_packet4 = protocol4.make_packet local parse_packet4 = protocol4.parse_packet local protocol5 = require("mqtt.protocol5") local make_packet5 = protocol5.make_packet local parse_packet5 = protocol5.parse_packet local log = require "mqtt.log" ------- --- MQTT client instance metatable local Client = {} Client.__index = Client --- Create and initialize MQTT client instance. Typically this is not called directly, -- but through `Client.create`. -- @tparam table opts MQTT client creation options table -- @tparam string opts.uri MQTT broker uri to connect. Expected format: --
`[mqtt[s]://][username[:password]@]hostname[:port]` --
Any option specifically added to the options -- table will take precedence over the option specified in this uri. -- @tparam boolean|string opts.clean clean session start flag, use "first" to start clean only on first connect -- @tparam[opt] string opts.protocol either `"mqtt"` or `"mqtts"` -- @tparam[opt] string opts.username username for authorization on MQTT broker -- @tparam[opt] string opts.password password for authorization on MQTT broker; not acceptable in absence of username -- @tparam[opt] string opts.host hostname of the MQTT broker to connect to -- @tparam[opt] int opts.port port number to connect to on the MQTT broker, defaults to `1883` port for plain or `8883` for secure network connections -- @tparam[opt=4] number opts.version MQTT protocol version to use, either `4` (for MQTT v3.1.1) or `5` (for MQTT v5.0). -- Also you may use special values `mqtt.v311` or `mqtt.v50` for this field. -- @tparam[opt] string opts.id MQTT client ID, will be generated by luamqtt library if absent -- @tparam[opt=false] boolean|table opts.secure use secure network connection, provided by the lua module set in `opts.ssl_module`. -- Set to true to select default parameters, check individual `mqtt.connectors` for supported options. -- @tparam[opt] table opts.will will message table with required fields `{ topic="...", payload="..." }` -- and optional fields `{ qos=0...2, retain=true/false }` -- @tparam[opt=60] number opts.keep_alive time interval (in seconds) for client to send PINGREQ packets to the server when network connection is inactive -- @tparam[opt=false] boolean opts.reconnect force created MQTT client to reconnect on connection close. -- Set to number value to provide reconnect timeout in seconds. -- It's not recommended to use values `< 3`. See also `Client:shutdown`. -- @tparam[opt] table opts.connector connector table to open and send/receive packets over network connection. -- default is `require("mqtt.connector")` which tries to auto-detect. See `mqtt.connector`. -- @tparam[opt="ssl"] string opts.ssl_module module name for the luasec-compatible ssl module, default is `"ssl"` -- may be used in some non-standard lua environments with own luasec-compatible ssl module -- @tparam[opt] table opts.on List of event-handlers. See `Client:on` for the format. -- @treturn Client MQTT client instance table -- @usage -- local Client = require "mqtt.client" -- -- local my_client = Client.create { -- uri = "mqtts://broker.host.com", -- clean = "first", -- version = mqtt.v50, -- } function Client:__init(opts) if not luamqtt_VERSION then luamqtt_VERSION = require("mqtt")._VERSION end -- fetch and validate client opts local a = {} -- own client copy of opts for key, value in pairs(opts) do if type(key) ~= "string" then error("expecting string key in opts, got: "..type(key)) end local value_type = type(value) if key == "uri" then assert(value_type == "string", "expecting uri to be a string") a.uri = value elseif key == "clean" then assert(value_type == "boolean" or value == "first", "expecting clean to be a boolean, or 'first'") a.clean = value elseif key == "version" then assert(value_type == "number", "expecting version to be a number") assert(value == 4 or value == 5, "expecting version to be a value either 4 or 5") a.version = value elseif key == "id" then assert(value_type == "string", "expecting id to be a string") a.id = value elseif key == "username" then assert(value_type == "string", "expecting username to be a string") a.username = value elseif key == "password" then assert(value_type == "string", "expecting password to be a string") a.password = value elseif key == "secure" then assert(value_type == "boolean" or value_type == "table", "expecting secure to be a boolean or table") a.secure = value elseif key == "will" then assert(value_type == "table", "expecting will to be a table") a.will = value elseif key == "keep_alive" then assert(value_type == "number", "expecting keep_alive to be a number") a.keep_alive = value elseif key == "properties" then assert(value_type == "table", "expecting properties to be a table") a.properties = value elseif key == "user_properties" then assert(value_type == "table", "expecting user_properties to be a table") a.user_properties = value elseif key == "reconnect" then assert(value_type == "boolean" or value_type == "number", "expecting reconnect to be a boolean or number") a.reconnect = value elseif key == "connector" then a.connector = value elseif key == "ssl_module" then assert(value_type == "string", "expecting ssl_module to be a string") a.ssl_module = value elseif key == "on" then assert(value_type == "table", "expecting 'on' to be a table with events and callbacks") a.on = value else error("unexpected key in client opts: "..key.." = "..tostring(value)) end end -- check required arguments assert(a.uri, 'expecting uri="..." to create MQTT client') assert(a.clean ~= nil, "expecting clean=true, clean=false, or clean='first' to create MQTT client") if not a.id then -- generate random client id a.id = str_format("luamqtt-v%s-%07x", str_gsub(luamqtt_VERSION, "[^%d]", "-"), math_random(1, 0xFFFFFFF)) end -- default connector a.connector = a.connector or require("mqtt.connector") -- default reconnect interval if a.reconnect == true then a.reconnect = 30 end -- validate connector content assert(type(a.connector) == "table", "expecting connector to be a table") assert(type(a.connector.validate) == "function", "expecting connector.validate to be a function") assert(type(a.connector.connect) == "function", "expecting connector.connect to be a function") assert(type(a.connector.shutdown) == "function", "expecting connector.shutdown to be a function") assert(type(a.connector.send) == "function", "expecting connector.send to be a function") assert(type(a.connector.receive) == "function", "expecting connector.receive to be a function") assert(a.connector.signal_closed, "missing connector.signal_closed signal value") assert(a.connector.signal_idle, "missing connector.signal_idle signal value") -- validate connection properties local test_conn = setmetatable({ uri = opts.uri }, a.connector) Client._parse_connection_opts(a, test_conn) test_conn:validate() -- will table content check if a.will then assert(type(a.will.topic) == "string", "expecting will.topic to be a string") assert(type(a.will.payload) == "string", "expecting will.payload to be a string") if a.will.qos ~= nil then assert(type(a.will.qos) == "number", "expecting will.qos to be a number") assert(check_qos(a.will.qos), "expecting will.qos to be a valid QoS value") end if a.will.retain ~= nil then assert(type(a.will.retain) == "boolean", "expecting will.retain to be a boolean") end end -- default keep_alive if not a.keep_alive then a.keep_alive = 60 end -- client opts self.opts = a -- event handlers self.handlers = { connect = {}, subscribe = {}, unsubscribe = {}, message = {}, acknowledge = {}, error = {}, close = {}, auth = {}, shutdown = {}, } self._handling = {} self._to_remove_handlers = {} -- state self.send_time = 0 -- time of the last network send from client side self.first_connect = true -- contains true to perform one network connection attempt after client creation -- Note: remains true, during the connect process. False after succes or failure. -- packet creation/parse functions according version if not a.version then a.version = 4 end if a.version == 4 then self._make_packet = make_packet4 self._parse_packet = parse_packet4 elseif a.version == 5 then self._make_packet = make_packet5 self._parse_packet = parse_packet5 end -- register event handlers if a.on then self:on(self.opts.on) end log:info("MQTT client '%s' created", a.id) end --- Add functions as handlers of given events. -- @tparam table events MQTT client creation options table -- @tparam function events.connect `function(connack_packet, client_obj)`
-- After a connect attempt, after receiving the CONNACK packet from the broker. -- check `connack_packet.rc == 0` for a succesful connect. -- @tparam function events.error `function(errmsg, client_obj [, packet])`
-- on errors, optional `packet` is only provided if the -- received `CONNACK.rc ~= 0` when connecting. -- @tparam function events.close `function(connection_obj, client_obj)`
-- upon closing the connection. `connection_obj.close_reason` -- (string) will hold the close reason. -- @tparam function events.shutdown `function(client_obj)`
-- upon shutting down the client (diconnecting an no more reconnects). -- @tparam function events.subscribe `function(suback_packet, client_obj)`
-- upon a succesful subscription, after receiving the SUBACK packet from the broker -- @tparam function events.unsubscribe `function(unsuback_packet, client_obj)`
-- upon a succesful unsubscription, after receiving the UNSUBACK packet from the broker -- @tparam function events.message `function(publish_packet, client_obj)`
-- upon receiving a PUBLISH packet from the broker -- @tparam function events.acknowledge `function(ack_packet, client_obj)`
-- upon receiving a PUBACK or PUBREC packet from the broker -- @tparam function events.auth `function(auth_packet, client_obj)`
-- upon receiving an AUTH packet -- @usage -- client:on { -- connect = function(pck, self) -- if pck.rc ~= 0 then -- return -- connection failed -- end -- -- succesfully connected -- end, -- message = function(pck, self) -- -- handle received message -- end, -- } -- -- -- an alternative way to add individual handlers; -- client:on("message", function(pck, self) -- -- handle received message -- end) function Client:on(...) local nargs = select("#", ...) local events if nargs == 2 then events = { [select(1, ...)] = select(2, ...) } elseif nargs == 1 then events = select(1, ...) else error("invalid arguments: expected only one or two arguments") end for event, func in pairs(events) do assert(type(event) == "string", "expecting event to be a string") assert(type(func) == "function", "expecting func to be a function") local handlers = self.handlers[event] if not handlers then error("invalid event '"..tostring(event).."' to handle") end handlers[#handlers + 1] = func end end -- Remove one item from the list-table with full-iteration local function remove_item(list, item) for i, test in ipairs(list) do if test == item then table_remove(list, i) return end end end --- Remove given function handler for specified event. -- @tparam string event event name to remove handler -- @tparam function func handler function to remove -- @usage -- local handler = function(pck, self) -- -- handle received message -- end -- -- -- add event handler -- client:on { -- message = handler -- } -- -- -- remove it again -- client:off("message", handler) function Client:off(event, func) local handlers = self.handlers[event] if not handlers then error("invalid event '"..tostring(event).."' to handle") end if self._handling[event] then -- this event is handling now, schedule the function removing to the moment after all handlers will be called for the event local to_remove = self._to_remove_handlers[event] or {} to_remove[#to_remove + 1] = func self._to_remove_handlers[event] = to_remove else -- it's ok to remove given function just now remove_item(handlers, func) end return true end --- Subscribe to specified topic. Returns the SUBSCRIBE packet id and calls optional callback when subscription will be created on broker -- @tparam table opts subscription options -- @tparam string opts.topic topic to subscribe -- @tparam[opt=0] number opts.qos QoS level for subscription -- @tparam boolean opts.no_local for MQTT v5.0 only: no_local flag for subscription -- @tparam boolean opts.retain_as_published for MQTT v5.0 only: retain_as_published flag for subscription -- @tparam boolean opts.retain_handling for MQTT v5.0 only: retain_handling flag for subscription -- @tparam[opt] table opts.properties for MQTT v5.0 only: properties for subscribe operation -- @tparam[opt] table opts.user_properties for MQTT v5.0 only: user properties for subscribe operation -- @tparam[opt] function opts.callback callback function to be called when subscription is acknowledged by broker -- @return packet id on success or false and error message on failure function Client:subscribe(opts) -- fetch and validate opts assert(type(opts) == "table", "expecting opts to be a table") assert(type(opts.topic) == "string", "expecting opts.topic to be a string") assert(opts.qos == nil or (type(opts.qos) == "number" and check_qos(opts.qos)), "expecting valid opts.qos value") assert(opts.no_local == nil or type(opts.no_local) == "boolean", "expecting opts.no_local to be a boolean") assert(opts.retain_as_published == nil or type(opts.retain_as_published) == "boolean", "expecting opts.retain_as_published to be a boolean") assert(opts.retain_handling == nil or type(opts.retain_handling) == "boolean", "expecting opts.retain_handling to be a boolean") assert(opts.properties == nil or type(opts.properties) == "table", "expecting opts.properties to be a table") assert(opts.user_properties == nil or type(opts.user_properties) == "table", "expecting opts.user_properties to be a table") assert(opts.callback == nil or type(opts.callback) == "function", "expecting opts.callback to be a function") -- check connection is alive if not self.connection then return false, "network connection is not opened" end -- create SUBSCRIBE packet local pargs = { type = packet_type.SUBSCRIBE, subscriptions = { { topic = opts.topic, qos = opts.qos, no_local = opts.no_local, retain_as_published = opts.retain_as_published, retain_handling = opts.retain_handling }, }, properties = opts.properties, user_properties = opts.user_properties, } self:_assign_packet_id(pargs) local packet_id = pargs.packet_id local subscribe = self._make_packet(pargs) log:info("subscribing client '%s' to topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.") -- send SUBSCRIBE packet local ok, err = self:_send_packet(subscribe) if not ok then err = "failed to send SUBSCRIBE: "..err log:error("client '%s': %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end -- add subscribe callback local callback = opts.callback if callback then local function handler(suback, ...) if suback.packet_id == packet_id then self:off("subscribe", handler) callback(suback, ...) end end self:on("subscribe", handler) end -- returns assigned packet id return packet_id end --- Unsubscribe from specified topic, and calls optional callback when subscription will be removed on broker -- @tparam table opts subscription options -- @tparam string opts.topic topic to unsubscribe -- @tparam[opt] table opts.properties properties for unsubscribe operation -- @tparam[opt] table opts.user_properties user properties for unsubscribe operation -- @tparam[opt] function opts.callback callback function to be called when the unsubscribe is acknowledged by the broker -- @return packet id on success or false and error message on failure function Client:unsubscribe(opts) -- fetch and validate opts assert(type(opts) == "table", "expecting opts to be a table") assert(type(opts.topic) == "string", "expecting opts.topic to be a string") assert(opts.properties == nil or type(opts.properties) == "table", "expecting opts.properties to be a table") assert(opts.user_properties == nil or type(opts.user_properties) == "table", "expecting opts.user_properties to be a table") assert(opts.callback == nil or type(opts.callback) == "function", "expecting opts.callback to be a function") -- check connection is alive if not self.connection then return false, "network connection is not opened" end -- create UNSUBSCRIBE packet local pargs = { type = packet_type.UNSUBSCRIBE, subscriptions = {opts.topic}, properties = opts.properties, user_properties = opts.user_properties, } self:_assign_packet_id(pargs) local packet_id = pargs.packet_id local unsubscribe = self._make_packet(pargs) log:info("unsubscribing client '%s' from topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.") -- send UNSUBSCRIBE packet local ok, err = self:_send_packet(unsubscribe) if not ok then err = "failed to send UNSUBSCRIBE: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end -- add unsubscribe callback local callback = opts.callback if callback then local function handler(unsuback, ...) if unsuback.packet_id == packet_id then self:off("unsubscribe", handler) callback(unsuback, ...) end end self:on("unsubscribe", handler) end -- returns assigned packet id return packet_id end --- Publish message to broker -- @tparam table opts publish operation options table -- @tparam string opts.topic topic to publish message -- @tparam[opt] string opts.payload publish message payload -- @tparam[opt=0] number opts.qos QoS level for message publication -- @tparam[opt=false] boolean opts.retain retain message publication flag -- @tparam[opt=false] boolean opts.dup dup message publication flag -- @tparam[opt] table opts.properties properties for publishing message -- @tparam[opt] table opts.user_properties user properties for publishing message -- @tparam[opt] function opts.callback callback to call when published message has been acknowledged by the broker -- @return true or packet id on success or false and error message on failure function Client:publish(opts) -- fetch and validate opts assert(type(opts) == "table", "expecting opts to be a table") assert(type(opts.topic) == "string", "expecting opts.topic to be a string") assert(opts.payload == nil or type(opts.payload) == "string", "expecting opts.payload to be a string") assert(opts.qos == nil or type(opts.qos) == "number", "expecting opts.qos to be a number") if opts.qos then assert(check_qos(opts.qos), "expecting qos to be a valid QoS value") end assert(opts.retain == nil or type(opts.retain) == "boolean", "expecting opts.retain to be a boolean") assert(opts.dup == nil or type(opts.dup) == "boolean", "expecting opts.dup to be a boolean") assert(opts.properties == nil or type(opts.properties) == "table", "expecting opts.properties to be a table") assert(opts.user_properties == nil or type(opts.user_properties) == "table", "expecting opts.user_properties to be a table") assert(opts.callback == nil or type(opts.callback) == "function", "expecting opts.callback to be a function") -- check connection is alive local conn = self.connection if not conn then return false, "network connection is not opened" end -- create PUBLISH packet opts.type = packet_type.PUBLISH self:_assign_packet_id(opts) local packet_id = opts.packet_id local publish = self._make_packet(opts) log:debug("client '%s' publishing to topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.") -- send PUBLISH packet local ok, err = self:_send_packet(publish) if not ok then err = "failed to send PUBLISH: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end -- record packet id as waited for QoS 2 exchange if opts.qos == 2 then conn.wait_for_pubrec[packet_id] = true end -- add acknowledge callback local callback = opts.callback if callback then if packet_id then local function handler(ack, ...) if ack.packet_id == packet_id then self:off("acknowledge", handler) callback(ack, ...) end end self:on("acknowledge", handler) else callback("no ack for QoS 0 message", self) end end -- returns assigned packet id return packet_id or true end --- Acknowledge given received message -- @tparam packet_mt msg PUBLISH message to acknowledge -- @tparam[opt=0] number rc The reason code field of PUBACK packet in MQTT v5.0 protocol -- @tparam[opt] table properties properties for PUBACK/PUBREC packets -- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets -- @return true on success or false and error message on failure function Client:acknowledge(msg, rc, properties, user_properties) assert(type(msg) == "table" and msg.type == packet_type.PUBLISH, "expecting msg to be a publish packet") assert(rc == nil or type(rc) == "number", "expecting rc to be a number") assert(properties == nil or type(properties) == "table", "expecting properties to be a table") assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table") -- check connection is alive local conn = self.connection if not conn then return false, "network connection is not opened" end -- check packet needs to be acknowledged local packet_id = msg.packet_id if not packet_id then return true end log:debug("client '%s' acknowledging packet %s", self.opts.id, packet_id or "n.a.") if msg.qos == 1 then -- PUBACK should be sent -- create PUBACK packet local puback = self._make_packet{ type = packet_type.PUBACK, packet_id = packet_id, rc = rc or 0, properties = properties, user_properties = user_properties, } -- send PUBACK packet local ok, err = self:_send_packet(puback) if not ok then err = "failed to send PUBACK: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end elseif msg.qos == 2 then -- PUBREC should be sent and packet_id should be remembered for PUBREL+PUBCOMP sequence -- create PUBREC packet local pubrec = self._make_packet{ type = packet_type.PUBREC, packet_id = packet_id, rc = rc or 0, properties = properties, user_properties = user_properties, } -- send PUBREC packet local ok, err = self:_send_packet(pubrec) if not ok then err = "failed to send PUBREC: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end -- store packet id as waiting for PUBREL conn.wait_for_pubrel[packet_id] = true end return true end --- Send DISCONNECT packet to the broker and close the connection. -- Note: if the client is set to automatically reconnect, it will do so. If you -- want to disconnect and NOT reconnect, use `Client:shutdown`. -- @tparam[opt=0] number rc The Disconnect Reason Code value from MQTT v5.0 protocol -- @tparam[opt] table properties properties for PUBACK/PUBREC packets -- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets -- @return true on success or false and error message on failure function Client:disconnect(rc, properties, user_properties) -- validate opts assert(rc == nil or type(rc) == "number", "expecting rc to be a number") assert(properties == nil or type(properties) == "table", "expecting properties to be a table") assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table") -- check connection is alive if not self.connection then return false, "network connection is not opened" end -- create DISCONNECT packet local disconnect = self._make_packet{ type = packet_type.DISCONNECT, rc = rc or 0, properties = properties, user_properties = user_properties, } log:info("client '%s' disconnecting (rc = %d)", self.opts.id, rc or 0) -- send DISCONNECT packet local ok, err = self:_send_packet(disconnect) if not ok then err = "failed to send DISCONNECT: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end -- now close connection self:close_connection("connection closed by client") return true end --- Shuts the client down. -- Disconnects if still connected, and disables reconnecting. If the client is -- added to an ioloop, this will prevent an automatic reconnect. -- Raises the "shutdown" event. -- @param ... see `Client:disconnect` -- @return `true` function Client:shutdown(...) log:debug("client '%s' shutting down", self.opts.id) self.first_connect = false self.opts.reconnect = false self:disconnect(...) self:handle("shutdown", self) return true end --- Send AUTH packet to authenticate client on broker, in MQTT v5.0 protocol -- @tparam[opt=0] number rc Authenticate Reason Code -- @tparam[opt] table properties properties for PUBACK/PUBREC packets -- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets -- @return true on success or false and error message on failure function Client:auth(rc, properties, user_properties) -- validate opts assert(rc == nil or type(rc) == "number", "expecting rc to be a number") assert(properties == nil or type(properties) == "table", "expecting properties to be a table") assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table") assert(self.opts.version == 5, "allowed only in MQTT v5.0 protocol") -- check connection is alive if not self.connection then return false, "network connection is not opened" end -- create AUTH packet local auth = self._make_packet{ type = packet_type.AUTH, rc = rc or 0, properties = properties, user_properties = user_properties, } log:info("client '%s' authenticating") -- send AUTH packet local ok, err = self:_send_packet(auth) if not ok then err = "failed to send AUTH: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end return true end --- Immediately close established network connection, without graceful session finishing with DISCONNECT packet -- @tparam[opt] string reason the reasong string of connection close function Client:close_connection(reason) assert(not reason or type(reason) == "string", "expecting reason to be a string") local conn = self.connection if not conn then return true end reason = reason or "unspecified" log:info("client '%s' closing connection (reason: %s)", self.opts.id, reason) conn:shutdown() self.connection = nil conn.close_reason = reason self:handle("close", conn, self) return true end --- Start connecting to broker -- @return true on success or false and error message on failure function Client:start_connecting() -- open network connection local ok, err = self:open_connection() if not ok then self.first_connect = false return false, err end -- send CONNECT packet ok, err = self:send_connect() if not ok then self.first_connect = false return false, err end return true end --- Low-level methods -- @section low-level --- Send PINGREQ packet -- @return true on success or false and error message on failure function Client:send_pingreq() -- check connection is alive if not self.connection then return false, "network connection is not opened" end -- create PINGREQ packet local pingreq = self._make_packet{ type = packet_type.PINGREQ, } log:debug("client '%s' sending PINGREQ", self.opts.id) -- send PINGREQ packet local ok, err = self:_send_packet(pingreq) if not ok then err = "failed to send PINGREQ: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end -- set ping timeout; for now 1 ping-request interval self.ping_expire_time = os_time() + self.opts.keep_alive return true end --- Open network connection to the broker -- @return true on success or false and error message on failure function Client:open_connection() if self.connection then return true end local opts = self.opts local connector = assert(opts.connector, "no connector configured in MQTT client") -- create connection table local conn = setmetatable({ uri = opts.uri, wait_for_pubrec = {}, -- a table with packet_id of partially acknowledged sent packets in QoS 2 exchange process wait_for_pubrel = {}, -- a table with packet_id of partially acknowledged received packets in QoS 2 exchange process }, connector) Client._parse_connection_opts(opts, conn) log:info("client '%s' connecting to broker '%s' (using: %s)", self.opts.id, opts.uri, conn.type or "unknown") -- perform connect local ok, err = conn:connect() if not ok then log:error("client '%s' %s", self.opts.id, err) err = "failed to open network connection: "..err self:handle("error", err, self) return false, err end -- assign connection self.connection = conn -- reset ping timeout self.ping_expire_time = nil return true end --- Send CONNECT packet into opened network connection -- @return true on success or false and error message on failure function Client:send_connect() -- check connection is alive if not self.connection then return false, "network connection is not opened" end local opts = self.opts -- create CONNECT packet local connect = self._make_packet{ type = packet_type.CONNECT, id = opts.id, clean = not not opts.clean, -- force to boolean, in case "first" username = opts.username, password = opts.password, will = opts.will, keep_alive = opts.keep_alive, properties = opts.properties, user_properties = opts.user_properties, } log:info("client '%s' sending CONNECT (user '%s')", self.opts.id, opts.username or "not specified") -- send CONNECT packet local ok, err = self:_send_packet(connect) if not ok then err = "failed to send CONNECT: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end -- reset last packet id self._last_packet_id = nil return true end --- Checks last message send, and sends a PINGREQ if necessary. -- Use this function to check and send keep-alives when using an external event loop. When using the -- included modules to add clients (see `mqtt.loop`), this will be taken care of automatically. -- @treturn[1] number time till next keep_alive (in seconds) -- @treturn[2] number time till next keep_alive (in seconds) -- @treturn[2] string in case of errors (eg. not connected) the second return value is an error string -- @usage -- -- example using a Copas event loop to send and check keep-alives -- copas.addthread(function() -- while true do -- if not my_client then -- return -- exiting, client was destroyed -- end -- copas.pause(my_client:check_keep_alive()) -- end -- end) function Client:check_keep_alive() local interval = self.opts.keep_alive if not self.connection then return interval, "network connection is not opened" end local t_now = os_time() local t_next = self.send_time + interval local t_timeout = self.ping_expire_time -- check last ping request if t_timeout and t_timeout <= t_now then -- we timed-out, close and exit local err = str_format("failed to receive PINGRESP within %d seconds", interval) log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return interval, err end -- send PINGREQ if keep_alive interval is reached if t_now >= t_next then local _, err = self:send_pingreq() return interval, err end -- return which ever is earlier, timeout or next ping request if t_timeout and t_timeout < t_next then return t_timeout - t_now end return t_next - t_now end -- Internal methods -- Send PUBREL acknowledge packet - second phase of QoS 2 exchange -- Returns true on success or false and error message on failure function Client:acknowledge_pubrel(packet_id) -- check connection is alive if not self.connection then return false, "network connection is not opened" end -- create PUBREL packet local pubrel = self._make_packet{type=packet_type.PUBREL, packet_id=packet_id, rc=0} log:debug("client '%s' sending PUBREL (packet: %s)", self.opts.id, packet_id or "n.a.") -- send PUBREL packet local ok, err = self:_send_packet(pubrel) if not ok then err = "failed to send PUBREL: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end return true end -- Send PUBCOMP acknowledge packet - last phase of QoS 2 exchange -- Returns true on success or false and error message on failure function Client:acknowledge_pubcomp(packet_id) -- check connection is alive if not self.connection then return false, "network connection is not opened" end -- create PUBCOMP packet local pubcomp = self._make_packet{type=packet_type.PUBCOMP, packet_id=packet_id, rc=0} log:debug("client '%s' sending PUBCOMP (packet: %s)", self.opts.id, packet_id or "n.a.") -- send PUBCOMP packet local ok, err = self:_send_packet(pubcomp) if not ok then err = "failed to send PUBCOMP: "..err log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return false, err end return true end -- Call specified event handlers function Client:handle(event, ...) local handlers = self.handlers[event] if not handlers then error("invalid event '"..tostring(event).."' to handle") end self._handling[event] = true -- protecting self.handlers[event] table from modifications by Client:off() when iterating for _, handler in ipairs(handlers) do handler(...) end self._handling[event] = nil -- process handlers removing, scheduled by Client:off() local to_remove = self._to_remove_handlers[event] if to_remove then for _, func in ipairs(to_remove) do remove_item(handlers, func) end self._to_remove_handlers[event] = nil end end -- Internal methods -- Assign next packet id for given packet creation opts function Client:_assign_packet_id(pargs) if not pargs.packet_id then if packet_id_required(pargs) then self._last_packet_id = next_packet_id(self._last_packet_id) pargs.packet_id = self._last_packet_id end end end -- Handle a single received packet function Client:handle_received_packet(packet) local conn = self.connection local err log:debug("client '%s' received '%s' (packet: %s)", self.opts.id, packet_type[packet.type], packet.packet_id or "n.a.") if not conn.connack then -- expecting only CONNACK packet here if packet.type ~= packet_type.CONNACK then err = "expecting CONNACK but received "..packet.type log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") self.first_connect = false return false, err end -- store connack packet in connection conn.connack = packet -- check CONNACK rc if packet.rc ~= 0 then err = str_format("CONNECT failed with CONNACK [rc=%d]: %s", packet.rc, packet:reason_string()) log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self, packet) self:handle("connect", packet, self) self:close_connection("connection failed") self.first_connect = false return false, err end log:info("client '%s' connected successfully to '%s:%s'", self.opts.id, conn.host, conn.port) -- fire connect event if self.opts.clean == "first" then self.opts.clean = false -- reset clean flag to false, so next connection resumes previous session log:debug("client '%s'; switching clean flag to false (was 'first')", self.opts.id) end self:handle("connect", packet, self) self.first_connect = false else -- connection authorized, so process usual packets -- handle packet according its type local ptype = packet.type if ptype == packet_type.PINGRESP then -- luacheck: ignore -- PINGREQ answer, clear timeout self.ping_expire_time = nil elseif ptype == packet_type.SUBACK then self:handle("subscribe", packet, self) elseif ptype == packet_type.UNSUBACK then self:handle("unsubscribe", packet, self) elseif ptype == packet_type.PUBLISH then -- check such packet is not waiting for pubrel acknowledge self:handle("message", packet, self) elseif ptype == packet_type.PUBACK then self:handle("acknowledge", packet, self) elseif ptype == packet_type.PUBREC then local packet_id = packet.packet_id if conn.wait_for_pubrec[packet_id] then conn.wait_for_pubrec[packet_id] = nil -- send PUBREL acknowledge if self:acknowledge_pubrel(packet_id) then -- and fire acknowledge event self:handle("acknowledge", packet, self) end end elseif ptype == packet_type.PUBREL then -- second phase of QoS 2 exchange - check we are already acknowledged such packet by PUBREL local packet_id = packet.packet_id if conn.wait_for_pubrel[packet_id] then -- remove packet from waiting for PUBREL packets table conn.wait_for_pubrel[packet_id] = nil -- send PUBCOMP acknowledge return self:acknowledge_pubcomp(packet_id) end elseif ptype == packet_type.PUBCOMP then --luacheck: ignore -- last phase of QoS 2 exchange -- do nothing here elseif ptype == packet_type.DISCONNECT then self:close_connection("disconnect received from broker") elseif ptype == packet_type.AUTH then self:handle("auth", packet, self) else log:warn("client '%s' don't know how to handle %s", self.opts.id, ptype) end end return true end do -- implict (re)connecting when reading local function implicit_connect(self) local reconnect = self.opts.reconnect if not self.first_connect and not reconnect then -- this would be a re-connect, but we're not supposed to auto-reconnect log:debug("client '%s' was disconnected and not set to auto-reconnect", self.opts.id) return false, "network connection is not opened" end -- should we wait for a timeout between retries? local t_reconnect = (self.last_connect_time or 0) + (reconnect or 0) local t_now = os_time() if t_reconnect > t_now then -- were delaying before retrying, return remaining delay return t_reconnect - t_now end self.last_connect_time = t_now local ok, err = self:start_connecting() if not ok then -- we failed to connect return reconnect, err end -- connected succesfully, but don't know how long it took, so return now -- to be nice to other clients. Return 0 to indicate ready-for-reading. return 0 end --- Performs a single IO loop step. -- It will connect if not connected, will re-connect if set to. -- This should be called repeatedly in a loop. When using the included modules to -- add clients (see `mqtt.loop`), this will be taken care of automatically. -- -- The return value is the time after which this method must be called again. -- It can be called sooner, but shouldn't be called later. -- @return[1] `-1`: the socket read timed out, so it is idle. This return code is only -- returned with buffered connectors (luasocket), never for yielding sockets -- (Copas or OpenResty) -- @return[2] `0`: a packet was succesfully handled, so retry immediately, no delays, -- in case additional data is waiting to be read on the socket. -- @return[3] `>0`: The reconnect timer needs a delay before it can retry (calling -- sooner is not a problem, it will only reconnect when the delay -- has actually passed) -- @return[4] nil -- @return[4] error message function Client:step() local conn = self.connection local reconnect = self.opts.reconnect -- try and connect if not connected yet if not conn then return implicit_connect(self) end local packet, err = self:_receive_packet() if not packet then if err == conn.signal_idle then -- connection was idle, nothing happened return -1 elseif err == conn.signal_closed then self:close_connection("connection closed by broker") return reconnect and 0, err else err = "failed to receive next packet: "..tostring(err) log:error("client '%s' %s", self.opts.id, err) self:handle("error", err, self) self:close_connection("error") return reconnect and 0, err end end local ok ok, err = self:handle_received_packet(packet) if not ok then return reconnect and 0, err end -- succesfully handled packed, maybe there is more, so retry asap return 0 end end -- Fill given connection table with host and port according given opts -- uri: mqtt[s]://[username][:password]@host.domain[:port] function Client._parse_connection_opts(opts, conn) local uri = assert(conn.uri) -- protocol local uriprotocol = uri:match("^([%a%d%-]-)://") if uriprotocol then uriprotocol = uriprotocol:lower() uri = uri:gsub("^[%a%d%-]-://","") if uriprotocol == "mqtts" and opts.secure == nil then opts.secure = true -- NOTE: goes into client 'opts' table, not in 'conn' elseif uriprotocol == "mqtt" and opts.secure == nil then opts.secure = false -- NOTE: goes into client 'opts' table, not in 'conn' elseif uriprotocol == "mqtts" and opts.secure == false then error("cannot use protocol 'mqtts' with 'secure=false'") elseif uriprotocol == "mqtt" and opts.secure then error("cannot use protocol 'mqtt' with 'secure=true|table'") end else -- no protocol info found in uri if opts.secure then uriprotocol = "mqtts" else uriprotocol = "mqtt" end end conn.protocol = opts.protocol or uriprotocol assert(type(conn.protocol) == "string", "expected protocol to be a string") assert(conn.protocol:match("mqtts?"), "only 'mqtt(s)' protocol is supported in the uri, got '"..tostring(conn.protocol).."'") -- print("protocol: ", uriprotocol) -- creds, host/port local creds, host_port if uri:find("@") then host_port = uri:match("@(.-)$"):lower() creds = uri:gsub("@.-$", "") else host_port = uri end -- print("creds: ", creds) -- print("host_port:", host_port) -- host-port local host, port = host_port:match("^([^:]+):?([^:]*)$") if port and #port > 0 then port = assert(tonumber(port), "port in uri must be numeric, got: '"..port.."'") else port = nil end -- print("port: ", port) -- print("host: ", host) conn.host = opts.host or host assert(type(conn.host) == "string", "expected host to be a string") -- default port conn.port = opts.port or port if not conn.port then if opts.secure then conn.port = 8883 -- default MQTT secure connection port else conn.port = 1883 -- default MQTT connection port end end assert(type(conn.port) == "number", "expected port to be a number") -- username-password local username, password if creds then username, password = creds:match("^([^:]+):?([^:]*)$") if password and #password == 0 then password = nil end end -- NOTE: these go into client 'opts' table, not in 'conn' opts.username = opts.username or username assert(opts.username == nil or type(opts.username) == "string", "expected username to be a string") opts.password = opts.password or password assert(opts.password == nil or type(opts.password) == "string", "expected password to be a string") assert(not conn.password or conn.username, "password is not accepted in absence of username") -- print("username: ", username) -- print("password: ", password) local secure = opts.secure if secure then conn.secure = true conn.secure_params = secure ~= true and secure or nil conn.ssl_module = opts.ssl_module or "ssl" assert(conn.ssl_module == nil or type(conn.ssl_module) == "string", "expected ssl_module to be a string") else -- sanity conn.secure = false conn.secure_params = nil conn.ssl_module = nil end end -- Send given packet to opened network connection function Client:_send_packet(packet) local conn = self.connection if not conn then return false, "network connection is not opened" end local data = tostring(packet) local len = data:len() if len <= 0 then return false, "sending empty packet" end -- and send binary packet to network connection local ok, err = conn:send(data) if not ok then return false, "connector.send failed: "..err end self.send_time = os_time() return true end -- Receive one packet from established network connection function Client:_receive_packet() local conn = self.connection if not conn then return false, "network connection is not opened" end -- read & parse packet local packet, err = self._parse_packet( function(size) return conn:receive(size) end ) if packet then -- succesful packet, clear handled data and return it conn:buffer_clear() return packet end -- check if we need more data, if not, clear the buffer because were done with -- the data in that case if err == conn.signal_idle then -- we need more data, so do not clear buffer, just return the error return false, err end -- some other error, can't use buffered data, dispose of it conn:buffer_clear() return false, err end -- Represent MQTT client as string function Client:__tostring() return str_format("mqtt.client{id=%q}", tostring(self.opts.id)) end -- Garbage collection handler function Client:__gc() -- close network connection if it's available, without sending DISCONNECT packet if self.connection then self:close_connection("garbage") end end --- Exported functions -- @section exported --- Create, initialize and return new MQTT client instance -- @name client.create -- @param ... see arguments of `Client:__init` -- @see Client:__init -- @treturn Client MQTT client instance function _M.create(opts) local cl = setmetatable({}, Client) cl:__init(opts) return cl end ------- if _G._TEST then -- export functions for test purposes (different name!) _M.__parse_connection_opts = Client._parse_connection_opts end -- export module table return _M -- vim: ts=4 sts=4 sw=4 noet ft=lua