1219 lines
38 KiB
Lua
1219 lines
38 KiB
Lua
--- MQTT client module
|
|
-- @module mqtt.client
|
|
-- @alias client
|
|
local client = {}
|
|
|
|
-- TODO: list event names
|
|
|
|
-------
|
|
|
|
-- load required stuff
|
|
local type = type
|
|
local error = error
|
|
local select = select
|
|
local require = require
|
|
local tostring = tostring
|
|
|
|
local os = require("os")
|
|
local os_time = os.time
|
|
|
|
local string = require("string")
|
|
local str_format = string.format
|
|
local str_gsub = string.gsub
|
|
local str_match = string.match
|
|
|
|
local table = require("table")
|
|
local table_remove = table.remove
|
|
|
|
local coroutine = require("coroutine")
|
|
local coroutine_create = coroutine.create
|
|
local coroutine_resume = coroutine.resume
|
|
local coroutine_yield = coroutine.yield
|
|
|
|
local math = require("math")
|
|
local math_random = math.random
|
|
|
|
local luamqtt_VERSION
|
|
|
|
local protocol = require("mqtt.protocol")
|
|
local packet_type = protocol.packet_type
|
|
local check_qos = protocol.check_qos
|
|
local next_packet_id = protocol.next_packet_id
|
|
local packet_id_required = protocol.packet_id_required
|
|
|
|
local protocol4 = require("mqtt.protocol4")
|
|
local make_packet4 = protocol4.make_packet
|
|
local parse_packet4 = protocol4.parse_packet
|
|
|
|
local protocol5 = require("mqtt.protocol5")
|
|
local make_packet5 = protocol5.make_packet
|
|
local parse_packet5 = protocol5.parse_packet
|
|
|
|
local ioloop = require("mqtt.ioloop")
|
|
local ioloop_get = ioloop.get
|
|
|
|
-------
|
|
|
|
--- MQTT client instance metatable
|
|
-- @type client_mt
|
|
local client_mt = {}
|
|
client_mt.__index = client_mt
|
|
|
|
--- Create and initialize MQTT client instance
|
|
-- @tparam table args MQTT client creation arguments table
|
|
-- @tparam string args.uri MQTT broker uri to connect.
|
|
-- Expecting "host:port" or "host" format, in second case the port will be selected automatically:
|
|
-- 1883 port for plain or 8883 for secure network connections
|
|
-- @tparam string args.clean clean session start flag
|
|
-- @tparam[opt=4] number args.version MQTT protocol version to use, either 4 (for MQTT v3.1.1) or 5 (for MQTT v5.0).
|
|
-- Also you may use special values mqtt.v311 or mqtt.v50 for this field.
|
|
-- @tparam[opt] string args.id MQTT client ID, will be generated by luamqtt library if absent
|
|
-- @tparam[opt] string args.username username for authorization on MQTT broker
|
|
-- @tparam[opt] string args.password password for authorization on MQTT broker; not acceptable in absence of username
|
|
-- @tparam[opt=false] boolean,table args.secure use secure network connection, provided by luasec lua module;
|
|
-- set to true to select default params: { mode="client", protocol="tlsv1_2", verify="none", options="all" }
|
|
-- or set to luasec-compatible table, for example with cafile="...", certificate="...", key="..."
|
|
-- @tparam[opt] table args.will will message table with required fields { topic="...", payload="..." }
|
|
-- and optional fields { qos=1...3, retain=true/false }
|
|
-- @tparam[opt=60] number args.keep_alive time interval for client to send PINGREQ packets to the server when network connection is inactive
|
|
-- @tparam[opt=false] boolean args.reconnect force created MQTT client to reconnect on connection close.
|
|
-- Set to number value to provide reconnect timeout in seconds
|
|
-- It's not recommended to use values < 3
|
|
-- @tparam[opt] table args.connector connector table to open and send/receive packets over network connection.
|
|
-- default is require("mqtt.luasocket"), or require("mqtt.luasocket_ssl") if secure argument is set
|
|
-- @tparam[opt="ssl"] string args.ssl_module module name for the luasec-compatible ssl module, default is "ssl"
|
|
-- may be used in some non-standard lua environments with own luasec-compatible ssl module
|
|
-- @treturn client_mt MQTT client instance table
|
|
function client_mt:__init(args)
|
|
if not luamqtt_VERSION then
|
|
luamqtt_VERSION = require("mqtt")._VERSION
|
|
end
|
|
|
|
-- fetch and validate client args
|
|
local a = {} -- own client copy of args
|
|
|
|
for key, value in pairs(args) do
|
|
if type(key) ~= "string" then
|
|
error("expecting string key in args, got: "..type(key))
|
|
end
|
|
|
|
local value_type = type(value)
|
|
if key == "uri" then
|
|
assert(value_type == "string", "expecting uri to be a string")
|
|
a.uri = value
|
|
elseif key == "clean" then
|
|
assert(value_type == "boolean", "expecting clean to be a boolean")
|
|
a.clean = value
|
|
elseif key == "version" then
|
|
assert(value_type == "number", "expecting version to be a number")
|
|
assert(value == 4 or value == 5, "expecting version to be a value either 4 or 5")
|
|
a.version = value
|
|
elseif key == "id" then
|
|
assert(value_type == "string", "expecting id to be a string")
|
|
a.id = value
|
|
elseif key == "username" then
|
|
assert(value_type == "string", "expecting username to be a string")
|
|
a.username = value
|
|
elseif key == "password" then
|
|
assert(value_type == "string", "expecting password to be a string")
|
|
a.password = value
|
|
elseif key == "secure" then
|
|
assert(value_type == "boolean" or value_type == "table", "expecting secure to be a boolean or table")
|
|
a.secure = value
|
|
elseif key == "will" then
|
|
assert(value_type == "table", "expecting will to be a table")
|
|
a.will = value
|
|
elseif key == "keep_alive" then
|
|
assert(value_type == "number", "expecting keep_alive to be a number")
|
|
a.keep_alive = value
|
|
elseif key == "properties" then
|
|
assert(value_type == "table", "expecting properties to be a table")
|
|
a.properties = value
|
|
elseif key == "user_properties" then
|
|
assert(value_type == "table", "expecting user_properties to be a table")
|
|
a.user_properties = value
|
|
elseif key == "reconnect" then
|
|
assert(value_type == "boolean" or value_type == "number", "expecting reconnect to be a boolean or number")
|
|
a.reconnect = value
|
|
elseif key == "connector" then
|
|
a.connector = value
|
|
elseif key == "ssl_module" then
|
|
assert(value_type == "string", "expecting ssl_module to be a string")
|
|
a.ssl_module = value
|
|
else
|
|
error("unexpected key in client args: "..key.." = "..tostring(value))
|
|
end
|
|
end
|
|
|
|
-- check required arguments
|
|
assert(a.uri, 'expecting uri="..." to create MQTT client')
|
|
assert(a.clean ~= nil, "expecting clean=true or clean=false to create MQTT client")
|
|
assert(not a.password or a.username, "password is not accepted in absence of username")
|
|
|
|
if not a.id then
|
|
-- generate random client id
|
|
a.id = str_format("luamqtt-v%s-%07x", str_gsub(luamqtt_VERSION, "[^%d]", "-"), math_random(1, 0xFFFFFFF))
|
|
end
|
|
|
|
-- default connector
|
|
if a.connector == nil then
|
|
if a.secure then
|
|
a.connector = require("mqtt.luasocket_ssl")
|
|
else
|
|
a.connector = require("mqtt.luasocket")
|
|
end
|
|
end
|
|
-- validate connector content
|
|
assert(type(a.connector) == "table", "expecting connector to be a table")
|
|
assert(type(a.connector.connect) == "function", "expecting connector.connect to be a function")
|
|
assert(type(a.connector.shutdown) == "function", "expecting connector.shutdown to be a function")
|
|
assert(type(a.connector.send) == "function", "expecting connector.send to be a function")
|
|
assert(type(a.connector.receive) == "function", "expecting connector.receive to be a function")
|
|
|
|
-- will table content check
|
|
if a.will then
|
|
assert(type(a.will.topic) == "string", "expecting will.topic to be a string")
|
|
assert(type(a.will.payload) == "string", "expecting will.payload to be a string")
|
|
if a.will.qos ~= nil then
|
|
assert(type(a.will.qos) == "number", "expecting will.qos to be a number")
|
|
assert(check_qos(a.will.qos), "expecting will.qos to be a valid QoS value")
|
|
end
|
|
if a.will.retain ~= nil then
|
|
assert(type(a.will.retain) == "boolean", "expecting will.retain to be a boolean")
|
|
end
|
|
end
|
|
|
|
-- default keep_alive
|
|
if not a.keep_alive then
|
|
a.keep_alive = 60
|
|
end
|
|
|
|
-- client args
|
|
self.args = a
|
|
|
|
-- event handlers
|
|
self.handlers = {
|
|
connect = {},
|
|
subscribe = {},
|
|
unsubscribe = {},
|
|
message = {},
|
|
acknowledge = {},
|
|
error = {},
|
|
close = {},
|
|
auth = {},
|
|
}
|
|
self._handling = {}
|
|
self._to_remove_handlers = {}
|
|
|
|
-- state
|
|
self.first_connect = true -- contains true to perform one network connection attemt after client creation
|
|
self.send_time = 0 -- time of the last network send from client side
|
|
|
|
-- packet creation/parse functions according version
|
|
if not a.version then
|
|
a.version = 4
|
|
end
|
|
if a.version == 4 then
|
|
self._make_packet = make_packet4
|
|
self._parse_packet = parse_packet4
|
|
elseif a.version == 5 then
|
|
self._make_packet = make_packet5
|
|
self._parse_packet = parse_packet5
|
|
end
|
|
|
|
-- automatically add client to default ioloop, if it's available and running, then start connecting
|
|
local loop = ioloop_get(false)
|
|
if loop and loop.running then
|
|
loop:add(self)
|
|
self:start_connecting()
|
|
end
|
|
end
|
|
|
|
--- Add functions as handlers of given events
|
|
-- @param ... (event_name, function) or { event1 = func1, event2 = func2 } table
|
|
function client_mt:on(...)
|
|
local nargs = select("#", ...)
|
|
local events
|
|
if nargs == 2 then
|
|
events = { [select(1, ...)] = select(2, ...) }
|
|
elseif nargs == 1 then
|
|
events = select(1, ...)
|
|
else
|
|
error("invalid args: expected only one or two arguments")
|
|
end
|
|
for event, func in pairs(events) do
|
|
assert(type(event) == "string", "expecting event to be a string")
|
|
assert(type(func) == "function", "expecting func to be a function")
|
|
local handlers = self.handlers[event]
|
|
if not handlers then
|
|
error("invalid event '"..tostring(event).."' to handle")
|
|
end
|
|
handlers[#handlers + 1] = func
|
|
end
|
|
end
|
|
|
|
-- Remove one item from the list-table with full-iteration
|
|
local function remove_item(list, item)
|
|
for i, test in ipairs(list) do
|
|
if test == item then
|
|
table_remove(list, i)
|
|
return
|
|
end
|
|
end
|
|
end
|
|
|
|
--- Remove given function handler for specified event
|
|
-- @tparam string event event name to remove handler
|
|
-- @tparam function func handler function to remove
|
|
function client_mt:off(event, func)
|
|
local handlers = self.handlers[event]
|
|
if not handlers then
|
|
error("invalid event '"..tostring(event).."' to handle")
|
|
end
|
|
if self._handling[event] then
|
|
-- this event is handling now, schedule the function removing to the moment after all handlers will be called for the event
|
|
local to_remove = self._to_remove_handlers[event] or {}
|
|
to_remove[#to_remove + 1] = func
|
|
self._to_remove_handlers[event] = to_remove
|
|
else
|
|
-- it's ok to remove given function just now
|
|
remove_item(handlers, func)
|
|
end
|
|
return true
|
|
end
|
|
|
|
--- Subscribe to specified topic. Returns the SUBSCRIBE packet id and calls optional callback when subscription will be created on broker
|
|
-- @tparam table args subscription arguments
|
|
-- @tparam string args.topic topic to subscribe
|
|
-- @tparam[opt=0] number args.qos QoS level for subscription
|
|
-- @tparam boolean args.no_local for MQTT v5.0 only: no_local flag for subscription
|
|
-- @tparam boolean args.retain_as_published for MQTT v5.0 only: retain_as_published flag for subscription
|
|
-- @tparam boolean args.retain_handling for MQTT v5.0 only: retain_handling flag for subscription
|
|
-- @tparam[opt] table args.properties for MQTT v5.0 only: properties for subscribe operation
|
|
-- @tparam[opt] table args.user_properties for MQTT v5.0 only: user properties for subscribe operation
|
|
-- @tparam[opt] function args.callback callback function to be called when subscription will be created
|
|
-- @return packet id on success or false and error message on failure
|
|
function client_mt:subscribe(args)
|
|
-- fetch and validate args
|
|
assert(type(args) == "table", "expecting args to be a table")
|
|
assert(type(args.topic) == "string", "expecting args.topic to be a string")
|
|
assert(args.qos == nil or (type(args.qos) == "number" and check_qos(args.qos)), "expecting valid args.qos value")
|
|
assert(args.no_local == nil or type(args.no_local) == "boolean", "expecting args.no_local to be a boolean")
|
|
assert(args.retain_as_published == nil or type(args.retain_as_published) == "boolean", "expecting args.retain_as_published to be a boolean")
|
|
assert(args.retain_handling == nil or type(args.retain_handling) == "boolean", "expecting args.retain_handling to be a boolean")
|
|
assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table")
|
|
assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table")
|
|
assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function")
|
|
|
|
-- check connection is alive
|
|
if not self.connection then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- create SUBSCRIBE packet
|
|
local pargs = {
|
|
type = packet_type.SUBSCRIBE,
|
|
subscriptions = {
|
|
{
|
|
topic = args.topic,
|
|
qos = args.qos,
|
|
no_local = args.no_local,
|
|
retain_as_published = args.retain_as_published,
|
|
retain_handling = args.retain_handling
|
|
},
|
|
},
|
|
properties = args.properties,
|
|
user_properties = args.user_properties,
|
|
}
|
|
self:_assign_packet_id(pargs)
|
|
local packet_id = pargs.packet_id
|
|
local subscribe = self._make_packet(pargs)
|
|
|
|
-- send SUBSCRIBE packet
|
|
local ok, err = self:_send_packet(subscribe)
|
|
if not ok then
|
|
err = "failed to send SUBSCRIBE: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
-- add subscribe callback
|
|
local callback = args.callback
|
|
if callback then
|
|
local function handler(suback, ...)
|
|
if suback.packet_id == packet_id then
|
|
self:off("subscribe", handler)
|
|
callback(suback, ...)
|
|
end
|
|
end
|
|
self:on("subscribe", handler)
|
|
end
|
|
|
|
-- returns assigned packet id
|
|
return packet_id
|
|
end
|
|
|
|
--- Unsubscribe from specified topic, and calls optional callback when subscription will be removed on broker
|
|
-- @tparam table args subscription arguments
|
|
-- @tparam string args.topic topic to unsubscribe
|
|
-- @tparam[opt] table args.properties properties for unsubscribe operation
|
|
-- @tparam[opt] table args.user_properties user properties for unsubscribe operation
|
|
-- @tparam[opt] function args.callback callback function to be called when subscription will be removed on broker
|
|
-- @return packet id on success or false and error message on failure
|
|
function client_mt:unsubscribe(args)
|
|
-- fetch and validate args
|
|
assert(type(args) == "table", "expecting args to be a table")
|
|
assert(type(args.topic) == "string", "expecting args.topic to be a string")
|
|
assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table")
|
|
assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table")
|
|
assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function")
|
|
|
|
|
|
-- check connection is alive
|
|
if not self.connection then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- create UNSUBSCRIBE packet
|
|
local pargs = {
|
|
type = packet_type.UNSUBSCRIBE,
|
|
subscriptions = {args.topic},
|
|
properties = args.properties,
|
|
user_properties = args.user_properties,
|
|
}
|
|
self:_assign_packet_id(pargs)
|
|
local packet_id = pargs.packet_id
|
|
local unsubscribe = self._make_packet(pargs)
|
|
|
|
-- send UNSUBSCRIBE packet
|
|
local ok, err = self:_send_packet(unsubscribe)
|
|
if not ok then
|
|
err = "failed to send UNSUBSCRIBE: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
-- add unsubscribe callback
|
|
local callback = args.callback
|
|
if callback then
|
|
local function handler(unsuback, ...)
|
|
if unsuback.packet_id == packet_id then
|
|
self:off("unsubscribe", handler)
|
|
callback(unsuback, ...)
|
|
end
|
|
end
|
|
self:on("unsubscribe", handler)
|
|
end
|
|
|
|
-- returns assigned packet id
|
|
return packet_id
|
|
end
|
|
|
|
--- Publish message to broker
|
|
-- @tparam table args publish operation arguments table
|
|
-- @tparam string args.topic topic to publish message
|
|
-- @tparam[opt] string args.payload publish message payload
|
|
-- @tparam[opt=0] number args.qos QoS level for message publication
|
|
-- @tparam[opt=false] boolean args.retain retain message publication flag
|
|
-- @tparam[opt=false] boolean args.dup dup message publication flag
|
|
-- @tparam[opt] table args.properties properties for publishing message
|
|
-- @tparam[opt] table args.user_properties user properties for publishing message
|
|
-- @tparam[opt] function args.callback callback to call when publihsed message will be acknowledged
|
|
-- @return true or packet id on success or false and error message on failure
|
|
function client_mt:publish(args)
|
|
-- fetch and validate args
|
|
assert(type(args) == "table", "expecting args to be a table")
|
|
assert(type(args.topic) == "string", "expecting args.topic to be a string")
|
|
assert(args.payload == nil or type(args.payload) == "string", "expecting args.payload to be a string")
|
|
assert(args.qos == nil or type(args.qos) == "number", "expecting args.qos to be a number")
|
|
if args.qos then
|
|
assert(check_qos(args.qos), "expecting qos to be a valid QoS value")
|
|
end
|
|
assert(args.retain == nil or type(args.retain) == "boolean", "expecting args.retain to be a boolean")
|
|
assert(args.dup == nil or type(args.dup) == "boolean", "expecting args.dup to be a boolean")
|
|
assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table")
|
|
assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table")
|
|
assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function")
|
|
|
|
-- check connection is alive
|
|
local conn = self.connection
|
|
if not conn then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- create PUBLISH packet
|
|
args.type = packet_type.PUBLISH
|
|
self:_assign_packet_id(args)
|
|
local packet_id = args.packet_id
|
|
local publish = self._make_packet(args)
|
|
|
|
-- send PUBLISH packet
|
|
local ok, err = self:_send_packet(publish)
|
|
if not ok then
|
|
err = "failed to send PUBLISH: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
-- record packet id as waited for QoS 2 exchange
|
|
if args.qos == 2 then
|
|
conn.wait_for_pubrec[packet_id] = true
|
|
end
|
|
|
|
-- add acknowledge callback
|
|
local callback = args.callback
|
|
if callback then
|
|
if packet_id then
|
|
local function handler(ack, ...)
|
|
if ack.packet_id == packet_id then
|
|
self:off("acknowledge", handler)
|
|
callback(ack, ...)
|
|
end
|
|
end
|
|
self:on("acknowledge", handler)
|
|
else
|
|
callback("no ack for QoS 0 message", self)
|
|
end
|
|
end
|
|
|
|
-- returns assigned packet id
|
|
return packet_id or true
|
|
end
|
|
|
|
--- Acknowledge given received message
|
|
-- @tparam packet_mt msg PUBLISH message to acknowledge
|
|
-- @tparam[opt=0] number rc The reason code field of PUBACK packet in MQTT v5.0 protocol
|
|
-- @tparam[opt] table properties properties for PUBACK/PUBREC packets
|
|
-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets
|
|
-- @return true on success or false and error message on failure
|
|
function client_mt:acknowledge(msg, rc, properties, user_properties)
|
|
assert(type(msg) == "table" and msg.type == packet_type.PUBLISH, "expecting msg to be a publish packet")
|
|
assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
|
|
assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
|
|
assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
|
|
|
|
-- check connection is alive
|
|
local conn = self.connection
|
|
if not conn then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- check packet needs to be acknowledged
|
|
local packet_id = msg.packet_id
|
|
if not packet_id then
|
|
return true
|
|
end
|
|
|
|
if msg.qos == 1 then
|
|
-- PUBACK should be sent
|
|
|
|
-- create PUBACK packet
|
|
local puback = self._make_packet{
|
|
type = packet_type.PUBACK,
|
|
packet_id = packet_id,
|
|
rc = rc or 0,
|
|
properties = properties,
|
|
user_properties = user_properties,
|
|
}
|
|
|
|
-- send PUBACK packet
|
|
local ok, err = self:_send_packet(puback)
|
|
if not ok then
|
|
err = "failed to send PUBACK: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
elseif msg.qos == 2 then
|
|
-- PUBREC should be sent and packet_id should be remembered for PUBREL+PUBCOMP sequence
|
|
|
|
-- create PUBREC packet
|
|
local pubrec = self._make_packet{
|
|
type = packet_type.PUBREC,
|
|
packet_id = packet_id,
|
|
rc = rc or 0,
|
|
properties = properties,
|
|
user_properties = user_properties,
|
|
}
|
|
|
|
-- send PUBREC packet
|
|
local ok, err = self:_send_packet(pubrec)
|
|
if not ok then
|
|
err = "failed to send PUBREC: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
-- store packet id as waiting for PUBREL
|
|
conn.wait_for_pubrel[packet_id] = true
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
--- Send DISCONNECT packet to the broker and close the connection
|
|
-- @tparam[opt=0] number rc The Disconnect Reason Code value from MQTT v5.0 protocol
|
|
-- @tparam[opt] table properties properties for PUBACK/PUBREC packets
|
|
-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets
|
|
-- @return true on success or false and error message on failure
|
|
function client_mt:disconnect(rc, properties, user_properties)
|
|
-- validate args
|
|
assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
|
|
assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
|
|
assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
|
|
|
|
-- check connection is alive
|
|
if not self.connection then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- create DISCONNECT packet
|
|
local disconnect = self._make_packet{
|
|
type = packet_type.DISCONNECT,
|
|
rc = rc or 0,
|
|
properties = properties,
|
|
user_properties = user_properties,
|
|
}
|
|
|
|
-- send DISCONNECT packet
|
|
local ok, err = self:_send_packet(disconnect)
|
|
if not ok then
|
|
err = "failed to send DISCONNECT: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
-- now close connection
|
|
self:close_connection("connection closed by client")
|
|
|
|
return true
|
|
end
|
|
|
|
--- Send AUTH packet to authenticate client on broker, in MQTT v5.0 protocol
|
|
-- @tparam[opt=0] number rc Authenticate Reason Code
|
|
-- @tparam[opt] table properties properties for PUBACK/PUBREC packets
|
|
-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets
|
|
-- @return true on success or false and error message on failure
|
|
function client_mt:auth(rc, properties, user_properties)
|
|
-- validate args
|
|
assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
|
|
assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
|
|
assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
|
|
assert(self.args.version == 5, "allowed only in MQTT v5.0 protocol")
|
|
|
|
-- check connection is alive
|
|
if not self.connection then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- create AUTH packet
|
|
local auth = self._make_packet{
|
|
type = packet_type.AUTH,
|
|
rc = rc or 0,
|
|
properties = properties,
|
|
user_properties = user_properties,
|
|
}
|
|
|
|
-- send AUTH packet
|
|
local ok, err = self:_send_packet(auth)
|
|
if not ok then
|
|
err = "failed to send AUTH: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
--- Immediately close established network connection, without graceful session finishing with DISCONNECT packet
|
|
-- @tparam[opt] string reason the reasong string of connection close
|
|
function client_mt:close_connection(reason)
|
|
assert(not reason or type(reason) == "string", "expecting reason to be a string")
|
|
local conn = self.connection
|
|
if not conn then
|
|
return true
|
|
end
|
|
|
|
local args = self.args
|
|
args.connector.shutdown(conn)
|
|
self.connection = nil
|
|
conn.close_reason = reason or "unspecified"
|
|
|
|
self:handle("close", conn, self)
|
|
|
|
-- check connection is still closed (self.connection may be re-created in "close" handler)
|
|
if not self.connection then
|
|
-- remove from ioloop
|
|
if self.ioloop and not args.reconnect then
|
|
self.ioloop:remove(self)
|
|
end
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
--- Start connecting to broker
|
|
-- @return true on success or false and error message on failure
|
|
function client_mt:start_connecting()
|
|
-- print("start connecting") -- debug
|
|
-- open network connection
|
|
local ok, err = self:open_connection()
|
|
if not ok then
|
|
return false, err
|
|
end
|
|
|
|
-- send CONNECT packet
|
|
ok, err = self:send_connect()
|
|
if not ok then
|
|
return false, err
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
--- Low-level methods
|
|
-- @section low-level
|
|
|
|
--- Send PINGREQ packet
|
|
-- @return true on success or false and error message on failure
|
|
function client_mt:send_pingreq()
|
|
-- check connection is alive
|
|
if not self.connection then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- create PINGREQ packet
|
|
local pingreq = self._make_packet{
|
|
type = packet_type.PINGREQ,
|
|
}
|
|
|
|
-- send PINGREQ packet
|
|
local ok, err = self:_send_packet(pingreq)
|
|
if not ok then
|
|
err = "failed to send PINGREQ: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
--- Open network connection to the broker
|
|
-- @return true on success or false and error message on failure
|
|
function client_mt:open_connection()
|
|
if self.connection then
|
|
return true
|
|
end
|
|
|
|
local args = self.args
|
|
local connector = assert(args.connector, "no connector configured in MQTT client")
|
|
|
|
-- create connection table
|
|
local conn = {
|
|
uri = args.uri,
|
|
wait_for_pubrec = {}, -- a table with packet_id of parially acknowledged sent packets in QoS 2 exchange process
|
|
wait_for_pubrel = {}, -- a table with packet_id of parially acknowledged received packets in QoS 2 exchange process
|
|
}
|
|
client_mt._parse_uri(args, conn)
|
|
client_mt._apply_secure(args, conn)
|
|
|
|
-- perform connect
|
|
local ok, err = connector.connect(conn)
|
|
if not ok then
|
|
err = "failed to open network connection: "..err
|
|
self:handle("error", err, self)
|
|
return false, err
|
|
end
|
|
|
|
-- assign connection
|
|
self.connection = conn
|
|
|
|
-- create receive function
|
|
local receive = connector.receive
|
|
self.connection.recv_func = function(size)
|
|
return receive(conn, size)
|
|
end
|
|
|
|
self:_apply_network_timeout()
|
|
|
|
return true
|
|
end
|
|
|
|
--- Send CONNECT packet into opened network connection
|
|
-- @return true on success or false and error message on failure
|
|
function client_mt:send_connect()
|
|
-- check connection is alive
|
|
if not self.connection then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
local args = self.args
|
|
|
|
-- create CONNECT packet
|
|
local connect = self._make_packet{
|
|
type = packet_type.CONNECT,
|
|
id = args.id,
|
|
clean = args.clean,
|
|
username = args.username,
|
|
password = args.password,
|
|
will = args.will,
|
|
keep_alive = args.keep_alive,
|
|
properties = args.properties,
|
|
user_properties = args.user_properties,
|
|
}
|
|
|
|
-- send CONNECT packet
|
|
local ok, err = self:_send_packet(connect)
|
|
if not ok then
|
|
err = "failed to send CONNECT: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
-- reset last packet id
|
|
self._last_packet_id = nil
|
|
|
|
return true
|
|
end
|
|
|
|
-- Internal methods
|
|
|
|
-- Set or rest ioloop for MQTT client
|
|
function client_mt:set_ioloop(loop)
|
|
self.ioloop = loop
|
|
self:_apply_network_timeout()
|
|
end
|
|
|
|
-- Send PUBREL acknowledge packet - second phase of QoS 2 exchange
|
|
-- Returns true on success or false and error message on failure
|
|
function client_mt:acknowledge_pubrel(packet_id)
|
|
-- check connection is alive
|
|
if not self.connection then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- create PUBREL packet
|
|
local pubrel = self._make_packet{type=packet_type.PUBREL, packet_id=packet_id, rc=0}
|
|
|
|
-- send PUBREL packet
|
|
local ok, err = self:_send_packet(pubrel)
|
|
if not ok then
|
|
err = "failed to send PUBREL: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
-- Send PUBCOMP acknowledge packet - last phase of QoS 2 exchange
|
|
-- Returns true on success or false and error message on failure
|
|
function client_mt:acknowledge_pubcomp(packet_id)
|
|
-- check connection is alive
|
|
if not self.connection then
|
|
return false, "network connection is not opened"
|
|
end
|
|
|
|
-- create PUBCOMP packet
|
|
local pubcomp = self._make_packet{type=packet_type.PUBCOMP, packet_id=packet_id, rc=0}
|
|
|
|
-- send PUBCOMP packet
|
|
local ok, err = self:_send_packet(pubcomp)
|
|
if not ok then
|
|
err = "failed to send PUBCOMP: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
-- Call specified event handlers
|
|
function client_mt:handle(event, ...)
|
|
local handlers = self.handlers[event]
|
|
if not handlers then
|
|
error("invalid event '"..tostring(event).."' to handle")
|
|
end
|
|
self._handling[event] = true -- protecting self.handlers[event] table from modifications by client_mt:off() when iterating
|
|
for _, handler in ipairs(handlers) do
|
|
handler(...)
|
|
end
|
|
self._handling[event] = nil
|
|
|
|
-- process handlers removing, scheduled by client_mt:off()
|
|
local to_remove = self._to_remove_handlers[event]
|
|
if to_remove then
|
|
for _, func in ipairs(to_remove) do
|
|
remove_item(handlers, func)
|
|
end
|
|
self._to_remove_handlers[event] = nil
|
|
end
|
|
end
|
|
|
|
-- Internal methods
|
|
|
|
-- Assign next packet id for given packet creation args
|
|
function client_mt:_assign_packet_id(pargs)
|
|
if not pargs.packet_id then
|
|
if packet_id_required(pargs) then
|
|
self._last_packet_id = next_packet_id(self._last_packet_id)
|
|
pargs.packet_id = self._last_packet_id
|
|
end
|
|
end
|
|
end
|
|
|
|
-- Receive packet function in sync mode
|
|
local function sync_recv(self)
|
|
return true, self:_receive_packet()
|
|
end
|
|
|
|
-- Perform one input/output iteration, called by sync receiving loop
|
|
function client_mt:_sync_iteration()
|
|
return self:_io_iteration(sync_recv)
|
|
end
|
|
|
|
-- Receive packet function - from ioloop's coroutine
|
|
local function ioloop_recv(self)
|
|
return coroutine_resume(self.connection.coro)
|
|
end
|
|
|
|
-- Perform one input/output iteration, called by ioloop
|
|
function client_mt:_ioloop_iteration()
|
|
-- working according state
|
|
local loop = self.ioloop
|
|
local args = self.args
|
|
|
|
local conn = self.connection
|
|
if conn then
|
|
-- network connection opened
|
|
-- perform packet receiving using ioloop receive function
|
|
local ok, err
|
|
if loop then
|
|
ok, err = self:_io_iteration(ioloop_recv)
|
|
else
|
|
ok, err = self:_sync_iteration()
|
|
end
|
|
|
|
if ok then
|
|
-- send PINGREQ if keep_alive interval is reached
|
|
if os_time() - self.send_time >= args.keep_alive then
|
|
self:send_pingreq()
|
|
end
|
|
end
|
|
|
|
return ok, err
|
|
else
|
|
-- no connection - first connect, reconnect or remove from ioloop
|
|
if self.first_connect then
|
|
self.first_connect = false
|
|
self:start_connecting()
|
|
elseif args.reconnect then
|
|
if args.reconnect == true then
|
|
self:start_connecting()
|
|
else
|
|
-- reconnect in specified timeout
|
|
if self.reconnect_timer_start then
|
|
if os_time() - self.reconnect_timer_start >= args.reconnect then
|
|
self.reconnect_timer_start = nil
|
|
self:start_connecting()
|
|
else
|
|
if loop then
|
|
loop:can_sleep()
|
|
end
|
|
end
|
|
else
|
|
self.reconnect_timer_start = os_time()
|
|
end
|
|
end
|
|
else
|
|
-- finish working with client
|
|
if loop then
|
|
loop:remove(self)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
-- Performing one IO iteration - receive next packet
|
|
function client_mt:_io_iteration(recv)
|
|
local conn = self.connection
|
|
|
|
-- first - try to receive packet
|
|
local ok, packet, err = recv(self)
|
|
-- print("received packet", ok, packet, err)
|
|
|
|
-- check coroutine resume status
|
|
if not ok then
|
|
err = "failed to resume receive packet coroutine: "..tostring(packet)
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
-- check for communication error
|
|
if packet == false then
|
|
if err == "closed" then
|
|
self:close_connection("connection closed by broker")
|
|
return false, err
|
|
else
|
|
err = "failed to receive next packet: "..err
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
end
|
|
|
|
-- check some packet received
|
|
if packet ~= "timeout" and packet ~= "wantread" then
|
|
if not conn.connack then
|
|
-- expecting only CONNACK packet here
|
|
if packet.type ~= packet_type.CONNACK then
|
|
err = "expecting CONNACK but received "..packet.type
|
|
self:handle("error", err, self)
|
|
self:close_connection("error")
|
|
return false, err
|
|
end
|
|
|
|
-- store connack packet in connection
|
|
conn.connack = packet
|
|
|
|
-- check CONNACK rc
|
|
if packet.rc ~= 0 then
|
|
err = str_format("CONNECT failed with CONNACK [rc=%d]: %s", packet.rc, packet:reason_string())
|
|
self:handle("error", err, self, packet)
|
|
self:handle("connect", packet, self)
|
|
self:close_connection("connection failed")
|
|
return false, err
|
|
end
|
|
|
|
-- fire connect event
|
|
self:handle("connect", packet, self)
|
|
else
|
|
-- connection authorized, so process usual packets
|
|
|
|
-- handle packet according its type
|
|
local ptype = packet.type
|
|
if ptype == packet_type.PINGRESP then -- luacheck: ignore
|
|
-- PINGREQ answer, nothing to do
|
|
-- TODO: break the connectin in absence of this packet in some timeout
|
|
elseif ptype == packet_type.SUBACK then
|
|
self:handle("subscribe", packet, self)
|
|
elseif ptype == packet_type.UNSUBACK then
|
|
self:handle("unsubscribe", packet, self)
|
|
elseif ptype == packet_type.PUBLISH then
|
|
-- check such packet is not waiting for pubrel acknowledge
|
|
self:handle("message", packet, self)
|
|
elseif ptype == packet_type.PUBACK then
|
|
self:handle("acknowledge", packet, self)
|
|
elseif ptype == packet_type.PUBREC then
|
|
local packet_id = packet.packet_id
|
|
if conn.wait_for_pubrec[packet_id] then
|
|
conn.wait_for_pubrec[packet_id] = nil
|
|
-- send PUBREL acknowledge
|
|
if self:acknowledge_pubrel(packet_id) then
|
|
-- and fire acknowledge event
|
|
self:handle("acknowledge", packet, self)
|
|
end
|
|
end
|
|
elseif ptype == packet_type.PUBREL then
|
|
-- second phase of QoS 2 exchange - check we are already acknowledged such packet by PUBREL
|
|
local packet_id = packet.packet_id
|
|
if conn.wait_for_pubrel[packet_id] then
|
|
-- remove packet from waiting for PUBREL packets table
|
|
conn.wait_for_pubrel[packet_id] = nil
|
|
-- send PUBCOMP acknowledge
|
|
self:acknowledge_pubcomp(packet_id)
|
|
end
|
|
elseif ptype == packet_type.PUBCOMP then --luacheck: ignore
|
|
-- last phase of QoS 2 exchange
|
|
-- do nothing here
|
|
elseif ptype == packet_type.DISCONNECT then
|
|
self:close_connection("disconnect received from broker")
|
|
elseif ptype == packet_type.AUTH then
|
|
self:handle("auth", packet, self)
|
|
-- else
|
|
-- print("unhandled packet:", packet) -- debug
|
|
end
|
|
end
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
-- Apply ioloop network timeout to already established connection (if any)
|
|
function client_mt:_apply_network_timeout()
|
|
local conn = self.connection
|
|
if conn then
|
|
local loop = self.ioloop
|
|
if loop then
|
|
-- apply connection timeout
|
|
self.args.connector.settimeout(conn, loop.args.timeout)
|
|
|
|
-- connection packets receive loop coroutine
|
|
conn.coro = coroutine_create(function()
|
|
while true do
|
|
local packet, err = self:_receive_packet()
|
|
if not packet then
|
|
return false, err
|
|
else
|
|
coroutine_yield(packet)
|
|
end
|
|
end
|
|
end)
|
|
|
|
-- replace connection recv_func with coroutine-based version
|
|
local sync_recv_func = conn.recv_func
|
|
conn.recv_func = function(totalSize, ...)
|
|
while true do
|
|
local allData = ""
|
|
while true do
|
|
local size = math.min(totalSize, 16384)
|
|
local data, err = sync_recv_func(size, ...)
|
|
if not data and (err == "timeout" or err == "wantread") then
|
|
loop.timeouted = true
|
|
coroutine_yield(err)
|
|
elseif data then
|
|
allData = allData .. data
|
|
totalSize = totalSize - size
|
|
if totalSize == 0 then
|
|
return allData, nil
|
|
end
|
|
else
|
|
return nil, err
|
|
end
|
|
end
|
|
end
|
|
end
|
|
conn.sync_recv_func = sync_recv_func
|
|
else
|
|
-- disable connection timeout
|
|
self.args.connector.settimeout(conn, nil)
|
|
|
|
-- replace back usual (blocking) connection recv_func
|
|
if conn.sync_recv_func then
|
|
conn.recv_func = conn.sync_recv_func
|
|
conn.sync_recv_func = nil
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
-- Fill given connection table with host and port according given args
|
|
function client_mt._parse_uri(args, conn)
|
|
local host, port = str_match(args.uri, "^([^%s]+):(%d+)$")
|
|
if not host then
|
|
-- trying pattern without port
|
|
host = assert(str_match(conn.uri, "^([^%s]+)$"), "invalid uri format: expecting at least host/ip in .uri")
|
|
end
|
|
if not port then
|
|
if args.secure then
|
|
port = 8883 -- default MQTT secure connection port
|
|
else
|
|
port = 1883 -- default MQTT connection port
|
|
end
|
|
else
|
|
port = tonumber(port)
|
|
end
|
|
conn.host, conn.port = host, port
|
|
end
|
|
|
|
-- Creates the conn.secure_params table and its content according client creation args
|
|
function client_mt._apply_secure(args, conn)
|
|
local secure = args.secure
|
|
if secure then
|
|
conn.secure = true
|
|
if type(secure) == "table" then
|
|
conn.secure_params = secure
|
|
else
|
|
conn.secure_params = {
|
|
mode = "client",
|
|
protocol = "tlsv1_2",
|
|
verify = "none",
|
|
options = "all",
|
|
}
|
|
end
|
|
conn.ssl_module = args.ssl_module or "ssl"
|
|
end
|
|
end
|
|
|
|
-- Send given packet to opened network connection
|
|
function client_mt:_send_packet(packet)
|
|
local conn = self.connection
|
|
if not conn then
|
|
return false, "network connection is not opened"
|
|
end
|
|
local data = tostring(packet)
|
|
local len = data:len()
|
|
if len <= 0 then
|
|
return false, "sending empty packet"
|
|
end
|
|
-- and send binary packet to network connection
|
|
local i, err = 1
|
|
local send = self.args.connector.send
|
|
while i < len do
|
|
i, err = send(conn, data, i)
|
|
if not i then
|
|
return false, "connector.send failed: "..err
|
|
end
|
|
end
|
|
self.send_time = os_time()
|
|
return true
|
|
end
|
|
|
|
-- Receive one packet from established network connection
|
|
function client_mt:_receive_packet()
|
|
local conn = self.connection
|
|
if not conn then
|
|
return false, "network connection is not opened"
|
|
end
|
|
-- parse packet
|
|
local packet, err = self._parse_packet(conn.recv_func)
|
|
if not packet then
|
|
return false, err
|
|
end
|
|
return packet
|
|
end
|
|
|
|
-- Represent MQTT client as string
|
|
function client_mt:__tostring()
|
|
return str_format("mqtt.client{id=%q}", tostring(self.args.id))
|
|
end
|
|
|
|
-- Garbage collection handler
|
|
function client_mt:__gc()
|
|
-- close network connection if it's available, without sending DISCONNECT packet
|
|
if self.connection then
|
|
self:close_connection("garbage")
|
|
end
|
|
end
|
|
|
|
--- Exported functions
|
|
-- @section exported
|
|
|
|
--- Create, initialize and return new MQTT client instance
|
|
-- @param ... see arguments of client_mt:__init(args)
|
|
-- @see client_mt:__init
|
|
-- @treturn client_mt MQTT client instance
|
|
function client.create(...)
|
|
local cl = setmetatable({}, client_mt)
|
|
cl:__init(...)
|
|
return cl
|
|
end
|
|
|
|
-------
|
|
|
|
-- export module table
|
|
return client
|
|
|
|
-- vim: ts=4 sts=4 sw=4 noet ft=lua
|