103 lines
3.4 KiB
Lua

--- Nginx OpenResty co-sockets based connector.
--
-- This connector works with the non-blocking openresty sockets. Note that the
-- secure setting haven't been implemented yet. It will simply use defaults
-- when doing a TLS handshake.
--
-- Caveats:
--
-- * sockets cannot cross phase/context boundaries. So all client interaction
-- must be done from the timer context in which the client threads run.
--
-- * multiple threads cannot send simultaneously (simple scenarios will just
-- work)
--
-- * since the client creates a long lived connection for reading, it returns
-- upon receiving a packet, to call an event handler. The handler must return
-- swiftly, since while the handler runs the socket will not be reading.
-- Any task that might take longer than a few milliseconds should be off
-- loaded to another thread.
--
-- * Nginx timers should be short lived because memory is only released after
-- the context is destroyed. In this case we're using the fro prolonged periods
-- of time, so be aware of this and implement client restarts if required.
--
-- thanks to @irimiab: https://github.com/xHasKx/luamqtt/issues/13
-- @module mqtt.connector.nginx
local super = require "mqtt.connector.base.non_buffered_base"
local ngxsocket = setmetatable({}, super)
ngxsocket.__index = ngxsocket
ngxsocket.super = super
-- load required stuff
local ngx_socket_tcp = ngx.socket.tcp
local long_timeout = 7*24*60*60*1000 -- one week
-- validate connection options
function ngxsocket:validate()
if self.secure then
assert(self.ssl_module == "ssl", "specifying custom ssl module when using Nginx connector is not supported")
assert(self.secure_params == nil or type(self.secure_params) == "table", "expecting .secure_params to be a table if given")
-- TODO: validate nginx stuff
end
end
-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function ngxsocket:connect()
-- TODO: add a lock for sending to prevent multiple threads from writing to
-- the same socket simultaneously (see the Copas connector)
local sock = ngx_socket_tcp()
-- set read-timeout to 'nil' to not timeout at all
sock:settimeouts(self.timeout * 1000, self.timeout * 1000, long_timeout) -- no timeout on reading
local ok, err = sock:connect(self.host, self.port)
if not ok then
return false, "socket:connect failed: "..err
end
if self.secure then
sock:sslhandshake()
end
self.sock = sock
return true
end
-- Shutdown network connection
function ngxsocket:shutdown()
self.sock:close()
end
-- Send data to network connection
function ngxsocket:send(data)
return self.sock:send(data)
end
function ngxsocket:buffer_clear()
-- since the packet is complete, we wait now indefinitely for the next one
self.sock:settimeouts(self.timeout * 1000, self.timeout * 1000, long_timeout) -- no timeout on reading
end
-- Receive given amount of data from network connection
function ngxsocket:receive(size)
local sock = self.sock
local data, err = sock:receive(size)
if data then
-- bytes received, so change from idefinite timeout to regular until
-- packet is complete (see buffer_clear method)
self.sock:settimeouts(self.timeout * 1000, self.timeout * 1000, self.timeout * 1000)
return data
end
if err == "closed" then
return false, self.signal_closed
elseif err == "timout" then
return false, self.signal_idle
else
return false, err
end
end
-- export module table
return ngxsocket