103 lines
3.4 KiB
Lua
103 lines
3.4 KiB
Lua
--- Nginx OpenResty co-sockets based connector.
|
|
--
|
|
-- This connector works with the non-blocking openresty sockets. Note that the
|
|
-- secure setting haven't been implemented yet. It will simply use defaults
|
|
-- when doing a TLS handshake.
|
|
--
|
|
-- Caveats:
|
|
--
|
|
-- * sockets cannot cross phase/context boundaries. So all client interaction
|
|
-- must be done from the timer context in which the client threads run.
|
|
--
|
|
-- * multiple threads cannot send simultaneously (simple scenarios will just
|
|
-- work)
|
|
--
|
|
-- * since the client creates a long lived connection for reading, it returns
|
|
-- upon receiving a packet, to call an event handler. The handler must return
|
|
-- swiftly, since while the handler runs the socket will not be reading.
|
|
-- Any task that might take longer than a few milliseconds should be off
|
|
-- loaded to another thread.
|
|
--
|
|
-- * Nginx timers should be short lived because memory is only released after
|
|
-- the context is destroyed. In this case we're using the fro prolonged periods
|
|
-- of time, so be aware of this and implement client restarts if required.
|
|
--
|
|
-- thanks to @irimiab: https://github.com/xHasKx/luamqtt/issues/13
|
|
-- @module mqtt.connector.nginx
|
|
|
|
local super = require "mqtt.connector.base.non_buffered_base"
|
|
local ngxsocket = setmetatable({}, super)
|
|
ngxsocket.__index = ngxsocket
|
|
ngxsocket.super = super
|
|
|
|
-- load required stuff
|
|
local ngx_socket_tcp = ngx.socket.tcp
|
|
local long_timeout = 7*24*60*60*1000 -- one week
|
|
|
|
-- validate connection options
|
|
function ngxsocket:validate()
|
|
if self.secure then
|
|
assert(self.ssl_module == "ssl", "specifying custom ssl module when using Nginx connector is not supported")
|
|
assert(self.secure_params == nil or type(self.secure_params) == "table", "expecting .secure_params to be a table if given")
|
|
-- TODO: validate nginx stuff
|
|
end
|
|
end
|
|
|
|
-- Open network connection to .host and .port in conn table
|
|
-- Store opened socket to conn table
|
|
-- Returns true on success, or false and error text on failure
|
|
function ngxsocket:connect()
|
|
-- TODO: add a lock for sending to prevent multiple threads from writing to
|
|
-- the same socket simultaneously (see the Copas connector)
|
|
local sock = ngx_socket_tcp()
|
|
-- set read-timeout to 'nil' to not timeout at all
|
|
sock:settimeouts(self.timeout * 1000, self.timeout * 1000, long_timeout) -- no timeout on reading
|
|
local ok, err = sock:connect(self.host, self.port)
|
|
if not ok then
|
|
return false, "socket:connect failed: "..err
|
|
end
|
|
if self.secure then
|
|
sock:sslhandshake()
|
|
end
|
|
self.sock = sock
|
|
return true
|
|
end
|
|
|
|
-- Shutdown network connection
|
|
function ngxsocket:shutdown()
|
|
self.sock:close()
|
|
end
|
|
|
|
-- Send data to network connection
|
|
function ngxsocket:send(data)
|
|
return self.sock:send(data)
|
|
end
|
|
|
|
function ngxsocket:buffer_clear()
|
|
-- since the packet is complete, we wait now indefinitely for the next one
|
|
self.sock:settimeouts(self.timeout * 1000, self.timeout * 1000, long_timeout) -- no timeout on reading
|
|
end
|
|
|
|
-- Receive given amount of data from network connection
|
|
function ngxsocket:receive(size)
|
|
local sock = self.sock
|
|
local data, err = sock:receive(size)
|
|
if data then
|
|
-- bytes received, so change from idefinite timeout to regular until
|
|
-- packet is complete (see buffer_clear method)
|
|
self.sock:settimeouts(self.timeout * 1000, self.timeout * 1000, self.timeout * 1000)
|
|
return data
|
|
end
|
|
|
|
if err == "closed" then
|
|
return false, self.signal_closed
|
|
elseif err == "timout" then
|
|
return false, self.signal_idle
|
|
else
|
|
return false, err
|
|
end
|
|
end
|
|
|
|
-- export module table
|
|
return ngxsocket
|