From 43a8ebf800178411a740b414675e74abde8ba2ae Mon Sep 17 00:00:00 2001 From: Sebastiaan de Schaetzen Date: Wed, 10 Jul 2024 21:51:06 +0200 Subject: [PATCH] Initial commit --- .gitignore | 2 + controller-client/main.lua | 3 + controller-host/conf.lua | 5 + controller-host/main.lua | 130 +++ controller-host/mqtt/bit53.lua | 18 + controller-host/mqtt/bitwrap.lua | 11 + controller-host/mqtt/client.lua | 1208 ++++++++++++++++++++++ controller-host/mqtt/init.lua | 87 ++ controller-host/mqtt/ioloop.lua | 180 ++++ controller-host/mqtt/luasocket-copas.lua | 48 + controller-host/mqtt/luasocket.lua | 52 + controller-host/mqtt/luasocket_ssl.lua | 56 + controller-host/mqtt/ngxsocket.lua | 55 + controller-host/mqtt/protocol.lua | 525 ++++++++++ controller-host/mqtt/protocol4.lua | 427 ++++++++ controller-host/mqtt/protocol5.lua | 1039 +++++++++++++++++++ controller-host/mqtt/tools.lua | 35 + controller-host/mqttthread.lua | 70 ++ upload.lua | 54 + upload.sh | 7 + 20 files changed, 4012 insertions(+) create mode 100644 .gitignore create mode 100644 controller-client/main.lua create mode 100644 controller-host/conf.lua create mode 100644 controller-host/main.lua create mode 100644 controller-host/mqtt/bit53.lua create mode 100644 controller-host/mqtt/bitwrap.lua create mode 100644 controller-host/mqtt/client.lua create mode 100644 controller-host/mqtt/init.lua create mode 100644 controller-host/mqtt/ioloop.lua create mode 100644 controller-host/mqtt/luasocket-copas.lua create mode 100644 controller-host/mqtt/luasocket.lua create mode 100644 controller-host/mqtt/luasocket_ssl.lua create mode 100644 controller-host/mqtt/ngxsocket.lua create mode 100644 controller-host/mqtt/protocol.lua create mode 100644 controller-host/mqtt/protocol4.lua create mode 100644 controller-host/mqtt/protocol5.lua create mode 100644 controller-host/mqtt/tools.lua create mode 100644 controller-host/mqttthread.lua create mode 100644 upload.lua create mode 100755 upload.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..01261b5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.zip +.vscode diff --git a/controller-client/main.lua b/controller-client/main.lua new file mode 100644 index 0000000..e3d4f72 --- /dev/null +++ b/controller-client/main.lua @@ -0,0 +1,3 @@ +function love.draw2() + love.graphics.setBackgroundColor(0, 1, 0) +end diff --git a/controller-host/conf.lua b/controller-host/conf.lua new file mode 100644 index 0000000..0db333d --- /dev/null +++ b/controller-host/conf.lua @@ -0,0 +1,5 @@ +function love.conf(t) + t.version = "11.4" + t.window.title = "Spider Controller" + t.window.resizable = true +end diff --git a/controller-host/main.lua b/controller-host/main.lua new file mode 100644 index 0000000..470dc5c --- /dev/null +++ b/controller-host/main.lua @@ -0,0 +1,130 @@ +love.mqtt = {} + +local mqttEventChannel +local mqttCommandChannel +local errorMessage = nil + +local oldPrint = print + +function print(...) + local string = string.format(...) + love.mqtt.send("controller/stdout", string) + oldPrint(...) +end + +function printTable(table, indentation) + indentation = indentation or "" + for name, value in pairs(table) do + print(indentation .. tostring(name) .. ": " .. tostring(value)) + end +end + +function love.draw(...) + if love.draw2 then + love.draw2(...) + else + local text = "Awaiting payload..." + local font = love.graphics.getFont() + + if errorMessage then + text = errorMessage + end + + -- Calculate the center of the screen + local centerX = love.graphics.getWidth() / 2 + local centerY = love.graphics.getHeight() / 2 + + -- Calculate textX and textY + local textX = math.floor(centerX - (font:getWidth(text) / 2)) + local textY = math.floor(centerY - (font:getHeight(text) / 2)) + + local realText + if errorMessage then + realText = errorMessage + else + realText = "Awaiting payload" .. ("."):rep(math.floor(love.timer.getTime() * 4 % 4)) + end + + -- Render text + if errorMessage then + love.graphics.setBackgroundColor(0.2, 0, 0, 0) + else + love.graphics.setBackgroundColor(0, 0, 0.2, 0) + end + love.graphics.print(realText, textX, textY) + end +end + +function love.update(...) + local message = mqttEventChannel:pop() + if message then + love.mqtt[message.target](unpack(message.args)) + end + + if love.update2 then + love.update2(...) + end +end + +function love.mqtt.connect(connack) + if connack.rc ~= 0 then + print("Connection to broker failed:", connack:reason_string()) + end + + love.mqtt.subscribe("controller/payload") + print("Connected to MQTT") + printTable(connack) +end + +function love.mqtt.message2(topic, payload) + if topic == "controller/payload" then + local success = love.filesystem.unmount("client.zip") + if not success then + print("Could not unmount client.zip") + end + local archive = love.filesystem.newFileData(payload, "client.zip") + local success = love.filesystem.mount(archive, "client", true) + if not success then + print("Failed to mount archive") + return + end + print("Archive mounted") + local chunk, errormsg = love.filesystem.load("client/main.lua", "bt") + if errormsg then + print(errormsg) + errorMessage = errormsg + return + end + love.load = nil + chunk() + if love.load then + love.load() + end + elseif love.mqtt.message then + love.mqtt.message(topic, payload) + end +end + +function love.mqtt.send(topic, arg) + mqttCommandChannel:push { + command = "send", + topic = topic, + arg = arg + } +end + +function love.mqtt.subscribe(topic) + mqttCommandChannel:push { + command = "subscribe", + topic = topic, + } +end + +function love.load() + local requirePaths = love.filesystem.getRequirePath() + love.filesystem.setRequirePath(requirePaths .. ";client/?.lua;client/?/init.lua") + local mqttThread = love.thread.newThread("mqttthread.lua") + mqttThread:start() + mqttEventChannel = love.thread.getChannel("mqtt_event") + mqttCommandChannel = love.thread.getChannel("mqtt_command") +end diff --git a/controller-host/mqtt/bit53.lua b/controller-host/mqtt/bit53.lua new file mode 100644 index 0000000..1367f38 --- /dev/null +++ b/controller-host/mqtt/bit53.lua @@ -0,0 +1,18 @@ +-- implementing some functions from BitOp (http://bitop.luajit.org/) on Lua 5.3 + +return { + lshift = function(x, n) + return x << n + end, + rshift = function(x, n) + return x >> n + end, + bor = function(x1, x2) + return x1 | x2 + end, + band = function(x1, x2) + return x1 & x2 + end, +} + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/bitwrap.lua b/controller-host/mqtt/bitwrap.lua new file mode 100644 index 0000000..e52d5eb --- /dev/null +++ b/controller-host/mqtt/bitwrap.lua @@ -0,0 +1,11 @@ +-- wrapper around BitOp module + +if _VERSION == "Lua 5.1" or type(jit) == "table" then -- Lua 5.1 or LuaJIT (based on Lua 5.1) + return require("bit") -- custom module https://luarocks.org/modules/luarocks/luabitop +elseif _VERSION == "Lua 5.2" then + return require("bit32") -- standard Lua 5.2 module +else + return require("mqtt.bit53") +end + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/client.lua b/controller-host/mqtt/client.lua new file mode 100644 index 0000000..6f73a1a --- /dev/null +++ b/controller-host/mqtt/client.lua @@ -0,0 +1,1208 @@ +--- MQTT client module +-- @module mqtt.client +-- @alias client +local client = {} + +-- TODO: list event names + +------- + +-- load required stuff +local type = type +local error = error +local select = select +local require = require +local tostring = tostring + +local os = require("os") +local os_time = os.time + +local string = require("string") +local str_format = string.format +local str_gsub = string.gsub +local str_match = string.match + +local table = require("table") +local table_remove = table.remove + +local coroutine = require("coroutine") +local coroutine_create = coroutine.create +local coroutine_resume = coroutine.resume +local coroutine_yield = coroutine.yield + +local math = require("math") +local math_random = math.random + +local luamqtt_VERSION + +local protocol = require("mqtt.protocol") +local packet_type = protocol.packet_type +local check_qos = protocol.check_qos +local next_packet_id = protocol.next_packet_id +local packet_id_required = protocol.packet_id_required + +local protocol4 = require("mqtt.protocol4") +local make_packet4 = protocol4.make_packet +local parse_packet4 = protocol4.parse_packet + +local protocol5 = require("mqtt.protocol5") +local make_packet5 = protocol5.make_packet +local parse_packet5 = protocol5.parse_packet + +local ioloop = require("mqtt.ioloop") +local ioloop_get = ioloop.get + +------- + +--- MQTT client instance metatable +-- @type client_mt +local client_mt = {} +client_mt.__index = client_mt + +--- Create and initialize MQTT client instance +-- @tparam table args MQTT client creation arguments table +-- @tparam string args.uri MQTT broker uri to connect. +-- Expecting "host:port" or "host" format, in second case the port will be selected automatically: +-- 1883 port for plain or 8883 for secure network connections +-- @tparam string args.clean clean session start flag +-- @tparam[opt=4] number args.version MQTT protocol version to use, either 4 (for MQTT v3.1.1) or 5 (for MQTT v5.0). +-- Also you may use special values mqtt.v311 or mqtt.v50 for this field. +-- @tparam[opt] string args.id MQTT client ID, will be generated by luamqtt library if absent +-- @tparam[opt] string args.username username for authorization on MQTT broker +-- @tparam[opt] string args.password password for authorization on MQTT broker; not acceptable in absence of username +-- @tparam[opt=false] boolean,table args.secure use secure network connection, provided by luasec lua module; +-- set to true to select default params: { mode="client", protocol="tlsv1_2", verify="none", options="all" } +-- or set to luasec-compatible table, for example with cafile="...", certificate="...", key="..." +-- @tparam[opt] table args.will will message table with required fields { topic="...", payload="..." } +-- and optional fields { qos=1...3, retain=true/false } +-- @tparam[opt=60] number args.keep_alive time interval for client to send PINGREQ packets to the server when network connection is inactive +-- @tparam[opt=false] boolean args.reconnect force created MQTT client to reconnect on connection close. +-- Set to number value to provide reconnect timeout in seconds +-- It's not recommended to use values < 3 +-- @tparam[opt] table args.connector connector table to open and send/receive packets over network connection. +-- default is require("mqtt.luasocket"), or require("mqtt.luasocket_ssl") if secure argument is set +-- @tparam[opt="ssl"] string args.ssl_module module name for the luasec-compatible ssl module, default is "ssl" +-- may be used in some non-standard lua environments with own luasec-compatible ssl module +-- @treturn client_mt MQTT client instance table +function client_mt:__init(args) + if not luamqtt_VERSION then + luamqtt_VERSION = require("mqtt")._VERSION + end + + -- fetch and validate client args + local a = {} -- own client copy of args + + for key, value in pairs(args) do + if type(key) ~= "string" then + error("expecting string key in args, got: "..type(key)) + end + + local value_type = type(value) + if key == "uri" then + assert(value_type == "string", "expecting uri to be a string") + a.uri = value + elseif key == "clean" then + assert(value_type == "boolean", "expecting clean to be a boolean") + a.clean = value + elseif key == "version" then + assert(value_type == "number", "expecting version to be a number") + assert(value == 4 or value == 5, "expecting version to be a value either 4 or 5") + a.version = value + elseif key == "id" then + assert(value_type == "string", "expecting id to be a string") + a.id = value + elseif key == "username" then + assert(value_type == "string", "expecting username to be a string") + a.username = value + elseif key == "password" then + assert(value_type == "string", "expecting password to be a string") + a.password = value + elseif key == "secure" then + assert(value_type == "boolean" or value_type == "table", "expecting secure to be a boolean or table") + a.secure = value + elseif key == "will" then + assert(value_type == "table", "expecting will to be a table") + a.will = value + elseif key == "keep_alive" then + assert(value_type == "number", "expecting keep_alive to be a number") + a.keep_alive = value + elseif key == "properties" then + assert(value_type == "table", "expecting properties to be a table") + a.properties = value + elseif key == "user_properties" then + assert(value_type == "table", "expecting user_properties to be a table") + a.user_properties = value + elseif key == "reconnect" then + assert(value_type == "boolean" or value_type == "number", "expecting reconnect to be a boolean or number") + a.reconnect = value + elseif key == "connector" then + a.connector = value + elseif key == "ssl_module" then + assert(value_type == "string", "expecting ssl_module to be a string") + a.ssl_module = value + else + error("unexpected key in client args: "..key.." = "..tostring(value)) + end + end + + -- check required arguments + assert(a.uri, 'expecting uri="..." to create MQTT client') + assert(a.clean ~= nil, "expecting clean=true or clean=false to create MQTT client") + assert(not a.password or a.username, "password is not accepted in absence of username") + + if not a.id then + -- generate random client id + a.id = str_format("luamqtt-v%s-%07x", str_gsub(luamqtt_VERSION, "[^%d]", "-"), math_random(1, 0xFFFFFFF)) + end + + -- default connector + if a.connector == nil then + if a.secure then + a.connector = require("mqtt.luasocket_ssl") + else + a.connector = require("mqtt.luasocket") + end + end + -- validate connector content + assert(type(a.connector) == "table", "expecting connector to be a table") + assert(type(a.connector.connect) == "function", "expecting connector.connect to be a function") + assert(type(a.connector.shutdown) == "function", "expecting connector.shutdown to be a function") + assert(type(a.connector.send) == "function", "expecting connector.send to be a function") + assert(type(a.connector.receive) == "function", "expecting connector.receive to be a function") + + -- will table content check + if a.will then + assert(type(a.will.topic) == "string", "expecting will.topic to be a string") + assert(type(a.will.payload) == "string", "expecting will.payload to be a string") + if a.will.qos ~= nil then + assert(type(a.will.qos) == "number", "expecting will.qos to be a number") + assert(check_qos(a.will.qos), "expecting will.qos to be a valid QoS value") + end + if a.will.retain ~= nil then + assert(type(a.will.retain) == "boolean", "expecting will.retain to be a boolean") + end + end + + -- default keep_alive + if not a.keep_alive then + a.keep_alive = 60 + end + + -- client args + self.args = a + + -- event handlers + self.handlers = { + connect = {}, + subscribe = {}, + unsubscribe = {}, + message = {}, + acknowledge = {}, + error = {}, + close = {}, + auth = {}, + } + self._handling = {} + self._to_remove_handlers = {} + + -- state + self.first_connect = true -- contains true to perform one network connection attemt after client creation + self.send_time = 0 -- time of the last network send from client side + + -- packet creation/parse functions according version + if not a.version then + a.version = 4 + end + if a.version == 4 then + self._make_packet = make_packet4 + self._parse_packet = parse_packet4 + elseif a.version == 5 then + self._make_packet = make_packet5 + self._parse_packet = parse_packet5 + end + + -- automatically add client to default ioloop, if it's available and running, then start connecting + local loop = ioloop_get(false) + if loop and loop.running then + loop:add(self) + self:start_connecting() + end +end + +--- Add functions as handlers of given events +-- @param ... (event_name, function) or { event1 = func1, event2 = func2 } table +function client_mt:on(...) + local nargs = select("#", ...) + local events + if nargs == 2 then + events = { [select(1, ...)] = select(2, ...) } + elseif nargs == 1 then + events = select(1, ...) + else + error("invalid args: expected only one or two arguments") + end + for event, func in pairs(events) do + assert(type(event) == "string", "expecting event to be a string") + assert(type(func) == "function", "expecting func to be a function") + local handlers = self.handlers[event] + if not handlers then + error("invalid event '"..tostring(event).."' to handle") + end + handlers[#handlers + 1] = func + end +end + +-- Remove one item from the list-table with full-iteration +local function remove_item(list, item) + for i, test in ipairs(list) do + if test == item then + table_remove(list, i) + return + end + end +end + +--- Remove given function handler for specified event +-- @tparam string event event name to remove handler +-- @tparam function func handler function to remove +function client_mt:off(event, func) + local handlers = self.handlers[event] + if not handlers then + error("invalid event '"..tostring(event).."' to handle") + end + if self._handling[event] then + -- this event is handling now, schedule the function removing to the moment after all handlers will be called for the event + local to_remove = self._to_remove_handlers[event] or {} + to_remove[#to_remove + 1] = func + self._to_remove_handlers[event] = to_remove + else + -- it's ok to remove given function just now + remove_item(handlers, func) + end + return true +end + +--- Subscribe to specified topic. Returns the SUBSCRIBE packet id and calls optional callback when subscription will be created on broker +-- @tparam table args subscription arguments +-- @tparam string args.topic topic to subscribe +-- @tparam[opt=0] number args.qos QoS level for subscription +-- @tparam boolean args.no_local for MQTT v5.0 only: no_local flag for subscription +-- @tparam boolean args.retain_as_published for MQTT v5.0 only: retain_as_published flag for subscription +-- @tparam boolean args.retain_handling for MQTT v5.0 only: retain_handling flag for subscription +-- @tparam[opt] table args.properties for MQTT v5.0 only: properties for subscribe operation +-- @tparam[opt] table args.user_properties for MQTT v5.0 only: user properties for subscribe operation +-- @tparam[opt] function args.callback callback function to be called when subscription will be created +-- @return packet id on success or false and error message on failure +function client_mt:subscribe(args) + -- fetch and validate args + assert(type(args) == "table", "expecting args to be a table") + assert(type(args.topic) == "string", "expecting args.topic to be a string") + assert(args.qos == nil or (type(args.qos) == "number" and check_qos(args.qos)), "expecting valid args.qos value") + assert(args.no_local == nil or type(args.no_local) == "boolean", "expecting args.no_local to be a boolean") + assert(args.retain_as_published == nil or type(args.retain_as_published) == "boolean", "expecting args.retain_as_published to be a boolean") + assert(args.retain_handling == nil or type(args.retain_handling) == "boolean", "expecting args.retain_handling to be a boolean") + assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table") + assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table") + assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function") + + -- check connection is alive + if not self.connection then + return false, "network connection is not opened" + end + + -- create SUBSCRIBE packet + local pargs = { + type = packet_type.SUBSCRIBE, + subscriptions = { + { + topic = args.topic, + qos = args.qos, + no_local = args.no_local, + retain_as_published = args.retain_as_published, + retain_handling = args.retain_handling + }, + }, + properties = args.properties, + user_properties = args.user_properties, + } + self:_assign_packet_id(pargs) + local packet_id = pargs.packet_id + local subscribe = self._make_packet(pargs) + + -- send SUBSCRIBE packet + local ok, err = self:_send_packet(subscribe) + if not ok then + err = "failed to send SUBSCRIBE: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + -- add subscribe callback + local callback = args.callback + if callback then + local function handler(suback, ...) + if suback.packet_id == packet_id then + self:off("subscribe", handler) + callback(suback, ...) + end + end + self:on("subscribe", handler) + end + + -- returns assigned packet id + return packet_id +end + +--- Unsubscribe from specified topic, and calls optional callback when subscription will be removed on broker +-- @tparam table args subscription arguments +-- @tparam string args.topic topic to unsubscribe +-- @tparam[opt] table args.properties properties for unsubscribe operation +-- @tparam[opt] table args.user_properties user properties for unsubscribe operation +-- @tparam[opt] function args.callback callback function to be called when subscription will be removed on broker +-- @return packet id on success or false and error message on failure +function client_mt:unsubscribe(args) + -- fetch and validate args + assert(type(args) == "table", "expecting args to be a table") + assert(type(args.topic) == "string", "expecting args.topic to be a string") + assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table") + assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table") + assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function") + + + -- check connection is alive + if not self.connection then + return false, "network connection is not opened" + end + + -- create UNSUBSCRIBE packet + local pargs = { + type = packet_type.UNSUBSCRIBE, + subscriptions = {args.topic}, + properties = args.properties, + user_properties = args.user_properties, + } + self:_assign_packet_id(pargs) + local packet_id = pargs.packet_id + local unsubscribe = self._make_packet(pargs) + + -- send UNSUBSCRIBE packet + local ok, err = self:_send_packet(unsubscribe) + if not ok then + err = "failed to send UNSUBSCRIBE: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + -- add unsubscribe callback + local callback = args.callback + if callback then + local function handler(unsuback, ...) + if unsuback.packet_id == packet_id then + self:off("unsubscribe", handler) + callback(unsuback, ...) + end + end + self:on("unsubscribe", handler) + end + + -- returns assigned packet id + return packet_id +end + +--- Publish message to broker +-- @tparam table args publish operation arguments table +-- @tparam string args.topic topic to publish message +-- @tparam[opt] string args.payload publish message payload +-- @tparam[opt=0] number args.qos QoS level for message publication +-- @tparam[opt=false] boolean args.retain retain message publication flag +-- @tparam[opt=false] boolean args.dup dup message publication flag +-- @tparam[opt] table args.properties properties for publishing message +-- @tparam[opt] table args.user_properties user properties for publishing message +-- @tparam[opt] function args.callback callback to call when publihsed message will be acknowledged +-- @return true or packet id on success or false and error message on failure +function client_mt:publish(args) + -- fetch and validate args + assert(type(args) == "table", "expecting args to be a table") + assert(type(args.topic) == "string", "expecting args.topic to be a string") + assert(args.payload == nil or type(args.payload) == "string", "expecting args.payload to be a string") + assert(args.qos == nil or type(args.qos) == "number", "expecting args.qos to be a number") + if args.qos then + assert(check_qos(args.qos), "expecting qos to be a valid QoS value") + end + assert(args.retain == nil or type(args.retain) == "boolean", "expecting args.retain to be a boolean") + assert(args.dup == nil or type(args.dup) == "boolean", "expecting args.dup to be a boolean") + assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table") + assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table") + assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function") + + -- check connection is alive + local conn = self.connection + if not conn then + return false, "network connection is not opened" + end + + -- create PUBLISH packet + args.type = packet_type.PUBLISH + self:_assign_packet_id(args) + local packet_id = args.packet_id + local publish = self._make_packet(args) + + -- send PUBLISH packet + local ok, err = self:_send_packet(publish) + if not ok then + err = "failed to send PUBLISH: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + -- record packet id as waited for QoS 2 exchange + if args.qos == 2 then + conn.wait_for_pubrec[packet_id] = true + end + + -- add acknowledge callback + local callback = args.callback + if callback then + if packet_id then + local function handler(ack, ...) + if ack.packet_id == packet_id then + self:off("acknowledge", handler) + callback(ack, ...) + end + end + self:on("acknowledge", handler) + else + callback("no ack for QoS 0 message", self) + end + end + + -- returns assigned packet id + return packet_id or true +end + +--- Acknowledge given received message +-- @tparam packet_mt msg PUBLISH message to acknowledge +-- @tparam[opt=0] number rc The reason code field of PUBACK packet in MQTT v5.0 protocol +-- @tparam[opt] table properties properties for PUBACK/PUBREC packets +-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets +-- @return true on success or false and error message on failure +function client_mt:acknowledge(msg, rc, properties, user_properties) + assert(type(msg) == "table" and msg.type == packet_type.PUBLISH, "expecting msg to be a publish packet") + assert(rc == nil or type(rc) == "number", "expecting rc to be a number") + assert(properties == nil or type(properties) == "table", "expecting properties to be a table") + assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table") + + -- check connection is alive + local conn = self.connection + if not conn then + return false, "network connection is not opened" + end + + -- check packet needs to be acknowledged + local packet_id = msg.packet_id + if not packet_id then + return true + end + + if msg.qos == 1 then + -- PUBACK should be sent + + -- create PUBACK packet + local puback = self._make_packet{ + type = packet_type.PUBACK, + packet_id = packet_id, + rc = rc or 0, + properties = properties, + user_properties = user_properties, + } + + -- send PUBACK packet + local ok, err = self:_send_packet(puback) + if not ok then + err = "failed to send PUBACK: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + elseif msg.qos == 2 then + -- PUBREC should be sent and packet_id should be remembered for PUBREL+PUBCOMP sequence + + -- create PUBREC packet + local pubrec = self._make_packet{ + type = packet_type.PUBREC, + packet_id = packet_id, + rc = rc or 0, + properties = properties, + user_properties = user_properties, + } + + -- send PUBREC packet + local ok, err = self:_send_packet(pubrec) + if not ok then + err = "failed to send PUBREC: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + -- store packet id as waiting for PUBREL + conn.wait_for_pubrel[packet_id] = true + end + + return true +end + +--- Send DISCONNECT packet to the broker and close the connection +-- @tparam[opt=0] number rc The Disconnect Reason Code value from MQTT v5.0 protocol +-- @tparam[opt] table properties properties for PUBACK/PUBREC packets +-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets +-- @return true on success or false and error message on failure +function client_mt:disconnect(rc, properties, user_properties) + -- validate args + assert(rc == nil or type(rc) == "number", "expecting rc to be a number") + assert(properties == nil or type(properties) == "table", "expecting properties to be a table") + assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table") + + -- check connection is alive + if not self.connection then + return false, "network connection is not opened" + end + + -- create DISCONNECT packet + local disconnect = self._make_packet{ + type = packet_type.DISCONNECT, + rc = rc or 0, + properties = properties, + user_properties = user_properties, + } + + -- send DISCONNECT packet + local ok, err = self:_send_packet(disconnect) + if not ok then + err = "failed to send DISCONNECT: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + -- now close connection + self:close_connection("connection closed by client") + + return true +end + +--- Send AUTH packet to authenticate client on broker, in MQTT v5.0 protocol +-- @tparam[opt=0] number rc Authenticate Reason Code +-- @tparam[opt] table properties properties for PUBACK/PUBREC packets +-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets +-- @return true on success or false and error message on failure +function client_mt:auth(rc, properties, user_properties) + -- validate args + assert(rc == nil or type(rc) == "number", "expecting rc to be a number") + assert(properties == nil or type(properties) == "table", "expecting properties to be a table") + assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table") + assert(self.args.version == 5, "allowed only in MQTT v5.0 protocol") + + -- check connection is alive + if not self.connection then + return false, "network connection is not opened" + end + + -- create AUTH packet + local auth = self._make_packet{ + type = packet_type.AUTH, + rc = rc or 0, + properties = properties, + user_properties = user_properties, + } + + -- send AUTH packet + local ok, err = self:_send_packet(auth) + if not ok then + err = "failed to send AUTH: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + return true +end + +--- Immediately close established network connection, without graceful session finishing with DISCONNECT packet +-- @tparam[opt] string reason the reasong string of connection close +function client_mt:close_connection(reason) + assert(not reason or type(reason) == "string", "expecting reason to be a string") + local conn = self.connection + if not conn then + return true + end + + local args = self.args + args.connector.shutdown(conn) + self.connection = nil + conn.close_reason = reason or "unspecified" + + self:handle("close", conn, self) + + -- check connection is still closed (self.connection may be re-created in "close" handler) + if not self.connection then + -- remove from ioloop + if self.ioloop and not args.reconnect then + self.ioloop:remove(self) + end + end + + return true +end + +--- Start connecting to broker +-- @return true on success or false and error message on failure +function client_mt:start_connecting() + -- print("start connecting") -- debug + -- open network connection + local ok, err = self:open_connection() + if not ok then + return false, err + end + + -- send CONNECT packet + ok, err = self:send_connect() + if not ok then + return false, err + end + + return true +end + +--- Low-level methods +-- @section low-level + +--- Send PINGREQ packet +-- @return true on success or false and error message on failure +function client_mt:send_pingreq() + -- check connection is alive + if not self.connection then + return false, "network connection is not opened" + end + + -- create PINGREQ packet + local pingreq = self._make_packet{ + type = packet_type.PINGREQ, + } + + -- send PINGREQ packet + local ok, err = self:_send_packet(pingreq) + if not ok then + err = "failed to send PINGREQ: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + return true +end + +--- Open network connection to the broker +-- @return true on success or false and error message on failure +function client_mt:open_connection() + if self.connection then + return true + end + + local args = self.args + local connector = assert(args.connector, "no connector configured in MQTT client") + + -- create connection table + local conn = { + uri = args.uri, + wait_for_pubrec = {}, -- a table with packet_id of parially acknowledged sent packets in QoS 2 exchange process + wait_for_pubrel = {}, -- a table with packet_id of parially acknowledged received packets in QoS 2 exchange process + } + client_mt._parse_uri(args, conn) + client_mt._apply_secure(args, conn) + + -- perform connect + local ok, err = connector.connect(conn) + if not ok then + err = "failed to open network connection: "..err + self:handle("error", err, self) + return false, err + end + + -- assign connection + self.connection = conn + + -- create receive function + local receive = connector.receive + self.connection.recv_func = function(size) + return receive(conn, size) + end + + self:_apply_network_timeout() + + return true +end + +--- Send CONNECT packet into opened network connection +-- @return true on success or false and error message on failure +function client_mt:send_connect() + -- check connection is alive + if not self.connection then + return false, "network connection is not opened" + end + + local args = self.args + + -- create CONNECT packet + local connect = self._make_packet{ + type = packet_type.CONNECT, + id = args.id, + clean = args.clean, + username = args.username, + password = args.password, + will = args.will, + keep_alive = args.keep_alive, + properties = args.properties, + user_properties = args.user_properties, + } + + -- send CONNECT packet + local ok, err = self:_send_packet(connect) + if not ok then + err = "failed to send CONNECT: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + -- reset last packet id + self._last_packet_id = nil + + return true +end + +-- Internal methods + +-- Set or rest ioloop for MQTT client +function client_mt:set_ioloop(loop) + self.ioloop = loop + self:_apply_network_timeout() +end + +-- Send PUBREL acknowledge packet - second phase of QoS 2 exchange +-- Returns true on success or false and error message on failure +function client_mt:acknowledge_pubrel(packet_id) + -- check connection is alive + if not self.connection then + return false, "network connection is not opened" + end + + -- create PUBREL packet + local pubrel = self._make_packet{type=packet_type.PUBREL, packet_id=packet_id, rc=0} + + -- send PUBREL packet + local ok, err = self:_send_packet(pubrel) + if not ok then + err = "failed to send PUBREL: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + return true +end + +-- Send PUBCOMP acknowledge packet - last phase of QoS 2 exchange +-- Returns true on success or false and error message on failure +function client_mt:acknowledge_pubcomp(packet_id) + -- check connection is alive + if not self.connection then + return false, "network connection is not opened" + end + + -- create PUBCOMP packet + local pubcomp = self._make_packet{type=packet_type.PUBCOMP, packet_id=packet_id, rc=0} + + -- send PUBCOMP packet + local ok, err = self:_send_packet(pubcomp) + if not ok then + err = "failed to send PUBCOMP: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + return true +end + +-- Call specified event handlers +function client_mt:handle(event, ...) + local handlers = self.handlers[event] + if not handlers then + error("invalid event '"..tostring(event).."' to handle") + end + self._handling[event] = true -- protecting self.handlers[event] table from modifications by client_mt:off() when iterating + for _, handler in ipairs(handlers) do + handler(...) + end + self._handling[event] = nil + + -- process handlers removing, scheduled by client_mt:off() + local to_remove = self._to_remove_handlers[event] + if to_remove then + for _, func in ipairs(to_remove) do + remove_item(handlers, func) + end + self._to_remove_handlers[event] = nil + end +end + +-- Internal methods + +-- Assign next packet id for given packet creation args +function client_mt:_assign_packet_id(pargs) + if not pargs.packet_id then + if packet_id_required(pargs) then + self._last_packet_id = next_packet_id(self._last_packet_id) + pargs.packet_id = self._last_packet_id + end + end +end + +-- Receive packet function in sync mode +local function sync_recv(self) + return true, self:_receive_packet() +end + +-- Perform one input/output iteration, called by sync receiving loop +function client_mt:_sync_iteration() + return self:_io_iteration(sync_recv) +end + +-- Receive packet function - from ioloop's coroutine +local function ioloop_recv(self) + return coroutine_resume(self.connection.coro) +end + +-- Perform one input/output iteration, called by ioloop +function client_mt:_ioloop_iteration() + -- working according state + local loop = self.ioloop + local args = self.args + + local conn = self.connection + if conn then + -- network connection opened + -- perform packet receiving using ioloop receive function + local ok, err + if loop then + ok, err = self:_io_iteration(ioloop_recv) + else + ok, err = self:_sync_iteration() + end + + if ok then + -- send PINGREQ if keep_alive interval is reached + if os_time() - self.send_time >= args.keep_alive then + self:send_pingreq() + end + end + + return ok, err + else + -- no connection - first connect, reconnect or remove from ioloop + if self.first_connect then + self.first_connect = false + self:start_connecting() + elseif args.reconnect then + if args.reconnect == true then + self:start_connecting() + else + -- reconnect in specified timeout + if self.reconnect_timer_start then + if os_time() - self.reconnect_timer_start >= args.reconnect then + self.reconnect_timer_start = nil + self:start_connecting() + else + if loop then + loop:can_sleep() + end + end + else + self.reconnect_timer_start = os_time() + end + end + else + -- finish working with client + if loop then + loop:remove(self) + end + end + end +end + +-- Performing one IO iteration - receive next packet +function client_mt:_io_iteration(recv) + local conn = self.connection + + -- first - try to receive packet + local ok, packet, err = recv(self) + -- print("received packet", ok, packet, err) + + -- check coroutine resume status + if not ok then + err = "failed to resume receive packet coroutine: "..tostring(packet) + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + -- check for communication error + if packet == false then + if err == "closed" then + self:close_connection("connection closed by broker") + return false, err + else + err = "failed to receive next packet: "..err + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + end + + -- check some packet received + if packet ~= "timeout" and packet ~= "wantread" then + if not conn.connack then + -- expecting only CONNACK packet here + if packet.type ~= packet_type.CONNACK then + err = "expecting CONNACK but received "..packet.type + self:handle("error", err, self) + self:close_connection("error") + return false, err + end + + -- store connack packet in connection + conn.connack = packet + + -- check CONNACK rc + if packet.rc ~= 0 then + err = str_format("CONNECT failed with CONNACK [rc=%d]: %s", packet.rc, packet:reason_string()) + self:handle("error", err, self, packet) + self:handle("connect", packet, self) + self:close_connection("connection failed") + return false, err + end + + -- fire connect event + self:handle("connect", packet, self) + else + -- connection authorized, so process usual packets + + -- handle packet according its type + local ptype = packet.type + if ptype == packet_type.PINGRESP then -- luacheck: ignore + -- PINGREQ answer, nothing to do + -- TODO: break the connectin in absence of this packet in some timeout + elseif ptype == packet_type.SUBACK then + self:handle("subscribe", packet, self) + elseif ptype == packet_type.UNSUBACK then + self:handle("unsubscribe", packet, self) + elseif ptype == packet_type.PUBLISH then + -- check such packet is not waiting for pubrel acknowledge + self:handle("message", packet, self) + elseif ptype == packet_type.PUBACK then + self:handle("acknowledge", packet, self) + elseif ptype == packet_type.PUBREC then + local packet_id = packet.packet_id + if conn.wait_for_pubrec[packet_id] then + conn.wait_for_pubrec[packet_id] = nil + -- send PUBREL acknowledge + if self:acknowledge_pubrel(packet_id) then + -- and fire acknowledge event + self:handle("acknowledge", packet, self) + end + end + elseif ptype == packet_type.PUBREL then + -- second phase of QoS 2 exchange - check we are already acknowledged such packet by PUBREL + local packet_id = packet.packet_id + if conn.wait_for_pubrel[packet_id] then + -- remove packet from waiting for PUBREL packets table + conn.wait_for_pubrel[packet_id] = nil + -- send PUBCOMP acknowledge + self:acknowledge_pubcomp(packet_id) + end + elseif ptype == packet_type.PUBCOMP then --luacheck: ignore + -- last phase of QoS 2 exchange + -- do nothing here + elseif ptype == packet_type.DISCONNECT then + self:close_connection("disconnect received from broker") + elseif ptype == packet_type.AUTH then + self:handle("auth", packet, self) + -- else + -- print("unhandled packet:", packet) -- debug + end + end + end + + return true +end + +-- Apply ioloop network timeout to already established connection (if any) +function client_mt:_apply_network_timeout() + local conn = self.connection + if conn then + local loop = self.ioloop + if loop then + -- apply connection timeout + self.args.connector.settimeout(conn, loop.args.timeout) + + -- connection packets receive loop coroutine + conn.coro = coroutine_create(function() + while true do + local packet, err = self:_receive_packet() + if not packet then + return false, err + else + coroutine_yield(packet) + end + end + end) + + -- replace connection recv_func with coroutine-based version + local sync_recv_func = conn.recv_func + conn.recv_func = function(...) + while true do + local data, err = sync_recv_func(...) + if not data and (err == "timeout" or err == "wantread") then + loop.timeouted = true + coroutine_yield(err) + else + return data, err + end + end + end + conn.sync_recv_func = sync_recv_func + else + -- disable connection timeout + self.args.connector.settimeout(conn, nil) + + -- replace back usual (blocking) connection recv_func + if conn.sync_recv_func then + conn.recv_func = conn.sync_recv_func + conn.sync_recv_func = nil + end + end + end +end + +-- Fill given connection table with host and port according given args +function client_mt._parse_uri(args, conn) + local host, port = str_match(args.uri, "^([^%s]+):(%d+)$") + if not host then + -- trying pattern without port + host = assert(str_match(conn.uri, "^([^%s]+)$"), "invalid uri format: expecting at least host/ip in .uri") + end + if not port then + if args.secure then + port = 8883 -- default MQTT secure connection port + else + port = 1883 -- default MQTT connection port + end + else + port = tonumber(port) + end + conn.host, conn.port = host, port +end + +-- Creates the conn.secure_params table and its content according client creation args +function client_mt._apply_secure(args, conn) + local secure = args.secure + if secure then + conn.secure = true + if type(secure) == "table" then + conn.secure_params = secure + else + conn.secure_params = { + mode = "client", + protocol = "tlsv1_2", + verify = "none", + options = "all", + } + end + conn.ssl_module = args.ssl_module or "ssl" + end +end + +-- Send given packet to opened network connection +function client_mt:_send_packet(packet) + local conn = self.connection + if not conn then + return false, "network connection is not opened" + end + local data = tostring(packet) + local len = data:len() + if len <= 0 then + return false, "sending empty packet" + end + -- and send binary packet to network connection + local i, err = 1 + local send = self.args.connector.send + while i < len do + i, err = send(conn, data, i) + if not i then + return false, "connector.send failed: "..err + end + end + self.send_time = os_time() + return true +end + +-- Receive one packet from established network connection +function client_mt:_receive_packet() + local conn = self.connection + if not conn then + return false, "network connection is not opened" + end + -- parse packet + local packet, err = self._parse_packet(conn.recv_func) + if not packet then + return false, err + end + return packet +end + +-- Represent MQTT client as string +function client_mt:__tostring() + return str_format("mqtt.client{id=%q}", tostring(self.args.id)) +end + +-- Garbage collection handler +function client_mt:__gc() + -- close network connection if it's available, without sending DISCONNECT packet + if self.connection then + self:close_connection("garbage") + end +end + +--- Exported functions +-- @section exported + +--- Create, initialize and return new MQTT client instance +-- @param ... see arguments of client_mt:__init(args) +-- @see client_mt:__init +-- @treturn client_mt MQTT client instance +function client.create(...) + local cl = setmetatable({}, client_mt) + cl:__init(...) + return cl +end + +------- + +-- export module table +return client + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/init.lua b/controller-host/mqtt/init.lua new file mode 100644 index 0000000..e68bdd3 --- /dev/null +++ b/controller-host/mqtt/init.lua @@ -0,0 +1,87 @@ +--- MQTT module +-- @module mqtt + +--[[ +MQTT protocol DOC: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html + +CONVENTIONS: + + * errors: + * passing invalid arguments (like number instead of string) to function in this library will raise exception + * all other errors will be returned in format: false, "error-text" + * you can wrap function call into standard lua assert() to raise exception + +]] + +--- Module table +-- @field v311 MQTT v3.1.1 protocol version constant +-- @field v50 MQTT v5.0 protocol version constant +-- @field _VERSION luamqtt version string +-- @table mqtt +local mqtt = { + -- supported MQTT protocol versions + v311 = 4, -- supported protocol version, MQTT v3.1.1 + v50 = 5, -- supported protocol version, MQTT v5.0 + + -- mqtt library version + _VERSION = "3.4.3", +} + +-- load required stuff +local type = type +local select = select +local require = require + +local client = require("mqtt.client") +local client_create = client.create + +local ioloop_get = require("mqtt.ioloop").get + +--- Create new MQTT client instance +-- @param ... Same as for mqtt.client.create(...) +-- @see mqtt.client.client_mt:__init +function mqtt.client(...) + return client_create(...) +end + +--- Returns default ioloop instance +-- @function mqtt.get_ioloop +mqtt.get_ioloop = ioloop_get + +--- Run default ioloop for given MQTT clients or functions +-- @param ... MQTT clients or lopp functions to add to ioloop +-- @see mqtt.ioloop.get +-- @see mqtt.ioloop.run_until_clients +function mqtt.run_ioloop(...) + local loop = ioloop_get() + for i = 1, select("#", ...) do + local cl = select(i, ...) + loop:add(cl) + if type(cl) ~= "function" then + cl:start_connecting() + end + end + return loop:run_until_clients() +end + +--- Run synchronous input/output loop for only one given MQTT client. +-- Provided client's connection will be opened. +-- Client reconnect feature will not work, and keep_alive too. +-- @param cl MQTT client instance to run +function mqtt.run_sync(cl) + local ok, err = cl:start_connecting() + if not ok then + return false, err + end + while cl.connection do + ok, err = cl:_sync_iteration() + if not ok then + return false, err + end + end +end + +-- export module table +return mqtt + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/ioloop.lua b/controller-host/mqtt/ioloop.lua new file mode 100644 index 0000000..b903c9a --- /dev/null +++ b/controller-host/mqtt/ioloop.lua @@ -0,0 +1,180 @@ +--- ioloop module +-- @module mqtt.ioloop +-- @alias ioloop + +--[[ + ioloop module + + In short: allowing you to work with several MQTT clients in one script, and allowing them to maintain + a long-term connection to broker, using PINGs. + + NOTE: this module will work only with MQTT clients using standard luasocket/luasocket_ssl connectors. + + In long: + Providing an IO loop instance dealing with efficient (as much as possible in limited lua IO) network communication + for several MQTT clients in the same OS thread. + The main idea is that you are creating an ioloop instance, then adding created and connected MQTT clients to it. + The ioloop instance is setting a non-blocking mode for sockets in MQTT clients and setting a small timeout + for their receive/send operations. Then ioloop is starting an endless loop trying to receive/send data for all added MQTT clients. + You may add more or remove some MQTT clients from the ioloop after it's created and started. + + Using that ioloop is allowing you to run a MQTT client for long time, through sending PINGREQ packets to broker + in keepAlive interval to maintain long-term connection. + + Also, any function can be added to the ioloop instance, and it will be called in the same endless loop over and over + alongside with added MQTT clients to provide you a piece of processor time to run your own logic (like running your own + network communications or any other thing good working in an io-loop) +]] + +-- module table +local ioloop = {} + +-- load required stuff +local next = next +local type = type +local ipairs = ipairs +local require = require +local setmetatable = setmetatable + +local table = require("table") +local tbl_remove = table.remove + +--- ioloop instances metatable +-- @type ioloop_mt +local ioloop_mt = {} +ioloop_mt.__index = ioloop_mt + +--- Initialize ioloop instance +-- @tparam table args ioloop creation arguments table +-- @tparam[opt=0.005] number args.timeout network operations timeout in seconds +-- @tparam[opt=0] number args.sleep sleep interval after each iteration +-- @tparam[opt] function args.sleep_function custom sleep function to call after each iteration +-- @treturn ioloop_mt ioloop instance +function ioloop_mt:__init(args) + args = args or {} + args.timeout = args.timeout or 0.005 + args.sleep = args.sleep or 0 + args.sleep_function = args.sleep_function or require("socket").sleep + self.args = args + self.clients = {} + self.running = false --ioloop running flag, used by MQTT clients which are adding after this ioloop started to run +end + +--- Add MQTT client or a loop function to the ioloop instance +-- @tparam client_mt|function client MQTT client or a loop function to add to ioloop +-- @return true on success or false and error message on failure +function ioloop_mt:add(client) + local clients = self.clients + if clients[client] then + return false, "such MQTT client or loop function is already added to this ioloop" + end + clients[#clients + 1] = client + clients[client] = true + + -- associate ioloop with adding MQTT client + if type(client) ~= "function" then + client:set_ioloop(self) + end + + return true +end + +--- Remove MQTT client or a loop function from the ioloop instance +-- @tparam client_mt|function client MQTT client or a loop function to remove from ioloop +-- @return true on success or false and error message on failure +function ioloop_mt:remove(client) + local clients = self.clients + if not clients[client] then + return false, "no such MQTT client or loop function was added to ioloop" + end + clients[client] = nil + + -- search an index of client to remove + for i, item in ipairs(clients) do + if item == client then + tbl_remove(clients, i) + break + end + end + + -- unlink ioloop from MQTT client + if type(client) ~= "function" then + client:set_ioloop(nil) + end + + return true +end + +--- Perform one ioloop iteration +function ioloop_mt:iteration() + self.timeouted = false + for _, client in ipairs(self.clients) do + if type(client) ~= "function" then + client:_ioloop_iteration() + else + client() + end + end + -- sleep a bit + local args = self.args + local sleep = args.sleep + if sleep > 0 then + args.sleep_function(sleep) + end +end + +--- Perform sleep if no one of the network operation in current iteration was not timeouted +function ioloop_mt:can_sleep() + if not self.timeouted then + local args = self.args + args.sleep_function(args.timeout) + self.timeouted = true + end +end + +--- Run ioloop until at least one client are in ioloop +function ioloop_mt:run_until_clients() + self.running = true + while next(self.clients) do + self:iteration() + end + self.running = false +end + +------- + +--- Create IO loop instance with given options +-- @see ioloop_mt:__init +-- @treturn ioloop_mt ioloop instance +local function ioloop_create(args) + local inst = setmetatable({}, ioloop_mt) + inst:__init(args) + return inst +end +ioloop.create = ioloop_create + +-- Default ioloop instance +local ioloop_instance + +--- Returns default ioloop instance +-- @tparam[opt=true] boolean autocreate Automatically create ioloop instance +-- @tparam[opt] table args Arguments for creating ioloop instance +-- @treturn ioloop_mt ioloop instance +function ioloop.get(autocreate, args) + if autocreate == nil then + autocreate = true + end + if autocreate then + if not ioloop_instance then + ioloop_instance = ioloop_create(args) + end + end + return ioloop_instance +end + +------- + +-- export module table +return ioloop + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/luasocket-copas.lua b/controller-host/mqtt/luasocket-copas.lua new file mode 100644 index 0000000..069229e --- /dev/null +++ b/controller-host/mqtt/luasocket-copas.lua @@ -0,0 +1,48 @@ +-- DOC: https://keplerproject.github.io/copas/ +-- NOTE: you will need to install copas like this: luarocks install copas + +-- module table +local connector = {} + +local socket = require("socket") +local copas = require("copas") + +-- Open network connection to .host and .port in conn table +-- Store opened socket to conn table +-- Returns true on success, or false and error text on failure +function connector.connect(conn) + local sock, err = socket.connect(conn.host, conn.port) + if not sock then + return false, "socket.connect failed: "..err + end + conn.sock = sock + return true +end + +-- Shutdown network connection +function connector.shutdown(conn) + conn.sock:shutdown() +end + +-- Send data to network connection +function connector.send(conn, data, i, j) + local ok, err = copas.send(conn.sock, data, i, j) + return ok, err +end + +-- Receive given amount of data from network connection +function connector.receive(conn, size) + local ok, err = copas.receive(conn.sock, size) + return ok, err +end + +-- Set connection's socket to non-blocking mode and set a timeout for it +function connector.settimeout(conn, timeout) + conn.timeout = timeout + conn.sock:settimeout(0) +end + +-- export module table +return connector + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/luasocket.lua b/controller-host/mqtt/luasocket.lua new file mode 100644 index 0000000..5b8de2b --- /dev/null +++ b/controller-host/mqtt/luasocket.lua @@ -0,0 +1,52 @@ +-- DOC: http://w3.impa.br/~diego/software/luasocket/tcp.html + +-- module table +local luasocket = {} + +local socket = require("socket") + +-- Open network connection to .host and .port in conn table +-- Store opened socket to conn table +-- Returns true on success, or false and error text on failure +function luasocket.connect(conn) + local sock, err = socket.connect(conn.host, conn.port) + if not sock then + return false, "socket.connect failed: "..err + end + conn.sock = sock + return true +end + +-- Shutdown network connection +function luasocket.shutdown(conn) + conn.sock:shutdown() +end + +-- Send data to network connection +function luasocket.send(conn, data, i, j) + local ok, err = conn.sock:send(data, i, j) + -- print(" luasocket.send:", ok, err, require("mqtt.tools").hex(data)) + return ok, err +end + +-- Receive given amount of data from network connection +function luasocket.receive(conn, size) + local ok, err = conn.sock:receive(size) + -- if ok then + -- print(" luasocket.receive:", size, require("mqtt.tools").hex(ok)) + -- elseif err ~= "timeout" then + -- print(" luasocket.receive:", ok, err) + -- end + return ok, err +end + +-- Set connection's socket to non-blocking mode and set a timeout for it +function luasocket.settimeout(conn, timeout) + conn.timeout = timeout + conn.sock:settimeout(timeout, "t") +end + +-- export module table +return luasocket + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/luasocket_ssl.lua b/controller-host/mqtt/luasocket_ssl.lua new file mode 100644 index 0000000..15b31cb --- /dev/null +++ b/controller-host/mqtt/luasocket_ssl.lua @@ -0,0 +1,56 @@ +-- DOC: http://w3.impa.br/~diego/software/luasocket/tcp.html + +-- module table +local luasocket_ssl = {} + +local type = type +local assert = assert +local luasocket = require("mqtt.luasocket") + +-- Open network connection to .host and .port in conn table +-- Store opened socket to conn table +-- Returns true on success, or false and error text on failure +function luasocket_ssl.connect(conn) + assert(type(conn.secure_params) == "table", "expecting .secure_params to be a table") + + -- open usual TCP connection + local ok, err = luasocket.connect(conn) + if not ok then + return false, "luasocket connect failed: "..err + end + local wrapped + + -- load right ssl module + local ssl = require(conn.ssl_module or "ssl") + + -- TLS/SSL initialization + wrapped, err = ssl.wrap(conn.sock, conn.secure_params) + if not wrapped then + conn.sock:shutdown() + return false, "ssl.wrap() failed: "..err + end + ok = wrapped:dohandshake() + if not ok then + conn.sock:shutdown() + return false, "ssl dohandshake failed" + end + + -- replace sock in connection table with wrapped secure socket + conn.sock = wrapped + return true +end + +-- Shutdown network connection +function luasocket_ssl.shutdown(conn) + conn.sock:close() +end + +-- Copy original methods from mqtt.luasocket module +luasocket_ssl.send = luasocket.send +luasocket_ssl.receive = luasocket.receive +luasocket_ssl.settimeout = luasocket.settimeout + +-- export module table +return luasocket_ssl + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/ngxsocket.lua b/controller-host/mqtt/ngxsocket.lua new file mode 100644 index 0000000..349780c --- /dev/null +++ b/controller-host/mqtt/ngxsocket.lua @@ -0,0 +1,55 @@ +-- module table +-- thanks to @irimiab: https://github.com/xHasKx/luamqtt/issues/13 +local ngxsocket = {} + +-- load required stuff +local string_sub = string.sub +local ngx_socket_tcp = ngx.socket.tcp -- luacheck: ignore + +-- Open network connection to .host and .port in conn table +-- Store opened socket to conn table +-- Returns true on success, or false and error text on failure +function ngxsocket.connect(conn) + local socket = ngx_socket_tcp() + socket:settimeout(0x7FFFFFFF) + local sock, err = socket:connect(conn.host, conn.port) + if not sock then + return false, "socket:connect failed: "..err + end + if conn.secure then + socket:sslhandshake() + end + conn.sock = socket + return true +end + +-- Shutdown network connection +function ngxsocket.shutdown(conn) + conn.sock:close() +end + +-- Send data to network connection +function ngxsocket.send(conn, data, i, j) + if i then + return conn.sock:send(string_sub(data, i, j)) + else + return conn.sock:send(data) + end +end + +-- Receive given amount of data from network connection +function ngxsocket.receive(conn, size) + return conn.sock:receive(size) +end + +-- Set connection's socket to non-blocking mode and set a timeout for it +function ngxsocket.settimeout(conn, timeout) + if not timeout then + conn.sock:settimeout(0x7FFFFFFF) + else + conn.sock:settimeout(timeout * 1000) + end +end + +-- export module table +return ngxsocket diff --git a/controller-host/mqtt/protocol.lua b/controller-host/mqtt/protocol.lua new file mode 100644 index 0000000..b7e467f --- /dev/null +++ b/controller-host/mqtt/protocol.lua @@ -0,0 +1,525 @@ +--- MQTT generic protocol components module +-- @module mqtt.protocol + +--[[ + +Here is a generic implementation of MQTT protocols of all supported versions. + +MQTT v3.1.1 documentation (DOCv3.1.1): + http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html + +MQTT v5.0 documentation (DOCv5.0): + http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html + +CONVENTIONS: + + * read_func - function to read data from some stream-like object (like network connection). + We are calling it with one argument: number of bytes to read. + Use currying/closures to pass other arguments to this function. + This function should return string of given size on success. + On failure it should return false/nil and an error message. + +]] + +-- module table +local protocol = {} + +-- load required stuff +local type = type +local error = error +local assert = assert +local require = require +local _VERSION = _VERSION -- lua interpreter version, not a mqtt._VERSION +local tostring = tostring +local setmetatable = setmetatable + + +local table = require("table") +local tbl_concat = table.concat +local unpack = unpack or table.unpack + +local string = require("string") +local str_sub = string.sub +local str_char = string.char +local str_byte = string.byte +local str_format = string.format + +local bit = require("mqtt.bitwrap") +local bor = bit.bor +local band = bit.band +local lshift = bit.lshift +local rshift = bit.rshift + +local tools = require("mqtt.tools") +local div = tools.div + +-- Create uint8 value data +local function make_uint8(val) + if val < 0 or val > 0xFF then + error("value is out of range to encode as uint8: "..tostring(val)) + end + return str_char(val) +end +protocol.make_uint8 = make_uint8 + +-- Create uint16 value data +local function make_uint16(val) + if val < 0 or val > 0xFFFF then + error("value is out of range to encode as uint16: "..tostring(val)) + end + return str_char(rshift(val, 8), band(val, 0xFF)) +end +protocol.make_uint16 = make_uint16 + +-- Create uint32 value data +function protocol.make_uint32(val) + if val < 0 or val > 0xFFFFFFFF then + error("value is out of range to encode as uint32: "..tostring(val)) + end + return str_char(rshift(val, 24), band(rshift(val, 16), 0xFF), band(rshift(val, 8), 0xFF), band(val, 0xFF)) +end + +-- Create UTF-8 string data +-- DOCv3.1.1: 1.5.3 UTF-8 encoded strings +-- DOCv5.0: 1.5.4 UTF-8 Encoded String +function protocol.make_string(str) + return make_uint16(str:len())..str +end + +-- Returns bytes of given integer value encoded as variable length field +-- DOCv3.1.1: 2.2.3 Remaining Length +-- DOCv5.0: 2.1.4 Remaining Length +local function make_var_length(len) + if len < 0 or len > 268435455 then + error("value is invalid for encoding as variable length field: "..tostring(len)) + end + local bytes = {} + local i = 1 + repeat + local byte = len % 128 + len = div(len, 128) + if len > 0 then + byte = bor(byte, 128) + end + bytes[i] = byte + i = i + 1 + until len <= 0 + return unpack(bytes) +end +protocol.make_var_length = make_var_length + +-- Make data for 1-byte property with only 0 or 1 value +function protocol.make_uint8_0_or_1(value) + if value ~= 0 and value ~= 1 then + error("expecting 0 or 1 as value") + end + return make_uint8(value) +end + +-- Make data for 2-byte property with nonzero value check +function protocol.make_uint16_nonzero(value) + if value == 0 then + error("expecting nonzero value") + end + return make_uint16(value) +end + +-- Make data for variable length property with nonzero value check +function protocol.make_var_length_nonzero(value) + if value == 0 then + error("expecting nonzero value") + end + return make_var_length(value) +end + +-- Read string using given read_func function +-- Returns false plus error message on failure +-- Returns parsed string on success +function protocol.parse_string(read_func) + assert(type(read_func) == "function", "expecting read_func to be a function") + local len, err = read_func(2) + if not len then + return false, "failed to read string length: "..err + end + -- convert len string from 2-byte integer + local byte1, byte2 = str_byte(len, 1, 2) + len = bor(lshift(byte1, 8), byte2) + -- and return string if parsed length + return read_func(len) +end + +-- Parse uint8 value using given read_func +local function parse_uint8(read_func) + assert(type(read_func) == "function", "expecting read_func to be a function") + local value, err = read_func(1) + if not value then + return false, "failed to read 1 byte for uint8: "..err + end + return str_byte(value, 1, 1) +end +protocol.parse_uint8 = parse_uint8 + +-- Parse uint8 value with only 0 or 1 value +function protocol.parse_uint8_0_or_1(read_func) + local value, err = parse_uint8(read_func) + if not value then + return false, err + end + if value ~= 0 and value ~= 1 then + return false, "expecting only 0 or 1 but got: "..value + end + return value +end + +-- Parse uint16 value using given read_func +local function parse_uint16(read_func) + assert(type(read_func) == "function", "expecting read_func to be a function") + local value, err = read_func(2) + if not value then + return false, "failed to read 2 byte for uint16: "..err + end + local byte1, byte2 = str_byte(value, 1, 2) + return lshift(byte1, 8) + byte2 +end +protocol.parse_uint16 = parse_uint16 + +-- Parse uint16 non-zero value using given read_func +function protocol.parse_uint16_nonzero(read_func) + local value, err = parse_uint16(read_func) + if not value then + return false, err + end + if value == 0 then + return false, "expecting non-zero value" + end + return value +end + +-- Parse uint32 value using given read_func +function protocol.parse_uint32(read_func) + assert(type(read_func) == "function", "expecting read_func to be a function") + local value, err = read_func(4) + if not value then + return false, "failed to read 4 byte for uint32: "..err + end + local byte1, byte2, byte3, byte4 = str_byte(value, 1, 4) + if _VERSION < "Lua 5.3" then + return byte1 * (2 ^ 24) + lshift(byte2, 16) + lshift(byte3, 8) + byte4 + else + return lshift(byte1, 24) + lshift(byte2, 16) + lshift(byte3, 8) + byte4 + end +end + +-- Max variable length integer value +local max_mult = 128 * 128 * 128 + +-- Returns variable length field value calling read_func function read data, DOC: 2.2.3 Remaining Length +local function parse_var_length(read_func) + assert(type(read_func) == "function", "expecting read_func to be a function") + local mult = 1 + local val = 0 + repeat + local byte, err = read_func(1) + if not byte then + return false, err + end + byte = str_byte(byte, 1, 1) + val = val + band(byte, 127) * mult + if mult > max_mult then + return false, "malformed variable length field data" + end + mult = mult * 128 + until band(byte, 128) == 0 + return val +end +protocol.parse_var_length = parse_var_length + +-- Parse Variable Byte Integer with non-zero constraint +function protocol.parse_var_length_nonzero(read_func) + local value, err = parse_var_length(read_func) + if not value then + return false, err + end + if value == 0 then + return false, "expecting non-zero value" + end + return value +end + +-- Create fixed packet header data +-- DOCv3.1.1: 2.2 Fixed header +-- DOCv5.0: 2.1.1 Fixed Header +function protocol.make_header(ptype, flags, len) + local byte1 = bor(lshift(ptype, 4), band(flags, 0x0F)) + return str_char(byte1, make_var_length(len)) +end + +-- Returns true if given value is a valid QoS +function protocol.check_qos(val) + return (val == 0) or (val == 1) or (val == 2) +end + +-- Returns true if given value is a valid Packet Identifier +-- DOCv3.1.1: 2.3.1 Packet Identifier +-- DOCv5.0: 2.2.1 Packet Identifier +function protocol.check_packet_id(val) + return val >= 1 and val <= 0xFFFF +end + +-- Returns the next Packet Identifier value relative to given current value +-- DOCv3.1.1: 2.3.1 Packet Identifier +-- DOCv5.0: 2.2.1 Packet Identifier +function protocol.next_packet_id(curr) + if not curr then + return 1 + end + assert(type(curr) == "number", "expecting curr to be a number") + assert(curr >= 1, "expecting curr to be >= 1") + curr = curr + 1 + if curr > 0xFFFF then + curr = 1 + end + return curr +end + +-- MQTT protocol fixed header packet types +-- DOCv3.1.1: 2.2.1 MQTT Control Packet type +-- DOCv5.0: 2.1.2 MQTT Control Packet type +local packet_type = { + CONNECT = 1, + CONNACK = 2, + PUBLISH = 3, + PUBACK = 4, + PUBREC = 5, + PUBREL = 6, + PUBCOMP = 7, + SUBSCRIBE = 8, + SUBACK = 9, + UNSUBSCRIBE = 10, + UNSUBACK = 11, + PINGREQ = 12, + PINGRESP = 13, + DISCONNECT = 14, + AUTH = 15, -- NOTE: new in MQTTv5.0 + [1] = "CONNECT", + [2] = "CONNACK", + [3] = "PUBLISH", + [4] = "PUBACK", + [5] = "PUBREC", + [6] = "PUBREL", + [7] = "PUBCOMP", + [8] = "SUBSCRIBE", + [9] = "SUBACK", + [10] = "UNSUBSCRIBE", + [11] = "UNSUBACK", + [12] = "PINGREQ", + [13] = "PINGRESP", + [14] = "DISCONNECT", + [15] = "AUTH", -- NOTE: new in MQTTv5.0 +} +protocol.packet_type = packet_type + +-- Packet types requiring packet identifier field +-- DOCv3.1.1: 2.3.1 Packet Identifier +-- DOCv5.0: 2.2.1 Packet Identifier +local packets_requiring_packet_id = { + [packet_type.PUBACK] = true, + [packet_type.PUBREC] = true, + [packet_type.PUBREL] = true, + [packet_type.PUBCOMP] = true, + [packet_type.SUBSCRIBE] = true, + [packet_type.SUBACK] = true, + [packet_type.UNSUBSCRIBE] = true, + [packet_type.UNSUBACK] = true, +} + +-- CONNACK return code/reason code strings +local connack_rc = { + -- MQTT v3.1.1 Connect return codes, DOCv3.1.1: 3.2.2.3 Connect Return code + [0] = "Connection Accepted", + [1] = "Connection Refused, unacceptable protocol version", + [2] = "Connection Refused, identifier rejected", + [3] = "Connection Refused, Server unavailable", + [4] = "Connection Refused, bad user name or password", + [5] = "Connection Refused, not authorized", + + -- MQTT v5.0 Connect reason codes, DOCv5.0: 3.2.2.2 Connect Reason Code + [0x80] = "Unspecified error", + [0x81] = "Malformed Packet", + [0x82] = "Protocol Error", + [0x83] = "Implementation specific error", + [0x84] = "Unsupported Protocol Version", + [0x85] = "Client Identifier not valid", + [0x86] = "Bad User Name or Password", + [0x87] = "Not authorized", + [0x88] = "Server unavailable", + [0x89] = "Server busy", + [0x8A] = "Banned", + [0x8C] = "Bad authentication method", + [0x90] = "Topic Name invalid", + [0x95] = "Packet too large", + [0x97] = "Quota exceeded", + [0x99] = "Payload format invalid", + [0x9A] = "Retain not supported", + [0x9B] = "QoS not supported", + [0x9C] = "Use another server", + [0x9D] = "Server moved", + [0x9F] = "Connection rate exceeded", +} +protocol.connack_rc = connack_rc + +-- Returns true if Packet Identifier field are required for given packet +function protocol.packet_id_required(args) + assert(type(args) == "table", "expecting args to be a table") + assert(type(args.type) == "number", "expecting .type to be a number") + local ptype = args.type + if ptype == packet_type.PUBLISH and args.qos and args.qos > 0 then + return true + end + return packets_requiring_packet_id[ptype] +end + +-- Metatable for combined data packet, should looks like a string +local combined_packet_mt = { + -- Convert combined data packet to string + __tostring = function(self) + local strings = {} + for i, part in ipairs(self) do + strings[i] = tostring(part) + end + return tbl_concat(strings) + end, + + -- Get length of combined data packet + len = function(self) + local len = 0 + for _, part in ipairs(self) do + len = len + part:len() + end + return len + end, + + -- Append part to the end of combined data packet + append = function(self, part) + self[#self + 1] = part + end +} + +-- Make combined_packet_mt table works like a class +combined_packet_mt.__index = function(_, key) + return combined_packet_mt[key] +end + +-- Combine several data parts into one +function protocol.combine(...) + return setmetatable({...}, combined_packet_mt) +end + +-- Convert any value to string, respecting strings and tables +local function value_tostring(value) + local t = type(value) + if t == "string" then + return str_format("%q", value) + elseif t == "table" then + local res = {} + for k, v in pairs(value) do + if type(k) == "number" then + res[#res + 1] = value_tostring(v) + else + if k:match("^[a-zA-Z_][_%w]*$") then + res[#res + 1] = str_format("%s=%s", k, value_tostring(v)) + else + res[#res + 1] = str_format("[%q]=%s", k, value_tostring(v)) + end + end + end + return str_format("{%s}", tbl_concat(res, ", ")) + else + return tostring(value) + end +end + +-- Convert packet to string representation +local function packet_tostring(packet) + local res = {} + for k, v in pairs(packet) do + res[#res + 1] = str_format("%s=%s", k, value_tostring(v)) + end + return str_format("%s{%s}", tostring(packet_type[packet.type]), tbl_concat(res, ", ")) +end +protocol.packet_tostring = packet_tostring + +-- Parsed packet metatable +protocol.packet_mt = { + __tostring = packet_tostring, +} + +-- Parsed CONNACK packet metatable +protocol.connack_packet_mt = { + __tostring = packet_tostring, +} +protocol.connack_packet_mt.__index = protocol.connack_packet_mt + +--- Returns reason string for CONNACK packet +-- @treturn string Reason string according packet's rc field +function protocol.connack_packet_mt:reason_string() + return connack_rc[self.rc] +end + +--- Start parsing a new packet +-- @tparam function read_func - function to read data from the network connection +-- @treturn number packet_type +-- @treturn number flags +-- @treturn table input - a table with fields "read_func" and "available" representing a stream-like object +-- to read already received packet data in chunks +-- @return false and error_message on failure +function protocol.start_parse_packet(read_func) + assert(type(read_func) == "function", "expecting read_func to be a function") + local byte1, err, len, data + + -- parse fixed header + -- DOC[v3.1.1]: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180832 + -- DOC[v5.0]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901020 + byte1, err = read_func(1) + if not byte1 then + return false, "failed to read first byte: "..err + end + byte1 = str_byte(byte1, 1, 1) + local ptype = rshift(byte1, 4) + local flags = band(byte1, 0xF) + len, err = parse_var_length(read_func) + if not len then + return false, "failed to parse remaining length: "..err + end + + -- create packet parser instance (aka input) + local input = {1, available = 0} -- input data offset and available size + if len > 0 then + data, err = read_func(len) + else + data = "" + end + if not data then + return false, "failed to read packet data: "..err + end + input.available = data:len() + + -- read data function for the input instance + input.read_func = function(size) + if size > input.available then + return false, "not enough data to read size: "..size + end + local off = input[1] + local res = str_sub(data, off, off + size - 1) + input[1] = off + size + input.available = input.available - size + return res + end + + return ptype, flags, input +end + +-- export module table +return protocol + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/protocol4.lua b/controller-host/mqtt/protocol4.lua new file mode 100644 index 0000000..6e8bb8b --- /dev/null +++ b/controller-host/mqtt/protocol4.lua @@ -0,0 +1,427 @@ +--[[ + +Here is a MQTT v3.1.1 protocol implementation + +MQTT v3.1.1 documentation (DOC): + http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html + +]] + +-- module table +local protocol4 = {} + +-- load required stuff +local type = type +local error = error +local assert = assert +local require = require +local tostring = tostring +local setmetatable = setmetatable + +local bit = require("mqtt.bitwrap") +local bor = bit.bor +local band = bit.band +local lshift = bit.lshift +local rshift = bit.rshift + +local protocol = require("mqtt.protocol") +local make_uint8 = protocol.make_uint8 +local make_uint16 = protocol.make_uint16 +local make_string = protocol.make_string +local make_header = protocol.make_header +local check_qos = protocol.check_qos +local check_packet_id = protocol.check_packet_id +local combine = protocol.combine +local packet_type = protocol.packet_type +local packet_mt = protocol.packet_mt +local connack_packet_mt = protocol.connack_packet_mt +local start_parse_packet = protocol.start_parse_packet +local parse_uint8 = protocol.parse_uint8 +local parse_uint16 = protocol.parse_uint16 + +-- Create Connect Flags data, DOC: 3.1.2.3 Connect Flags +local function make_connect_flags(args) + local byte = 0 -- bit 0 should be zero + -- DOC: 3.1.2.4 Clean Session + if args.clean ~= nil then + assert(type(args.clean) == "boolean", "expecting .clean to be a boolean") + if args.clean then + byte = bor(byte, lshift(1, 1)) + end + end + -- DOC: 3.1.2.5 Will Flag + if args.will ~= nil then + -- check required args are presented + assert(type(args.will) == "table", "expecting .will to be a table") + assert(type(args.will.payload) == "string", "expecting .will.payload to be a string") + assert(type(args.will.topic) == "string", "expecting .will.topic to be a string") + if args.will.qos ~= nil then + assert(type(args.will.qos) == "number", "expecting .will.qos to be a number") + assert(check_qos(args.will.qos), "expecting .will.qos to be a valid QoS value") + end + if args.will.retain ~= nil then + assert(type(args.will.retain) == "boolean", "expecting .will.retain to be a boolean") + end + -- will flag should be set to 1 + byte = bor(byte, lshift(1, 2)) + -- DOC: 3.1.2.6 Will QoS + byte = bor(byte, lshift(args.will.qos or 0, 3)) + -- DOC: 3.1.2.7 Will Retain + if args.will.retain then + byte = bor(byte, lshift(1, 5)) + end + end + -- DOC: 3.1.2.8 User Name Flag + if args.username ~= nil then + assert(type(args.username) == "string", "expecting .username to be a string") + byte = bor(byte, lshift(1, 7)) + end + -- DOC: 3.1.2.9 Password Flag + if args.password ~= nil then + assert(type(args.password) == "string", "expecting .password to be a string") + assert(args.username, "the .username is required to set .password") + byte = bor(byte, lshift(1, 6)) + end + return make_uint8(byte) +end + +-- Create CONNECT packet, DOC: 3.1 CONNECT – Client requests a connection to a Server +local function make_packet_connect(args) + -- check args + assert(type(args.id) == "string", "expecting .id to be a string with MQTT client id") + -- DOC: 3.1.2.10 Keep Alive + local keep_alive_ival = 0 + if args.keep_alive then + assert(type(args.keep_alive) == "number") + keep_alive_ival = args.keep_alive + end + -- DOC: 3.1.2 Variable header + local variable_header = combine( + make_string("MQTT"), -- DOC: 3.1.2.1 Protocol Name + make_uint8(4), -- DOC: 3.1.2.2 Protocol Level (4 is for MQTT v3.1.1) + make_connect_flags(args), -- DOC: 3.1.2.3 Connect Flags + make_uint16(keep_alive_ival) -- DOC: 3.1.2.10 Keep Alive + ) + -- DOC: 3.1.3 Payload + -- DOC: 3.1.3.1 Client Identifier + local payload = combine( + make_string(args.id) + ) + if args.will then + -- DOC: 3.1.3.2 Will Topic + assert(type(args.will.topic) == "string", "expecting will.topic to be a string") + payload:append(make_string(args.will.topic)) + -- DOC: 3.1.3.3 Will Message + assert(args.will.payload == nil or type(args.will.payload) == "string", "expecting will.payload to be a string or nil") + payload:append(make_string(args.will.payload or "")) + end + if args.username then + -- DOC: 3.1.3.4 User Name + payload:append(make_string(args.username)) + if args.password then + -- DOC: 3.1.3.5 Password + payload:append(make_string(args.password)) + end + end + -- DOC: 3.1.1 Fixed header + local header = make_header(packet_type.CONNECT, 0, variable_header:len() + payload:len()) + return combine(header, variable_header, payload) +end + +-- Create PUBLISH packet, DOC: 3.3 PUBLISH – Publish message +local function make_packet_publish(args) + -- check args + assert(type(args.topic) == "string", "expecting .topic to be a string") + if args.payload ~= nil then + assert(type(args.payload) == "string", "expecting .payload to be a string") + end + if args.qos ~= nil then + assert(type(args.qos) == "number", "expecting .qos to be a number") + assert(check_qos(args.qos), "expecting .qos to be a valid QoS value") + end + if args.retain ~= nil then + assert(type(args.retain) == "boolean", "expecting .retain to be a boolean") + end + if args.dup ~= nil then + assert(type(args.dup) == "boolean", "expecting .dup to be a boolean") + end + -- DOC: 3.3.1 Fixed header + local flags = 0 + -- 3.3.1.3 RETAIN + if args.retain then + flags = bor(flags, 0x1) + end + -- DOC: 3.3.1.2 QoS + flags = bor(flags, lshift(args.qos or 0, 1)) + -- DOC: 3.3.1.1 DUP + if args.dup then + flags = bor(flags, lshift(1, 3)) + end + -- DOC: 3.3.2 Variable header + local variable_header = combine( + make_string(args.topic) + ) + -- DOC: 3.3.2.2 Packet Identifier + if args.qos and args.qos > 0 then + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + variable_header:append(make_uint16(args.packet_id)) + end + local payload + if args.payload then + payload = args.payload + else + payload = "" + end + -- DOC: 3.3.1 Fixed header + local header = make_header(packet_type.PUBLISH, flags, variable_header:len() + payload:len()) + return combine(header, variable_header, payload) +end + +-- Create PUBACK packet, DOC: 3.4 PUBACK – Publish acknowledgement +local function make_packet_puback(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + -- DOC: 3.4.1 Fixed header + local header = make_header(packet_type.PUBACK, 0, 2) + -- DOC: 3.4.2 Variable header + local variable_header = make_uint16(args.packet_id) + return combine(header, variable_header) +end + +-- Create PUBREC packet, DOC: 3.5 PUBREC – Publish received (QoS 2 publish received, part 1) +local function make_packet_pubrec(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + -- DOC: 3.5.1 Fixed header + local header = make_header(packet_type.PUBREC, 0, 2) + -- DOC: 3.5.2 Variable header + local variable_header = make_uint16(args.packet_id) + return combine(header, variable_header) +end + +-- Create PUBREL packet, DOC: 3.6 PUBREL – Publish release (QoS 2 publish received, part 2) +local function make_packet_pubrel(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + -- DOC: 3.6.1 Fixed header + local header = make_header(packet_type.PUBREL, 0x2, 2) -- flags are 0x2 == 0010 bits (fixed value) + -- DOC: 3.6.2 Variable header + local variable_header = make_uint16(args.packet_id) + return combine(header, variable_header) +end + +-- Create PUBCOMP packet, DOC: 3.7 PUBCOMP – Publish complete (QoS 2 publish received, part 3) +local function make_packet_pubcomp(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + -- DOC: 3.7.1 Fixed header + local header = make_header(packet_type.PUBCOMP, 0, 2) + -- DOC: 3.7.2 Variable header + local variable_header = make_uint16(args.packet_id) + return combine(header, variable_header) +end + +-- Create SUBSCRIBE packet, DOC: 3.8 SUBSCRIBE - Subscribe to topics +local function make_packet_subscribe(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + assert(type(args.subscriptions) == "table", "expecting .subscriptions to be a table") + assert(#args.subscriptions > 0, "expecting .subscriptions to be a non-empty array") + -- DOC: 3.8.2 Variable header + local variable_header = combine( + make_uint16(args.packet_id) + ) + -- DOC: 3.8.3 Payload + local payload = combine() + for i, subscription in ipairs(args.subscriptions) do + assert(type(subscription) == "table", "expecting .subscriptions["..i.."] to be a table") + assert(type(subscription.topic) == "string", "expecting .subscriptions["..i.."].topic to be a string") + if subscription.qos ~= nil then + assert(type(subscription.qos) == "number", "expecting .subscriptions["..i.."].qos to be a number") + assert(check_qos(subscription.qos), "expecting .subscriptions["..i.."].qos to be a valid QoS value") + end + payload:append(make_string(subscription.topic)) + payload:append(make_uint8(subscription.qos or 0)) + end + -- DOC: 3.8.1 Fixed header + local header = make_header(packet_type.SUBSCRIBE, 2, variable_header:len() + payload:len()) -- NOTE: fixed flags value 0x2 + return combine(header, variable_header, payload) +end + +-- Create UNSUBSCRIBE packet, DOC: 3.10 UNSUBSCRIBE – Unsubscribe from topics +local function make_packet_unsubscribe(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + assert(type(args.subscriptions) == "table", "expecting .subscriptions to be a table") + assert(#args.subscriptions > 0, "expecting .subscriptions to be a non-empty array") + -- DOC: 3.10.2 Variable header + local variable_header = combine( + make_uint16(args.packet_id) + ) + -- DOC: 3.10.3 Payload + local payload = combine() + for i, subscription in ipairs(args.subscriptions) do + assert(type(subscription) == "string", "expecting .subscriptions["..i.."] to be a string") + payload:append(make_string(subscription)) + end + -- DOC: 3.10.1 Fixed header + local header = make_header(packet_type.UNSUBSCRIBE, 2, variable_header:len() + payload:len()) -- NOTE: fixed flags value 0x2 + return combine(header, variable_header, payload) +end + +-- Create packet of given {type: number} in args +function protocol4.make_packet(args) + assert(type(args) == "table", "expecting args to be a table") + assert(type(args.type) == "number", "expecting .type number in args") + local ptype = args.type + if ptype == packet_type.CONNECT then + return make_packet_connect(args) + elseif ptype == packet_type.PUBLISH then + return make_packet_publish(args) + elseif ptype == packet_type.PUBACK then + return make_packet_puback(args) + elseif ptype == packet_type.PUBREC then + return make_packet_pubrec(args) + elseif ptype == packet_type.PUBREL then + return make_packet_pubrel(args) + elseif ptype == packet_type.PUBCOMP then + return make_packet_pubcomp(args) + elseif ptype == packet_type.SUBSCRIBE then + return make_packet_subscribe(args) + elseif ptype == packet_type.UNSUBSCRIBE then + return make_packet_unsubscribe(args) + elseif ptype == packet_type.PINGREQ then + -- DOC: 3.12 PINGREQ – PING request + return combine("\192\000") -- 192 == 0xC0, type == 12, flags == 0 + elseif ptype == packet_type.DISCONNECT then + -- DOC: 3.14 DISCONNECT – Disconnect notification + return combine("\224\000") -- 224 == 0xD0, type == 14, flags == 0 + else + error("unexpected packet type to make: "..ptype) + end +end + +-- Parse packet using given read_func +-- Returns packet on success or false and error message on failure +function protocol4.parse_packet(read_func) + local ptype, flags, input = start_parse_packet(read_func) + if not ptype then + return false, flags + end + -- parse readed data according type in fixed header + if ptype == packet_type.CONNACK then + -- DOC: 3.2 CONNACK – Acknowledge connection request + if input.available ~= 2 then + return false, "expecting data of length 2 bytes" + end + local byte1, byte2 = parse_uint8(input.read_func), parse_uint8(input.read_func) + local sp = (band(byte1, 0x1) ~= 0) + return setmetatable({type=ptype, sp=sp, rc=byte2}, connack_packet_mt) + elseif ptype == packet_type.PUBLISH then + -- DOC: 3.3 PUBLISH – Publish message + -- DOC: 3.3.1.1 DUP + local dup = (band(flags, 0x8) ~= 0) + -- DOC: 3.3.1.2 QoS + local qos = band(rshift(flags, 1), 0x3) + -- DOC: 3.3.1.3 RETAIN + local retain = (band(flags, 0x1) ~= 0) + -- DOC: 3.3.2.1 Topic Name + if input.available < 2 then + return false, "expecting data of length at least 2 bytes" + end + local topic_len = parse_uint16(input.read_func) + if input.available < topic_len then + return false, "malformed PUBLISH packet: not enough data to parse topic" + end + local topic = input.read_func(topic_len) + -- DOC: 3.3.2.2 Packet Identifier + local packet_id + if qos > 0 then + -- DOC: 3.3.2.2 Packet Identifier + if input.available < 2 then + return false, "malformed PUBLISH packet: not enough data to parse packet_id" + end + packet_id = parse_uint16(input.read_func) + end + -- DOC: 3.3.3 Payload + local payload + if input.available > 0 then + payload = input.read_func(input.available) + end + return setmetatable({type=ptype, dup=dup, qos=qos, retain=retain, packet_id=packet_id, topic=topic, payload=payload}, packet_mt) + elseif ptype == packet_type.PUBACK then + -- DOC: 3.4 PUBACK – Publish acknowledgement + if input.available ~= 2 then + return false, "expecting data of length 2 bytes" + end + -- DOC: 3.4.2 Variable header + local packet_id = parse_uint16(input.read_func) + return setmetatable({type=ptype, packet_id=packet_id}, packet_mt) + elseif ptype == packet_type.PUBREC then + -- DOC: 3.5 PUBREC – Publish received (QoS 2 publish received, part 1) + if input.available ~= 2 then + return false, "expecting data of length 2 bytes" + end + -- DOC: 3.5.2 Variable header + local packet_id = parse_uint16(input.read_func) + return setmetatable({type=ptype, packet_id=packet_id}, packet_mt) + elseif ptype == packet_type.PUBREL then + -- DOC: 3.6 PUBREL – Publish release (QoS 2 publish received, part 2) + if input.available ~= 2 then + return false, "expecting data of length 2 bytes" + end + -- also flags should be checked to equals 2 by the server + -- DOC: 3.6.2 Variable header + local packet_id = parse_uint16(input.read_func) + return setmetatable({type=ptype, packet_id=packet_id}, packet_mt) + elseif ptype == packet_type.PUBCOMP then + -- 3.7 PUBCOMP – Publish complete (QoS 2 publish received, part 3) + if input.available ~= 2 then + return false, "expecting data of length 2 bytes" + end + -- DOC: 3.7.2 Variable header + local packet_id = parse_uint16(input.read_func) + return setmetatable({type=ptype, packet_id=packet_id}, packet_mt) + elseif ptype == packet_type.SUBACK then + -- DOC: 3.9 SUBACK – Subscribe acknowledgement + if input.available < 3 then + return false, "expecting data of length at least 3 bytes" + end + -- DOC: 3.9.2 Variable header + -- DOC: 3.9.3 Payload + local packet_id = parse_uint16(input.read_func) + local rc = {} -- DOC: The payload contains a list of return codes. + while input.available > 0 do + rc[#rc + 1] = parse_uint8(input.read_func) + end + return setmetatable({type=ptype, packet_id=packet_id, rc=rc}, packet_mt) + elseif ptype == packet_type.UNSUBACK then + -- DOC: 3.11 UNSUBACK – Unsubscribe acknowledgement + if input.available ~= 2 then + return false, "expecting data of length 2 bytes" + end + -- DOC: 3.11.2 Variable header + local packet_id = parse_uint16(input.read_func) + return setmetatable({type=ptype, packet_id=packet_id}, packet_mt) + elseif ptype == packet_type.PINGRESP then + -- DOC: 3.13 PINGRESP – PING response + if input.available ~= 0 then + return false, "expecting data of length 0 bytes" + end + return setmetatable({type=ptype}, packet_mt) + else + return false, "unexpected packet type received: "..tostring(ptype) + end +end + +-- export module table +return protocol4 + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/protocol5.lua b/controller-host/mqtt/protocol5.lua new file mode 100644 index 0000000..de9c696 --- /dev/null +++ b/controller-host/mqtt/protocol5.lua @@ -0,0 +1,1039 @@ +--[[ + +Here is a MQTT v5.0 protocol implementation + +MQTT v5.0 documentation (DOC): + http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html + +]] + +-- module table +local protocol5 = {} + +-- load required stuff +local type = type +local error = error +local assert = assert +local require = require +local tostring = tostring +local setmetatable = setmetatable + +local table = require("table") +local unpack = table.unpack or unpack +local tbl_sort = table.sort + +local string = require("string") +local str_char = string.char +local fmt = string.format + +local bit = require("mqtt.bitwrap") +local bor = bit.bor +local band = bit.band +local lshift = bit.lshift +local rshift = bit.rshift + +local protocol = require("mqtt.protocol") +local make_uint8 = protocol.make_uint8 +local make_uint16 = protocol.make_uint16 +local make_uint32 = protocol.make_uint32 +local make_string = protocol.make_string +local make_var_length = protocol.make_var_length +local parse_var_length = protocol.parse_var_length +local make_uint8_0_or_1 = protocol.make_uint8_0_or_1 +local make_uint16_nonzero = protocol.make_uint16_nonzero +local make_var_length_nonzero = protocol.make_var_length_nonzero +local parse_string = protocol.parse_string +local parse_uint8 = protocol.parse_uint8 +local parse_uint8_0_or_1 = protocol.parse_uint8_0_or_1 +local parse_uint16 = protocol.parse_uint16 +local parse_uint16_nonzero = protocol.parse_uint16_nonzero +local parse_uint32 = protocol.parse_uint32 +local parse_var_length_nonzero = protocol.parse_var_length_nonzero +local make_header = protocol.make_header +local check_qos = protocol.check_qos +local check_packet_id = protocol.check_packet_id +local combine = protocol.combine +local packet_type = protocol.packet_type +local packet_mt = protocol.packet_mt +local connack_packet_mt = protocol.connack_packet_mt +local start_parse_packet = protocol.start_parse_packet + +-- Returns true if given value is a valid Retain Handling option, DOC: 3.8.3.1 Subscription Options +local function check_retain_handling(val) + return (val == 0) or (val == 1) or (val == 2) +end + +-- Create Connect Flags data, DOC: 3.1.2.3 Connect Flags +local function make_connect_flags(args) + local byte = 0 -- bit 0 should be zero + -- DOC: 3.1.2.4 Clean Start + if args.clean ~= nil then + assert(type(args.clean) == "boolean", "expecting .clean to be a boolean") + if args.clean then + byte = bor(byte, lshift(1, 1)) + end + end + -- DOC: 3.1.2.5 Will Flag + if args.will ~= nil then + -- check required args are presented + assert(type(args.will) == "table", "expecting .will to be a table") + assert(type(args.will.payload) == "string", "expecting .will.payload to be a string") + assert(type(args.will.topic) == "string", "expecting .will.topic to be a string") + assert(type(args.will.qos) == "number", "expecting .will.qos to be a number") + assert(check_qos(args.will.qos), "expecting .will.qos to be a valid QoS value") + assert(type(args.will.retain) == "boolean", "expecting .will.retain to be a boolean") + if args.will.properties ~= nil then + assert(type(args.will.properties) == "table", "expecting .will.properties to be a table") + end + if args.will.user_properties ~= nil then + assert(type(args.will.user_properties) == "table", "expecting .will.user_properties to be a table") + end + -- will flag should be set to 1 + byte = bor(byte, lshift(1, 2)) + -- DOC: 3.1.2.6 Will QoS + byte = bor(byte, lshift(args.will.qos, 3)) + -- DOC: 3.1.2.7 Will Retain + if args.will.retain then + byte = bor(byte, lshift(1, 5)) + end + end + -- DOC: 3.1.2.8 User Name Flag + if args.username ~= nil then + assert(type(args.username) == "string", "expecting .username to be a string") + byte = bor(byte, lshift(1, 7)) + end + -- DOC: 3.1.2.9 Password Flag + if args.password ~= nil then + assert(type(args.password) == "string", "expecting .password to be a string") + assert(args.username, "the .username is required to set .password") + byte = bor(byte, lshift(1, 6)) + end + return make_uint8(byte) +end + +-- Known property names and its identifiers, DOC: 2.2.2.2 Property +local property_pairs = { + { 0x01, "payload_format_indicator", + make = make_uint8_0_or_1, + parse = parse_uint8_0_or_1, }, + { 0x02, "message_expiry_interval", + make = make_uint32, + parse = parse_uint32, }, + { 0x03, "content_type", + make = make_string, + parse = parse_string, }, + { 0x08, "response_topic", + make = make_string, + parse = parse_string, }, + { 0x09, "correlation_data", + make = make_string, + parse = parse_string, }, + { 0x0B, "subscription_identifiers", + make = function(value) return str_char(make_var_length_nonzero(value)) end, + parse = parse_var_length_nonzero, + multiple = true, }, + { 0x11, "session_expiry_interval", + make = make_uint32, + parse = parse_uint32, }, + { 0x12, "assigned_client_identifier", + make = make_string, + parse = parse_string, }, + { 0x13, "server_keep_alive", + make = make_uint16, + parse = parse_uint16, }, + { 0x15, "authentication_method", + make = make_string, + parse = parse_string, }, + { 0x16, "authentication_data", + make = make_string, + parse = parse_string, }, + { 0x17, "request_problem_information", + make = make_uint8_0_or_1, + parse = parse_uint8_0_or_1, }, + { 0x18, "will_delay_interval", + make = make_uint32, + parse = parse_uint32, }, + { 0x19, "request_response_information", + make = make_uint8_0_or_1, + parse = parse_uint8_0_or_1, }, + { 0x1A, "response_information", + make = make_string, + parse = parse_string, }, + { 0x1C, "server_reference", + make = make_string, + parse = parse_string, }, + { 0x1F, "reason_string", + make = make_string, + parse = parse_string, }, + { 0x21, "receive_maximum", + make = make_uint16, + parse = parse_uint16, }, + { 0x22, "topic_alias_maximum", + make = make_uint16, + parse = parse_uint16, }, + { 0x23, "topic_alias", + make = make_uint16_nonzero, + parse = parse_uint16_nonzero, }, + { 0x24, "maximum_qos", + make = make_uint8_0_or_1, + parse = parse_uint8_0_or_1, }, + { 0x25, "retain_available", + make = make_uint8_0_or_1, + parse = parse_uint8_0_or_1, }, + { 0x26, "user_property", -- NOTE: not implemented intentionally + make = function(value_) error("not implemented") end, + parse = function(read_func_) error("not implemented") end, }, + { 0x27, "maximum_packet_size", + make = make_uint32, + parse = parse_uint32, }, + { 0x28, "wildcard_subscription_available", + make = make_uint8_0_or_1, + parse = parse_uint8_0_or_1, }, + { 0x29, "subscription_identifiers_available", + make = make_uint8_0_or_1, + parse = parse_uint8_0_or_1, }, + { 0x2A, "shared_subscription_available", + make = make_uint8_0_or_1, + parse = parse_uint8_0_or_1, }, +} + +-- properties table with keys in two directions: from name to identifier and back +local properties = {} +-- table with property value make functions +local property_make = {} +-- table with property value parse function +local property_parse = {} +-- table with property multiple flag +local property_multiple = {} +-- fill the properties and property_make tables +for _, prop in ipairs(property_pairs) do + properties[prop[2]] = prop[1] -- name ==> identifier + properties[prop[1]] = prop[2] -- identifier ==> name + property_make[prop[1]] = prop.make -- identifier ==> make function + property_parse[prop[1]] = prop.parse -- identifier ==> make function + property_multiple[prop[1]] = prop.multiple -- identifier ==> multiple flag +end + +-- Allowed properties per packet type +local allowed_properties = { + [packet_type.CONNECT] = { + [0x11] = true, -- DOC: 3.1.2.11.2 Session Expiry Interval + [0x21] = true, -- DOC: 3.1.2.11.3 Receive Maximum + [0x27] = true, -- DOC: 3.1.2.11.4 Maximum Packet Size + [0x22] = true, -- DOC: 3.1.2.11.5 Topic Alias Maximum + [0x19] = true, -- DOC: 3.1.2.11.6 Request Response Information + [0x17] = true, -- DOC: 3.1.2.11.7 Request Problem Information + [0x26] = true, -- DOC: 3.1.2.11.8 User Property + [0x15] = true, -- DOC: 3.1.2.11.9 Authentication Method + [0x16] = true, -- DOC: 3.1.2.11.10 Authentication Data + }, + [packet_type.CONNACK] = { + [0x11] = true, -- DOC: 3.2.2.3.2 Session Expiry Interval + [0x21] = true, -- DOC: 3.2.2.3.3 Receive Maximum + [0x24] = true, -- DOC: 3.2.2.3.4 Maximum QoS + [0x25] = true, -- DOC: 3.2.2.3.5 Retain Available + [0x27] = true, -- DOC: 3.2.2.3.6 Maximum Packet Size + [0x12] = true, -- DOC: 3.2.2.3.7 Assigned Client Identifier + [0x22] = true, -- DOC: 3.2.2.3.8 Topic Alias Maximum + [0x1F] = true, -- DOC: 3.2.2.3.9 Reason String + [0x26] = true, -- DOC: 3.2.2.3.10 User Property + [0x28] = true, -- DOC: 3.2.2.3.11 Wildcard Subscription Available + [0x29] = true, -- DOC: 3.2.2.3.12 Subscription Identifiers Available + [0x2A] = true, -- DOC: 3.2.2.3.13 Shared Subscription Available + [0x13] = true, -- DOC: 3.2.2.3.14 Server Keep Alive + [0x1A] = true, -- DOC: 3.2.2.3.15 Response Information + [0x1C] = true, -- DOC: 3.2.2.3.16 Server Reference + [0x15] = true, -- DOC: 3.2.2.3.17 Authentication Method + [0x16] = true, -- DOC: 3.2.2.3.18 Authentication Data + }, + [packet_type.PUBLISH] = { + [0x01] = true, -- DOC: 3.3.2.3.2 Payload Format Indicator + [0x02] = true, -- DOC: 3.3.2.3.3 Message Expiry Interval + [0x23] = true, -- DOC: 3.3.2.3.4 Topic Alias + [0x08] = true, -- DOC: 3.3.2.3.5 Response Topic + [0x09] = true, -- DOC: 3.3.2.3.6 Correlation Data + [0x26] = true, -- DOC: 3.3.2.3.7 User Property + [0x0B] = true, -- DOC: 3.3.2.3.8 Subscription Identifier + [0x03] = true, -- DOC: 3.3.2.3.9 Content Type + }, + will = { + [0x18] = true, -- DOC: 3.1.3.2.2 Will Delay Interval + [0x01] = true, -- DOC: 3.1.3.2.3 Payload Format Indicator + [0x02] = true, -- DOC: 3.1.3.2.4 Message Expiry Interval + [0x03] = true, -- DOC: 3.1.3.2.5 Content Type + [0x08] = true, -- DOC: 3.1.3.2.6 Response Topic + [0x09] = true, -- DOC: 3.1.3.2.7 Correlation Data + [0x26] = true, -- DOC: 3.1.3.2.8 User Property + }, + [packet_type.PUBACK] = { + [0x1F] = true, -- DOC: 3.4.2.2.2 Reason String + [0x26] = true, -- DOC: 3.4.2.2.3 User Property + }, + [packet_type.PUBREC] = { + [0x1F] = true, -- DOC: 3.5.2.2.2 Reason String + [0x26] = true, -- DOC: 3.5.2.2.3 User Property + }, + [packet_type.PUBREL] = { + [0x1F] = true, -- DOC: 3.6.2.2.2 Reason String + [0x26] = true, -- DOC: 3.6.2.2.3 User Property + }, + [packet_type.PUBCOMP] = { + [0x1F] = true, -- DOC: 3.7.2.2.2 Reason String + [0x26] = true, -- DOC: 3.7.2.2.3 User Property + }, + [packet_type.SUBSCRIBE] = { + [0x0B] = true, -- DOC: 3.8.2.1.2 Subscription Identifier + [0x26] = true, -- DOC: 3.8.2.1.3 User Property + }, + [packet_type.SUBACK] = { + [0x1F] = true, -- DOC: 3.9.2.1.2 Reason String + [0x26] = true, -- DOC: 3.9.2.1.3 User Property + }, + [packet_type.UNSUBSCRIBE] = { + [0x26] = true, -- DOC: 3.10.2.1.2 User Property + }, + [packet_type.UNSUBACK] = { + [0x1F] = true, -- DOC: 3.11.2.1.2 Reason String + [0x26] = true, -- DOC: 3.11.2.1.3 User Property + }, + -- NOTE: PINGREQ (3.12), PINGRESP (3.13) has no properties + [packet_type.DISCONNECT] = { + [0x11] = true, -- DOC: 3.14.2.2.2 Session Expiry Interval + [0x1F] = true, -- DOC: 3.14.2.2.3 Reason String + [0x26] = true, -- DOC: 3.14.2.2.4 User Property + [0x1C] = true, -- DOC: 3.14.2.2.5 Server Reference + }, + [packet_type.AUTH] = { + [0x15] = true, -- DOC: 3.15.2.2.2 Authentication Method + [0x16] = true, -- DOC: 3.15.2.2.3 Authentication Data + [0x1F] = true, -- DOC: 3.15.2.2.4 Reason String + [0x26] = true, -- DOC: 3.15.2.2.5 User Property + }, +} + +-- Create properties field for various control packets, DOC: 2.2.2 Properties +local function make_properties(ptype, args) + local allowed = assert(allowed_properties[ptype], "invalid packet type to detect allowed properties") + local props = "" + local uprop_id = properties.user_property + -- writing known properties + if args.properties ~= nil then + assert(type(args.properties) == "table", "expecting .properties to be a table") + -- validate all properties and append them to order list + local order = {} + for name, value in pairs(args.properties) do + assert(type(name) == "string", "expecting property name to be a string: "..tostring(name)) + -- detect property identifier and check it's allowed for that packet type + local prop_id = assert(properties[name], "unknown property: "..tostring(name)) + assert(prop_id ~= uprop_id, "user properties should be passed in .user_properties table") + assert(allowed[prop_id], "property "..name.." is not allowed for packet type "..ptype) + order[#order + 1] = { prop_id, name, value } + end + -- sort props in the identifier ascending order + tbl_sort(order, function(a, b) return a[1] < b[1] end) + for _, item in ipairs(order) do + local prop_id, name, value = unpack(item) + if property_multiple[prop_id] then + assert(type(value) == "table", "expecting list-table for property with multiple value") + assert(#value == 1, "only one value for multiple-property supported") + value = value[1] + end + -- make property data + local ok, val = pcall(property_make[prop_id], value) + if not ok then + error("invalid property value: "..name.." = "..tostring(value)..": "..tostring(val)) + end + local prop = combine( + str_char(make_var_length(prop_id)), + val + ) + -- and append it to props + if type(props) == "string" then + props = combine(prop) + else + props:append(prop) + end + end + end + -- writing userproperties + if args.user_properties ~= nil then + assert(type(args.user_properties) == "table", "expecting .user_properties to be a table") + assert(allowed[uprop_id], "user_property is not allowed for packet type "..ptype) + local order = {} + for name, val in pairs(args.user_properties) do + local ntype = type(name) + if ntype == "string" then + if type(val) ~= "string" then + error(fmt("user property '%s' value should be a string", name)) + end + order[#order + 1] = {name, val, 0} + elseif ntype == "number" then + if type(val) ~= "table" or type(val[1]) ~= "string" or type(val[2]) ~= "string" then + error(fmt("user property at index %d should be a table with two strings", name)) + end + order[#order + 1] = {val[1], val[2], name} + else + error(fmt("unknown user property name type passed: %s", ntype)) + end + end + tbl_sort(order, function(a, b) if a[1] == b[1] then return a[3] < b[3] else return a[1] < b[1] end end) + for _, pair in ipairs(order) do + local name = pair[1] + local value = pair[2] + -- make user property data + local prop = combine( + str_char(make_var_length(uprop_id)), + make_string(name), + make_string(value) + ) + -- and append it to props + if type(props) == "string" then + props = combine(prop) + else + props:append(prop) + end + end + end + -- and combine properties with its length field + return combine( + str_char(make_var_length(props:len())), -- DOC: 2.2.2.1 Property Length + props -- DOC: 2.2.2.2 Property + ) +end + +-- Create CONNECT packet, DOC: 3.1 CONNECT – Connection Request +local function make_packet_connect(args) + -- check args + assert(type(args.id) == "string", "expecting .id to be a string with MQTT client id") + -- DOC: 3.1.2.10 Keep Alive + local keep_alive_ival = 0 + if args.keep_alive then + assert(type(args.keep_alive) == "number") + keep_alive_ival = args.keep_alive + end + -- DOC: 3.1.2.11 CONNECT Properties + local props = make_properties(packet_type.CONNECT, args) + -- DOC: 3.1.2 CONNECT Variable Header + local variable_header = combine( + make_string("MQTT"), -- DOC: 3.1.2.1 Protocol Name + make_uint8(5), -- DOC: 3.1.2.2 Protocol Version (5 is for MQTT v5.0) + make_connect_flags(args), -- DOC: 3.1.2.3 Connect Flags + make_uint16(keep_alive_ival), -- DOC: 3.1.2.10 Keep Alive + props -- DOC: 3.1.2.11 CONNECT Properties + ) + -- DOC: 3.1.3 CONNECT Payload + -- DOC: 3.1.3.1 Client Identifier (ClientID) + local payload = combine( + make_string(args.id) + ) + if args.will then + -- DOC: 3.1.3.2 Will Properties + payload:append(make_properties("will", args.will)) + -- DOC: 3.1.3.3 Will Topic + assert(type(args.will.topic) == "string", "expecting will.topic to be a string") + payload:append(make_string(args.will.topic)) + -- DOC: 3.1.3.4 Will Payload + assert(args.will.payload == nil or type(args.will.payload) == "string", "expecting will.payload to be a string or nil") + payload:append(make_string(args.will.payload)) + end + if args.username then + -- DOC: 3.1.3.5 User Name + payload:append(make_string(args.username)) + if args.password then + -- DOC: 3.1.3.6 Password + payload:append(make_string(args.password)) + end + end + -- DOC: 3.1.1 Fixed header + local header = make_header(packet_type.CONNECT, 0, variable_header:len() + payload:len()) + return combine(header, variable_header, payload) +end + +-- Create PUBLISH packet, DOC: 3.3 PUBLISH – Publish message +local function make_packet_publish(args) + -- check args + assert(type(args.topic) == "string", "expecting .topic to be a string") + if args.payload ~= nil then + assert(type(args.payload) == "string", "expecting .payload to be a string") + end + if args.qos ~= nil then + assert(type(args.qos) == "number", "expecting .qos to be a number") + assert(check_qos(args.qos), "expecting .qos to be a valid QoS value") + end + if args.retain ~= nil then + assert(type(args.retain) == "boolean", "expecting .retain to be a boolean") + end + if args.dup ~= nil then + assert(type(args.dup) == "boolean", "expecting .dup to be a boolean") + end + + -- DOC: 3.3.1 PUBLISH Fixed Header + local flags = 0 + -- 3.3.1.3 RETAIN + if args.retain then + flags = bor(flags, 0x1) + end + -- DOC: 3.3.1.2 QoS + flags = bor(flags, lshift(args.qos or 0, 1)) + -- DOC: 3.3.1.1 DUP + if args.dup then + flags = bor(flags, lshift(1, 3)) + end + -- DOC: 3.3.2 PUBLISH Variable Header + local variable_header = combine( + make_string(args.topic) + ) + -- DOC: 3.3.2.2 Packet Identifier + if args.qos and args.qos > 0 then + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + variable_header:append(make_uint16(args.packet_id)) + end + -- DOC: 3.3.2.3 PUBLISH Properties + variable_header:append(make_properties(packet_type.PUBLISH, args)) + -- DOC: 3.3.3 PUBLISH Payload + local payload + if args.payload then + payload = args.payload + else + payload = "" + end + -- DOC: 3.3.1 Fixed header + local header = make_header(packet_type.PUBLISH, flags, variable_header:len() + payload:len()) + return combine(header, variable_header, payload) +end + +-- Create PUBACK packet, DOC: 3.4 PUBACK – Publish acknowledgement +local function make_packet_puback(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + assert(type(args.rc) == "number", "expecting .rc to be a number") + -- DOC: 3.4.2 PUBACK Variable Header + local variable_header = combine(make_uint16(args.packet_id)) + local props = make_properties(packet_type.PUBACK, args) -- DOC: 3.4.2.2 PUBACK Properties + -- DOC: The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success) and there are no Properties. In this case the PUBACK has a Remaining Length of 2. + if props:len() > 1 or args.rc ~= 0 then + variable_header:append(make_uint8(args.rc)) -- DOC: 3.4.2.1 PUBACK Reason Code + variable_header:append(props) -- DOC: 3.4.2.2 PUBACK Properties + end + -- DOC: 3.4.1 PUBACK Fixed Header + local header = make_header(packet_type.PUBACK, 0, variable_header:len()) + return combine(header, variable_header) +end + +-- Create PUBREC packet, DOC: 3.5 PUBREC – Publish received (QoS 2 delivery part 1) +local function make_packet_pubrec(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + assert(type(args.rc) == "number", "expecting .rc to be a number") + -- DOC: 3.5.2 PUBREC Variable Header + local variable_header = combine(make_uint16(args.packet_id)) + local props = make_properties(packet_type.PUBREC, args) -- DOC: 3.5.2.2 PUBREC Properties + -- DOC: The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success) and there are no Properties. In this case the PUBREC has a Remaining Length of 2. + if props:len() > 1 or args.rc ~= 0 then + variable_header:append(make_uint8(args.rc)) -- DOC: 3.5.2.1 PUBREC Reason Code + variable_header:append(props) -- DOC: 3.5.2.2 PUBREC Properties + end + -- DOC: 3.5.1 PUBREC Fixed Header + local header = make_header(packet_type.PUBREC, 0, variable_header:len()) + return combine(header, variable_header) +end + +-- Create PUBREL packet, DOC: 3.6 PUBREL – Publish release (QoS 2 delivery part 2) +local function make_packet_pubrel(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + assert(type(args.rc) == "number", "expecting .rc to be a number") + -- DOC: 3.6.2 PUBREL Variable Header + local variable_header = combine(make_uint16(args.packet_id)) + local props = make_properties(packet_type.PUBREL, args) -- DOC: 3.6.2.2 PUBREL Properties + -- DOC: The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success) and there are no Properties. In this case the PUBREL has a Remaining Length of 2. + if props:len() > 1 or args.rc ~= 0 then + variable_header:append(make_uint8(args.rc)) -- DOC: 3.6.2.1 PUBREL Reason Code + variable_header:append(props) -- DOC: 3.6.2.2 PUBREL Properties + end + -- DOC: 3.6.1 PUBREL Fixed Header + local header = make_header(packet_type.PUBREL, 2, variable_header:len()) -- flags: fixed 0010 bits, DOC: Figure 3‑14 – PUBREL packet Fixed Header + return combine(header, variable_header) +end + +-- Create PUBCOMP packet, DOC: 3.7 PUBCOMP – Publish complete (QoS 2 delivery part 3) +local function make_packet_pubcomp(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + assert(type(args.rc) == "number", "expecting .rc to be a number") + -- DOC: 3.7.2 PUBCOMP Variable Header + local variable_header = combine(make_uint16(args.packet_id)) + local props = make_properties(packet_type.PUBCOMP, args) -- DOC: 3.7.2.2 PUBCOMP Properties + -- DOC: The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success) and there are no Properties. In this case the PUBCOMP has a Remaining Length of 2. + if props:len() > 1 or args.rc ~= 0 then + variable_header:append(make_uint8(args.rc)) -- DOC: 3.7.2.1 PUBCOMP Reason Code + variable_header:append(props) -- DOC: 3.7.2.2 PUBCOMP Properties + end + -- DOC: 3.7.1 PUBCOMP Fixed Header + local header = make_header(packet_type.PUBCOMP, 0, variable_header:len()) + return combine(header, variable_header) +end + +-- Create SUBSCRIBE packet, DOC: 3.8 SUBSCRIBE - Subscribe request +local function make_packet_subscribe(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + assert(type(args.subscriptions) == "table", "expecting .subscriptions to be a table") + assert(#args.subscriptions > 0, "expecting .subscriptions to be a non-empty array") + -- DOC: 3.8.2 SUBSCRIBE Variable Header + local variable_header = combine( + make_uint16(args.packet_id), + make_properties(packet_type.SUBSCRIBE, args) -- DOC: 3.8.2.1 SUBSCRIBE Properties + ) + -- DOC: 3.8.3 SUBSCRIBE Payload + local payload = combine() + for i, subscription in ipairs(args.subscriptions) do + assert(type(subscription) == "table", "expecting .subscriptions["..i.."] to be a table") + assert(type(subscription.topic) == "string", "expecting .subscriptions["..i.."].topic to be a string") + if subscription.qos ~= nil then -- TODO: maybe remove that check and make .qos mandatory? + assert(type(subscription.qos) == "number", "expecting .subscriptions["..i.."].qos to be a number") + assert(check_qos(subscription.qos), "expecting .subscriptions["..i.."].qos to be a valid QoS value") + end + if subscription.retain_as_published ~= nil then + assert(type(subscription.retain_as_published) == "boolean", "expecting .subscriptions["..i.."].retain_as_published to be a boolean") + end + if subscription.retain_handling ~= nil then + assert(type(subscription.retain_handling) == "number", "expecting .subscriptions["..i.."].retain_handling to be a number") + assert(check_retain_handling(subscription.retain_handling), "expecting .subscriptions["..i.."].retain_handling to be a valid Retain Handling option") + end + -- DOC: 3.8.3.1 Subscription Options + local so = subscription.qos or 0 + if subscription.no_local then + so = bor(so, 4) -- set Bit 2 + end + if subscription.retain_as_published then + so = bor(so, 8) -- set Bit 3 + end + if subscription.retain_handling then + so = bor(so, lshift(subscription.retain_handling, 4)) -- set Bit 4 and 5 + end + payload:append(make_string(subscription.topic)) + payload:append(make_uint8(so)) + end + -- DOC: 3.8.1 SUBSCRIBE Fixed Header + local header = make_header(packet_type.SUBSCRIBE, 2, variable_header:len() + payload:len()) -- flags: fixed 0010 bits, DOC: Figure 3‑18 SUBSCRIBE packet Fixed Header + return combine(header, variable_header, payload) +end + +-- Create UNSUBSCRIBE packet, DOC: 3.10 UNSUBSCRIBE – Unsubscribe request +local function make_packet_unsubscribe(args) + -- check args + assert(type(args.packet_id) == "number", "expecting .packet_id to be a number") + assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier") + assert(type(args.subscriptions) == "table", "expecting .subscriptions to be a table") + assert(#args.subscriptions > 0, "expecting .subscriptions to be a non-empty array") + -- DOC: 3.10.2 UNSUBSCRIBE Variable Header + local variable_header = combine( + make_uint16(args.packet_id), + make_properties(packet_type.UNSUBSCRIBE, args) -- DOC: 3.10.2.1 UNSUBSCRIBE Properties + ) + -- DOC: 3.10.3 UNSUBSCRIBE Payload + local payload = combine() + for i, subscription in ipairs(args.subscriptions) do + assert(type(subscription) == "string", "expecting .subscriptions["..i.."] to be a string") + payload:append(make_string(subscription)) + end + -- DOC: 3.10.1 UNSUBSCRIBE Fixed Header + local header = make_header(packet_type.UNSUBSCRIBE, 2, variable_header:len() + payload:len()) -- flags: fixed 0010 bits, DOC: Figure 3.28 – UNSUBSCRIBE packet Fixed Header + return combine(header, variable_header, payload) +end + +-- Create DISCONNECT packet, DOC: 3.14 DISCONNECT – Disconnect notification +local function make_packet_disconnect(args) + -- check args + assert(type(args.rc) == "number", "expecting .rc to be a number") + -- DOC: 3.14.2 DISCONNECT Variable Header + local variable_header = combine() + local props = make_properties(packet_type.DISCONNECT, args) -- DOC: 3.14.2.2 DISCONNECT Properties + -- DOC: The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Normal disconnecton) and there are no Properties. In this case the DISCONNECT has a Remaining Length of 0. + if props:len() > 1 or args.rc ~= 0 then + variable_header:append(make_uint8(args.rc)) -- DOC: 3.14.2.1 Disconnect Reason Code + variable_header:append(props) -- DOC: 3.14.2.2 DISCONNECT Properties + end + -- DOC: 3.14.1 DISCONNECT Fixed Header + local header = make_header(packet_type.DISCONNECT, 0, variable_header:len()) -- flags: 0 + return combine(header, variable_header) +end + +-- Create AUTH packet, DOC: 3.15 AUTH – Authentication exchange +local function make_packet_auth(args) + -- check args + assert(type(args.rc) == "number", "expecting .rc to be a number") + -- DOC: 3.15.2 AUTH Variable Header + local variable_header = combine() + -- DOC: The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success) and there are no Properties. In this case the AUTH has a Remaining Length of 0. + local props = make_properties(packet_type.AUTH, args) -- DOC: 3.15.2.2 AUTH Properties + if props:len() > 1 or args.rc ~= 0 then + variable_header:append(make_uint8(args.rc)) -- DOC: 3.15.2.1 Authenticate Reason Code + variable_header:append(props) -- DOC: 3.15.2.2 AUTH Properties + end + -- DOC: 3.15.1 AUTH Fixed Header + local header = make_header(packet_type.AUTH, 0, variable_header:len()) + return combine(header, variable_header) +end + +-- Create packet of given {type: number} in args +function protocol5.make_packet(args) + assert(type(args) == "table", "expecting args to be a table") + assert(type(args.type) == "number", "expecting .type number in args") + local ptype = args.type + if ptype == packet_type.CONNECT then + return make_packet_connect(args) + elseif ptype == packet_type.PUBLISH then + return make_packet_publish(args) + elseif ptype == packet_type.PUBACK then + return make_packet_puback(args) + elseif ptype == packet_type.PUBREC then + return make_packet_pubrec(args) + elseif ptype == packet_type.PUBREL then + return make_packet_pubrel(args) + elseif ptype == packet_type.PUBCOMP then + return make_packet_pubcomp(args) + elseif ptype == packet_type.SUBSCRIBE then + return make_packet_subscribe(args) + elseif ptype == packet_type.UNSUBSCRIBE then + return make_packet_unsubscribe(args) + elseif ptype == packet_type.PINGREQ then + -- DOC: 3.12 PINGREQ – PING request + return combine("\192\000") -- 192 == 0xC0, type == 12, flags == 0 + elseif ptype == packet_type.DISCONNECT then + return make_packet_disconnect(args) + elseif ptype == packet_type.AUTH then + return make_packet_auth(args) + else + error("unexpected packet type to make: "..ptype) + end +end + +-- Parse properties using given read_data function for specified packet type +-- Result will be stored in packet.properties and packet.user_properties +-- Returns false plus string with error message on failure +-- Returns true on success +local function parse_properties(ptype, read_data, input, packet) + assert(type(read_data) == "function", "expecting read_data to be a function") + -- DOC: 2.2.2 Properties + -- parse Property Length + -- create read_func for parse_var_length and other parse functions, reading from data string instead of network connector + local len, err = parse_var_length(read_data) + if not len then + return false, "failed to parse properties length: "..err + end + -- check data contains enough bytes for reading properties + if input.available < len then + return false, "not enough data to parse properties of length "..len + end + -- ensure properties and user_properties are presented in packet + if not packet.properties then + packet.properties = {} + end + if not packet.user_properties then + packet.user_properties = {} + end + local uprops = packet.user_properties + -- parse allowed properties + local uprop_id = properties.user_property + local allowed = assert(allowed_properties[ptype], "no allowed properties for specified packet type: "..tostring(ptype)) + local props_end = input[1] + len + while input[1] < props_end do + -- property id, DOC: 2.2.2.2 Property + local prop_id + prop_id, err = parse_var_length(read_data) + if not prop_id then + return false, "failed to parse property length: "..err + end + if not allowed[prop_id] then + return false, "property "..prop_id.." is not allowed for packet type "..ptype + end + if prop_id == uprop_id then + -- parse name=value string pair + local name, value + name, err = parse_string(read_data) + if not name then + return false, "failed to parse user property name: "..err + end + value, err = parse_string(read_data) + if not value then + return false, "failed to parse user property value: "..err + end + local old_val = uprops[name] + if old_val ~= nil then + -- ensure uprops contains pairs with name = + local found = false + for _, pair in ipairs(uprops) do + if pair[1] == name and pair[2] == old_val then + found = true + break + end + end + if not found then + uprops[#uprops + 1] = {name, old_val} + end + uprops[#uprops + 1] = {name, value} + end + uprops[name] = value + else + -- parse property value according its identifier + local value + value, err = property_parse[prop_id](read_data) + if err then + return false, "failed ro parse property "..prop_id.." value: "..err + end + if property_multiple[prop_id] then + local curr = packet.properties[properties[prop_id]] or {} + curr[#curr + 1] = value + packet.properties[properties[prop_id]] = curr + else + packet.properties[properties[prop_id]] = value + end + end + end + return true +end + +-- Parse packet using given read_func +-- Returns packet on success or false and error message on failure +function protocol5.parse_packet(read_func) + local ptype, flags, input = start_parse_packet(read_func) + if not ptype then + return false, flags + end + local byte1, byte2, err, rc, ok, packet, topic, packet_id + local read_data = input.read_func + + -- parse readed data according type in fixed header + if ptype == packet_type.CONNACK then + -- DOC: 3.2 CONNACK – Connect acknowledgement + if input.available < 3 then + return false, "expecting data of length 3 bytes or more" + end + -- DOC: 3.2.2.1.1 Session Present + -- DOC: 3.2.2.2 Connect Reason Code + byte1, byte2 = parse_uint8(read_data), parse_uint8(read_data) + local sp = (band(byte1, 0x1) ~= 0) + packet = setmetatable({type=ptype, sp=sp, rc=byte2}, connack_packet_mt) + -- DOC: 3.2.2.3 CONNACK Properties + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + elseif ptype == packet_type.PUBLISH then + -- DOC: 3.3 PUBLISH – Publish message + -- DOC: 3.3.1.1 DUP + local dup = (band(flags, 0x8) ~= 0) + -- DOC: 3.3.1.2 QoS + local qos = band(rshift(flags, 1), 0x3) + -- DOC: 3.3.1.3 RETAIN + local retain = (band(flags, 0x1) ~= 0) + -- DOC: 3.3.2.1 Topic Name + topic, err = parse_string(read_data) + if not topic then + return false, "failed to parse topic: "..err + end + -- DOC: 3.3.2.2 Packet Identifier + if qos > 0 then + packet_id, err = parse_uint16(read_data) + if not packet_id then + return false, "failed to parse packet_id: "..err + end + end + -- DOC: 3.3.2.3 PUBLISH Properties + packet = setmetatable({type=ptype, dup=dup, qos=qos, retain=retain, packet_id=packet_id, topic=topic}, packet_mt) + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + if input.available > 0 then + -- DOC: 3.3.3 PUBLISH Payload + packet.payload = read_data(input.available) + end + elseif ptype == packet_type.PUBACK then + -- DOC: 3.4 PUBACK – Publish acknowledgement + packet_id, err = parse_uint16(read_data) + if not packet_id then + return false, "failed to parse packet_id: "..err + end + packet = setmetatable({type=ptype, packet_id=packet_id, rc=0, properties={}, user_properties={}}, packet_mt) + if input.available > 0 then + -- DOC: 3.4.2.1 PUBACK Reason Code + rc, err = parse_uint8(read_data) + if not rc then + return false, "failed to parse rc: "..err + end + packet.rc = rc + -- DOC: 3.4.2.2 PUBACK Properties + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + end + elseif ptype == packet_type.PUBREC then + -- DOC: 3.5 PUBREC – Publish received (QoS 2 delivery part 1) + packet_id, err = parse_uint16(read_data) + if not packet_id then + return false, "failed to parse packet_id: "..err + end + packet = setmetatable({type=ptype, packet_id=packet_id, rc=0, properties={}, user_properties={}}, packet_mt) + if input.available > 0 then + -- DOC: 3.5.2.1 PUBREC Reason Code + rc, err = parse_uint8(read_data) + if not rc then + return false, "failed to parse rc: "..err + end + packet.rc = rc + -- DOC: 3.5.2.2 PUBREC Properties + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + end + elseif ptype == packet_type.PUBREL then + -- DOC: 3.6 PUBREL – Publish release (QoS 2 delivery part 2) + packet_id, err = parse_uint16(read_data) + if not packet_id then + return false, "failed to parse packet_id: "..err + end + packet = setmetatable({type=ptype, packet_id=packet_id, rc=0, properties={}, user_properties={}}, packet_mt) + if input.available > 0 then + -- DOC: 3.6.2.1 PUBREL Reason Code + rc, err = parse_uint8(read_data) + if not rc then + return false, "failed to parse rc: "..err + end + packet.rc = rc + -- DOC: 3.6.2.2 PUBREL Properties + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + end + elseif ptype == packet_type.PUBCOMP then + -- DOC: 3.7 PUBCOMP – Publish complete (QoS 2 delivery part 3) + packet_id, err = parse_uint16(read_data) + if not packet_id then + return false, "failed to parse packet_id: "..err + end + packet = setmetatable({type=ptype, packet_id=packet_id, rc=0, properties={}, user_properties={}}, packet_mt) + if input.available > 0 then + -- DOC: 3.7.2.1 PUBCOMP Reason Code + rc, err = parse_uint8(read_data) + if not rc then + return false, "failed to parse rc: "..err + end + packet.rc = rc + -- DOC: 3.7.2.2 PUBCOMP Properties + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + end + elseif ptype == packet_type.SUBACK then + -- DOC: 3.9 SUBACK – Subscribe acknowledgement + -- DOC: 3.9.2 SUBACK Variable Header + packet_id, err = parse_uint16(read_data) + if not packet_id then + return false, "failed to parse packet_id: "..err + end + -- DOC: 3.9.2.1 SUBACK Properties + packet = setmetatable({type=ptype, packet_id=packet_id}, packet_mt) + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + -- DOC: 3.9.3 SUBACK Payload + local rcs = {} + while input.available > 0 do + rc, err = parse_uint8(read_data) + if not rc then + return false, "failed to parse reason code: "..err + end + rcs[#rcs + 1] = rc + end + if not next(rcs) then + return false, "expecting at least one reason code in SUBACK" + end + packet.rc = rcs -- TODO: reason codes table somewhere should be placed + elseif ptype == packet_type.UNSUBACK then + -- DOC: 3.11 UNSUBACK – Unsubscribe acknowledgement + -- DOC: 3.11.2 UNSUBACK Variable Header + packet_id, err = parse_uint16(read_data) + if not packet_id then + return false, "failed to parse packet_id: "..err + end + -- 3.11.2.1 UNSUBACK Properties + packet = setmetatable({type=ptype, packet_id=packet_id}, packet_mt) + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + -- 3.11.3 UNSUBACK Payload + local rcs = {} + while input.available > 0 do + rc, err = parse_uint8(read_data) + if not rc then + return false, "failed to parse reason code: "..err + end + rcs[#rcs + 1] = rc + end + if not next(rcs) then + return false, "expecting at least one reason code in UNSUBACK" + end + packet.rc = rcs + elseif ptype == packet_type.PINGRESP then + -- DOC: 3.13 PINGRESP – PING response + packet = setmetatable({type=ptype, properties={}, user_properties={}}, packet_mt) + elseif ptype == packet_type.DISCONNECT then + -- DOC: 3.14 DISCONNECT – Disconnect notification + packet = setmetatable({type=ptype, rc=0, properties={}, user_properties={}}, packet_mt) + if input.available > 0 then + -- DOC: 3.14.2.1 Disconnect Reason Code + rc, err = parse_uint8(read_data) -- TODO: reason codes table + if not rc then + return false, "failed to parse rc: "..err + end + packet.rc = rc + -- DOC: 3.14.2.2 DISCONNECT Properties + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + end + elseif ptype == packet_type.AUTH then + -- DOC: 3.15 AUTH – Authentication exchange + -- DOC: 3.15.2.1 Authenticate Reason Code + packet = setmetatable({type=ptype, rc=0, properties={}, user_properties={}}, packet_mt) + if input.available > 1 then + rc, err = parse_uint8(read_data) + if not rc then + return false, "failed to parse Authenticate Reason Code: "..err + end + packet.rc = rc + -- DOC: 3.15.2.2 AUTH Properties + ok, err = parse_properties(ptype, read_data, input, packet) + if not ok then + return false, "failed to parse packet properties: "..err + end + end + else + return false, "unexpected packet type received: "..tostring(ptype) + end + if input.available > 0 then + return false, "extra data in remaining length left after packet parsing" + end + return packet +end + +-- export module table +return protocol5 + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqtt/tools.lua b/controller-host/mqtt/tools.lua new file mode 100644 index 0000000..48b8886 --- /dev/null +++ b/controller-host/mqtt/tools.lua @@ -0,0 +1,35 @@ +-- module table +local tools = {} + +-- load required stuff +local require = require + +local string = require("string") +local str_format = string.format +local str_byte = string.byte + +local table = require("table") +local tbl_concat = table.concat + +local math = require("math") +local math_floor = math.floor + + +-- Returns string encoded as HEX +function tools.hex(str) + local res = {} + for i = 1, #str do + res[i] = str_format("%02X", str_byte(str, i)) + end + return tbl_concat(res) +end + +-- Integer division function +function tools.div(x, y) + return math_floor(x / y) +end + +-- export module table +return tools + +-- vim: ts=4 sts=4 sw=4 noet ft=lua diff --git a/controller-host/mqttthread.lua b/controller-host/mqttthread.lua new file mode 100644 index 0000000..f0fd80a --- /dev/null +++ b/controller-host/mqttthread.lua @@ -0,0 +1,70 @@ +-- Load MQTT library +local mqtt = require("mqtt") +local client + +local eventChannel = love.thread.getChannel("mqtt_event") +local commandChannel = love.thread.getChannel("mqtt_command") + +local function call(target, ...) + local args = {...} + eventChannel:push { + target = target, + args = args + } +end + +local function onConnect(connack) + print("On connect") + call("connect", connack) +end + +local function onMessage(message) + if message.topic:sub(0, 7) == "spider/" then + message.topic = message.topic:sub(8) + end + print(message.topic) + call("message2", message.topic, message.payload) +end + +local function onCommand(command) + if command.command == "send" then + client:publish { + topic = "spider/" .. command.topic, + payload = command.arg, + qos = 0 + } + elseif command.command == "subscribe" then + local topic = "spider/" .. command.topic + assert(client:subscribe { + topic = topic + }) + print("Subribed to " .. topic) + end +end + +local function main() + client = mqtt.client { + uri = "mqtt.seeseepuff.be", + username = "mqtt_controller", + clean = true, + reconnect = 5, + } + + client:on { + connect = onConnect, + message = onMessage + } + + print("Connecting") + local ioloop = mqtt.get_ioloop() + local i = 0 + ioloop:add(function() + local command = commandChannel:pop() + if command then + onCommand(command) + end + end) + mqtt.run_ioloop(client) +end + +main() diff --git a/upload.lua b/upload.lua new file mode 100644 index 0000000..25dd694 --- /dev/null +++ b/upload.lua @@ -0,0 +1,54 @@ +package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua' +local mqtt = require("mqtt") +local client + +local file = ... +local fh = io.open(file, "rb") +local contents = fh:read("a") +fh:close() + +function printTable(table, indentation) + indentation = indentation or "" + for name, value in pairs(table) do + print(indentation .. tostring(name) .. ": " .. tostring(value)) + end +end + +local function onMessage(data) + print(data.payload) +end + +local function onConnect(connack) + if connack.rc ~= 0 then + print("Connection to broker failed:", connack:reason_string()) + os.exit(1) + end + print("Connected to MQTT") + + assert(client:subscribe{ + topic = "spider/controller/stdout" + }) + + io.write("Sending payload...") + assert(client:publish { + topic = "spider/controller/payload", + payload = contents, + qos = 0 + }) + print(" DONE!") +end + +client = mqtt.client { + uri = "mqtt.seeseepuff.be", + username = "mqtt_controller", + clean = true, + reconnect = 5, +} + +client:on { + connect = onConnect, + message = onMessage, +} + +print("Connecting") +mqtt.run_ioloop(client) diff --git a/upload.sh b/upload.sh new file mode 100755 index 0000000..d2626ff --- /dev/null +++ b/upload.sh @@ -0,0 +1,7 @@ +#!/bin/sh +set -e +cd controller-client +rm -vf ../controller-client.zip +zip -r ../controller-client.zip * +cd .. +lua upload.lua controller-client.zip