1394 lines
		
	
	
		
			47 KiB
		
	
	
	
		
			Lua
		
	
	
	
	
	
			
		
		
	
	
			1394 lines
		
	
	
		
			47 KiB
		
	
	
	
		
			Lua
		
	
	
	
	
	
--- This class contains the MQTT client implementation.
 | 
						|
-- @classmod Client
 | 
						|
local _M = {}
 | 
						|
 | 
						|
-------
 | 
						|
 | 
						|
-- load required stuff
 | 
						|
local type = type
 | 
						|
local error = error
 | 
						|
local select = select
 | 
						|
local require = require
 | 
						|
local tostring = tostring
 | 
						|
 | 
						|
local os = require("os")
 | 
						|
local os_time = os.time
 | 
						|
 | 
						|
local string = require("string")
 | 
						|
local str_format = string.format
 | 
						|
local str_gsub = string.gsub
 | 
						|
 | 
						|
local table = require("table")
 | 
						|
local table_remove = table.remove
 | 
						|
 | 
						|
local math = require("math")
 | 
						|
local math_random = math.random
 | 
						|
 | 
						|
local luamqtt_VERSION
 | 
						|
 | 
						|
local protocol = require("mqtt.protocol")
 | 
						|
local packet_type = protocol.packet_type
 | 
						|
local check_qos = protocol.check_qos
 | 
						|
local next_packet_id = protocol.next_packet_id
 | 
						|
local packet_id_required = protocol.packet_id_required
 | 
						|
 | 
						|
local protocol4 = require("mqtt.protocol4")
 | 
						|
local make_packet4 = protocol4.make_packet
 | 
						|
local parse_packet4 = protocol4.parse_packet
 | 
						|
 | 
						|
local protocol5 = require("mqtt.protocol5")
 | 
						|
local make_packet5 = protocol5.make_packet
 | 
						|
local parse_packet5 = protocol5.parse_packet
 | 
						|
 | 
						|
local log = require "mqtt.log"
 | 
						|
 | 
						|
-------
 | 
						|
 | 
						|
--- MQTT client instance metatable
 | 
						|
local Client = {}
 | 
						|
Client.__index = Client
 | 
						|
 | 
						|
--- Create and initialize MQTT client instance. Typically this is not called directly,
 | 
						|
-- but through `Client.create`.
 | 
						|
-- @tparam table opts							MQTT client creation options table
 | 
						|
-- @tparam string opts.uri						MQTT broker uri to connect. Expected format:
 | 
						|
--			<br/>	`[mqtt[s]://][username[:password]@]hostname[:port]`
 | 
						|
--			<br/>Any option specifically added to the options
 | 
						|
--			table will take precedence over the option specified in this uri.
 | 
						|
-- @tparam boolean|string opts.clean			clean session start flag, use "first" to start clean only on first connect
 | 
						|
-- @tparam[opt] string opts.protocol			either `"mqtt"` or `"mqtts"`
 | 
						|
-- @tparam[opt] string opts.username			username for authorization on MQTT broker
 | 
						|
-- @tparam[opt] string opts.password			password for authorization on MQTT broker; not acceptable in absence of username
 | 
						|
-- @tparam[opt] string opts.host				hostname of the MQTT broker to connect to
 | 
						|
-- @tparam[opt] int opts.port					port number to connect to on the MQTT broker, defaults to `1883` port for plain or `8883` for secure network connections
 | 
						|
-- @tparam[opt=4] number opts.version			MQTT protocol version to use, either `4` (for MQTT v3.1.1) or `5` (for MQTT v5.0).
 | 
						|
--												Also you may use special values `mqtt.v311` or `mqtt.v50` for this field.
 | 
						|
-- @tparam[opt] string opts.id					MQTT client ID, will be generated by luamqtt library if absent
 | 
						|
-- @tparam[opt=false] boolean|table opts.secure	use secure network connection, provided by the lua module set in `opts.ssl_module`.
 | 
						|
--			Set to true to select default parameters, check individual `mqtt.connectors` for supported options.
 | 
						|
-- @tparam[opt] table opts.will					will message table with required fields `{ topic="...", payload="..." }`
 | 
						|
--			and optional fields `{ qos=0...2, retain=true/false }`
 | 
						|
-- @tparam[opt=60] number opts.keep_alive		time interval (in seconds) for client to send PINGREQ packets to the server when network connection is inactive
 | 
						|
-- @tparam[opt=false] boolean opts.reconnect	force created MQTT client to reconnect on connection close.
 | 
						|
--			Set to number value to provide reconnect timeout in seconds.
 | 
						|
--			It's not recommended to use values `< 3`. See also `Client:shutdown`.
 | 
						|
-- @tparam[opt] table opts.connector			connector table to open and send/receive packets over network connection.
 | 
						|
--			default is `require("mqtt.connector")` which tries to auto-detect. See `mqtt.connector`.
 | 
						|
-- @tparam[opt="ssl"] string opts.ssl_module	module name for the luasec-compatible ssl module, default is `"ssl"`
 | 
						|
--			may be used in some non-standard lua environments with own luasec-compatible ssl module
 | 
						|
-- @tparam[opt] table opts.on 					List of event-handlers. See `Client:on` for the format.
 | 
						|
-- @treturn Client MQTT client instance table
 | 
						|
-- @usage
 | 
						|
-- local Client = require "mqtt.client"
 | 
						|
--
 | 
						|
-- local my_client = Client.create {
 | 
						|
-- 	uri = "mqtts://broker.host.com",
 | 
						|
-- 	clean = "first",
 | 
						|
-- 	version = mqtt.v50,
 | 
						|
-- }
 | 
						|
function Client:__init(opts)
 | 
						|
	if not luamqtt_VERSION then
 | 
						|
		luamqtt_VERSION = require("mqtt")._VERSION
 | 
						|
	end
 | 
						|
 | 
						|
	-- fetch and validate client opts
 | 
						|
	local a = {} -- own client copy of opts
 | 
						|
 | 
						|
	for key, value in pairs(opts) do
 | 
						|
		if type(key) ~= "string" then
 | 
						|
			error("expecting string key in opts, got: "..type(key))
 | 
						|
		end
 | 
						|
 | 
						|
		local value_type = type(value)
 | 
						|
		if key == "uri" then
 | 
						|
			assert(value_type == "string", "expecting uri to be a string")
 | 
						|
			a.uri = value
 | 
						|
		elseif key == "clean" then
 | 
						|
			assert(value_type == "boolean" or value == "first", "expecting clean to be a boolean, or 'first'")
 | 
						|
			a.clean = value
 | 
						|
		elseif key == "version" then
 | 
						|
			assert(value_type == "number", "expecting version to be a number")
 | 
						|
			assert(value == 4 or value == 5, "expecting version to be a value either 4 or 5")
 | 
						|
			a.version = value
 | 
						|
		elseif key == "id" then
 | 
						|
			assert(value_type == "string", "expecting id to be a string")
 | 
						|
			a.id = value
 | 
						|
		elseif key == "username" then
 | 
						|
			assert(value_type == "string", "expecting username to be a string")
 | 
						|
			a.username = value
 | 
						|
		elseif key == "password" then
 | 
						|
			assert(value_type == "string", "expecting password to be a string")
 | 
						|
			a.password = value
 | 
						|
		elseif key == "secure" then
 | 
						|
			assert(value_type == "boolean" or value_type == "table", "expecting secure to be a boolean or table")
 | 
						|
			a.secure = value
 | 
						|
		elseif key == "will" then
 | 
						|
			assert(value_type == "table", "expecting will to be a table")
 | 
						|
			a.will = value
 | 
						|
		elseif key == "keep_alive" then
 | 
						|
			assert(value_type == "number", "expecting keep_alive to be a number")
 | 
						|
			a.keep_alive = value
 | 
						|
		elseif key == "properties" then
 | 
						|
			assert(value_type == "table", "expecting properties to be a table")
 | 
						|
			a.properties = value
 | 
						|
		elseif key == "user_properties" then
 | 
						|
			assert(value_type == "table", "expecting user_properties to be a table")
 | 
						|
			a.user_properties = value
 | 
						|
		elseif key == "reconnect" then
 | 
						|
			assert(value_type == "boolean" or value_type == "number", "expecting reconnect to be a boolean or number")
 | 
						|
			a.reconnect = value
 | 
						|
		elseif key == "connector" then
 | 
						|
			a.connector = value
 | 
						|
		elseif key == "ssl_module" then
 | 
						|
			assert(value_type == "string", "expecting ssl_module to be a string")
 | 
						|
			a.ssl_module = value
 | 
						|
		elseif key == "on" then
 | 
						|
			assert(value_type == "table", "expecting 'on' to be a table with events and callbacks")
 | 
						|
			a.on = value
 | 
						|
		else
 | 
						|
			error("unexpected key in client opts: "..key.." = "..tostring(value))
 | 
						|
		end
 | 
						|
	end
 | 
						|
 | 
						|
	-- check required arguments
 | 
						|
	assert(a.uri, 'expecting uri="..." to create MQTT client')
 | 
						|
	assert(a.clean ~= nil, "expecting clean=true, clean=false, or clean='first' to create MQTT client")
 | 
						|
 | 
						|
	if not a.id then
 | 
						|
		-- generate random client id
 | 
						|
		a.id = str_format("luamqtt-v%s-%07x", str_gsub(luamqtt_VERSION, "[^%d]", "-"), math_random(1, 0xFFFFFFF))
 | 
						|
	end
 | 
						|
 | 
						|
	-- default connector
 | 
						|
	a.connector = a.connector or require("mqtt.connector")
 | 
						|
 | 
						|
	-- default reconnect interval
 | 
						|
	if a.reconnect == true then
 | 
						|
		a.reconnect = 30
 | 
						|
	end
 | 
						|
 | 
						|
	-- validate connector content
 | 
						|
	assert(type(a.connector) == "table", "expecting connector to be a table")
 | 
						|
	assert(type(a.connector.validate) == "function", "expecting connector.validate to be a function")
 | 
						|
	assert(type(a.connector.connect) == "function", "expecting connector.connect to be a function")
 | 
						|
	assert(type(a.connector.shutdown) == "function", "expecting connector.shutdown to be a function")
 | 
						|
	assert(type(a.connector.send) == "function", "expecting connector.send to be a function")
 | 
						|
	assert(type(a.connector.receive) == "function", "expecting connector.receive to be a function")
 | 
						|
	assert(a.connector.signal_closed, "missing connector.signal_closed signal value")
 | 
						|
	assert(a.connector.signal_idle, "missing connector.signal_idle signal value")
 | 
						|
 | 
						|
	-- validate connection properties
 | 
						|
	local test_conn = setmetatable({ uri = opts.uri }, a.connector)
 | 
						|
	Client._parse_connection_opts(a, test_conn)
 | 
						|
	test_conn:validate()
 | 
						|
 | 
						|
	-- will table content check
 | 
						|
	if a.will then
 | 
						|
		assert(type(a.will.topic) == "string", "expecting will.topic to be a string")
 | 
						|
		assert(type(a.will.payload) == "string", "expecting will.payload to be a string")
 | 
						|
		if a.will.qos ~= nil then
 | 
						|
			assert(type(a.will.qos) == "number", "expecting will.qos to be a number")
 | 
						|
			assert(check_qos(a.will.qos), "expecting will.qos to be a valid QoS value")
 | 
						|
		end
 | 
						|
		if a.will.retain ~= nil then
 | 
						|
			assert(type(a.will.retain) == "boolean", "expecting will.retain to be a boolean")
 | 
						|
		end
 | 
						|
	end
 | 
						|
 | 
						|
	-- default keep_alive
 | 
						|
	if not a.keep_alive then
 | 
						|
		a.keep_alive = 60
 | 
						|
	end
 | 
						|
 | 
						|
	-- client opts
 | 
						|
	self.opts = a
 | 
						|
 | 
						|
	-- event handlers
 | 
						|
	self.handlers = {
 | 
						|
		connect = {},
 | 
						|
		subscribe = {},
 | 
						|
		unsubscribe = {},
 | 
						|
		message = {},
 | 
						|
		acknowledge = {},
 | 
						|
		error = {},
 | 
						|
		close = {},
 | 
						|
		auth = {},
 | 
						|
		shutdown = {},
 | 
						|
	}
 | 
						|
	self._handling = {}
 | 
						|
	self._to_remove_handlers = {}
 | 
						|
 | 
						|
	-- state
 | 
						|
	self.send_time = 0				-- time of the last network send from client side
 | 
						|
	self.first_connect = true		-- contains true to perform one network connection attempt after client creation
 | 
						|
									-- Note: remains true, during the connect process. False after succes or failure.
 | 
						|
 | 
						|
	-- packet creation/parse functions according version
 | 
						|
	if not a.version then
 | 
						|
		a.version = 4
 | 
						|
	end
 | 
						|
	if a.version == 4 then
 | 
						|
		self._make_packet = make_packet4
 | 
						|
		self._parse_packet = parse_packet4
 | 
						|
	elseif a.version == 5 then
 | 
						|
		self._make_packet = make_packet5
 | 
						|
		self._parse_packet = parse_packet5
 | 
						|
	end
 | 
						|
 | 
						|
	-- register event handlers
 | 
						|
	if a.on then
 | 
						|
		self:on(self.opts.on)
 | 
						|
	end
 | 
						|
 | 
						|
	log:info("MQTT client '%s' created", a.id)
 | 
						|
end
 | 
						|
 | 
						|
--- Add functions as handlers of given events.
 | 
						|
-- @tparam table events							MQTT client creation options table
 | 
						|
-- @tparam function events.connect				`function(connack_packet, client_obj)`<br/>
 | 
						|
-- 				After a connect attempt, after receiving the CONNACK packet from the broker.
 | 
						|
-- 				check `connack_packet.rc == 0` for a succesful connect.
 | 
						|
-- @tparam function events.error					`function(errmsg, client_obj [, packet])`<br/>
 | 
						|
-- 												on errors, optional `packet` is only provided if the
 | 
						|
--												received `CONNACK.rc ~= 0` when connecting.
 | 
						|
-- @tparam function events.close					`function(connection_obj, client_obj)`<br/>
 | 
						|
-- 												upon closing the connection. `connection_obj.close_reason`
 | 
						|
--												(string) will hold the close reason.
 | 
						|
-- @tparam function events.shutdown				`function(client_obj)`<br/>
 | 
						|
-- 												upon shutting down the client (diconnecting an no more reconnects).
 | 
						|
-- @tparam function events.subscribe				`function(suback_packet, client_obj)`<br/>
 | 
						|
-- 												upon a succesful subscription, after receiving the SUBACK packet from the broker
 | 
						|
-- @tparam function events.unsubscribe			`function(unsuback_packet, client_obj)`<br/>
 | 
						|
-- 												upon a succesful unsubscription, after receiving the UNSUBACK packet from the broker
 | 
						|
-- @tparam function events.message				`function(publish_packet, client_obj)`<br/>
 | 
						|
-- 												upon receiving a PUBLISH packet from the broker
 | 
						|
-- @tparam function events.acknowledge			`function(ack_packet, client_obj)`<br/>
 | 
						|
-- 												upon receiving a PUBACK or PUBREC packet from the broker
 | 
						|
-- @tparam function events.auth					`function(auth_packet, client_obj)`<br/>
 | 
						|
-- 												upon receiving an AUTH packet
 | 
						|
-- @usage
 | 
						|
-- client:on {
 | 
						|
-- 	connect = function(pck, self)
 | 
						|
-- 		if pck.rc ~= 0 then
 | 
						|
-- 			return -- connection failed
 | 
						|
-- 		end
 | 
						|
-- 		-- succesfully connected
 | 
						|
-- 	end,
 | 
						|
-- 	message = function(pck, self)
 | 
						|
-- 		-- handle received message
 | 
						|
-- 	end,
 | 
						|
-- }
 | 
						|
--
 | 
						|
-- -- an alternative way to add individual handlers;
 | 
						|
-- client:on("message", function(pck, self)
 | 
						|
-- 	-- handle received message
 | 
						|
-- end)
 | 
						|
function Client:on(...)
 | 
						|
	local nargs = select("#", ...)
 | 
						|
	local events
 | 
						|
	if nargs == 2 then
 | 
						|
		events = { [select(1, ...)] = select(2, ...) }
 | 
						|
	elseif nargs == 1 then
 | 
						|
		events = select(1, ...)
 | 
						|
	else
 | 
						|
		error("invalid arguments: expected only one or two arguments")
 | 
						|
	end
 | 
						|
	for event, func in pairs(events) do
 | 
						|
		assert(type(event) == "string", "expecting event to be a string")
 | 
						|
		assert(type(func) == "function", "expecting func to be a function")
 | 
						|
		local handlers = self.handlers[event]
 | 
						|
		if not handlers then
 | 
						|
			error("invalid event '"..tostring(event).."' to handle")
 | 
						|
		end
 | 
						|
		handlers[#handlers + 1] = func
 | 
						|
	end
 | 
						|
end
 | 
						|
 | 
						|
-- Remove one item from the list-table with full-iteration
 | 
						|
local function remove_item(list, item)
 | 
						|
	for i, test in ipairs(list) do
 | 
						|
		if test == item then
 | 
						|
			table_remove(list, i)
 | 
						|
			return
 | 
						|
		end
 | 
						|
	end
 | 
						|
end
 | 
						|
 | 
						|
--- Remove given function handler for specified event.
 | 
						|
-- @tparam string event		event name to remove handler
 | 
						|
-- @tparam function func	handler function to remove
 | 
						|
-- @usage
 | 
						|
-- local handler = function(pck, self)
 | 
						|
-- 	-- handle received message
 | 
						|
-- end
 | 
						|
--
 | 
						|
-- -- add event handler
 | 
						|
-- client:on {
 | 
						|
-- 	message = handler
 | 
						|
-- }
 | 
						|
--
 | 
						|
-- -- remove it again
 | 
						|
-- client:off("message", handler)
 | 
						|
function Client:off(event, func)
 | 
						|
	local handlers = self.handlers[event]
 | 
						|
	if not handlers then
 | 
						|
		error("invalid event '"..tostring(event).."' to handle")
 | 
						|
	end
 | 
						|
	if self._handling[event] then
 | 
						|
		-- this event is handling now, schedule the function removing to the moment after all handlers will be called for the event
 | 
						|
		local to_remove = self._to_remove_handlers[event] or {}
 | 
						|
		to_remove[#to_remove + 1] = func
 | 
						|
		self._to_remove_handlers[event] = to_remove
 | 
						|
	else
 | 
						|
		-- it's ok to remove given function just now
 | 
						|
		remove_item(handlers, func)
 | 
						|
	end
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Subscribe to specified topic. Returns the SUBSCRIBE packet id and calls optional callback when subscription will be created on broker
 | 
						|
-- @tparam table opts							subscription options
 | 
						|
-- @tparam string opts.topic					topic to subscribe
 | 
						|
-- @tparam[opt=0] number opts.qos				QoS level for subscription
 | 
						|
-- @tparam boolean opts.no_local				for MQTT v5.0 only: no_local flag for subscription
 | 
						|
-- @tparam boolean opts.retain_as_published		for MQTT v5.0 only: retain_as_published flag for subscription
 | 
						|
-- @tparam boolean opts.retain_handling			for MQTT v5.0 only: retain_handling flag for subscription
 | 
						|
-- @tparam[opt] table opts.properties			for MQTT v5.0 only: properties for subscribe operation
 | 
						|
-- @tparam[opt] table opts.user_properties		for MQTT v5.0 only: user properties for subscribe operation
 | 
						|
-- @tparam[opt] function opts.callback			callback function to be called when subscription is acknowledged by broker
 | 
						|
-- @return packet id on success or false and error message on failure
 | 
						|
function Client:subscribe(opts)
 | 
						|
	-- fetch and validate opts
 | 
						|
	assert(type(opts) == "table", "expecting opts to be a table")
 | 
						|
	assert(type(opts.topic) == "string", "expecting opts.topic to be a string")
 | 
						|
	assert(opts.qos == nil or (type(opts.qos) == "number" and check_qos(opts.qos)), "expecting valid opts.qos value")
 | 
						|
	assert(opts.no_local == nil or type(opts.no_local) == "boolean", "expecting opts.no_local to be a boolean")
 | 
						|
	assert(opts.retain_as_published == nil or type(opts.retain_as_published) == "boolean", "expecting opts.retain_as_published to be a boolean")
 | 
						|
	assert(opts.retain_handling == nil or type(opts.retain_handling) == "boolean", "expecting opts.retain_handling to be a boolean")
 | 
						|
	assert(opts.properties == nil or type(opts.properties) == "table", "expecting opts.properties to be a table")
 | 
						|
	assert(opts.user_properties == nil or type(opts.user_properties) == "table", "expecting opts.user_properties to be a table")
 | 
						|
	assert(opts.callback == nil or type(opts.callback) == "function", "expecting opts.callback to be a function")
 | 
						|
 | 
						|
	-- check connection is alive
 | 
						|
	if not self.connection then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- create SUBSCRIBE packet
 | 
						|
	local pargs = {
 | 
						|
		type = packet_type.SUBSCRIBE,
 | 
						|
		subscriptions = {
 | 
						|
			{
 | 
						|
				topic = opts.topic,
 | 
						|
				qos = opts.qos,
 | 
						|
				no_local = opts.no_local,
 | 
						|
				retain_as_published = opts.retain_as_published,
 | 
						|
				retain_handling = opts.retain_handling
 | 
						|
			},
 | 
						|
		},
 | 
						|
		properties = opts.properties,
 | 
						|
		user_properties = opts.user_properties,
 | 
						|
	}
 | 
						|
	self:_assign_packet_id(pargs)
 | 
						|
	local packet_id = pargs.packet_id
 | 
						|
	local subscribe = self._make_packet(pargs)
 | 
						|
 | 
						|
	log:info("subscribing client '%s' to topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.")
 | 
						|
 | 
						|
	-- send SUBSCRIBE packet
 | 
						|
	local ok, err = self:_send_packet(subscribe)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send SUBSCRIBE: "..err
 | 
						|
		log:error("client '%s': %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- add subscribe callback
 | 
						|
	local callback = opts.callback
 | 
						|
	if callback then
 | 
						|
		local function handler(suback, ...)
 | 
						|
			if suback.packet_id == packet_id then
 | 
						|
				self:off("subscribe", handler)
 | 
						|
				callback(suback, ...)
 | 
						|
			end
 | 
						|
		end
 | 
						|
		self:on("subscribe", handler)
 | 
						|
	end
 | 
						|
 | 
						|
	-- returns assigned packet id
 | 
						|
	return packet_id
 | 
						|
end
 | 
						|
 | 
						|
--- Unsubscribe from specified topic, and calls optional callback when subscription will be removed on broker
 | 
						|
-- @tparam table opts						subscription options
 | 
						|
-- @tparam string opts.topic				topic to unsubscribe
 | 
						|
-- @tparam[opt] table opts.properties		properties for unsubscribe operation
 | 
						|
-- @tparam[opt] table opts.user_properties	user properties for unsubscribe operation
 | 
						|
-- @tparam[opt] function opts.callback		callback function to be called when the unsubscribe is acknowledged by the broker
 | 
						|
-- @return packet id on success or false and error message on failure
 | 
						|
function Client:unsubscribe(opts)
 | 
						|
	-- fetch and validate opts
 | 
						|
	assert(type(opts) == "table", "expecting opts to be a table")
 | 
						|
	assert(type(opts.topic) == "string", "expecting opts.topic to be a string")
 | 
						|
	assert(opts.properties == nil or type(opts.properties) == "table", "expecting opts.properties to be a table")
 | 
						|
	assert(opts.user_properties == nil or type(opts.user_properties) == "table", "expecting opts.user_properties to be a table")
 | 
						|
	assert(opts.callback == nil or type(opts.callback) == "function", "expecting opts.callback to be a function")
 | 
						|
 | 
						|
 | 
						|
	-- check connection is alive
 | 
						|
	if not self.connection then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- create UNSUBSCRIBE packet
 | 
						|
	local  pargs = {
 | 
						|
		type = packet_type.UNSUBSCRIBE,
 | 
						|
		subscriptions = {opts.topic},
 | 
						|
		properties = opts.properties,
 | 
						|
		user_properties = opts.user_properties,
 | 
						|
	}
 | 
						|
	self:_assign_packet_id(pargs)
 | 
						|
	local packet_id = pargs.packet_id
 | 
						|
	local unsubscribe = self._make_packet(pargs)
 | 
						|
 | 
						|
	log:info("unsubscribing client '%s' from topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.")
 | 
						|
 | 
						|
	-- send UNSUBSCRIBE packet
 | 
						|
	local ok, err = self:_send_packet(unsubscribe)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send UNSUBSCRIBE: "..err
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- add unsubscribe callback
 | 
						|
	local callback = opts.callback
 | 
						|
	if callback then
 | 
						|
		local function handler(unsuback, ...)
 | 
						|
			if unsuback.packet_id == packet_id then
 | 
						|
				self:off("unsubscribe", handler)
 | 
						|
				callback(unsuback, ...)
 | 
						|
			end
 | 
						|
		end
 | 
						|
		self:on("unsubscribe", handler)
 | 
						|
	end
 | 
						|
 | 
						|
	-- returns assigned packet id
 | 
						|
	return packet_id
 | 
						|
end
 | 
						|
 | 
						|
--- Publish message to broker
 | 
						|
-- @tparam table opts						publish operation options table
 | 
						|
-- @tparam string opts.topic				topic to publish message
 | 
						|
-- @tparam[opt] string opts.payload			publish message payload
 | 
						|
-- @tparam[opt=0] number opts.qos			QoS level for message publication
 | 
						|
-- @tparam[opt=false] boolean opts.retain	retain message publication flag
 | 
						|
-- @tparam[opt=false] boolean opts.dup		dup message publication flag
 | 
						|
-- @tparam[opt] table opts.properties		properties for publishing message
 | 
						|
-- @tparam[opt] table opts.user_properties	user properties for publishing message
 | 
						|
-- @tparam[opt] function opts.callback		callback to call when published message has been acknowledged by the broker
 | 
						|
-- @return true or packet id on success or false and error message on failure
 | 
						|
function Client:publish(opts)
 | 
						|
	-- fetch and validate opts
 | 
						|
	assert(type(opts) == "table", "expecting opts to be a table")
 | 
						|
	assert(type(opts.topic) == "string", "expecting opts.topic to be a string")
 | 
						|
	assert(opts.payload == nil or type(opts.payload) == "string", "expecting opts.payload to be a string")
 | 
						|
	assert(opts.qos == nil or type(opts.qos) == "number", "expecting opts.qos to be a number")
 | 
						|
	if opts.qos then
 | 
						|
		assert(check_qos(opts.qos), "expecting qos to be a valid QoS value")
 | 
						|
	end
 | 
						|
	assert(opts.retain == nil or type(opts.retain) == "boolean", "expecting opts.retain to be a boolean")
 | 
						|
	assert(opts.dup == nil or type(opts.dup) == "boolean", "expecting opts.dup to be a boolean")
 | 
						|
	assert(opts.properties == nil or type(opts.properties) == "table", "expecting opts.properties to be a table")
 | 
						|
	assert(opts.user_properties == nil or type(opts.user_properties) == "table", "expecting opts.user_properties to be a table")
 | 
						|
	assert(opts.callback == nil or type(opts.callback) == "function", "expecting opts.callback to be a function")
 | 
						|
 | 
						|
	-- check connection is alive
 | 
						|
	local conn = self.connection
 | 
						|
	if not conn then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- create PUBLISH packet
 | 
						|
	opts.type = packet_type.PUBLISH
 | 
						|
	self:_assign_packet_id(opts)
 | 
						|
	local packet_id = opts.packet_id
 | 
						|
	local publish = self._make_packet(opts)
 | 
						|
 | 
						|
	log:debug("client '%s' publishing to topic '%s' (packet: %s)", self.opts.id, opts.topic, packet_id or "n.a.")
 | 
						|
 | 
						|
	-- send PUBLISH packet
 | 
						|
	local ok, err = self:_send_packet(publish)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send PUBLISH: "..err
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- record packet id as waited for QoS 2 exchange
 | 
						|
	if opts.qos == 2 then
 | 
						|
		conn.wait_for_pubrec[packet_id] = true
 | 
						|
	end
 | 
						|
 | 
						|
	-- add acknowledge callback
 | 
						|
	local callback = opts.callback
 | 
						|
	if callback then
 | 
						|
		if packet_id then
 | 
						|
			local function handler(ack, ...)
 | 
						|
				if ack.packet_id == packet_id then
 | 
						|
					self:off("acknowledge", handler)
 | 
						|
					callback(ack, ...)
 | 
						|
				end
 | 
						|
			end
 | 
						|
			self:on("acknowledge", handler)
 | 
						|
		else
 | 
						|
			callback("no ack for QoS 0 message", self)
 | 
						|
		end
 | 
						|
	end
 | 
						|
 | 
						|
	-- returns assigned packet id
 | 
						|
	return packet_id or true
 | 
						|
end
 | 
						|
 | 
						|
--- Acknowledge given received message
 | 
						|
-- @tparam packet_mt msg				PUBLISH message to acknowledge
 | 
						|
-- @tparam[opt=0] number rc				The reason code field of PUBACK packet in MQTT v5.0 protocol
 | 
						|
-- @tparam[opt] table properties		properties for PUBACK/PUBREC packets
 | 
						|
-- @tparam[opt] table user_properties	user properties for PUBACK/PUBREC packets
 | 
						|
-- @return true on success or false and error message on failure
 | 
						|
function Client:acknowledge(msg, rc, properties, user_properties)
 | 
						|
	assert(type(msg) == "table" and msg.type == packet_type.PUBLISH, "expecting msg to be a publish packet")
 | 
						|
	assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
 | 
						|
	assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
 | 
						|
	assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
 | 
						|
 | 
						|
	-- check connection is alive
 | 
						|
	local conn = self.connection
 | 
						|
	if not conn then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- check packet needs to be acknowledged
 | 
						|
	local packet_id = msg.packet_id
 | 
						|
	if not packet_id then
 | 
						|
		return true
 | 
						|
	end
 | 
						|
 | 
						|
	log:debug("client '%s' acknowledging packet %s", self.opts.id, packet_id or "n.a.")
 | 
						|
 | 
						|
	if msg.qos == 1 then
 | 
						|
		-- PUBACK should be sent
 | 
						|
 | 
						|
		-- create PUBACK packet
 | 
						|
		local puback = self._make_packet{
 | 
						|
			type = packet_type.PUBACK,
 | 
						|
			packet_id = packet_id,
 | 
						|
			rc = rc or 0,
 | 
						|
			properties = properties,
 | 
						|
			user_properties = user_properties,
 | 
						|
		}
 | 
						|
 | 
						|
		-- send PUBACK packet
 | 
						|
		local ok, err = self:_send_packet(puback)
 | 
						|
		if not ok then
 | 
						|
			err = "failed to send PUBACK: "..err
 | 
						|
			log:error("client '%s' %s", self.opts.id, err)
 | 
						|
			self:handle("error", err, self)
 | 
						|
			self:close_connection("error")
 | 
						|
			return false, err
 | 
						|
		end
 | 
						|
	elseif msg.qos == 2 then
 | 
						|
		-- PUBREC should be sent and packet_id should be remembered for PUBREL+PUBCOMP sequence
 | 
						|
 | 
						|
		-- create PUBREC packet
 | 
						|
		local pubrec = self._make_packet{
 | 
						|
			type = packet_type.PUBREC,
 | 
						|
			packet_id = packet_id,
 | 
						|
			rc = rc or 0,
 | 
						|
			properties = properties,
 | 
						|
			user_properties = user_properties,
 | 
						|
		}
 | 
						|
 | 
						|
		-- send PUBREC packet
 | 
						|
		local ok, err = self:_send_packet(pubrec)
 | 
						|
		if not ok then
 | 
						|
			err = "failed to send PUBREC: "..err
 | 
						|
			log:error("client '%s' %s", self.opts.id, err)
 | 
						|
			self:handle("error", err, self)
 | 
						|
			self:close_connection("error")
 | 
						|
			return false, err
 | 
						|
		end
 | 
						|
 | 
						|
		-- store packet id as waiting for PUBREL
 | 
						|
		conn.wait_for_pubrel[packet_id] = true
 | 
						|
	end
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Send DISCONNECT packet to the broker and close the connection.
 | 
						|
-- Note: if the client is set to automatically reconnect, it will do so. If you
 | 
						|
-- want to disconnect and NOT reconnect, use `Client:shutdown`.
 | 
						|
-- @tparam[opt=0] number rc				The Disconnect Reason Code value from MQTT v5.0 protocol
 | 
						|
-- @tparam[opt] table properties		properties for PUBACK/PUBREC packets
 | 
						|
-- @tparam[opt] table user_properties	user properties for PUBACK/PUBREC packets
 | 
						|
-- @return true on success or false and error message on failure
 | 
						|
function Client:disconnect(rc, properties, user_properties)
 | 
						|
	-- validate opts
 | 
						|
	assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
 | 
						|
	assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
 | 
						|
	assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
 | 
						|
 | 
						|
	-- check connection is alive
 | 
						|
	if not self.connection then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- create DISCONNECT packet
 | 
						|
	local disconnect = self._make_packet{
 | 
						|
		type = packet_type.DISCONNECT,
 | 
						|
		rc = rc or 0,
 | 
						|
		properties = properties,
 | 
						|
		user_properties = user_properties,
 | 
						|
	}
 | 
						|
 | 
						|
	log:info("client '%s' disconnecting (rc = %d)", self.opts.id, rc or 0)
 | 
						|
 | 
						|
	-- send DISCONNECT packet
 | 
						|
	local ok, err = self:_send_packet(disconnect)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send DISCONNECT: "..err
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- now close connection
 | 
						|
	self:close_connection("connection closed by client")
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Shuts the client down.
 | 
						|
-- Disconnects if still connected, and disables reconnecting. If the client is
 | 
						|
-- added to an ioloop, this will prevent an automatic reconnect.
 | 
						|
-- Raises the "shutdown" event.
 | 
						|
-- @param ... see `Client:disconnect`
 | 
						|
-- @return `true`
 | 
						|
function Client:shutdown(...)
 | 
						|
	log:debug("client '%s' shutting down", self.opts.id)
 | 
						|
	self.first_connect = false
 | 
						|
	self.opts.reconnect = false
 | 
						|
	self:disconnect(...)
 | 
						|
	self:handle("shutdown", self)
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Send AUTH packet to authenticate client on broker, in MQTT v5.0 protocol
 | 
						|
-- @tparam[opt=0] number rc				Authenticate Reason Code
 | 
						|
-- @tparam[opt] table properties		properties for PUBACK/PUBREC packets
 | 
						|
-- @tparam[opt] table user_properties	user properties for PUBACK/PUBREC packets
 | 
						|
-- @return true on success or false and error message on failure
 | 
						|
function Client:auth(rc, properties, user_properties)
 | 
						|
	-- validate opts
 | 
						|
	assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
 | 
						|
	assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
 | 
						|
	assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
 | 
						|
	assert(self.opts.version == 5, "allowed only in MQTT v5.0 protocol")
 | 
						|
 | 
						|
	-- check connection is alive
 | 
						|
	if not self.connection then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- create AUTH packet
 | 
						|
	local auth = self._make_packet{
 | 
						|
		type = packet_type.AUTH,
 | 
						|
		rc = rc or 0,
 | 
						|
		properties = properties,
 | 
						|
		user_properties = user_properties,
 | 
						|
	}
 | 
						|
 | 
						|
	log:info("client '%s' authenticating")
 | 
						|
 | 
						|
	-- send AUTH packet
 | 
						|
	local ok, err = self:_send_packet(auth)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send AUTH: "..err
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Immediately close established network connection, without graceful session finishing with DISCONNECT packet
 | 
						|
-- @tparam[opt] string reason the reasong string of connection close
 | 
						|
function Client:close_connection(reason)
 | 
						|
	assert(not reason or type(reason) == "string", "expecting reason to be a string")
 | 
						|
	local conn = self.connection
 | 
						|
	if not conn then
 | 
						|
		return true
 | 
						|
	end
 | 
						|
 | 
						|
	reason = reason or "unspecified"
 | 
						|
 | 
						|
	log:info("client '%s' closing connection (reason: %s)", self.opts.id, reason)
 | 
						|
 | 
						|
	conn:shutdown()
 | 
						|
	self.connection = nil
 | 
						|
	conn.close_reason = reason
 | 
						|
 | 
						|
	self:handle("close", conn, self)
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Start connecting to broker
 | 
						|
-- @return true on success or false and error message on failure
 | 
						|
function Client:start_connecting()
 | 
						|
	-- open network connection
 | 
						|
	local ok, err = self:open_connection()
 | 
						|
	if not ok then
 | 
						|
		self.first_connect = false
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- send CONNECT packet
 | 
						|
	ok, err = self:send_connect()
 | 
						|
	if not ok then
 | 
						|
		self.first_connect = false
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Low-level methods
 | 
						|
-- @section low-level
 | 
						|
 | 
						|
--- Send PINGREQ packet
 | 
						|
-- @return true on success or false and error message on failure
 | 
						|
function Client:send_pingreq()
 | 
						|
	-- check connection is alive
 | 
						|
	if not self.connection then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- create PINGREQ packet
 | 
						|
	local pingreq = self._make_packet{
 | 
						|
		type = packet_type.PINGREQ,
 | 
						|
	}
 | 
						|
 | 
						|
	log:debug("client '%s' sending PINGREQ", self.opts.id)
 | 
						|
 | 
						|
	-- send PINGREQ packet
 | 
						|
	local ok, err = self:_send_packet(pingreq)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send PINGREQ: "..err
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- set ping timeout; for now 1 ping-request interval
 | 
						|
	self.ping_expire_time = os_time() + self.opts.keep_alive
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Open network connection to the broker
 | 
						|
-- @return true on success or false and error message on failure
 | 
						|
function Client:open_connection()
 | 
						|
	if self.connection then
 | 
						|
		return true
 | 
						|
	end
 | 
						|
 | 
						|
	local opts = self.opts
 | 
						|
	local connector = assert(opts.connector, "no connector configured in MQTT client")
 | 
						|
 | 
						|
	-- create connection table
 | 
						|
	local conn = setmetatable({
 | 
						|
		uri = opts.uri,
 | 
						|
		wait_for_pubrec = {},	-- a table with packet_id of partially acknowledged sent packets in QoS 2 exchange process
 | 
						|
		wait_for_pubrel = {},	-- a table with packet_id of partially acknowledged received packets in QoS 2 exchange process
 | 
						|
	}, connector)
 | 
						|
	Client._parse_connection_opts(opts, conn)
 | 
						|
 | 
						|
	log:info("client '%s' connecting to broker '%s' (using: %s)", self.opts.id, opts.uri, conn.type or "unknown")
 | 
						|
 | 
						|
	-- perform connect
 | 
						|
	local ok, err = conn:connect()
 | 
						|
	if not ok then
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		err = "failed to open network connection: "..err
 | 
						|
		self:handle("error", err, self)
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- assign connection
 | 
						|
	self.connection = conn
 | 
						|
 | 
						|
	-- reset ping timeout
 | 
						|
	self.ping_expire_time = nil
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Send CONNECT packet into opened network connection
 | 
						|
-- @return true on success or false and error message on failure
 | 
						|
function Client:send_connect()
 | 
						|
	-- check connection is alive
 | 
						|
	if not self.connection then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	local opts = self.opts
 | 
						|
 | 
						|
	-- create CONNECT packet
 | 
						|
	local connect = self._make_packet{
 | 
						|
		type = packet_type.CONNECT,
 | 
						|
		id = opts.id,
 | 
						|
		clean = not not opts.clean, -- force to boolean, in case "first"
 | 
						|
		username = opts.username,
 | 
						|
		password = opts.password,
 | 
						|
		will = opts.will,
 | 
						|
		keep_alive = opts.keep_alive,
 | 
						|
		properties = opts.properties,
 | 
						|
		user_properties = opts.user_properties,
 | 
						|
	}
 | 
						|
 | 
						|
	log:info("client '%s' sending CONNECT (user '%s')", self.opts.id, opts.username or "not specified")
 | 
						|
 | 
						|
	-- send CONNECT packet
 | 
						|
	local ok, err = self:_send_packet(connect)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send CONNECT: "..err
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- reset last packet id
 | 
						|
	self._last_packet_id = nil
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
--- Checks last message send, and sends a PINGREQ if necessary.
 | 
						|
-- Use this function to check and send keep-alives when using an external event loop. When using the
 | 
						|
-- included modules to add clients (see `mqtt.loop`), this will be taken care of automatically.
 | 
						|
-- @treturn[1] number time till next keep_alive (in seconds)
 | 
						|
-- @treturn[2] number time till next keep_alive (in seconds)
 | 
						|
-- @treturn[2] string in case of errors (eg. not connected) the second return value is an error string
 | 
						|
-- @usage
 | 
						|
-- -- example using a Copas event loop to send and check keep-alives
 | 
						|
-- copas.addthread(function()
 | 
						|
--     while true do
 | 
						|
--         if not my_client then
 | 
						|
--             return -- exiting, client was destroyed
 | 
						|
--         end
 | 
						|
--         copas.pause(my_client:check_keep_alive())
 | 
						|
--     end
 | 
						|
-- end)
 | 
						|
function Client:check_keep_alive()
 | 
						|
	local interval = self.opts.keep_alive
 | 
						|
	if not self.connection then
 | 
						|
		return interval, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	local t_now = os_time()
 | 
						|
	local t_next = self.send_time + interval
 | 
						|
	local t_timeout = self.ping_expire_time
 | 
						|
 | 
						|
	-- check last ping request
 | 
						|
	if t_timeout and t_timeout <= t_now then
 | 
						|
		-- we timed-out, close and exit
 | 
						|
		local err = str_format("failed to receive PINGRESP within %d seconds", interval)
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return interval, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- send PINGREQ if keep_alive interval is reached
 | 
						|
	if t_now >= t_next then
 | 
						|
		local _, err = self:send_pingreq()
 | 
						|
		return interval, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- return which ever is earlier, timeout or next ping request
 | 
						|
	if t_timeout and t_timeout < t_next then
 | 
						|
		return t_timeout - t_now
 | 
						|
	end
 | 
						|
	return t_next - t_now
 | 
						|
end
 | 
						|
 | 
						|
 | 
						|
-- Internal methods
 | 
						|
 | 
						|
-- Send PUBREL acknowledge packet - second phase of QoS 2 exchange
 | 
						|
-- Returns true on success or false and error message on failure
 | 
						|
function Client:acknowledge_pubrel(packet_id)
 | 
						|
	-- check connection is alive
 | 
						|
	if not self.connection then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- create PUBREL packet
 | 
						|
	local pubrel = self._make_packet{type=packet_type.PUBREL, packet_id=packet_id, rc=0}
 | 
						|
 | 
						|
	log:debug("client '%s' sending PUBREL (packet: %s)", self.opts.id, packet_id or "n.a.")
 | 
						|
 | 
						|
	-- send PUBREL packet
 | 
						|
	local ok, err = self:_send_packet(pubrel)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send PUBREL: "..err
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
-- Send PUBCOMP acknowledge packet - last phase of QoS 2 exchange
 | 
						|
-- Returns true on success or false and error message on failure
 | 
						|
function Client:acknowledge_pubcomp(packet_id)
 | 
						|
	-- check connection is alive
 | 
						|
	if not self.connection then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
 | 
						|
	-- create PUBCOMP packet
 | 
						|
	local pubcomp = self._make_packet{type=packet_type.PUBCOMP, packet_id=packet_id, rc=0}
 | 
						|
 | 
						|
	log:debug("client '%s' sending PUBCOMP (packet: %s)", self.opts.id,  packet_id or "n.a.")
 | 
						|
 | 
						|
	-- send PUBCOMP packet
 | 
						|
	local ok, err = self:_send_packet(pubcomp)
 | 
						|
	if not ok then
 | 
						|
		err = "failed to send PUBCOMP: "..err
 | 
						|
		log:error("client '%s' %s", self.opts.id, err)
 | 
						|
		self:handle("error", err, self)
 | 
						|
		self:close_connection("error")
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
-- Call specified event handlers
 | 
						|
function Client:handle(event, ...)
 | 
						|
	local handlers = self.handlers[event]
 | 
						|
	if not handlers then
 | 
						|
		error("invalid event '"..tostring(event).."' to handle")
 | 
						|
	end
 | 
						|
	self._handling[event] = true -- protecting self.handlers[event] table from modifications by Client:off() when iterating
 | 
						|
	for _, handler in ipairs(handlers) do
 | 
						|
		handler(...)
 | 
						|
	end
 | 
						|
	self._handling[event] = nil
 | 
						|
 | 
						|
	-- process handlers removing, scheduled by Client:off()
 | 
						|
	local to_remove = self._to_remove_handlers[event]
 | 
						|
	if to_remove then
 | 
						|
		for _, func in ipairs(to_remove) do
 | 
						|
			remove_item(handlers, func)
 | 
						|
		end
 | 
						|
		self._to_remove_handlers[event] = nil
 | 
						|
	end
 | 
						|
end
 | 
						|
 | 
						|
-- Internal methods
 | 
						|
 | 
						|
-- Assign next packet id for given packet creation opts
 | 
						|
function Client:_assign_packet_id(pargs)
 | 
						|
	if not pargs.packet_id then
 | 
						|
		if packet_id_required(pargs) then
 | 
						|
			self._last_packet_id = next_packet_id(self._last_packet_id)
 | 
						|
			pargs.packet_id = self._last_packet_id
 | 
						|
		end
 | 
						|
	end
 | 
						|
end
 | 
						|
 | 
						|
-- Handle a single received packet
 | 
						|
function Client:handle_received_packet(packet)
 | 
						|
	local conn = self.connection
 | 
						|
	local err
 | 
						|
 | 
						|
	log:debug("client '%s' received '%s' (packet: %s)", self.opts.id, packet_type[packet.type], packet.packet_id or "n.a.")
 | 
						|
 | 
						|
	if not conn.connack then
 | 
						|
		-- expecting only CONNACK packet here
 | 
						|
		if packet.type ~= packet_type.CONNACK then
 | 
						|
			err = "expecting CONNACK but received "..packet.type
 | 
						|
			log:error("client '%s' %s", self.opts.id, err)
 | 
						|
			self:handle("error", err, self)
 | 
						|
			self:close_connection("error")
 | 
						|
			self.first_connect = false
 | 
						|
			return false, err
 | 
						|
		end
 | 
						|
 | 
						|
		-- store connack packet in connection
 | 
						|
		conn.connack = packet
 | 
						|
 | 
						|
		-- check CONNACK rc
 | 
						|
		if packet.rc ~= 0 then
 | 
						|
			err = str_format("CONNECT failed with CONNACK [rc=%d]: %s", packet.rc, packet:reason_string())
 | 
						|
			log:error("client '%s' %s", self.opts.id, err)
 | 
						|
			self:handle("error", err, self, packet)
 | 
						|
			self:handle("connect", packet, self)
 | 
						|
			self:close_connection("connection failed")
 | 
						|
			self.first_connect = false
 | 
						|
			return false, err
 | 
						|
		end
 | 
						|
 | 
						|
		log:info("client '%s' connected successfully to '%s:%s'", self.opts.id, conn.host, conn.port)
 | 
						|
 | 
						|
		-- fire connect event
 | 
						|
		if self.opts.clean == "first" then
 | 
						|
			self.opts.clean = false  -- reset clean flag to false, so next connection resumes previous session
 | 
						|
			log:debug("client '%s'; switching clean flag to false (was 'first')", self.opts.id)
 | 
						|
		end
 | 
						|
		self:handle("connect", packet, self)
 | 
						|
		self.first_connect = false
 | 
						|
	else
 | 
						|
		-- connection authorized, so process usual packets
 | 
						|
 | 
						|
		-- handle packet according its type
 | 
						|
		local ptype = packet.type
 | 
						|
		if ptype == packet_type.PINGRESP then -- luacheck: ignore
 | 
						|
			-- PINGREQ answer, clear timeout
 | 
						|
			self.ping_expire_time = nil
 | 
						|
		elseif ptype == packet_type.SUBACK then
 | 
						|
			self:handle("subscribe", packet, self)
 | 
						|
		elseif ptype == packet_type.UNSUBACK then
 | 
						|
			self:handle("unsubscribe", packet, self)
 | 
						|
		elseif ptype == packet_type.PUBLISH then
 | 
						|
			-- check such packet is not waiting for pubrel acknowledge
 | 
						|
			self:handle("message", packet, self)
 | 
						|
		elseif ptype == packet_type.PUBACK then
 | 
						|
			self:handle("acknowledge", packet, self)
 | 
						|
		elseif ptype == packet_type.PUBREC then
 | 
						|
			local packet_id = packet.packet_id
 | 
						|
			if conn.wait_for_pubrec[packet_id] then
 | 
						|
				conn.wait_for_pubrec[packet_id] = nil
 | 
						|
				-- send PUBREL acknowledge
 | 
						|
				if self:acknowledge_pubrel(packet_id) then
 | 
						|
					-- and fire acknowledge event
 | 
						|
					self:handle("acknowledge", packet, self)
 | 
						|
				end
 | 
						|
			end
 | 
						|
		elseif ptype == packet_type.PUBREL then
 | 
						|
			-- second phase of QoS 2 exchange - check we are already acknowledged such packet by PUBREL
 | 
						|
			local packet_id = packet.packet_id
 | 
						|
			if conn.wait_for_pubrel[packet_id] then
 | 
						|
				-- remove packet from waiting for PUBREL packets table
 | 
						|
				conn.wait_for_pubrel[packet_id] = nil
 | 
						|
				-- send PUBCOMP acknowledge
 | 
						|
				return self:acknowledge_pubcomp(packet_id)
 | 
						|
			end
 | 
						|
		elseif ptype == packet_type.PUBCOMP then --luacheck: ignore
 | 
						|
			-- last phase of QoS 2 exchange
 | 
						|
			-- do nothing here
 | 
						|
		elseif ptype == packet_type.DISCONNECT then
 | 
						|
			self:close_connection("disconnect received from broker")
 | 
						|
		elseif ptype == packet_type.AUTH then
 | 
						|
			self:handle("auth", packet, self)
 | 
						|
		else
 | 
						|
			log:warn("client '%s' don't know how to handle %s", self.opts.id, ptype)
 | 
						|
		end
 | 
						|
	end
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
do
 | 
						|
	-- implict (re)connecting when reading
 | 
						|
	local function implicit_connect(self)
 | 
						|
		local reconnect = self.opts.reconnect
 | 
						|
 | 
						|
		if not self.first_connect and not reconnect then
 | 
						|
			-- this would be a re-connect, but we're not supposed to auto-reconnect
 | 
						|
			log:debug("client '%s' was disconnected and not set to auto-reconnect", self.opts.id)
 | 
						|
			return false, "network connection is not opened"
 | 
						|
		end
 | 
						|
 | 
						|
		-- should we wait for a timeout between retries?
 | 
						|
		local t_reconnect = (self.last_connect_time or 0) + (reconnect or 0)
 | 
						|
		local t_now = os_time()
 | 
						|
		if t_reconnect > t_now then
 | 
						|
			-- were delaying before retrying, return remaining delay
 | 
						|
			return t_reconnect - t_now
 | 
						|
		end
 | 
						|
 | 
						|
		self.last_connect_time = t_now
 | 
						|
 | 
						|
		local ok, err = self:start_connecting()
 | 
						|
		if not ok then
 | 
						|
			-- we failed to connect
 | 
						|
			return reconnect, err
 | 
						|
		end
 | 
						|
 | 
						|
		-- connected succesfully, but don't know how long it took, so return now
 | 
						|
		-- to be nice to other clients. Return 0 to indicate ready-for-reading.
 | 
						|
		return 0
 | 
						|
	end
 | 
						|
 | 
						|
	--- Performs a single IO loop step.
 | 
						|
	-- It will connect if not connected, will re-connect if set to.
 | 
						|
	-- This should be called repeatedly in a loop.  When using the included modules to
 | 
						|
	-- add clients (see `mqtt.loop`), this will be taken care of automatically.
 | 
						|
	--
 | 
						|
	-- The return value is the time after which this method must be called again.
 | 
						|
	-- It can be called sooner, but shouldn't be called later.
 | 
						|
	-- @return[1] `-1`: the socket read timed out, so it is idle. This return code is only
 | 
						|
	-- returned with buffered connectors (luasocket), never for yielding sockets
 | 
						|
	-- (Copas or OpenResty)
 | 
						|
	-- @return[2] `0`: a packet was succesfully handled, so retry immediately, no delays,
 | 
						|
	-- in case additional data is waiting to be read on the socket.
 | 
						|
	-- @return[3] `>0`: The reconnect timer needs a delay before it can retry (calling
 | 
						|
	-- sooner is not a problem, it will only reconnect when the delay
 | 
						|
    -- has actually passed)
 | 
						|
	-- @return[4] nil
 | 
						|
	-- @return[4] error message
 | 
						|
	function Client:step()
 | 
						|
		local conn = self.connection
 | 
						|
		local reconnect = self.opts.reconnect
 | 
						|
 | 
						|
		-- try and connect if not connected yet
 | 
						|
		if not conn then
 | 
						|
			return implicit_connect(self)
 | 
						|
		end
 | 
						|
 | 
						|
		local packet, err = self:_receive_packet()
 | 
						|
		if not packet then
 | 
						|
			if err == conn.signal_idle then
 | 
						|
				-- connection was idle, nothing happened
 | 
						|
				return -1
 | 
						|
			elseif err == conn.signal_closed then
 | 
						|
				self:close_connection("connection closed by broker")
 | 
						|
				return reconnect and 0, err
 | 
						|
			else
 | 
						|
				err = "failed to receive next packet: "..tostring(err)
 | 
						|
				log:error("client '%s' %s", self.opts.id, err)
 | 
						|
				self:handle("error", err, self)
 | 
						|
				self:close_connection("error")
 | 
						|
				return reconnect and 0, err
 | 
						|
			end
 | 
						|
		end
 | 
						|
 | 
						|
		local ok
 | 
						|
		ok, err = self:handle_received_packet(packet)
 | 
						|
		if not ok then
 | 
						|
			return reconnect and 0, err
 | 
						|
		end
 | 
						|
 | 
						|
		-- succesfully handled packed, maybe there is more, so retry asap
 | 
						|
		return 0
 | 
						|
	end
 | 
						|
end
 | 
						|
 | 
						|
-- Fill given connection table with host and port according given opts
 | 
						|
-- uri:  mqtt[s]://[username][:password]@host.domain[:port]
 | 
						|
function Client._parse_connection_opts(opts, conn)
 | 
						|
	local uri = assert(conn.uri)
 | 
						|
 | 
						|
	-- protocol
 | 
						|
	local uriprotocol = uri:match("^([%a%d%-]-)://")
 | 
						|
	if uriprotocol then
 | 
						|
		uriprotocol = uriprotocol:lower()
 | 
						|
		uri = uri:gsub("^[%a%d%-]-://","")
 | 
						|
 | 
						|
		if uriprotocol == "mqtts" and opts.secure == nil then
 | 
						|
			opts.secure = true -- NOTE: goes into client 'opts' table, not in 'conn'
 | 
						|
 | 
						|
		elseif uriprotocol == "mqtt" and opts.secure == nil then
 | 
						|
			opts.secure = false -- NOTE: goes into client 'opts' table, not in 'conn'
 | 
						|
 | 
						|
		elseif uriprotocol == "mqtts" and opts.secure == false then
 | 
						|
			error("cannot use protocol 'mqtts' with 'secure=false'")
 | 
						|
 | 
						|
		elseif uriprotocol == "mqtt" and opts.secure then
 | 
						|
			error("cannot use protocol 'mqtt' with 'secure=true|table'")
 | 
						|
		end
 | 
						|
	else
 | 
						|
		-- no protocol info found in uri
 | 
						|
		if opts.secure then
 | 
						|
			uriprotocol = "mqtts"
 | 
						|
		else
 | 
						|
			uriprotocol = "mqtt"
 | 
						|
		end
 | 
						|
	end
 | 
						|
 | 
						|
	conn.protocol = opts.protocol or uriprotocol
 | 
						|
	assert(type(conn.protocol) == "string", "expected protocol to be a string")
 | 
						|
	assert(conn.protocol:match("mqtts?"), "only 'mqtt(s)' protocol is supported in the uri, got '"..tostring(conn.protocol).."'")
 | 
						|
	-- print("protocol: ", uriprotocol)
 | 
						|
 | 
						|
	-- creds, host/port
 | 
						|
	local creds, host_port
 | 
						|
	if uri:find("@") then
 | 
						|
		host_port = uri:match("@(.-)$"):lower()
 | 
						|
		creds = uri:gsub("@.-$", "")
 | 
						|
	else
 | 
						|
		host_port = uri
 | 
						|
	end
 | 
						|
	-- print("creds: ", creds)
 | 
						|
	-- print("host_port:", host_port)
 | 
						|
 | 
						|
	-- host-port
 | 
						|
	local host, port = host_port:match("^([^:]+):?([^:]*)$")
 | 
						|
	if port and #port > 0 then
 | 
						|
		port = assert(tonumber(port), "port in uri must be numeric, got: '"..port.."'")
 | 
						|
	else
 | 
						|
		port = nil
 | 
						|
	end
 | 
						|
	-- print("port: ", port)
 | 
						|
	-- print("host: ", host)
 | 
						|
	conn.host = opts.host or host
 | 
						|
	assert(type(conn.host) == "string", "expected host to be a string")
 | 
						|
	-- default port
 | 
						|
	conn.port = opts.port or port
 | 
						|
	if not conn.port then
 | 
						|
		if opts.secure then
 | 
						|
			conn.port = 8883 -- default MQTT secure connection port
 | 
						|
		else
 | 
						|
			conn.port = 1883 -- default MQTT connection port
 | 
						|
		end
 | 
						|
	end
 | 
						|
	assert(type(conn.port) == "number", "expected port to be a number")
 | 
						|
 | 
						|
 | 
						|
	-- username-password
 | 
						|
	local username, password
 | 
						|
	if creds then
 | 
						|
		username, password = creds:match("^([^:]+):?([^:]*)$")
 | 
						|
		if password and #password == 0 then
 | 
						|
			password = nil
 | 
						|
		end
 | 
						|
	end
 | 
						|
	-- NOTE: these go into client 'opts' table, not in 'conn'
 | 
						|
	opts.username = opts.username or username
 | 
						|
	assert(opts.username == nil or type(opts.username) == "string", "expected username to be a string")
 | 
						|
	opts.password = opts.password or password
 | 
						|
	assert(opts.password == nil or type(opts.password) == "string", "expected password to be a string")
 | 
						|
	assert(not conn.password or conn.username, "password is not accepted in absence of username")
 | 
						|
	-- print("username: ", username)
 | 
						|
	-- print("password: ", password)
 | 
						|
 | 
						|
 | 
						|
	local secure = opts.secure
 | 
						|
	if secure then
 | 
						|
		conn.secure = true
 | 
						|
		conn.secure_params = secure ~= true and secure or nil
 | 
						|
		conn.ssl_module = opts.ssl_module or "ssl"
 | 
						|
		assert(conn.ssl_module == nil or type(conn.ssl_module) == "string", "expected ssl_module to be a string")
 | 
						|
	else
 | 
						|
		-- sanity
 | 
						|
		conn.secure = false
 | 
						|
		conn.secure_params = nil
 | 
						|
		conn.ssl_module = nil
 | 
						|
	end
 | 
						|
end
 | 
						|
 | 
						|
-- Send given packet to opened network connection
 | 
						|
function Client:_send_packet(packet)
 | 
						|
	local conn = self.connection
 | 
						|
	if not conn then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
	local data = tostring(packet)
 | 
						|
	local len = data:len()
 | 
						|
	if len <= 0 then
 | 
						|
		return false, "sending empty packet"
 | 
						|
	end
 | 
						|
	-- and send binary packet to network connection
 | 
						|
	local ok, err = conn:send(data)
 | 
						|
	if not ok then
 | 
						|
		return false, "connector.send failed: "..err
 | 
						|
	end
 | 
						|
	self.send_time = os_time()
 | 
						|
	return true
 | 
						|
end
 | 
						|
 | 
						|
-- Receive one packet from established network connection
 | 
						|
function Client:_receive_packet()
 | 
						|
	local conn = self.connection
 | 
						|
	if not conn then
 | 
						|
		return false, "network connection is not opened"
 | 
						|
	end
 | 
						|
	-- read & parse packet
 | 
						|
	local packet, err = self._parse_packet(
 | 
						|
		function(size)
 | 
						|
			return conn:receive(size)
 | 
						|
		end
 | 
						|
	)
 | 
						|
	if packet then
 | 
						|
		-- succesful packet, clear handled data and return it
 | 
						|
		conn:buffer_clear()
 | 
						|
		return packet
 | 
						|
	end
 | 
						|
 | 
						|
	-- check if we need more data, if not, clear the buffer because were done with
 | 
						|
	-- the data in that case
 | 
						|
	if err == conn.signal_idle then
 | 
						|
		-- we need more data, so do not clear buffer, just return the error
 | 
						|
		return false, err
 | 
						|
	end
 | 
						|
 | 
						|
	-- some other error, can't use buffered data, dispose of it
 | 
						|
	conn:buffer_clear()
 | 
						|
	return false, err
 | 
						|
end
 | 
						|
 | 
						|
-- Represent MQTT client as string
 | 
						|
function Client:__tostring()
 | 
						|
	return str_format("mqtt.client{id=%q}", tostring(self.opts.id))
 | 
						|
end
 | 
						|
 | 
						|
-- Garbage collection handler
 | 
						|
function Client:__gc()
 | 
						|
	-- close network connection if it's available, without sending DISCONNECT packet
 | 
						|
	if self.connection then
 | 
						|
		self:close_connection("garbage")
 | 
						|
	end
 | 
						|
end
 | 
						|
 | 
						|
--- Exported functions
 | 
						|
-- @section exported
 | 
						|
 | 
						|
--- Create, initialize and return new MQTT client instance
 | 
						|
-- @name client.create
 | 
						|
-- @param ... see arguments of `Client:__init`
 | 
						|
-- @see Client:__init
 | 
						|
-- @treturn Client MQTT client instance
 | 
						|
function _M.create(opts)
 | 
						|
	local cl = setmetatable({}, Client)
 | 
						|
	cl:__init(opts)
 | 
						|
	return cl
 | 
						|
end
 | 
						|
 | 
						|
-------
 | 
						|
 | 
						|
if _G._TEST then
 | 
						|
	-- export functions for test purposes (different name!)
 | 
						|
	_M.__parse_connection_opts = Client._parse_connection_opts
 | 
						|
end
 | 
						|
 | 
						|
-- export module table
 | 
						|
return _M
 | 
						|
 | 
						|
-- vim: ts=4 sts=4 sw=4 noet ft=lua
 |