Compare commits

...

2 Commits

32 changed files with 3486 additions and 1649 deletions

View File

@ -0,0 +1,19 @@
local socket = require("socket")
local conn = socket.connect("localhost", 1234)
print("connected")
conn:settimeout(3)
local data, err, part = conn:receive(3)
print("Data:", data)
print("Err:", err)
print("Part:", part)
local data, err, part2 = conn:receive(3, part)
print("Data:", data)
print("Err:", err)
print("Part:", part2)
conn:close()

View File

@ -1,5 +1,6 @@
-- wrapper around BitOp module
-- luacheck: globals jit
if _VERSION == "Lua 5.1" or type(jit) == "table" then -- Lua 5.1 or LuaJIT (based on Lua 5.1)
return require("bit") -- custom module https://luarocks.org/modules/luarocks/luabitop
elseif _VERSION == "Lua 5.2" then

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,89 @@
-- base connector class for buffered reading.
--
-- Use this base class if the sockets do NOT yield.
-- So LuaSocket for example, when using Copas or OpenResty
-- use the non-buffered base class.
--
-- This base class derives from `non_buffered_base` it implements the
-- `receive` and `buffer_clear` methods. But adds the `plain_receive` method
-- that must be implemented.
--
-- NOTE: the `plain_receive` method is supposed to be non-blocking (see its
-- description), but the `send` method has no such facilities, so is `blocking`
-- in this class. Make sure to set the proper timeouts in either method before
-- starting the send/receive. So for example for LuaSocket call `settimeout(0)`
-- before receiving, and `settimeout(30)` before sending.
--
-- @class mqtt.connector.base.buffered_base
local super = require "mqtt.connector.base.non_buffered_base"
local buffered = setmetatable({}, super)
buffered.__index = buffered
buffered.super = super
buffered.type = "buffered, blocking i/o"
-- debug helper function
-- function buffered:buffer_state(msg)
-- print(string.format("buffer: size = %03d last-byte-done = %03d -- %s",
-- #(self.buffer_string or ""), self.buffer_pointer or 0, msg))
-- end
-- bytes read were handled, clear those
function buffered:buffer_clear()
-- self:buffer_state("before clearing buffer")
self.buffer_string = nil
self.buffer_pointer = nil
end
-- read bytes, first from buffer, remaining from function
-- if function returns "idle" then reset read pointer
function buffered:receive(size)
-- self:buffer_state("receive start "..size.." bytes")
local buf = self.buffer_string or ""
local idx = self.buffer_pointer or 0
while size > (#buf - idx) do
-- buffer is lacking bytes, read more...
local data, err = self:plain_receive(size - (#buf - idx))
if not data then
if err == self.signal_idle then
-- read timedout, retry entire packet later, reset buffer
self.buffer_pointer = 0
end
return data, err
end
-- append received data, and try again
buf = buf .. data
self.buffer_string = buf
-- self:buffer_state("receive added "..#data.." bytes")
end
self.buffer_pointer = idx + size
local data = buf:sub(idx + 1, idx + size)
-- print("data: ", require("mqtt.tools").hex(data))
-- self:buffer_state("receive done "..size.." bytes\n")
return data
end
--- Retrieves the requested number of bytes from the socket, in a non-blocking
-- manner.
-- The implementation MUST read with a timeout that immediately returns if there
-- is no data to read. If there is no data, then it MUST return
-- `nil, self.signal_idle` to indicate it no data was there and we need to retry later.
--
-- If there is partial data, it should return that data (less than the requested
-- number of bytes), with no error/signal.
--
-- If the receive errors, because of a closed connection it should return
-- `nil, self.signal_closed` to indicate this. Any other errors can be returned
-- as a regular `nil, err`.
-- @tparam size int number of bytes to receive.
-- @return data, or `false, err`, where `err` can be a signal.
function buffered:plain_receive(size) -- luacheck: ignore
error("method 'plain_receive' on buffered connector wasn't implemented")
end
return buffered

View File

@ -0,0 +1,29 @@
-- validates the LuaSec options, and applies defaults
return function(conn)
if conn.secure then
local params = conn.secure_params
if not params then
-- set default LuaSec options
conn.secure_params = {
mode = "client",
protocol = "any",
verify = "none",
options = {"all", "no_sslv2", "no_sslv3", "no_tlsv1"},
}
return
end
local ok, ssl = pcall(require, conn.ssl_module)
assert(ok, "ssl_module '"..tostring(conn.ssl_module).."' not found, secure connections unavailable")
assert(type(params) == "table", "expecting .secure_params to be a table, got: "..type(params))
params.mode = params.mode or "client"
assert(params.mode == "client", "secure parameter 'mode' must be set to 'client' if given, got: "..tostring(params.mode))
local ctx, err = ssl.newcontext(params)
if not ctx then
error("Couldn't create secure context: "..tostring(err))
end
end
end

View File

@ -0,0 +1,67 @@
-- base connector class for non-buffered reading.
--
-- Use this base class if the sockets DO yield.
-- So Copas or OpenResty for example, when using LuaSocket
-- use the buffered base class.
--
-- NOTE: when the send operation can also yield (as is the case with Copas and
-- OpenResty) you should wrap the `send` handler in a lock to prevent a half-send
-- message from being interleaved by another message send from another thread.
--
-- @class mqtt.connector.base.non_buffered_base
local non_buffered = {
type = "non-buffered, yielding i/o",
timeout = 30 -- default timeout
}
non_buffered.__index = non_buffered
-- we need to specify signals for these conditions such that the client
-- doesn't have to rely on magic strings like "timeout", "wantread", etc.
-- the connector is responsible for translating those connector specific
-- messages to a generic signal
non_buffered.signal_idle = {} -- read timeout occured, so we're idle need to come back later and try again
non_buffered.signal_closed = {} -- remote closed the connection
--- Validate connection options.
function non_buffered:shutdown() -- luacheck: ignore
error("method 'shutdown' on connector wasn't implemented")
end
--- Clears consumed bytes.
-- Called by the mqtt client when the consumed bytes from the buffer are handled
-- and can be cleared from the buffer.
-- A no-op for the non-buffered classes, since the sockets yield when incomplete.
function non_buffered.buffer_clear()
end
--- Retrieves the requested number of bytes from the socket.
-- If the receive errors, because of a closed connection it should return
-- `nil, self.signal_closed` to indicate this. Any other errors can be returned
-- as a regular `nil, err`.
-- @tparam size int number of retrieve to return.
-- @return data, or `false, err`, where `err` can be a signal.
function non_buffered:receive(size) -- luacheck: ignore
error("method 'receive' on non-buffered connector wasn't implemented")
end
--- Open network connection to `self.host` and `self.port`.
-- @return `true` on success, or `false, err` on failure
function non_buffered:connect() -- luacheck: ignore
error("method 'connect' on connector wasn't implemented")
end
--- Shutdown the network connection.
function non_buffered:shutdown() -- luacheck: ignore
error("method 'shutdown' on connector wasn't implemented")
end
--- Shutdown the network connection.
-- @tparam data string data to send
-- @return `true` on success, or `false, err` on failure
function non_buffered:send(data) -- luacheck: ignore
error("method 'send' on connector wasn't implemented")
end
return non_buffered

View File

@ -0,0 +1,121 @@
--- Copas based connector.
--
-- Copas is an advanced coroutine scheduler in pure-Lua. It uses LuaSocket
-- under the hood, but in a non-blocking way. It also uses LuaSec for TLS
-- based connections (like the `mqtt.connector.luasocket` one). And hence uses
-- the same defaults for the `secure` option when creating the `client`.
--
-- Caveats:
--
-- * the `client` option `ssl_module` is not supported by the Copas connector,
-- It will always use the module named `ssl`.
--
-- * multiple threads can send simultaneously (sending is wrapped in a lock)
--
-- * since the client creates a long lived connection for reading, it returns
-- upon receiving a packet, to call an event handler. The handler must return
-- swiftly, since while the handler runs the socket will not be reading.
-- Any task that might take longer than a few milliseconds should be off
-- loaded to another thread (the Copas-loop will take care of this).
--
-- NOTE: you will need to install copas like this: `luarocks install copas`.
-- @module mqtt.connector.copas
local super = require "mqtt.connector.base.non_buffered_base"
local connector = setmetatable({}, super)
connector.__index = connector
connector.super = super
local socket = require("socket")
local copas = require("copas")
local new_lock = require("copas.lock").new
local validate_luasec = require("mqtt.connector.base.luasec")
-- validate connection options
function connector:validate()
if self.secure then
assert(self.ssl_module == "ssl" or self.ssl_module == nil, "Copas connector only supports 'ssl' as 'ssl_module'")
validate_luasec(self)
end
end
-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function connector:connect()
self:validate()
local sock = copas.wrap(socket.tcp(), self.secure_params)
copas.setsocketname("mqtt@"..self.host..":"..self.port, sock)
sock:settimeouts(self.timeout, self.timeout, -1) -- no timout on reading
local ok, err = sock:connect(self.host, self.port)
if not ok then
return false, "copas.connect failed: "..err
end
self.sock = sock
self.send_lock = new_lock(30) -- 30 second timeout
return true
end
-- the packet was fully read, we can clear the bufer.
function connector:buffer_clear()
-- since the packet is complete, we wait now indefinitely for the next one
self.sock:settimeouts(nil, nil, -1) -- no timeout on reading
end
-- Shutdown network connection
function connector:shutdown()
self.sock:close()
self.send_lock:destroy()
end
-- Send data to network connection
function connector:send(data)
-- cache locally in case lock/sock gets replaced while we were sending
local sock = self.sock
local lock = self.send_lock
local ok, err = lock:get()
if not ok then
return nil, "failed acquiring send_lock: "..tostring(err)
end
local i = 1
while i < #data do
i, err = sock:send(data, i)
if not i then
lock:release()
return false, err
end
end
lock:release()
return true
end
-- Receive given amount of data from network connection
function connector:receive(size)
local sock = self.sock
local data, err = sock:receive(size)
if data then
-- bytes received, so change from idefinite timeout to regular until
-- packet is complete (see buffer_clear method)
self.sock:settimeouts(nil, nil, self.timeout)
return data
end
if err == "closed" then
return false, self.signal_closed
elseif err == "timout" then
return false, self.signal_idle
else
return false, err
end
end
-- export module table
return connector
-- vim: ts=4 sts=4 sw=4 noet ft=lua

View File

@ -0,0 +1,34 @@
--- Auto detect the connector to use.
-- The different environments require different socket implementations to work
-- properly. The 'connectors' are an abstraction to facilitate that without
-- having to modify the client itself.
--
-- This module is will auto-detect the environment and return the proper
-- module from;
--
-- * `mqtt.connector.nginx` for using the non-blocking OpenResty co-socket apis
--
-- * `mqtt.connector.copas` for the non-blocking Copas wrapped sockets
--
-- * `mqtt.connector.luasocket` for LuaSocket based sockets (blocking)
--
-- Since the selection is based on a.o. packages loaded, make sure that in case
-- of using the `copas` scheduler, you require it before the `mqtt` modules.
--
-- Since the `client` defaults to this module (`mqtt.connector`) there typically
-- is no need to use this directly. When implementing your own connectors,
-- the included connectors provide good examples of what to look out for.
-- @module mqtt.connector
local loops = setmetatable({
copas = "mqtt.connector.copas",
nginx = "mqtt.connector.nginx",
ioloop = "mqtt.connector.luasocket"
}, {
__index = function()
error("failed to auto-detect connector to use, please set one explicitly", 2)
end
})
local loop = require("mqtt.loop.detect")()
return require(loops[loop])

View File

@ -0,0 +1,142 @@
--- LuaSocket (and LuaSec) based connector.
--
-- This connector works with the blocking LuaSocket sockets. This connector uses
-- `LuaSec` for TLS connections. This is the connector used for the included
-- `mqtt.ioloop` scheduler.
--
-- When using TLS / MQTTS connections, the `secure` option passed to the `client`
-- when creating it, can be the standard table of options as used by LuaSec
-- for creating a context. When omitted the defaults will be;
-- `{ mode="client", protocol="any", verify="none",
-- options={ "all", "no_sslv2", "no_sslv3", "no_tlsv1" } }`
--
-- Caveats:
--
-- * since the client creates a long lived connection for reading, it returns
-- upon receiving a packet, to call an event handler. The handler must return
-- swiftly, since while the handler runs the socket will not be reading.
-- Any task that might take longer than a few milliseconds should be off
-- loaded to another task.
--
-- @module mqtt.connector.luasocket
local super = require "mqtt.connector.base.buffered_base"
local luasocket = setmetatable({}, super)
luasocket.__index = luasocket
luasocket.super = super
local socket = require("socket")
local validate_luasec = require("mqtt.connector.base.luasec")
-- table with error messages that indicate a read timeout
luasocket.timeout_errors = {
timeout = true, -- luasocket
wantread = true, -- luasec
wantwrite = true, -- luasec
}
-- validate connection options
function luasocket:validate()
if self.secure then
validate_luasec(self)
end
end
-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function luasocket:connect()
self:validate()
local ssl
if self.secure then
ssl = require(self.ssl_module)
end
self:buffer_clear() -- sanity
local sock = socket.tcp()
sock:settimeout(self.timeout)
local ok, err = sock:connect(self.host, self.port)
if not ok then
return false, "socket.connect failed to connect to '"..tostring(self.host)..":"..tostring(self.port).."': "..err
end
if self.secure_params then
-- Wrap socket in TLS one
do
local wrapped
wrapped, err = ssl.wrap(sock, self.secure_params)
if not wrapped then
sock:close(self) -- close TCP level
return false, "ssl.wrap() failed: "..tostring(err)
end
-- replace sock with wrapped secure socket
sock = wrapped
end
-- do TLS/SSL initialization/handshake
sock:settimeout(self.timeout) -- sanity; again since its now a luasec socket
ok, err = sock:dohandshake()
if not ok then
sock:close()
return false, "ssl dohandshake failed: "..tostring(err)
end
end
self.sock = sock
return true
end
-- Shutdown network connection
function luasocket:shutdown()
self.sock:close()
end
-- Send data to network connection
function luasocket:send(data)
local sock = self.sock
local i = 0
local err
sock:settimeout(self.timeout)
while i < #data do
i, err = sock:send(data, i + 1)
if not i then
return false, err
end
end
return true
end
-- Receive given amount of data from network connection
function luasocket:plain_receive(size)
local sock = self.sock
sock:settimeout(0)
local data, err, partial = sock:receive(size)
data = data or partial or ""
if #data > 0 then
return data
end
-- convert error to signal if required
if self.timeout_errors[err or -1] then
return false, self.signal_idle
elseif err == "closed" then
return false, self.signal_closed
else
return false, err
end
end
-- export module table
return luasocket
-- vim: ts=4 sts=4 sw=4 noet ft=lua

View File

@ -0,0 +1,102 @@
--- Nginx OpenResty co-sockets based connector.
--
-- This connector works with the non-blocking openresty sockets. Note that the
-- secure setting haven't been implemented yet. It will simply use defaults
-- when doing a TLS handshake.
--
-- Caveats:
--
-- * sockets cannot cross phase/context boundaries. So all client interaction
-- must be done from the timer context in which the client threads run.
--
-- * multiple threads cannot send simultaneously (simple scenarios will just
-- work)
--
-- * since the client creates a long lived connection for reading, it returns
-- upon receiving a packet, to call an event handler. The handler must return
-- swiftly, since while the handler runs the socket will not be reading.
-- Any task that might take longer than a few milliseconds should be off
-- loaded to another thread.
--
-- * Nginx timers should be short lived because memory is only released after
-- the context is destroyed. In this case we're using the fro prolonged periods
-- of time, so be aware of this and implement client restarts if required.
--
-- thanks to @irimiab: https://github.com/xHasKx/luamqtt/issues/13
-- @module mqtt.connector.nginx
local super = require "mqtt.connector.base.non_buffered_base"
local ngxsocket = setmetatable({}, super)
ngxsocket.__index = ngxsocket
ngxsocket.super = super
-- load required stuff
local ngx_socket_tcp = ngx.socket.tcp
local long_timeout = 7*24*60*60*1000 -- one week
-- validate connection options
function ngxsocket:validate()
if self.secure then
assert(self.ssl_module == "ssl", "specifying custom ssl module when using Nginx connector is not supported")
assert(self.secure_params == nil or type(self.secure_params) == "table", "expecting .secure_params to be a table if given")
-- TODO: validate nginx stuff
end
end
-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function ngxsocket:connect()
-- TODO: add a lock for sending to prevent multiple threads from writing to
-- the same socket simultaneously (see the Copas connector)
local sock = ngx_socket_tcp()
-- set read-timeout to 'nil' to not timeout at all
sock:settimeouts(self.timeout * 1000, self.timeout * 1000, long_timeout) -- no timeout on reading
local ok, err = sock:connect(self.host, self.port)
if not ok then
return false, "socket:connect failed: "..err
end
if self.secure then
sock:sslhandshake()
end
self.sock = sock
return true
end
-- Shutdown network connection
function ngxsocket:shutdown()
self.sock:close()
end
-- Send data to network connection
function ngxsocket:send(data)
return self.sock:send(data)
end
function ngxsocket:buffer_clear()
-- since the packet is complete, we wait now indefinitely for the next one
self.sock:settimeouts(self.timeout * 1000, self.timeout * 1000, long_timeout) -- no timeout on reading
end
-- Receive given amount of data from network connection
function ngxsocket:receive(size)
local sock = self.sock
local data, err = sock:receive(size)
if data then
-- bytes received, so change from idefinite timeout to regular until
-- packet is complete (see buffer_clear method)
self.sock:settimeouts(self.timeout * 1000, self.timeout * 1000, self.timeout * 1000)
return data
end
if err == "closed" then
return false, self.signal_closed
elseif err == "timout" then
return false, self.signal_idle
else
return false, err
end
end
-- export module table
return ngxsocket

View File

@ -0,0 +1,19 @@
--- MQTT const module
--- Module table
-- @tfield number v311 MQTT v3.1.1 protocol version constant
-- @tfield number v50 MQTT v5.0 protocol version constant
-- @tfield string _VERSION luamqtt library version string
-- @table const
local const = {
-- supported MQTT protocol versions
v311 = 4, -- supported protocol version, MQTT v3.1.1
v50 = 5, -- supported protocol version, MQTT v5.0
-- luamqtt library version string
_VERSION = "1.0.1",
}
return const
-- vim: ts=4 sts=4 sw=4 noet ft=lua

View File

@ -14,73 +14,253 @@ CONVENTIONS:
]]
--- Module table
-- @field v311 MQTT v3.1.1 protocol version constant
-- @field v50 MQTT v5.0 protocol version constant
-- @field _VERSION luamqtt version string
-- @tfield number v311 MQTT v3.1.1 protocol version constant
-- @tfield number v50 MQTT v5.0 protocol version constant
-- @tfield string _VERSION luamqtt library version string
-- @table mqtt
local mqtt = {
-- supported MQTT protocol versions
v311 = 4, -- supported protocol version, MQTT v3.1.1
v50 = 5, -- supported protocol version, MQTT v5.0
-- @see mqtt.const
-- @usage
-- local client = mqtt.client {
-- uri = "mqtts://aladdin:soopersecret@mqttbroker.com",
-- clean = true,
-- version = mqtt.v50, -- specify constant for MQTT version
-- }
local mqtt = {}
-- mqtt library version
_VERSION = "3.4.3",
}
-- copy all values from const module
local const = require("mqtt.const")
for key, value in pairs(const) do
mqtt[key] = value
end
-- load required stuff
local type = type
local log = require "mqtt.log"
local select = select
local require = require
local client = require("mqtt.client")
local client_create = client.create
local ioloop_get = require("mqtt.ioloop").get
local ioloop = require("mqtt.ioloop")
local ioloop_get = ioloop.get
--- Create new MQTT client instance
-- @param ... Same as for mqtt.client.create(...)
-- @see mqtt.client.client_mt:__init
-- @param ... Same as for `Client.create`(...)
-- @see Client:__init
function mqtt.client(...)
return client_create(...)
end
--- Returns default ioloop instance
--- Returns default `ioloop` instance. Shortcut to `Ioloop.get`.
-- @function mqtt.get_ioloop
-- @see Ioloop.get
mqtt.get_ioloop = ioloop_get
--- Run default ioloop for given MQTT clients or functions
-- @param ... MQTT clients or lopp functions to add to ioloop
-- @see mqtt.ioloop.get
-- @see mqtt.ioloop.run_until_clients
--- Run default `ioloop` for given MQTT clients or functions.
-- Will not return until all clients/functions have exited.
-- @param ... MQTT clients or loop functions to add to ioloop, see `Ioloop:add` for details on functions.
-- @see Ioloop.get
-- @see Ioloop.run_until_clients
-- @usage
-- mqtt.run_ioloop(client1, client2, func1)
function mqtt.run_ioloop(...)
log:info("starting default ioloop instance")
local loop = ioloop_get()
for i = 1, select("#", ...) do
local cl = select(i, ...)
loop:add(cl)
if type(cl) ~= "function" then
cl:start_connecting()
end
end
return loop:run_until_clients()
end
--- Run synchronous input/output loop for only one given MQTT client.
-- Provided client's connection will be opened.
-- Client reconnect feature will not work, and keep_alive too.
-- @param cl MQTT client instance to run
function mqtt.run_sync(cl)
local ok, err = cl:start_connecting()
if not ok then
return false, err
--- Validates a topic with wildcards.
-- @param t (string) wildcard topic to validate
-- @return topic, or false+error
-- @usage local t = assert(mqtt.validate_subscribe_topic("base/+/thermostat/#"))
function mqtt.validate_subscribe_topic(t)
if type(t) ~= "string" then
return false, "not a string"
end
while cl.connection do
ok, err = cl:_sync_iteration()
if not ok then
return false, err
if #t < 1 then
return false, "minimum topic length is 1"
end
do
local _, count = t:gsub("#", "")
if count > 1 then
return false, "wildcard '#' may only appear once"
end
if count == 1 then
if t ~= "#" and not t:find("/#$") then
return false, "wildcard '#' must be the last character, and be prefixed with '/' (unless the topic is '#')"
end
end
end
do
local t1 = "/"..t.."/"
local i = 1
while i do
i = t1:find("+", i)
if i then
if t1:sub(i-1, i+1) ~= "/+/" then
return false, "wildcard '+' must be enclosed between '/' (except at start/end)"
end
i = i + 1
end
end
end
return t
end
--- Validates a topic without wildcards.
-- @param t (string) topic to validate
-- @return topic, or false+error
-- @usage local t = assert(mqtt.validate_publish_topic("base/living/thermostat/setpoint"))
function mqtt.validate_publish_topic(t)
if type(t) ~= "string" then
return false, "not a string"
end
if #t < 1 then
return false, "minimum topic length is 1"
end
if t:find("+", nil, true) or t:find("#", nil, true) then
return false, "wildcards '#', and '+' are not allowed when publishing"
end
return t
end
--- Returns a Lua pattern from topic.
-- Takes a wildcarded-topic and returns a Lua pattern that can be used
-- to validate if a received topic matches the wildcard-topic
-- @param t (string) the wildcard topic
-- @return Lua-pattern (string) or false+err
-- @usage
-- local patt = compile_topic_pattern("homes/+/+/#")
--
-- local topic = "homes/myhome/living/mainlights/brightness"
-- local homeid, roomid, varargs = topic:match(patt)
function mqtt.compile_topic_pattern(t)
local ok, err = mqtt.validate_subscribe_topic(t)
if not ok then
return ok, err
end
if t == "#" then
t = "(.+)" -- matches anything at least 1 character long
else
-- first replace valid mqtt '+' and '#' with placeholders
local hash = string.char(1)
t = t:gsub("/#$", "/" .. hash)
local plus = string.char(2)
t = t:gsub("^%+$", plus)
t = t:gsub("^%+/", plus .. "/")
local c = 1
while c ~= 0 do -- must loop, since adjacent patterns can overlap
t, c = t:gsub("/%+/", "/" .. plus .. "/")
end
t = t:gsub("/%+$", "/" .. plus)
-- now escape any special Lua pattern characters
t = t:gsub("[%\\%(%)%.%%%+%-%*%?%[%^%$]", function(cap) return "%"..cap end)
-- finally replace placeholders with captures
t = t:gsub(hash,"(.-)") -- match anything, can be empty
t = t:gsub(plus,"([^/]-)") -- match anything between '/', can be empty
end
return "^"..t.."$"
end
--- Parses wildcards in a topic into a table.
-- @tparam topic string incoming topic string
-- @tparam table opts parsing options table
-- @tparam string opts.topic the wild-carded topic to match against (optional if `opts.pattern` is given)
-- @tparam string opts.pattern the compiled pattern for the wild-carded topic (optional if `opts.topic`
-- is given). If not given then topic will be compiled and the result will be
-- stored in this field for future use (cache).
-- @tparam array opts.keys array of field names. The order must be the same as the
-- order of the wildcards in `topic`
-- @treturn[1] table `fields`: the array part will have the values of the wildcards, in
-- the order they appeared. The hash part, will have the field names provided
-- in `opts.keys`, with the values of the corresponding wildcard. If a `#`
-- wildcard was used, that one will be the last in the table.
-- @treturn[1] `varargs`: The returned table is an array, with all segments that were
-- matched by the `#` wildcard (empty if there was no `#` wildcard).
-- @treturn[2] boolean `false` if there was no match
-- @return[3] `false`+err on error, eg. pattern was invalid.
-- @usage
-- local opts = {
-- topic = "homes/+/+/#",
-- keys = { "homeid", "roomid", "varargs"},
-- }
-- local fields, varargst = topic_match("homes/myhome/living/mainlights/brightness", opts)
--
-- print(fields[1], fields.homeid) -- "myhome myhome"
-- print(fields[2], fields.roomid) -- "living living"
-- print(fields[3], fields.varargs) -- "mainlights/brightness mainlights/brightness"
--
-- print(varargst[1]) -- "mainlights"
-- print(varargst[2]) -- "brightness"
function mqtt.topic_match(topic, opts)
if type(topic) ~= "string" then
return false, "expected topic to be a string"
end
if type(opts) ~= "table" then
return false, "expected options to be a table"
end
local pattern = opts.pattern
if not pattern then
local ptopic = opts.topic
if not ptopic then
return false, "either 'opts.topic' or 'opts.pattern' must set"
end
local err
pattern, err = mqtt.compile_topic_pattern(ptopic)
if not pattern then
return false, "failed to compile 'opts.topic' into pattern: "..tostring(err)
end
-- store/cache compiled pattern for next time
opts.pattern = pattern
end
local values = { topic:match(pattern) }
if values[1] == nil then
return false
end
local keys = opts.keys
if keys ~= nil then
if type(keys) ~= "table" then
return false, "expected 'opts.keys' to be a table (array)"
end
-- we have a table with keys, copy values to fields
for i, value in ipairs(values) do
local key = keys[i]
if key ~= nil then
values[key] = value
end
end
end
if not pattern:find("%(%.[%-%+]%)%$$") then -- pattern for "#" as last char
-- we're done
return values, {}
end
-- we have a '#' wildcard
local vararg = values[#values]
local varargs = {}
local i = 0
local ni = 0
while ni do
ni = vararg:find("/", i, true)
if ni then
varargs[#varargs + 1] = vararg:sub(i, ni-1)
i = ni + 1
else
varargs[#varargs + 1] = vararg:sub(i, -1)
end
end
return values, varargs
end
-- export module table
return mqtt

View File

@ -1,35 +1,28 @@
--- ioloop module
-- @module mqtt.ioloop
-- @alias ioloop
--- This class contains the ioloop implementation.
--
-- In short: allowing you to work with several MQTT clients in one script, and allowing them to maintain
-- a long-term connection to broker, using PINGs. This is the bundled alternative to Copas and Nginx.
--
-- NOTE: this module will work only with MQTT clients using the `connector.luasocket` connector.
--
-- Providing an IO loop instance dealing with efficient (as much as possible in limited lua IO) network communication
-- for several MQTT clients in the same OS thread.
-- The main idea is that you are creating an ioloop instance, then adding MQTT clients to it.
-- Then ioloop is starting an endless loop trying to receive/send data for all added MQTT clients.
-- You may add more or remove some MQTT clients to/from the ioloop after it has been created and started.
--
-- Using an ioloop is allowing you to run a MQTT client for long time, through sending PINGREQ packets to broker
-- in keepAlive interval to maintain long-term connection.
--
-- Also, any function can be added to the ioloop instance, and it will be called in the same endless loop over and over
-- alongside with added MQTT clients to provide you a piece of processor time to run your own logic (like running your own
-- network communications or any other thing good working in an io-loop)
-- @classmod Ioloop
--[[
ioloop module
In short: allowing you to work with several MQTT clients in one script, and allowing them to maintain
a long-term connection to broker, using PINGs.
NOTE: this module will work only with MQTT clients using standard luasocket/luasocket_ssl connectors.
In long:
Providing an IO loop instance dealing with efficient (as much as possible in limited lua IO) network communication
for several MQTT clients in the same OS thread.
The main idea is that you are creating an ioloop instance, then adding created and connected MQTT clients to it.
The ioloop instance is setting a non-blocking mode for sockets in MQTT clients and setting a small timeout
for their receive/send operations. Then ioloop is starting an endless loop trying to receive/send data for all added MQTT clients.
You may add more or remove some MQTT clients from the ioloop after it's created and started.
Using that ioloop is allowing you to run a MQTT client for long time, through sending PINGREQ packets to broker
in keepAlive interval to maintain long-term connection.
Also, any function can be added to the ioloop instance, and it will be called in the same endless loop over and over
alongside with added MQTT clients to provide you a piece of processor time to run your own logic (like running your own
network communications or any other thing good working in an io-loop)
]]
-- module table
local ioloop = {}
local _M = {}
-- load required stuff
local log = require "mqtt.log"
local next = next
local type = type
local ipairs = ipairs
@ -39,135 +32,214 @@ local setmetatable = setmetatable
local table = require("table")
local tbl_remove = table.remove
--- ioloop instances metatable
-- @type ioloop_mt
local ioloop_mt = {}
ioloop_mt.__index = ioloop_mt
local math = require("math")
local math_min = math.min
--- Initialize ioloop instance
-- @tparam table args ioloop creation arguments table
-- @tparam[opt=0.005] number args.timeout network operations timeout in seconds
-- @tparam[opt=0] number args.sleep sleep interval after each iteration
-- @tparam[opt] function args.sleep_function custom sleep function to call after each iteration
-- @treturn ioloop_mt ioloop instance
function ioloop_mt:__init(args)
args = args or {}
args.timeout = args.timeout or 0.005
args.sleep = args.sleep or 0
args.sleep_function = args.sleep_function or require("socket").sleep
self.args = args
--- ioloop instances metatable
local Ioloop = {}
Ioloop.__index = Ioloop
--- Initialize ioloop instance.
-- @tparam table opts ioloop creation options table
-- @tparam[opt=0] number opts.sleep_min min sleep interval after each iteration
-- @tparam[opt=0.002] number opts.sleep_step increase in sleep after every idle iteration
-- @tparam[opt=0.030] number opts.sleep_max max sleep interval after each iteration
-- @tparam[opt=luasocket.sleep] function opts.sleep_function custom sleep function to call after each iteration
-- @treturn Ioloop ioloop instance
function Ioloop:__init(opts)
log:debug("initializing ioloop instance '%s'", tostring(self))
opts = opts or {}
opts.sleep_min = opts.sleep_min or 0
opts.sleep_step = opts.sleep_step or 0.002
opts.sleep_max = opts.sleep_max or 0.030
opts.sleep_function = opts.sleep_function or require("socket").sleep
self.opts = opts
self.clients = {}
self.timeouts = setmetatable({}, { __mode = "v" })
self.running = false --ioloop running flag, used by MQTT clients which are adding after this ioloop started to run
end
--- Add MQTT client or a loop function to the ioloop instance
-- @tparam client_mt|function client MQTT client or a loop function to add to ioloop
--- Add MQTT client or a loop function to the ioloop instance.
-- When adding a function, the function should on each call return the time (in seconds) it wishes to sleep. The ioloop
-- will sleep after each iteration based on what clients/functions returned. So the function may be called sooner than
-- the requested time, but will not be called later.
-- @tparam client_mt|function client MQTT client or a loop function to add to ioloop
-- @return true on success or false and error message on failure
function ioloop_mt:add(client)
-- @usage
-- -- create a timer on a 1 second interval
-- local timer do
-- local interval = 1
-- local next_call = socket.gettime() + interval
-- timer = function()
-- if next_call >= socket.gettime() then
--
-- -- do stuff here
--
-- next_call = socket.gettime() + interval
-- return interval
-- else
-- return next_call - socket.gettime()
-- end
-- end
-- end
--
-- loop:add(timer)
function Ioloop:add(client)
local clients = self.clients
if clients[client] then
return false, "such MQTT client or loop function is already added to this ioloop"
if type(client) == "table" then
log:warn("MQTT client '%s' was already added to ioloop '%s'", client.opts.id, tostring(self))
return false, "MQTT client was already added to this ioloop"
else
log:warn("MQTT loop function '%s' was already added to this ioloop '%s'", tostring(client), tostring(self))
return false, "MQTT loop function was already added to this ioloop"
end
end
clients[#clients + 1] = client
clients[client] = true
self.timeouts[client] = self.opts.sleep_min
-- associate ioloop with adding MQTT client
if type(client) ~= "function" then
client:set_ioloop(self)
if type(client) == "table" then
log:info("adding client '%s' to ioloop '%s'", client.opts.id, tostring(self))
-- create and add function for PINGREQ
local function f()
if not clients[client] then
-- the client were supposed to do keepalive for is gone, remove ourselves
self:remove(f)
end
return client:check_keep_alive()
end
-- add it to start doing keepalive checks
self:add(f)
else
log:info("adding function '%s' to ioloop '%s'", tostring(client), tostring(self))
end
return true
end
--- Remove MQTT client or a loop function from the ioloop instance
-- @tparam client_mt|function client MQTT client or a loop function to remove from ioloop
-- @tparam client_mt|function client MQTT client or a loop function to remove from ioloop
-- @return true on success or false and error message on failure
function ioloop_mt:remove(client)
function Ioloop:remove(client)
local clients = self.clients
if not clients[client] then
return false, "no such MQTT client or loop function was added to ioloop"
if type(client) == "table" then
log:warn("MQTT client not found '%s' in ioloop '%s'", client.opts.id, tostring(self))
return false, "MQTT client not found"
else
log:warn("MQTT loop function not found '%s' in ioloop '%s'", tostring(client), tostring(self))
return false, "MQTT loop function not found"
end
end
clients[client] = nil
-- search an index of client to remove
for i, item in ipairs(clients) do
if item == client then
-- found it, remove
tbl_remove(clients, i)
clients[client] = nil
break
end
end
-- unlink ioloop from MQTT client
if type(client) ~= "function" then
client:set_ioloop(nil)
if type(client) == "table" then
log:info("removed client '%s' from ioloop '%s'", client.opts.id, tostring(self))
else
log:info("removed loop function '%s' from ioloop '%s'", tostring(client), tostring(self))
end
return true
end
--- Perform one ioloop iteration
function ioloop_mt:iteration()
self.timeouted = false
--- Perform one ioloop iteration.
-- TODO: make this smarter do not wake-up functions or clients returning a longer
-- sleep delay. Currently they will be tried earlier if another returns a smaller delay.
function Ioloop:iteration()
local opts = self.opts
local sleep = opts.sleep_max
for _, client in ipairs(self.clients) do
local t, err
-- read data and handle events
if type(client) ~= "function" then
client:_ioloop_iteration()
t, err = client:step()
else
client()
t = client() or opts.sleep_max
end
if t == -1 then
-- no data read, client is idle, step up timeout
t = math_min(self.timeouts[client] + opts.sleep_step, opts.sleep_max)
self.timeouts[client] = t
elseif not t then
-- an error from a client was returned
if not client.opts.reconnect then
-- error and not reconnecting, remove the client
log:info("client '%s' returned '%s', no re-connect set, removing client", client.opts.id, err)
self:remove(client)
t = opts.sleep_max
else
-- error, but will reconnect
log:error("client '%s' failed with '%s', will try re-connecting", client.opts.id, err)
t = opts.sleep_min -- try asap
end
else
-- a number of seconds was returned
t = math_min(t, opts.sleep_max)
self.timeouts[client] = opts.sleep_min
end
sleep = math_min(sleep, t)
end
-- sleep a bit
local args = self.args
local sleep = args.sleep
if sleep > 0 then
args.sleep_function(sleep)
opts.sleep_function(sleep)
end
end
--- Perform sleep if no one of the network operation in current iteration was not timeouted
function ioloop_mt:can_sleep()
if not self.timeouted then
local args = self.args
args.sleep_function(args.timeout)
self.timeouted = true
end
end
--- Run the ioloop.
-- While there is at least one client/function in the ioloop it will continue
-- iterating. After all clients/functions are gone, it will return.
function Ioloop:run_until_clients()
log:info("ioloop started with %d clients/functions", #self.clients)
--- Run ioloop until at least one client are in ioloop
function ioloop_mt:run_until_clients()
self.running = true
while next(self.clients) do
self:iteration()
end
self.running = false
log:info("ioloop finished with %d clients/functions", #self.clients)
end
-------
--- Exported functions
-- @section exported
--- Create IO loop instance with given options
-- @see ioloop_mt:__init
-- @treturn ioloop_mt ioloop instance
local function ioloop_create(args)
local inst = setmetatable({}, ioloop_mt)
inst:__init(args)
-- @name ioloop.create
-- @see Ioloop:__init
-- @treturn Ioloop ioloop instance
function _M.create(opts)
local inst = setmetatable({}, Ioloop)
inst:__init(opts)
return inst
end
ioloop.create = ioloop_create
-- Default ioloop instance
local ioloop_instance
--- Returns default ioloop instance
-- @name ioloop.get
-- @tparam[opt=true] boolean autocreate Automatically create ioloop instance
-- @tparam[opt] table args Arguments for creating ioloop instance
-- @treturn ioloop_mt ioloop instance
function ioloop.get(autocreate, args)
-- @tparam[opt] table opts Arguments for creating ioloop instance
-- @treturn Ioloop ioloop instance
function _M.get(autocreate, opts)
if autocreate == nil then
autocreate = true
end
if autocreate then
if not ioloop_instance then
ioloop_instance = ioloop_create(args)
end
if autocreate and not ioloop_instance then
log:info("auto-creating default ioloop instance")
ioloop_instance = _M.create(opts)
end
return ioloop_instance
end
@ -175,6 +247,6 @@ end
-------
-- export module table
return ioloop
return _M
-- vim: ts=4 sts=4 sw=4 noet ft=lua

View File

@ -0,0 +1,17 @@
-- logging
-- returns a LuaLogging compatible logger object if LuaLogging was already loaded
-- otherwise returns a stub
local ll = package.loaded.logging
if ll and type(ll) == "table" and ll.defaultLogger and
tostring(ll._VERSION):find("LuaLogging") then
-- default LuaLogging logger is available
return ll.defaultLogger()
else
-- just use a stub logger with only no-op functions
local nop = function() end
return setmetatable({}, {
__index = function(self, key) self[key] = nop return nop end
})
end

View File

@ -0,0 +1,72 @@
--- Copas specific client handling module.
-- Typically this module is not used directly, but through `mqtt.loop` when
-- auto-detecting the environment.
-- @module mqtt.loop.copas
local copas = require "copas"
local log = require "mqtt.log"
local client_registry = {}
local _M = {}
--- Add MQTT client to the Copas scheduler.
-- Each received packet will be handled by a new thread, such that the thread
-- listening on the socket can return immediately.
-- The client will automatically be removed after it exits. It will set up a
-- thread to call `Client:check_keep_alive`.
-- @param cl mqtt-client to add to the Copas scheduler
-- @return `true` on success or `false` and error message on failure
function _M.add(cl)
if client_registry[cl] then
log:warn("MQTT client '%s' was already added to Copas", cl.opts.id)
return false, "MQTT client was already added to Copas"
end
client_registry[cl] = true
do -- make mqtt device async for incoming packets
local handle_received_packet = cl.handle_received_packet
local count = 0
-- replace packet handler; create a new thread for each packet received
cl.handle_received_packet = function(mqttdevice, packet)
count = count + 1
copas.addnamedthread(handle_received_packet, cl.opts.id..":receive_"..count, mqttdevice, packet)
return true
end
end
-- add keep-alive timer
local timer = copas.addnamedthread(function()
while client_registry[cl] do
local next_check = cl:check_keep_alive()
if next_check > 0 then
copas.pause(next_check)
end
end
end, cl.opts.id .. ":keep_alive")
-- add client to connect and listen
copas.addnamedthread(function()
while client_registry[cl] do
local timeout = cl:step()
if not timeout then
client_registry[cl] = nil -- exiting
log:debug("MQTT client '%s' exited, removed from Copas", cl.opts.id)
copas.wakeup(timer)
else
if timeout > 0 then
copas.pause(timeout)
end
end
end
end, cl.opts.id .. ":listener")
return true
end
return setmetatable(_M, {
__call = function(self, ...)
return self.add(...)
end,
})

View File

@ -0,0 +1,30 @@
--- Module returns a single function to detect the io-loop in use.
-- Either 'copas', 'nginx', or 'ioloop', or nil+error
local log = require "mqtt.log"
local loop
return function()
if loop then return loop end
if type(ngx) == "table" then
-- there is a global 'ngx' table, so we're running OpenResty
log:info("LuaMQTT auto-detected Nginx as the runtime environment")
loop = "nginx"
return loop
elseif package.loaded.copas then
-- 'copas' was already loaded
log:info("LuaMQTT auto-detected Copas as the io-loop in use")
loop = "copas"
return loop
elseif pcall(require, "socket") and tostring(require("socket")._VERSION):find("LuaSocket") then
-- LuaSocket is available
log:info("LuaMQTT auto-detected LuaSocket as the socket library to use with mqtt-ioloop")
loop = "ioloop"
return loop
else
-- unknown
return nil, "LuaMQTT io-loop/connector auto-detection failed, please specify one explicitly"
end
end

View File

@ -0,0 +1,37 @@
--- Auto detect the IO loop to use.
-- Interacting with the supported IO loops (ioloop, copas, and nginx) requires
-- specific implementations to get it right.
-- This module will auto-detect the environment and return the proper
-- module from;
--
-- * `mqtt.loop.ioloop`
--
-- * `mqtt.loop.copas`
--
-- * `mqtt.loop.nginx`
--
-- Since the selection is based on a.o. packages loaded, make sure that in case
-- of using the `copas` scheduler, you require it before the `mqtt` modules.
--
-- @usage
-- --local copas = require "copas" -- only if you use Copas
-- local mqtt = require "mqtt"
-- local add_client = require("mqtt.loop").add -- returns a loop-specific function
--
-- local client = mqtt.create { ... options ... }
-- add_client(client) -- works for ioloop, copas, and nginx
--
-- @module mqtt.loop
local loops = setmetatable({
copas = "mqtt.loop.copas",
nginx = "mqtt.loop.nginx",
ioloop = "mqtt.loop.ioloop"
}, {
__index = function()
error("failed to auto-detect connector to use, please set one explicitly", 2)
end
})
local loop = require("mqtt.loop.detect")()
return require(loops[loop])

View File

@ -0,0 +1,24 @@
--- IOloop specific client handling module.
-- Typically this module is not used directly, but through `mqtt.loop` when
-- auto-detecting the environment.
-- @module mqtt.loop.ioloop
local _M = {}
local mqtt = require "mqtt"
--- Add MQTT client to the integrated ioloop.
-- The client will automatically be removed after it exits. It will set up a
-- function to call `Client:check_keep_alive` in the ioloop.
-- @param client mqtt-client to add to the ioloop
-- @return `true` on success or `false` and error message on failure
function _M.add(client)
local default_loop = mqtt.get_ioloop()
return default_loop:add(client)
end
return setmetatable(_M, {
__call = function(self, ...)
return self.add(...)
end,
})

View File

@ -0,0 +1,76 @@
--- Nginx specific client handling module.
-- Typically this module is not used directly, but through `mqtt.loop` when
-- auto-detecting the environment.
-- @module mqtt.loop.nginx
local client_registry = {}
local _M = {}
--- Add MQTT client to the Nginx environment.
-- The client will automatically be removed after it exits. It will set up a
-- thread to call `Client:check_keep_alive`.
-- @param client mqtt-client to add to the Nginx environment
-- @return `true` on success or `false` and error message on failure
function _M.add(client)
if client_registry[client] then
ngx.log(ngx.WARN, "MQTT client '%s' was already added to Nginx", client.opts.id)
return false, "MQTT client was already added to Nginx"
end
do -- make mqtt device async for incoming packets
local handle_received_packet = client.handle_received_packet
-- replace packet handler; create a new thread for each packet received
client.handle_received_packet = function(mqttdevice, packet)
ngx.thread.spawn(handle_received_packet, mqttdevice, packet)
return true
end
end
local ok, err = ngx.timer.at(0, function()
-- spawn a thread to listen on the socket
local coro = ngx.thread.spawn(function()
while true do
local sleeptime = client:step()
if not sleeptime then
ngx.log(ngx.INFO, "MQTT client '", client.opts.id, "' exited, stopping client-thread")
client_registry[client] = nil
return
else
if sleeptime > 0 then
ngx.sleep(sleeptime * 1000)
end
end
end
end)
-- endless keep-alive loop
while not ngx.worker.exiting() do
ngx.sleep((client:check_keep_alive())) -- double (()) to trim to 1 argument
end
-- exiting
client_registry[client] = nil
ngx.log(ngx.DEBUG, "MQTT client '", client.opts.id, "' keep-alive loop exited")
client:disconnect()
ngx.thread.wait(coro)
ngx.log(ngx.DEBUG, "MQTT client '", client.opts.id, "' exit complete")
end)
if not ok then
ngx.log(ngx.CRIT, "Failed to start timer-context for device '", client.id,"': ", err)
return false, "timer failed: " .. err
end
return true
end
return setmetatable(_M, {
__call = function(self, ...)
return self.add(...)
end,
})

View File

@ -1,48 +0,0 @@
-- DOC: https://keplerproject.github.io/copas/
-- NOTE: you will need to install copas like this: luarocks install copas
-- module table
local connector = {}
local socket = require("socket")
local copas = require("copas")
-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function connector.connect(conn)
local sock, err = socket.connect(conn.host, conn.port)
if not sock then
return false, "socket.connect failed: "..err
end
conn.sock = sock
return true
end
-- Shutdown network connection
function connector.shutdown(conn)
conn.sock:shutdown()
end
-- Send data to network connection
function connector.send(conn, data, i, j)
local ok, err = copas.send(conn.sock, data, i, j)
return ok, err
end
-- Receive given amount of data from network connection
function connector.receive(conn, size)
local ok, err = copas.receive(conn.sock, size)
return ok, err
end
-- Set connection's socket to non-blocking mode and set a timeout for it
function connector.settimeout(conn, timeout)
conn.timeout = timeout
conn.sock:settimeout(0)
end
-- export module table
return connector
-- vim: ts=4 sts=4 sw=4 noet ft=lua

View File

@ -1,54 +0,0 @@
-- DOC: http://w3.impa.br/~diego/software/luasocket/tcp.html
-- module table
local luasocket = {}
local socket = require("socket")
-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function luasocket.connect(conn)
local sock, err = socket.connect(conn.host, conn.port)
if not sock then
return false, "socket.connect failed: "..err
end
conn.sock = sock
return true
end
-- Shutdown network connection
function luasocket.shutdown(conn)
conn.sock:shutdown()
end
-- Send data to network connection
function luasocket.send(conn, data, i, j)
conn.sock:settimeout(nil, "t")
local ok, err = conn.sock:send(data, i, j)
conn.sock:settimeout(conn.timeout, "t")
-- print(" luasocket.send:", ok, err, require("mqtt.tools").hex(data))
return ok, err
end
-- Receive given amount of data from network connection
function luasocket.receive(conn, size)
local ok, err = conn.sock:receive(size)
--if ok then
-- print(" luasocket.receive good:", size, #ok, require("mqtt.tools").hex(ok))
--elseif err ~= "timeout" then
-- print(" luasocket.receive fail:", ok, err)
--end
return ok, err
end
-- Set connection's socket to non-blocking mode and set a timeout for it
function luasocket.settimeout(conn, timeout)
conn.timeout = timeout
conn.sock:settimeout(timeout, "b")
end
-- export module table
return luasocket
-- vim: ts=4 sts=4 sw=4 noet ft=lua

View File

@ -1,56 +0,0 @@
-- DOC: http://w3.impa.br/~diego/software/luasocket/tcp.html
-- module table
local luasocket_ssl = {}
local type = type
local assert = assert
local luasocket = require("mqtt.luasocket")
-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function luasocket_ssl.connect(conn)
assert(type(conn.secure_params) == "table", "expecting .secure_params to be a table")
-- open usual TCP connection
local ok, err = luasocket.connect(conn)
if not ok then
return false, "luasocket connect failed: "..err
end
local wrapped
-- load right ssl module
local ssl = require(conn.ssl_module or "ssl")
-- TLS/SSL initialization
wrapped, err = ssl.wrap(conn.sock, conn.secure_params)
if not wrapped then
conn.sock:shutdown()
return false, "ssl.wrap() failed: "..err
end
ok = wrapped:dohandshake()
if not ok then
conn.sock:shutdown()
return false, "ssl dohandshake failed"
end
-- replace sock in connection table with wrapped secure socket
conn.sock = wrapped
return true
end
-- Shutdown network connection
function luasocket_ssl.shutdown(conn)
conn.sock:close()
end
-- Copy original methods from mqtt.luasocket module
luasocket_ssl.send = luasocket.send
luasocket_ssl.receive = luasocket.receive
luasocket_ssl.settimeout = luasocket.settimeout
-- export module table
return luasocket_ssl
-- vim: ts=4 sts=4 sw=4 noet ft=lua

View File

@ -1,55 +0,0 @@
-- module table
-- thanks to @irimiab: https://github.com/xHasKx/luamqtt/issues/13
local ngxsocket = {}
-- load required stuff
local string_sub = string.sub
local ngx_socket_tcp = ngx.socket.tcp -- luacheck: ignore
-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function ngxsocket.connect(conn)
local socket = ngx_socket_tcp()
socket:settimeout(0x7FFFFFFF)
local sock, err = socket:connect(conn.host, conn.port)
if not sock then
return false, "socket:connect failed: "..err
end
if conn.secure then
socket:sslhandshake()
end
conn.sock = socket
return true
end
-- Shutdown network connection
function ngxsocket.shutdown(conn)
conn.sock:close()
end
-- Send data to network connection
function ngxsocket.send(conn, data, i, j)
if i then
return conn.sock:send(string_sub(data, i, j))
else
return conn.sock:send(data)
end
end
-- Receive given amount of data from network connection
function ngxsocket.receive(conn, size)
return conn.sock:receive(size)
end
-- Set connection's socket to non-blocking mode and set a timeout for it
function ngxsocket.settimeout(conn, timeout)
if not timeout then
conn.sock:settimeout(0x7FFFFFFF)
else
conn.sock:settimeout(timeout * 1000)
end
end
-- export module table
return ngxsocket

View File

@ -6,10 +6,10 @@
Here is a generic implementation of MQTT protocols of all supported versions.
MQTT v3.1.1 documentation (DOCv3.1.1):
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html
DOC[1]: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html
MQTT v5.0 documentation (DOCv5.0):
http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
DOC[2]: http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
CONVENTIONS:
@ -44,6 +44,10 @@ local str_char = string.char
local str_byte = string.byte
local str_format = string.format
local const = require("mqtt.const")
local const_v311 = const.v311
local const_v50 = const.v50
local bit = require("mqtt.bitwrap")
local bor = bit.bor
local band = bit.band
@ -52,26 +56,33 @@ local rshift = bit.rshift
local tools = require("mqtt.tools")
local div = tools.div
local sortedpairs = tools.sortedpairs
-- Create uint8 value data
local function make_uint8(val)
--- Create bytes of the uint8 value
-- @tparam number val - integer value to convert to bytes
-- @treturn string bytes of the value
function protocol.make_uint8(val)
if val < 0 or val > 0xFF then
error("value is out of range to encode as uint8: "..tostring(val))
end
return str_char(val)
end
protocol.make_uint8 = make_uint8
local make_uint8 = protocol.make_uint8
-- Create uint16 value data
local function make_uint16(val)
--- Create bytes of the uint16 value
-- @tparam number val - integer value to convert to bytes
-- @treturn string bytes of the value
function protocol.make_uint16(val)
if val < 0 or val > 0xFFFF then
error("value is out of range to encode as uint16: "..tostring(val))
end
return str_char(rshift(val, 8), band(val, 0xFF))
end
protocol.make_uint16 = make_uint16
local make_uint16 = protocol.make_uint16
-- Create uint32 value data
--- Create bytes of the uint32 value
-- @tparam number val - integer value to convert to bytes
-- @treturn string bytes of the value
function protocol.make_uint32(val)
if val < 0 or val > 0xFFFFFFFF then
error("value is out of range to encode as uint32: "..tostring(val))
@ -79,18 +90,27 @@ function protocol.make_uint32(val)
return str_char(rshift(val, 24), band(rshift(val, 16), 0xFF), band(rshift(val, 8), 0xFF), band(val, 0xFF))
end
-- Create UTF-8 string data
-- DOCv3.1.1: 1.5.3 UTF-8 encoded strings
-- DOCv5.0: 1.5.4 UTF-8 Encoded String
--- Create bytes of the UTF-8 string value according to the MQTT spec.
-- Basically it's the same string with its length prefixed as uint16 value.
-- For MQTT v3.1.1: <b>1.5.3 UTF-8 encoded strings</b>,
-- For MQTT v5.0: <b>1.5.4 UTF-8 Encoded String</b>.
-- @tparam string str - string value to convert to bytes
-- @treturn string bytes of the value
function protocol.make_string(str)
return make_uint16(str:len())..str
end
-- Returns bytes of given integer value encoded as variable length field
-- DOCv3.1.1: 2.2.3 Remaining Length
-- DOCv5.0: 2.1.4 Remaining Length
local function make_var_length(len)
if len < 0 or len > 268435455 then
--- Maximum integer value (268435455) that can be encoded using variable-length encoding
protocol.max_variable_length = 268435455
local max_variable_length = protocol.max_variable_length
--- Create bytes of the integer value encoded as variable length field
-- For MQTT v3.1.1: <b>2.2.3 Remaining Length</b>,
-- For MQTT v5.0: <b>2.1.4 Remaining Length</b>.
-- @tparam number len - integer value to be encoded
-- @treturn string bytes of the value
function protocol.make_var_length(len)
if len < 0 or len > max_variable_length then
error("value is invalid for encoding as variable length field: "..tostring(len))
end
local bytes = {}
@ -106,9 +126,11 @@ local function make_var_length(len)
until len <= 0
return unpack(bytes)
end
protocol.make_var_length = make_var_length
local make_var_length = protocol.make_var_length
-- Make data for 1-byte property with only 0 or 1 value
--- Make bytes for 1-byte value with only 0 or 1 value allowed
-- @tparam number value - integer value to convert to bytes
-- @treturn string bytes of the value
function protocol.make_uint8_0_or_1(value)
if value ~= 0 and value ~= 1 then
error("expecting 0 or 1 as value")
@ -116,7 +138,9 @@ function protocol.make_uint8_0_or_1(value)
return make_uint8(value)
end
-- Make data for 2-byte property with nonzero value check
--- Make bytes for 2-byte value with nonzero check
-- @tparam number value - integer value to convert to bytes
-- @treturn string bytes of the value
function protocol.make_uint16_nonzero(value)
if value == 0 then
error("expecting nonzero value")
@ -124,7 +148,9 @@ function protocol.make_uint16_nonzero(value)
return make_uint16(value)
end
-- Make data for variable length property with nonzero value check
--- Make bytes for variable length value with nonzero value check
-- @tparam number value - integer value to convert to bytes
-- @treturn string bytes of the value
function protocol.make_var_length_nonzero(value)
if value == 0 then
error("expecting nonzero value")
@ -132,24 +158,29 @@ function protocol.make_var_length_nonzero(value)
return make_var_length(value)
end
-- Read string using given read_func function
-- Returns false plus error message on failure
-- Returns parsed string on success
--- Read string (or bytes) using given read_func function
-- @tparam function read_func - function to read some bytes from the network layer
-- @treturn string parsed string (or bytes) on success
-- @return OR false and error message on failure
function protocol.parse_string(read_func)
assert(type(read_func) == "function", "expecting read_func to be a function")
local len, err = read_func(2)
if not len then
return false, "failed to read string length: "..err
end
-- convert len string from 2-byte integer
-- convert string length from 2 bytes
local byte1, byte2 = str_byte(len, 1, 2)
len = bor(lshift(byte1, 8), byte2)
-- and return string if parsed length
-- and return string/bytes of the parsed length
return read_func(len)
end
local parse_string = protocol.parse_string
-- Parse uint8 value using given read_func
local function parse_uint8(read_func)
--- Parse uint8 value using given read_func
-- @tparam function read_func - function to read some bytes from the network layer
-- @treturn number parser value
-- @return OR false and error message on failure
function protocol.parse_uint8(read_func)
assert(type(read_func) == "function", "expecting read_func to be a function")
local value, err = read_func(1)
if not value then
@ -157,9 +188,12 @@ local function parse_uint8(read_func)
end
return str_byte(value, 1, 1)
end
protocol.parse_uint8 = parse_uint8
local parse_uint8 = protocol.parse_uint8
-- Parse uint8 value with only 0 or 1 value
--- Parse uint8 value using given read_func with only 0 or 1 value allowed
-- @tparam function read_func - function to read some bytes from the network layer
-- @treturn number parser value
-- @return OR false and error message on failure
function protocol.parse_uint8_0_or_1(read_func)
local value, err = parse_uint8(read_func)
if not value then
@ -171,8 +205,11 @@ function protocol.parse_uint8_0_or_1(read_func)
return value
end
-- Parse uint16 value using given read_func
local function parse_uint16(read_func)
--- Parse uint16 value using given read_func
-- @tparam function read_func - function to read some bytes from the network layer
-- @treturn number parser value
-- @return OR false and error message on failure
function protocol.parse_uint16(read_func)
assert(type(read_func) == "function", "expecting read_func to be a function")
local value, err = read_func(2)
if not value then
@ -181,9 +218,12 @@ local function parse_uint16(read_func)
local byte1, byte2 = str_byte(value, 1, 2)
return lshift(byte1, 8) + byte2
end
protocol.parse_uint16 = parse_uint16
local parse_uint16 = protocol.parse_uint16
-- Parse uint16 non-zero value using given read_func
--- Parse uint16 non-zero value using given read_func
-- @tparam function read_func - function to read some bytes from the network layer
-- @treturn number parser value
-- @return OR false and error message on failure
function protocol.parse_uint16_nonzero(read_func)
local value, err = parse_uint16(read_func)
if not value then
@ -195,7 +235,10 @@ function protocol.parse_uint16_nonzero(read_func)
return value
end
-- Parse uint32 value using given read_func
--- Parse uint32 value using given read_func
-- @tparam function read_func - function to read some bytes from the network layer
-- @treturn number parser value
-- @return OR false and error message on failure
function protocol.parse_uint32(read_func)
assert(type(read_func) == "function", "expecting read_func to be a function")
local value, err = read_func(4)
@ -210,11 +253,18 @@ function protocol.parse_uint32(read_func)
end
end
-- Max variable length integer value
-- Max multiplier of the variable length integer value
local max_mult = 128 * 128 * 128
-- Returns variable length field value calling read_func function read data, DOC: 2.2.3 Remaining Length
local function parse_var_length(read_func)
--- Parse variable length field value using given read_func.
-- For MQTT v3.1.1: <b>2.2.3 Remaining Length</b>,
-- For MQTT v5.0: <b>2.1.4 Remaining Length</b>.
-- @tparam function read_func - function to read some bytes from the network layer
-- @treturn number parser value
-- @return OR false and error message on failure
function protocol.parse_var_length(read_func)
-- DOC[1]: 2.2.3 Remaining Length
-- DOC[2]: 1.5.5 Variable Byte Integer
assert(type(read_func) == "function", "expecting read_func to be a function")
local mult = 1
local val = 0
@ -232,9 +282,14 @@ local function parse_var_length(read_func)
until band(byte, 128) == 0
return val
end
protocol.parse_var_length = parse_var_length
local parse_var_length = protocol.parse_var_length
-- Parse Variable Byte Integer with non-zero constraint
--- Parse variable length field value using given read_func with non-zero constraint.
-- For MQTT v3.1.1: <b>2.2.3 Remaining Length</b>,
-- For MQTT v5.0: <b>2.1.4 Remaining Length</b>.
-- @tparam function read_func - function to read some bytes from the network layer
-- @treturn number parser value
-- @return OR false and error message on failure
function protocol.parse_var_length_nonzero(read_func)
local value, err = parse_var_length(read_func)
if not value then
@ -246,29 +301,40 @@ function protocol.parse_var_length_nonzero(read_func)
return value
end
-- Create fixed packet header data
-- DOCv3.1.1: 2.2 Fixed header
-- DOCv5.0: 2.1.1 Fixed Header
--- Create bytes of the MQTT fixed packet header
-- For MQTT v3.1.1: <b>2.2 Fixed header</b>,
-- For MQTT v5.0: <b>2.1.1 Fixed Header</b>.
-- @tparam number ptype - MQTT packet type
-- @tparam number flags - MQTT packet flags
-- @tparam number len - MQTT packet length
-- @treturn string bytes of the fixed packet header
function protocol.make_header(ptype, flags, len)
local byte1 = bor(lshift(ptype, 4), band(flags, 0x0F))
return str_char(byte1, make_var_length(len))
end
-- Returns true if given value is a valid QoS
--- Check if given value is a valid PUBLISH message QoS value
-- @tparam number val - QoS value
-- @treturn boolean true for valid QoS value, otherwise false
function protocol.check_qos(val)
return (val == 0) or (val == 1) or (val == 2)
end
-- Returns true if given value is a valid Packet Identifier
-- DOCv3.1.1: 2.3.1 Packet Identifier
-- DOCv5.0: 2.2.1 Packet Identifier
--- Check if given value is a valid Packet Identifier
-- For MQTT v3.1.1: <b>2.3.1 Packet Identifier</b>,
-- For MQTT v5.0: <b>2.2.1 Packet Identifier</b>.
-- @tparam number val - Packet ID value
-- @treturn boolean true for valid Packet ID value, otherwise false
function protocol.check_packet_id(val)
return val >= 1 and val <= 0xFFFF
end
-- Returns the next Packet Identifier value relative to given current value
-- DOCv3.1.1: 2.3.1 Packet Identifier
-- DOCv5.0: 2.2.1 Packet Identifier
--- Returns the next Packet Identifier value relative to given current value.
-- If current is nil - returns 1 as the first possible Packet ID.
-- For MQTT v3.1.1: <b>2.3.1 Packet Identifier</b>,
-- For MQTT v5.0: <b>2.2.1 Packet Identifier</b>.
-- @tparam[opt] number curr - current Packet ID value
-- @treturn number next Packet ID value
function protocol.next_packet_id(curr)
if not curr then
return 1
@ -282,42 +348,42 @@ function protocol.next_packet_id(curr)
return curr
end
-- MQTT protocol fixed header packet types
-- DOCv3.1.1: 2.2.1 MQTT Control Packet type
-- DOCv5.0: 2.1.2 MQTT Control Packet type
local packet_type = {
CONNECT = 1,
CONNACK = 2,
PUBLISH = 3,
PUBACK = 4,
PUBREC = 5,
PUBREL = 6,
PUBCOMP = 7,
SUBSCRIBE = 8,
SUBACK = 9,
UNSUBSCRIBE = 10,
UNSUBACK = 11,
PINGREQ = 12,
PINGRESP = 13,
DISCONNECT = 14,
AUTH = 15, -- NOTE: new in MQTTv5.0
[1] = "CONNECT",
[2] = "CONNACK",
[3] = "PUBLISH",
[4] = "PUBACK",
[5] = "PUBREC",
[6] = "PUBREL",
[7] = "PUBCOMP",
[8] = "SUBSCRIBE",
[9] = "SUBACK",
[10] = "UNSUBSCRIBE",
[11] = "UNSUBACK",
[12] = "PINGREQ",
[13] = "PINGRESP",
[14] = "DISCONNECT",
[15] = "AUTH", -- NOTE: new in MQTTv5.0
--- MQTT protocol fixed header packet types.
-- For MQTT v3.1.1: <b>2.2.1 MQTT Control Packet type</b>,
-- For MQTT v5.0: <b>2.1.2 MQTT Control Packet type</b>.
protocol.packet_type = {
CONNECT = 1, -- 1
CONNACK = 2, -- 2
PUBLISH = 3, -- 3
PUBACK = 4, -- 4
PUBREC = 5, -- 5
PUBREL = 6, -- 6
PUBCOMP = 7, -- 7
SUBSCRIBE = 8, -- 8
SUBACK = 9, -- 9
UNSUBSCRIBE = 10, -- 10
UNSUBACK = 11, -- 11
PINGREQ = 12, -- 12
PINGRESP = 13, -- 13
DISCONNECT = 14, -- 14
AUTH = 15, -- 15
[1] = "CONNECT", -- "CONNECT"
[2] = "CONNACK", -- "CONNACK"
[3] = "PUBLISH", -- "PUBLISH"
[4] = "PUBACK", -- "PUBACK"
[5] = "PUBREC", -- "PUBREC"
[6] = "PUBREL", -- "PUBREL"
[7] = "PUBCOMP", -- "PUBCOMP"
[8] = "SUBSCRIBE", -- "SUBSCRIBE"
[9] = "SUBACK", -- "SUBACK"
[10] = "UNSUBSCRIBE", -- "UNSUBSCRIBE"
[11] = "UNSUBACK", -- "UNSUBACK"
[12] = "PINGREQ", -- "PINGREQ"
[13] = "PINGRESP", -- "PINGRESP"
[14] = "DISCONNECT", -- "DISCONNECT"
[15] = "AUTH", -- "AUTH"
}
protocol.packet_type = packet_type
local packet_type = protocol.packet_type
-- Packet types requiring packet identifier field
-- DOCv3.1.1: 2.3.1 Packet Identifier
@ -334,7 +400,7 @@ local packets_requiring_packet_id = {
}
-- CONNACK return code/reason code strings
local connack_rc = {
protocol.connack_rc = {
-- MQTT v3.1.1 Connect return codes, DOCv3.1.1: 3.2.2.3 Connect Return code
[0] = "Connection Accepted",
[1] = "Connection Refused, unacceptable protocol version",
@ -366,9 +432,11 @@ local connack_rc = {
[0x9D] = "Server moved",
[0x9F] = "Connection rate exceeded",
}
protocol.connack_rc = connack_rc
local connack_rc = protocol.connack_rc
-- Returns true if Packet Identifier field are required for given packet
--- Check if Packet Identifier field are required for given packet
-- @tparam table args - args for creating packet
-- @treturn boolean true if Packet Identifier are required for the packet
function protocol.packet_id_required(args)
assert(type(args) == "table", "expecting args to be a table")
assert(type(args.type) == "number", "expecting .type to be a number")
@ -410,7 +478,9 @@ combined_packet_mt.__index = function(_, key)
return combined_packet_mt[key]
end
-- Combine several data parts into one
--- Combine several data parts into one
-- @tparam combined_packet_mt/string ... any amount of strings of combined_packet_mt tables to combine into one packet
-- @treturn combined_packet_mt table suitable to append packet parts or to stringify it into raw packet bytes
function protocol.combine(...)
return setmetatable({...}, combined_packet_mt)
end
@ -422,7 +492,7 @@ local function value_tostring(value)
return str_format("%q", value)
elseif t == "table" then
local res = {}
for k, v in pairs(value) do
for k, v in sortedpairs(value) do
if type(k) == "number" then
res[#res + 1] = value_tostring(v)
else
@ -439,32 +509,36 @@ local function value_tostring(value)
end
end
-- Convert packet to string representation
local function packet_tostring(packet)
--- Render packet to string representation
-- @tparam packet_mt packet table to convert to string
-- @treturn string human-readable string representation of the packet
function protocol.packet_tostring(packet)
local res = {}
for k, v in pairs(packet) do
for k, v in sortedpairs(packet) do
res[#res + 1] = str_format("%s=%s", k, value_tostring(v))
end
return str_format("%s{%s}", tostring(packet_type[packet.type]), tbl_concat(res, ", "))
end
protocol.packet_tostring = packet_tostring
local packet_tostring = protocol.packet_tostring
-- Parsed packet metatable
--- Parsed packet metatable
protocol.packet_mt = {
__tostring = packet_tostring,
__tostring = packet_tostring, -- packet-to-human-readable-string conversion metamethod using protocol.packet_tostring()
}
-- Parsed CONNACK packet metatable
--- Parsed CONNACK packet metatable
protocol.connack_packet_mt = {
__tostring = packet_tostring,
__tostring = packet_tostring, -- packet-to-human-readable-string conversion metamethod using protocol.packet_tostring()
reason_string = function(self) -- Returns reason string for the CONNACK packet according to its rc field
local reason_string = connack_rc[self.rc]
if not reason_string then
reason_string = "Unknown: "..self.rc
end
return reason_string
end,
}
protocol.connack_packet_mt.__index = protocol.connack_packet_mt
--- Returns reason string for CONNACK packet
-- @treturn string Reason string according packet's rc field
function protocol.connack_packet_mt:reason_string()
return connack_rc[self.rc]
end
--- Start parsing a new packet
-- @tparam function read_func - function to read data from the network connection
@ -472,7 +546,7 @@ end
-- @treturn number flags
-- @treturn table input - a table with fields "read_func" and "available" representing a stream-like object
-- to read already received packet data in chunks
-- @return false and error_message on failure
-- @return OR false and error_message on failure
function protocol.start_parse_packet(read_func)
assert(type(read_func) == "function", "expecting read_func to be a function")
local byte1, err, len, data
@ -482,14 +556,14 @@ function protocol.start_parse_packet(read_func)
-- DOC[v5.0]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901020
byte1, err = read_func(1)
if not byte1 then
return false, "failed to read first byte: "..err
return false, err
end
byte1 = str_byte(byte1, 1, 1)
local ptype = rshift(byte1, 4)
local flags = band(byte1, 0xF)
len, err = parse_var_length(read_func)
if not len then
return false, "failed to parse remaining length: "..err
return false, err
end
-- create packet parser instance (aka input)
@ -500,14 +574,14 @@ function protocol.start_parse_packet(read_func)
data = ""
end
if not data then
return false, "failed to read packet data: "..err
return false, err
end
input.available = data:len()
-- read data function for the input instance
input.read_func = function(size)
if size > input.available then
return false, "not enough data to read size: "..size
return false, size
end
local off = input[1]
local res = str_sub(data, off, off + size - 1)
@ -519,6 +593,101 @@ function protocol.start_parse_packet(read_func)
return ptype, flags, input
end
--- Parse CONNECT packet with read_func
-- @tparam function read_func - function to read data from the network connection
-- @tparam[opt] number version - expected protocol version constant or nil to accept both versions
-- @return packet on success or false and error message on failure
function protocol.parse_packet_connect(read_func, version)
-- DOC[v3.1.1]: 3.1 CONNECT Client requests a connection to a Server
-- DOC[v5.0]: 3.1 CONNECT Connection Request
local ptype, flags, input = protocol.start_parse_packet(read_func)
if ptype ~= packet_type.CONNECT then
return false, "expecting CONNECT (1) packet type but got "..ptype
end
if flags ~= 0 then
return false, "expecting CONNECT flags to be 0 but got "..flags
end
return protocol.parse_packet_connect_input(input, version)
end
--- Parse CONNECT packet from already received stream-like packet input table
-- @tparam table input - a table with fields "read_func" and "available" representing a stream-like object
-- @tparam[opt] number version - expected protocol version constant or nil to accept both versions
-- @return packet on success or false and error message on failure
function protocol.parse_packet_connect_input(input, version)
-- DOC[v3.1.1]: 3.1 CONNECT Client requests a connection to a Server
-- DOC[v5.0]: 3.1 CONNECT Connection Request
local read_func = input.read_func
local err, protocol_name, protocol_ver, connect_flags, keep_alive
-- DOC: 3.1.2.1 Protocol Name
protocol_name, err = parse_string(read_func)
if not protocol_name then
return false, "failed to parse protocol name: "..err
end
if protocol_name ~= "MQTT" then
return false, "expecting 'MQTT' as protocol name but received '"..protocol_name.."'"
end
-- DOC[v3.1.1]: 3.1.2.2 Protocol Level
-- DOC[v5.0]: 3.1.2.2 Protocol Version
protocol_ver, err = parse_uint8(read_func)
if not protocol_ver then
return false, "failed to parse protocol level/version: "..err
end
if version ~= nil and version ~= protocol_ver then
return false, "expecting protocol version "..version.." but received "..protocol_ver
end
-- DOC: 3.1.2.3 Connect Flags
connect_flags, err = parse_uint8(read_func)
if not connect_flags then
return false, "failed to parse connect flags: "..err
end
if band(connect_flags, 0x1) ~= 0 then
return false, "reserved 1st bit in connect flags are set"
end
local clean = (band(connect_flags, 0x2) ~= 0)
local will = (band(connect_flags, 0x4) ~= 0)
local will_qos = band(rshift(connect_flags, 3), 0x3)
local will_retain = (band(connect_flags, 0x20) ~= 0)
local password_flag = (band(connect_flags, 0x40) ~= 0)
local username_flag = (band(connect_flags, 0x80) ~= 0)
-- DOC: 3.1.2.10 Keep Alive
keep_alive, err = parse_uint16(read_func)
if not keep_alive then
return false, "failed to parse keep alive field: "..err
end
-- continue parsing based on the protocol_ver
-- preparing common connect packet fields
local packet = {
type = packet_type.CONNECT,
version = protocol_ver,
clean = clean,
password = password_flag, -- NOTE: will be replaced
username = username_flag, -- NOTE: will be replaced
keep_alive = keep_alive,
}
if will then
packet.will = {
qos = will_qos,
retain = will_retain,
topic = "", -- NOTE: will be replaced
payload = "", -- NOTE: will be replaced
}
end
if protocol_ver == const_v311 then
return require("mqtt.protocol4")._parse_packet_connect_continue(input, packet)
elseif protocol_ver == const_v50 then
return require("mqtt.protocol5")._parse_packet_connect_continue(input, packet)
else
return false, "unexpected protocol version to continue parsing: "..protocol_ver
end
end
-- export module table
return protocol

View File

@ -18,6 +18,9 @@ local require = require
local tostring = tostring
local setmetatable = setmetatable
local const = require("mqtt.const")
local const_v311 = const.v311
local bit = require("mqtt.bitwrap")
local bor = bit.bor
local band = bit.band
@ -36,6 +39,8 @@ local packet_type = protocol.packet_type
local packet_mt = protocol.packet_mt
local connack_packet_mt = protocol.connack_packet_mt
local start_parse_packet = protocol.start_parse_packet
local parse_packet_connect_input = protocol.parse_packet_connect_input
local parse_string = protocol.parse_string
local parse_uint8 = protocol.parse_uint8
local parse_uint16 = protocol.parse_uint16
@ -128,6 +133,31 @@ local function make_packet_connect(args)
return combine(header, variable_header, payload)
end
-- Create CONNACK packet, DOC: 3.2 CONNACK Acknowledge connection request
local function make_packet_connack(args)
-- check args
assert(type(args.sp) == "boolean", "expecting .sp to be a boolean")
assert(type(args.rc) == "number", "expecting .rc to be a boolean")
-- DOC: 3.2.2.1 Connect Acknowledge Flags
-- DOC: 3.2.2.2 Session Present
local byte1
if args.sp then
byte1 = 1 -- bit 0 of the Connect Acknowledge Flags.
else
byte1 = 0
end
-- DOC: 3.2.2.3 Connect Return code
local byte2 = args.rc
-- DOC: 3.2.2 Variable header
local variable_header = combine(
make_uint8(byte1),
make_uint8(byte2)
)
-- DOC: 3.2.1 Fixed header
local header = make_header(packet_type.CONNACK, 0, variable_header:len()) -- NOTE: fixed flags value 0x0
return combine(header, variable_header)
end
-- Create PUBLISH packet, DOC: 3.3 PUBLISH Publish message
local function make_packet_publish(args)
-- check args
@ -254,6 +284,29 @@ local function make_packet_subscribe(args)
return combine(header, variable_header, payload)
end
-- Create SUBACK packet, DOC: 3.9 SUBACK Subscribe acknowledgement
local function make_packet_suback(args)
-- check args
assert(type(args.packet_id) == "number", "expecting .packet_id to be a number")
assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier")
assert(type(args.rc) == "table", "expecting .rc to be a table")
assert(#args.rc > 0, "expecting .rc to be a non-empty array")
-- DOC: 3.9.2 Variable header
local variable_header = combine(
make_uint16(args.packet_id)
)
-- DOC: 3.9.3 Payload
local payload = combine()
for i, rc in ipairs(args.rc) do
assert(type(rc) == "number", "expecting .rc["..i.."] to be a number")
assert(rc >= 0 and rc <= 255, "expecting .rc["..i.."] to be in range [0, 255]")
payload:append(make_uint8(rc))
end
-- DOC: 3.9.1 Fixed header
local header = make_header(packet_type.SUBACK, 0, variable_header:len() + payload:len()) -- NOTE: fixed flags value 0x0
return combine(header, variable_header, payload)
end
-- Create UNSUBSCRIBE packet, DOC: 3.10 UNSUBSCRIBE Unsubscribe from topics
local function make_packet_unsubscribe(args)
-- check args
@ -276,151 +329,427 @@ local function make_packet_unsubscribe(args)
return combine(header, variable_header, payload)
end
-- Create UNSUBACK packet, DOC: 3.11 UNSUBACK Unsubscribe acknowledgement
local function make_packet_unsuback(args)
-- check args
assert(type(args.packet_id) == "number", "expecting .packet_id to be a number")
assert(check_packet_id(args.packet_id), "expecting .packet_id to be a valid Packet Identifier")
-- DOC: 3.11.2 Variable header
local variable_header = combine(
make_uint16(args.packet_id)
)
-- DOC: 3.11.3 Payload
-- The UNSUBACK Packet has no payload.
-- DOC: 3.11.1 Fixed header
local header = make_header(packet_type.UNSUBACK, 0, variable_header:len()) -- NOTE: fixed flags value 0x0
return combine(header, variable_header)
end
-- Create packet of given {type: number} in args
function protocol4.make_packet(args)
assert(type(args) == "table", "expecting args to be a table")
assert(type(args.type) == "number", "expecting .type number in args")
local ptype = args.type
if ptype == packet_type.CONNECT then
if ptype == packet_type.CONNECT then -- 1
return make_packet_connect(args)
elseif ptype == packet_type.PUBLISH then
elseif ptype == packet_type.CONNACK then -- 2
return make_packet_connack(args)
elseif ptype == packet_type.PUBLISH then -- 3
return make_packet_publish(args)
elseif ptype == packet_type.PUBACK then
elseif ptype == packet_type.PUBACK then -- 4
return make_packet_puback(args)
elseif ptype == packet_type.PUBREC then
elseif ptype == packet_type.PUBREC then -- 5
return make_packet_pubrec(args)
elseif ptype == packet_type.PUBREL then
elseif ptype == packet_type.PUBREL then -- 6
return make_packet_pubrel(args)
elseif ptype == packet_type.PUBCOMP then
elseif ptype == packet_type.PUBCOMP then -- 7
return make_packet_pubcomp(args)
elseif ptype == packet_type.SUBSCRIBE then
elseif ptype == packet_type.SUBSCRIBE then -- 8
return make_packet_subscribe(args)
elseif ptype == packet_type.UNSUBSCRIBE then
elseif ptype == packet_type.SUBACK then -- 9
return make_packet_suback(args)
elseif ptype == packet_type.UNSUBSCRIBE then -- 10
return make_packet_unsubscribe(args)
elseif ptype == packet_type.PINGREQ then
elseif ptype == packet_type.UNSUBACK then -- 11
return make_packet_unsuback(args)
elseif ptype == packet_type.PINGREQ then -- 12
-- DOC: 3.12 PINGREQ PING request
return combine("\192\000") -- 192 == 0xC0, type == 12, flags == 0
elseif ptype == packet_type.DISCONNECT then
elseif ptype == packet_type.PINGRESP then -- 13
-- DOC: 3.13 PINGRESP PING response
return combine("\208\000") -- 208 == 0xD0, type == 13, flags == 0
elseif ptype == packet_type.DISCONNECT then -- 14
-- DOC: 3.14 DISCONNECT Disconnect notification
return combine("\224\000") -- 224 == 0xD0, type == 14, flags == 0
else
error("unexpected packet type to make: "..ptype)
error("unexpected protocol4 packet type to make: "..ptype)
end
end
-- Parse CONNACK packet, DOC: 3.2 CONNACK Acknowledge connection request
local function parse_packet_connack(ptype, flags, input)
-- DOC: 3.2.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 2 then
return false, packet_type[ptype]..": expecting data of length 2 bytes"
end
local byte1, byte2 = parse_uint8(input.read_func), parse_uint8(input.read_func)
local sp = (band(byte1, 0x1) ~= 0)
return setmetatable({type=ptype, sp=sp, rc=byte2}, connack_packet_mt)
end
-- Parse PUBLISH packet, DOC: 3.3 PUBLISH Publish message
local function parse_packet_publish(ptype, flags, input)
-- DOC: 3.3.1.1 DUP
local dup = (band(flags, 0x8) ~= 0)
-- DOC: 3.3.1.2 QoS
local qos = band(rshift(flags, 1), 0x3)
-- DOC: 3.3.1.3 RETAIN
local retain = (band(flags, 0x1) ~= 0)
-- DOC: 3.3.2.1 Topic Name
if input.available < 2 then
return false, packet_type[ptype]..": expecting data of length at least 2 bytes"
end
local topic_len = parse_uint16(input.read_func)
if input.available < topic_len then
return false, packet_type[ptype]..": malformed packet: not enough data to parse topic"
end
local topic = input.read_func(topic_len)
-- DOC: 3.3.2.2 Packet Identifier
local packet_id
if qos > 0 then
-- DOC: 3.3.2.2 Packet Identifier
if input.available < 2 then
return false, packet_type[ptype]..": malformed packet: not enough data to parse packet_id"
end
packet_id = parse_uint16(input.read_func)
end
-- DOC: 3.3.3 Payload
local payload
if input.available > 0 then
payload = input.read_func(input.available)
end
return setmetatable({type=ptype, dup=dup, qos=qos, retain=retain, packet_id=packet_id, topic=topic, payload=payload}, packet_mt)
end
-- Parse PUBACK packet, DOC: 3.4 PUBACK Publish acknowledgement
local function parse_packet_puback(ptype, flags, input)
-- DOC: 3.4.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 2 then
return false, packet_type[ptype]..": expecting data of length 2 bytes"
end
-- DOC: 3.4.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
end
-- Parse PUBREC packet, DOC: 3.5 PUBREC Publish received (QoS 2 publish received, part 1)
local function parse_packet_pubrec(ptype, flags, input)
-- DOC: 3.4.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 2 then
return false, packet_type[ptype]..": expecting data of length 2 bytes"
end
-- DOC: 3.5.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
end
-- Parse PUBREL packet, DOC: 3.6 PUBREL Publish release (QoS 2 publish received, part 2)
local function parse_packet_pubrel(ptype, flags, input)
if flags ~= 2 then
-- DOC: The Server MUST treat any other value as malformed and close the Network Connection [MQTT-3.6.1-1].
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 2 then
return false, packet_type[ptype]..": expecting data of length 2 bytes"
end
-- DOC: 3.6.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
end
-- Parse PUBCOMP packet, DOC: 3.7 PUBCOMP Publish complete (QoS 2 publish received, part 3)
local function parse_packet_pubcomp(ptype, flags, input)
-- DOC: 3.7.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 2 then
return false, packet_type[ptype]..": expecting data of length 2 bytes"
end
-- DOC: 3.7.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
end
-- Parse SUBSCRIBE packet, DOC: 3.8 SUBSCRIBE - Subscribe to topics
local function parse_packet_subscribe(ptype, flags, input)
if flags ~= 2 then
-- DOC: The Server MUST treat any other value as malformed and close the Network Connection [MQTT-3.8.1-1].
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available < 5 then -- variable header (2) + payload: topic length (2) + qos (1)
-- DOC: The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair. A SUBSCRIBE packet with no payload is a protocol violation [MQTT-3.8.3-3]
return false, packet_type[ptype]..": expecting data of length 5 bytes at least"
end
-- DOC: 3.8.2 Variable header
local packet_id = parse_uint16(input.read_func)
-- DOC: 3.8.3 Payload
local subscriptions = {}
while input.available > 0 do
local topic_filter, qos, err
topic_filter, err = parse_string(input.read_func)
if not topic_filter then
return false, packet_type[ptype]..": failed to parse topic filter: "..err
end
qos, err = parse_uint8(input.read_func)
if not qos then
return false, packet_type[ptype]..": failed to parse qos: "..err
end
subscriptions[#subscriptions + 1] = {
topic = topic_filter,
qos = qos,
}
end
return setmetatable({type=ptype, packet_id=packet_id, subscriptions=subscriptions}, packet_mt)
end
-- SUBACK return codes
-- DOC: 3.9.3 Payload
local suback_rc = {
[0x00] = "Success - Maximum QoS 0",
[0x01] = "Success - Maximum QoS 1",
[0x02] = "Success - Maximum QoS 2",
[0x80] = "Failure",
}
protocol4.suback_rc = suback_rc
--- Parsed SUBACK packet metatable
local suback_packet_mt = {
__tostring = protocol.packet_tostring, -- packet-to-human-readable-string conversion metamethod using protocol.packet_tostring()
reason_strings = function(self) -- Returns return codes descriptions for the SUBACK packet according to its rc field
local human_readable = {}
for i, rc in ipairs(self.rc) do
local return_code = suback_rc[rc]
if return_code then
human_readable[i] = return_code
else
human_readable[i] = "Unknown: "..tostring(rc)
end
end
return human_readable
end,
}
suback_packet_mt.__index = suback_packet_mt
protocol4.suback_packet_mt = suback_packet_mt
-- Parse SUBACK packet, DOC: 3.9 SUBACK Subscribe acknowledgement
local function parse_packet_suback(ptype, flags, input)
-- DOC: 3.9.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available < 3 then
return false, packet_type[ptype]..": expecting data of length at least 3 bytes"
end
-- DOC: 3.9.2 Variable header
-- DOC: 3.9.3 Payload
local packet_id = parse_uint16(input.read_func)
local rc = {} -- DOC: The payload contains a list of return codes.
while input.available > 0 do
rc[#rc + 1] = parse_uint8(input.read_func)
end
return setmetatable({type=ptype, packet_id=packet_id, rc=rc}, suback_packet_mt)
end
-- Parse UNSUBSCRIBE packet, DOC: 3.10 UNSUBSCRIBE Unsubscribe from topics
local function parse_packet_unsubscribe(ptype, flags, input)
-- DOC: 3.10.1 Fixed header
if flags ~= 2 then
-- DOC: The Server MUST treat any other value as malformed and close the Network Connection [MQTT-3.10.1-1].
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available < 4 then -- variable header (2) + payload: topic length (2)
-- DOC: The Payload of an UNSUBSCRIBE packet MUST contain at least one Topic Filter. An UNSUBSCRIBE packet with no payload is a protocol violation [MQTT-3.10.3-2].
return false, packet_type[ptype]..": expecting data of length at least 4 bytes"
end
-- DOC: 3.10.2 Variable header
local packet_id = parse_uint16(input.read_func)
-- DOC: 3.10.3 Payload
local subscriptions = {}
while input.available > 0 do
local topic_filter, err = parse_string(input.read_func)
if not topic_filter then
return false, packet_type[ptype]..": failed to parse topic filter: "..err
end
subscriptions[#subscriptions + 1] = topic_filter
end
return setmetatable({type=ptype, packet_id=packet_id, subscriptions=subscriptions}, packet_mt)
end
-- Parse UNSUBACK packet, DOC: 3.11 UNSUBACK Unsubscribe acknowledgement
local function parse_packet_unsuback(ptype, flags, input)
-- DOC: 3.11.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 2 then
return false, packet_type[ptype]..": expecting data of length 2 bytes"
end
-- DOC: 3.11.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
end
-- Parse PINGREQ packet, DOC: 3.12 PINGREQ PING request
local function parse_packet_pingreq(ptype, flags, input)
-- DOC: 3.12.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 0 then
return false, packet_type[ptype]..": expecting data of length 0 bytes"
end
return setmetatable({type=ptype}, packet_mt)
end
-- Parse PINGRESP packet, DOC: 3.13 PINGRESP PING response
local function parse_packet_pingresp(ptype, flags, input)
-- DOC: 3.13.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 0 then
return false, packet_type[ptype]..": expecting data of length 0 bytes"
end
return setmetatable({type=ptype}, packet_mt)
end
-- Parse DISCONNECT packet, DOC: 3.14 DISCONNECT Disconnect notification
local function parse_packet_disconnect(ptype, flags, input)
-- DOC: 3.14.1 Fixed header
if flags ~= 0 then -- Reserved
return false, packet_type[ptype]..": unexpected flags value: "..flags
end
if input.available ~= 0 then
return false, packet_type[ptype]..": expecting data of length 0 bytes"
end
return setmetatable({type=ptype}, packet_mt)
end
-- Parse packet using given read_func
-- Returns packet on success or false and error message on failure
function protocol4.parse_packet(read_func)
local ptype, flags, input = start_parse_packet(read_func)
if not ptype then
return false, flags
return false, flags -- flags is error message in this case
end
-- parse readed data according type in fixed header
if ptype == packet_type.CONNACK then
-- DOC: 3.2 CONNACK Acknowledge connection request
if input.available ~= 2 then
return false, "expecting data of length 2 bytes"
end
local byte1, byte2 = parse_uint8(input.read_func), parse_uint8(input.read_func)
local sp = (band(byte1, 0x1) ~= 0)
return setmetatable({type=ptype, sp=sp, rc=byte2}, connack_packet_mt)
elseif ptype == packet_type.PUBLISH then
-- DOC: 3.3 PUBLISH Publish message
-- DOC: 3.3.1.1 DUP
local dup = (band(flags, 0x8) ~= 0)
-- DOC: 3.3.1.2 QoS
local qos = band(rshift(flags, 1), 0x3)
-- DOC: 3.3.1.3 RETAIN
local retain = (band(flags, 0x1) ~= 0)
-- DOC: 3.3.2.1 Topic Name
if input.available < 2 then
return false, "expecting data of length at least 2 bytes"
end
local topic_len = parse_uint16(input.read_func)
if input.available < topic_len then
return false, "malformed PUBLISH packet: not enough data to parse topic"
end
local topic = input.read_func(topic_len)
-- DOC: 3.3.2.2 Packet Identifier
local packet_id
if qos > 0 then
-- DOC: 3.3.2.2 Packet Identifier
if input.available < 2 then
return false, "malformed PUBLISH packet: not enough data to parse packet_id"
end
packet_id = parse_uint16(input.read_func)
end
-- DOC: 3.3.3 Payload
local payload
if input.available > 0 then
payload = input.read_func(input.available)
end
return setmetatable({type=ptype, dup=dup, qos=qos, retain=retain, packet_id=packet_id, topic=topic, payload=payload}, packet_mt)
elseif ptype == packet_type.PUBACK then
-- DOC: 3.4 PUBACK Publish acknowledgement
if input.available ~= 2 then
return false, "expecting data of length 2 bytes"
end
-- DOC: 3.4.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
elseif ptype == packet_type.PUBREC then
-- DOC: 3.5 PUBREC Publish received (QoS 2 publish received, part 1)
if input.available ~= 2 then
return false, "expecting data of length 2 bytes"
end
-- DOC: 3.5.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
elseif ptype == packet_type.PUBREL then
-- DOC: 3.6 PUBREL Publish release (QoS 2 publish received, part 2)
if input.available ~= 2 then
return false, "expecting data of length 2 bytes"
end
-- also flags should be checked to equals 2 by the server
-- DOC: 3.6.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
elseif ptype == packet_type.PUBCOMP then
-- 3.7 PUBCOMP Publish complete (QoS 2 publish received, part 3)
if input.available ~= 2 then
return false, "expecting data of length 2 bytes"
end
-- DOC: 3.7.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
elseif ptype == packet_type.SUBACK then
-- DOC: 3.9 SUBACK Subscribe acknowledgement
if input.available < 3 then
return false, "expecting data of length at least 3 bytes"
end
-- DOC: 3.9.2 Variable header
-- DOC: 3.9.3 Payload
local packet_id = parse_uint16(input.read_func)
local rc = {} -- DOC: The payload contains a list of return codes.
while input.available > 0 do
rc[#rc + 1] = parse_uint8(input.read_func)
end
return setmetatable({type=ptype, packet_id=packet_id, rc=rc}, packet_mt)
elseif ptype == packet_type.UNSUBACK then
-- DOC: 3.11 UNSUBACK Unsubscribe acknowledgement
if input.available ~= 2 then
return false, "expecting data of length 2 bytes"
end
-- DOC: 3.11.2 Variable header
local packet_id = parse_uint16(input.read_func)
return setmetatable({type=ptype, packet_id=packet_id}, packet_mt)
elseif ptype == packet_type.PINGRESP then
-- DOC: 3.13 PINGRESP PING response
if input.available ~= 0 then
return false, "expecting data of length 0 bytes"
end
return setmetatable({type=ptype}, packet_mt)
-- parse read data according type in fixed header
if ptype == packet_type.CONNECT then -- 1
return parse_packet_connect_input(input, const_v311)
elseif ptype == packet_type.CONNACK then -- 2
return parse_packet_connack(ptype, flags, input)
elseif ptype == packet_type.PUBLISH then -- 3
return parse_packet_publish(ptype, flags, input)
elseif ptype == packet_type.PUBACK then -- 4
return parse_packet_puback(ptype, flags, input)
elseif ptype == packet_type.PUBREC then -- 5
return parse_packet_pubrec(ptype, flags, input)
elseif ptype == packet_type.PUBREL then -- 6
return parse_packet_pubrel(ptype, flags, input)
elseif ptype == packet_type.PUBCOMP then -- 7
return parse_packet_pubcomp(ptype, flags, input)
elseif ptype == packet_type.SUBSCRIBE then -- 8
return parse_packet_subscribe(ptype, flags, input)
elseif ptype == packet_type.SUBACK then -- 9
return parse_packet_suback(ptype, flags, input)
elseif ptype == packet_type.UNSUBSCRIBE then -- 10
return parse_packet_unsubscribe(ptype, flags, input)
elseif ptype == packet_type.UNSUBACK then -- 11
return parse_packet_unsuback(ptype, flags, input)
elseif ptype == packet_type.PINGREQ then -- 12
return parse_packet_pingreq(ptype, flags, input)
elseif ptype == packet_type.PINGRESP then -- 13
return parse_packet_pingresp(ptype, flags, input)
elseif ptype == packet_type.DISCONNECT then -- 14
return parse_packet_disconnect(ptype, flags, input)
else
return false, "unexpected packet type received: "..tostring(ptype)
end
end
-- Continue parsing of the MQTT v3.1.1 CONNECT packet
-- Internally called from the protocol.parse_packet_connect_input() function
-- Returns packet on success or false and error message on failure
function protocol4._parse_packet_connect_continue(input, packet)
-- DOC: 3.1.3 Payload
-- These fields, if present, MUST appear in the order Client Identifier, Will Topic, Will Message, User Name, Password
local read_func = input.read_func
local client_id, err
-- DOC: 3.1.3.1 Client Identifier
client_id, err = parse_string(read_func)
if not client_id then
return false, "CONNECT: failed to parse client_id: "..err
end
packet.id = client_id
local will = packet.will
if will then
-- 3.1.3.2 Will Topic
local will_topic, will_payload
will_topic, err = parse_string(read_func)
if not will_topic then
return false, "CONNECT: failed to parse will_topic: "..err
end
will.topic = will_topic
-- DOC: 3.1.3.3 Will Message
will_payload, err = parse_string(read_func)
if not will_payload then
return false, "CONNECT: failed to parse will_payload: "..err
end
will.payload = will_payload
end
if packet.username then
-- DOC: 3.1.3.4 User Name
local username
username, err = parse_string(read_func)
if not username then
return false, "CONNECT: failed to parse username: "..err
end
packet.username = username
else
packet.username = nil
end
if packet.password then
-- DOC: 3.1.3.5 Password
if not packet.username then
return false, "CONNECT: MQTT v3.1.1 does not allow providing password without username"
end
local password
password, err = parse_string(read_func)
if not password then
return false, "CONNECT: failed to parse password: "..err
end
packet.password = password
else
packet.password = nil
end
return setmetatable(packet, packet_mt)
end
-- export module table
return protocol4

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,11 @@ local str_byte = string.byte
local table = require("table")
local tbl_concat = table.concat
local tbl_sort = table.sort
local type = type
local error = error
local pairs = pairs
local math = require("math")
local math_floor = math.floor
@ -29,6 +34,73 @@ function tools.div(x, y)
return math_floor(x / y)
end
-- table.sort callback for tools.sortedpairs()
local function sortedpairs_compare(a, b)
local a_type = type(a)
local b_type = type(b)
if (a_type == "string" and b_type == "string") or (a_type == "number" and b_type == "number") then
return a < b
elseif a_type == "number" then
return true
elseif b_type == "number" then
return false
else
error("sortedpairs failed to make a stable keys comparison of types "..a_type.." and "..b_type)
end
end
-- Iterate through table keys and values in stable (sorted) order
function tools.sortedpairs(tbl)
local keys = {}
for k in pairs(tbl) do
local k_type = type(k)
if k_type ~= "string" and k_type ~= "number" then
error("sortedpairs failed to make a stable iteration order for key of type "..type(k))
end
keys[#keys + 1] = k
end
tbl_sort(keys, sortedpairs_compare)
local i = 0
return function()
i = i + 1
local key = keys[i]
if key then
return key, tbl[key]
end
end
end
-- Converts multi-line string to a HEX string, removing all whitespace and line-comments started with "--"
function tools.extract_hex(str)
local res = {}
-- iterate through lines
local n = 0
for line in str:gmatch("[^\n]+") do
n = n + 1
-- find a comment start
local comment_begin = line:find("--", 1, true)
if comment_begin then
-- remove comment from the line
line = line:sub(1, comment_begin - 1)
end
-- remove all whitespace from the line
line = line:gsub("%s", "")
-- check for the non-hex chars
local non_hex = line:find("[^0-9A-Fa-f]+")
if non_hex then
error(string.format("non-hex char '%s' at %s:%s", line:sub(non_hex, non_hex), n, non_hex))
end
-- append line to concat list
res[#res + 1] = line
end
-- finally concat all lines onto one HEX-string
local hexstr = tbl_concat(res)
if (#hexstr % 2) ~= 0 then
error("odd number of chars in the resulting HEX string")
end
return hexstr
end
-- export module table
return tools

View File

@ -38,7 +38,6 @@ local function onCommand(command)
assert(client:subscribe {
topic = topic
})
})
end
end

View File

@ -1,346 +0,0 @@
local mqtt = {}
local socket = require("socket")
local clientMt = {}
local function bytesUsedByVarIntForValue(value)
if value <= 128-1 then
return 1, nil
elseif value <= 128*128-1 then
return 2, nil
elseif value <= 128*128*128-1 then
return 3, nil
elseif value <= 128*128*128*128-1 then
return 4, nil
else
return nil, "invalid byte length"
end
end
local function bytesUsedByString(string)
return 2 + #string
end
function clientMt:receiveByte()
end
function clientMt:flush()
local start = 1
while start <= #self.buffer do
print("flushing data")
local _, err, sent = self.connection:send(self.buffer, start)
if err then
print("Error: " .. err)
return nil, err
else
self.buffer = ""
return true, err
end
end
return true, nil
end
function clientMt:sendByte(byte)
self.buffer = self.buffer .. string.char(byte)
if #self.buffer > 1024 then
return self:flush()
end
end
function clientMt:sendData(data)
local result, err = self:flush()
if err then return result, err end
self.buffer = data
self:flush()
local result, err = self:flush()
if err then return result, err end
end
function clientMt:sendVarInt(value)
repeat
local encoded = value & 0x7F
value = value >> 7
if value > 0 then
encoded = encoded | 128
end
local _, err = self:sendByte(encoded)
if err then
return nil, err
end
until value <= 0
return true, nil
end
function clientMt:sendShort(value)
local _, err = self:sendByte((value >> 8) & 0xFF)
if err then return nil, err end
local _, err = self:sendByte(value & 0xFF)
if err then return nil, err end
end
function clientMt:sendString(text)
local _, err = self:sendShort(#text)
if err then return nil, err end
local _, err = self:sendData(text)
if err then return nil, err end
end
function clientMt:handleError(err, result)
if err then
print("Got error")
if self.connection then
self.connection:close()
end
self.connection = nil
end
return result, err
end
function clientMt:sendPacket()
local result, err = self:flush()
return self:handleError(err, result)
end
function clientMt:connect()
if self.connection then
return true, nil
end
local conn, err = socket.connect(self.uri, 1883)
if not conn then
return nil, "failed to connect: " .. err
end
conn:setoption("tcp-nodelay", true)
conn:setoption("linger", {on = true, timeout = 100})
conn:settimeout(nil)
self.connection = conn
local _, err = self:sendByte(0x10)
if err then return nil, self:handleError(err) end
local length = 0
local protocolName = "MQTT"
local protocolNameLength = bytesUsedByString(protocolName)
length = length + protocolNameLength
local protocolVersion = 4
local connectFlag = 0x02 -- 1 byte
length = length + 2
local keepAlive = 0 -- 2 bytes
length = length + 2
local clientIdLength = bytesUsedByString(self.id)
length = length + clientIdLength
_, err = self:sendVarInt(length)
if err then return nil, self:handleError(err) end
_, err = self:sendString(protocolName)
if err then return nil, self:handleError(err) end
_, err = self:sendByte(protocolVersion)
if err then return nil, self:handleError(err) end
_, err = self:sendByte(connectFlag)
if err then return nil, self:handleError(err) end
_, err = self:sendShort(keepAlive)
if err then return nil, self:handleError(err) end
_, err = self:sendString(self.id)
if err then return nil, self:handleError(err) end
return self:sendPacket()
end
function clientMt:disconnect(args)
if not self.connection then
return true, nil
end
local _, err = self:sendByte(0xE0)
if err then return nil, self:handleError(err) end
local _, err = self:sendByte(0x00)
if err then return nil, self:handleError(err) end
local result, err = self:sendPacket()
self.connection:shutdown("both")
local peer
repeat
peer = self.connection:getpeername()
if peer then socket.sleep(0.02) end
until peer
self.connection:close()
self.connection = nil
return result, err
end
function clientMt:publish(args)
local topic = args.topic
local payload = args.payload
local _, err = self:connect()
if err then return nil, self:handleError(err) end
local retain = args.retain and 0x01 or 0x00
local _, err = self:sendByte(0x30 | retain)
if err then return nil, self:handleError(err) end
local topicLength = bytesUsedByString(topic)
local payloadLength = #payload
_, err = self:sendVarInt(topicLength + payloadLength)
if err then return nil, self:handleError(err) end
_, err = self:sendString(topic)
if err then return nil, self:handleError(err) end
_, err = self:sendData(payload)
if err then return nil, self:handleError(err) end
return self:sendPacket()
end
function clientMt:subscribe(args)
local topic = args.topic
local _, err = self:connect()
if err then return nil, self:handleError(err) end
local _, err = self:sendByte(0x82)
if err then return nil, self:handleError(err) end
local packetIdentifier = self.packetIdentifier
self.packetIdentifier = self.packetIdentifier + 1
if self.packetIdentifier > 0xFF00 then
self.packetIdentifier = 1
end
local topicFilter = 0
local topicLength = bytesUsedByString(topic)
local length = 2 + topicLength + 1
_, err = self:sendVarInt(length)
if err then return nil, self:handleError(err) end
_, err = self:sendShort(packetIdentifier)
if err then return nil, self:handleError(err) end
_, err = self:sendString(topic)
if err then return nil, self:handleError(err) end
_, err = self:sendByte(topicFilter)
if err then return nil, self:handleError(err) end
return self:sendPacket()
end
function clientMt:fireEvent(event, ...)
if not self.eventHandlers then
return
end
if not self.eventHandlers[event] then
return
end
self.eventHandlers[event](...)
end
function clientMt:receiveBytes(count)
local result, err, partial = nil, nil, ""
while true do
result, err, partial = client:receive(1 - #partial, partial)
if err == "timeout" then
coroutine.yield()
else if result then
return result
else
return nil, err
end
end
end
function clientMt:receiveByte()
return string.byte(self:receiveBytes(1)))
end
function clientMt:receiveVarInt()
local multiplier = 1
local value = 0
local encodedByte
repeat
encodedByte, err = receiveByte()
if err then return nil, self:handleError(err) end
value = value + (encodedByte & 127) * multiplier
multiplier = multiplier * 128
if multiplier > 128*128*128 then
return nil, "malformed remaining length"
end
until (encodedByte & 128) ~= 0
return value
end
function clientMt:receivePacket()
local firstByte, err = self:receiveByte()
if err then return self:handleError(err) end
local remainingLength, err = self:receiveVarInt()
if err then return self:handleError(err) end
local packetType = (firstByte >> 4) & 0xF
if packetType == 2 then
-- CONNACK
assert(remainingLength == 2, "Invalid CONNACK length")
local flags, err = self:receiveByte()
if err then return self:handleError(err) end
local returnCode, err = self:receiveByte()
if err then return self:handleError(err) end
print("Connected")
local sessionPresent = flags & 1
if not sessionPresent then
self:fireEvent("connect")
end
else
-- Unsupported or error
self:handleError("Invalid packet type " .. packetType)
end
end
function clientMt:threadReceive()
while true do
if self.connection then
local err = self:receiveAndHandlePacket()
if err then
self:handleError(err)
end
else
coroutine.yield()
end
end
end
function clientMt:on(eventHandlers)
self.eventHandlers = eventHandlers
end
function mqtt.client(args)
local client = {
uri = args.uri,
id = args.id,
reconnect = 5,
connection = nil,
packetIdentifier = 1,
buffer = ""
}
setmetatable(client, {__index = clientMt})
client.create(function() client:threadReceive() end)
return client
end
return mqtt

View File

@ -1,5 +1,5 @@
package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua'
local mqtt = require("mymqtt")
local mqtt = require("mqtt")
local client
function printTable(table, indentation)
@ -38,6 +38,7 @@ client = mqtt.client {
id = "tool-get-image",
reconnect = 5,
version = mqtt.v311,
clean = "first"
}
client:on {
@ -52,6 +53,4 @@ client:subscribe {
topic = 'spider/controller/#'
}
client:disconnect()
--mqtt.run_ioloop(client)
mqtt.run_ioloop(client)

View File

@ -1,4 +1,5 @@
package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua'
--local mqtt = require("mymqtt")
local mqtt = require("mqtt")
local client
@ -39,4 +40,6 @@ client:on {
message = onMessage,
}
--client:runForever()
mqtt.run_ioloop(client)

View File

@ -1,11 +1,6 @@
package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua'
local mqtt = require("mymqtt")
--local socket = require("socket")
--local client
local fh = io.open("spider-image.bin", "rb")
local contents = fh:read("a")
fh:close()
local mqtt = require("mqtt")
local socket = require("socket")
local function onConnect(connack)
if connack.rc ~= 0 then
@ -13,6 +8,12 @@ local function onConnect(connack)
os.exit(1)
end
assert(client:publish {
topic = "spider/telemetry/camfeed",
payload = string.rep("a", 53772600),
qos = 0
})
print("Connected and subscribed")
end
@ -32,18 +33,5 @@ client:on {
end,
}
--mqtt.run_ioloop(client)
--while true do
assert(client:publish {
topic = "spider/telemetry/camfeed",
payload = "a" .. contents,
qos = 0
})
--end
print("Calling disconnect")
require("socket").sleep(0.002)
client:disconnect()
mqtt.run_ioloop(client)