Improved version of luamqtt with fewer bugs

This commit is contained in:
2024-07-20 17:51:48 +02:00
parent c7344711c2
commit 3eee21480b
31 changed files with 3487 additions and 1664 deletions

View File

@@ -0,0 +1,72 @@
--- Copas specific client handling module.
-- Typically this module is not used directly, but through `mqtt.loop` when
-- auto-detecting the environment.
-- @module mqtt.loop.copas
local copas = require "copas"
local log = require "mqtt.log"
local client_registry = {}
local _M = {}
--- Add MQTT client to the Copas scheduler.
-- Each received packet will be handled by a new thread, such that the thread
-- listening on the socket can return immediately.
-- The client will automatically be removed after it exits. It will set up a
-- thread to call `Client:check_keep_alive`.
-- @param cl mqtt-client to add to the Copas scheduler
-- @return `true` on success or `false` and error message on failure
function _M.add(cl)
if client_registry[cl] then
log:warn("MQTT client '%s' was already added to Copas", cl.opts.id)
return false, "MQTT client was already added to Copas"
end
client_registry[cl] = true
do -- make mqtt device async for incoming packets
local handle_received_packet = cl.handle_received_packet
local count = 0
-- replace packet handler; create a new thread for each packet received
cl.handle_received_packet = function(mqttdevice, packet)
count = count + 1
copas.addnamedthread(handle_received_packet, cl.opts.id..":receive_"..count, mqttdevice, packet)
return true
end
end
-- add keep-alive timer
local timer = copas.addnamedthread(function()
while client_registry[cl] do
local next_check = cl:check_keep_alive()
if next_check > 0 then
copas.pause(next_check)
end
end
end, cl.opts.id .. ":keep_alive")
-- add client to connect and listen
copas.addnamedthread(function()
while client_registry[cl] do
local timeout = cl:step()
if not timeout then
client_registry[cl] = nil -- exiting
log:debug("MQTT client '%s' exited, removed from Copas", cl.opts.id)
copas.wakeup(timer)
else
if timeout > 0 then
copas.pause(timeout)
end
end
end
end, cl.opts.id .. ":listener")
return true
end
return setmetatable(_M, {
__call = function(self, ...)
return self.add(...)
end,
})

View File

@@ -0,0 +1,30 @@
--- Module returns a single function to detect the io-loop in use.
-- Either 'copas', 'nginx', or 'ioloop', or nil+error
local log = require "mqtt.log"
local loop
return function()
if loop then return loop end
if type(ngx) == "table" then
-- there is a global 'ngx' table, so we're running OpenResty
log:info("LuaMQTT auto-detected Nginx as the runtime environment")
loop = "nginx"
return loop
elseif package.loaded.copas then
-- 'copas' was already loaded
log:info("LuaMQTT auto-detected Copas as the io-loop in use")
loop = "copas"
return loop
elseif pcall(require, "socket") and tostring(require("socket")._VERSION):find("LuaSocket") then
-- LuaSocket is available
log:info("LuaMQTT auto-detected LuaSocket as the socket library to use with mqtt-ioloop")
loop = "ioloop"
return loop
else
-- unknown
return nil, "LuaMQTT io-loop/connector auto-detection failed, please specify one explicitly"
end
end

View File

@@ -0,0 +1,37 @@
--- Auto detect the IO loop to use.
-- Interacting with the supported IO loops (ioloop, copas, and nginx) requires
-- specific implementations to get it right.
-- This module will auto-detect the environment and return the proper
-- module from;
--
-- * `mqtt.loop.ioloop`
--
-- * `mqtt.loop.copas`
--
-- * `mqtt.loop.nginx`
--
-- Since the selection is based on a.o. packages loaded, make sure that in case
-- of using the `copas` scheduler, you require it before the `mqtt` modules.
--
-- @usage
-- --local copas = require "copas" -- only if you use Copas
-- local mqtt = require "mqtt"
-- local add_client = require("mqtt.loop").add -- returns a loop-specific function
--
-- local client = mqtt.create { ... options ... }
-- add_client(client) -- works for ioloop, copas, and nginx
--
-- @module mqtt.loop
local loops = setmetatable({
copas = "mqtt.loop.copas",
nginx = "mqtt.loop.nginx",
ioloop = "mqtt.loop.ioloop"
}, {
__index = function()
error("failed to auto-detect connector to use, please set one explicitly", 2)
end
})
local loop = require("mqtt.loop.detect")()
return require(loops[loop])

View File

@@ -0,0 +1,24 @@
--- IOloop specific client handling module.
-- Typically this module is not used directly, but through `mqtt.loop` when
-- auto-detecting the environment.
-- @module mqtt.loop.ioloop
local _M = {}
local mqtt = require "mqtt"
--- Add MQTT client to the integrated ioloop.
-- The client will automatically be removed after it exits. It will set up a
-- function to call `Client:check_keep_alive` in the ioloop.
-- @param client mqtt-client to add to the ioloop
-- @return `true` on success or `false` and error message on failure
function _M.add(client)
local default_loop = mqtt.get_ioloop()
return default_loop:add(client)
end
return setmetatable(_M, {
__call = function(self, ...)
return self.add(...)
end,
})

View File

@@ -0,0 +1,76 @@
--- Nginx specific client handling module.
-- Typically this module is not used directly, but through `mqtt.loop` when
-- auto-detecting the environment.
-- @module mqtt.loop.nginx
local client_registry = {}
local _M = {}
--- Add MQTT client to the Nginx environment.
-- The client will automatically be removed after it exits. It will set up a
-- thread to call `Client:check_keep_alive`.
-- @param client mqtt-client to add to the Nginx environment
-- @return `true` on success or `false` and error message on failure
function _M.add(client)
if client_registry[client] then
ngx.log(ngx.WARN, "MQTT client '%s' was already added to Nginx", client.opts.id)
return false, "MQTT client was already added to Nginx"
end
do -- make mqtt device async for incoming packets
local handle_received_packet = client.handle_received_packet
-- replace packet handler; create a new thread for each packet received
client.handle_received_packet = function(mqttdevice, packet)
ngx.thread.spawn(handle_received_packet, mqttdevice, packet)
return true
end
end
local ok, err = ngx.timer.at(0, function()
-- spawn a thread to listen on the socket
local coro = ngx.thread.spawn(function()
while true do
local sleeptime = client:step()
if not sleeptime then
ngx.log(ngx.INFO, "MQTT client '", client.opts.id, "' exited, stopping client-thread")
client_registry[client] = nil
return
else
if sleeptime > 0 then
ngx.sleep(sleeptime * 1000)
end
end
end
end)
-- endless keep-alive loop
while not ngx.worker.exiting() do
ngx.sleep((client:check_keep_alive())) -- double (()) to trim to 1 argument
end
-- exiting
client_registry[client] = nil
ngx.log(ngx.DEBUG, "MQTT client '", client.opts.id, "' keep-alive loop exited")
client:disconnect()
ngx.thread.wait(coro)
ngx.log(ngx.DEBUG, "MQTT client '", client.opts.id, "' exit complete")
end)
if not ok then
ngx.log(ngx.CRIT, "Failed to start timer-context for device '", client.id,"': ", err)
return false, "timer failed: " .. err
end
return true
end
return setmetatable(_M, {
__call = function(self, ...)
return self.add(...)
end,
})