122 lines
3.5 KiB
Lua
122 lines
3.5 KiB
Lua
--- Copas based connector.
|
|
--
|
|
-- Copas is an advanced coroutine scheduler in pure-Lua. It uses LuaSocket
|
|
-- under the hood, but in a non-blocking way. It also uses LuaSec for TLS
|
|
-- based connections (like the `mqtt.connector.luasocket` one). And hence uses
|
|
-- the same defaults for the `secure` option when creating the `client`.
|
|
--
|
|
-- Caveats:
|
|
--
|
|
-- * the `client` option `ssl_module` is not supported by the Copas connector,
|
|
-- It will always use the module named `ssl`.
|
|
--
|
|
-- * multiple threads can send simultaneously (sending is wrapped in a lock)
|
|
--
|
|
-- * since the client creates a long lived connection for reading, it returns
|
|
-- upon receiving a packet, to call an event handler. The handler must return
|
|
-- swiftly, since while the handler runs the socket will not be reading.
|
|
-- Any task that might take longer than a few milliseconds should be off
|
|
-- loaded to another thread (the Copas-loop will take care of this).
|
|
--
|
|
-- NOTE: you will need to install copas like this: `luarocks install copas`.
|
|
-- @module mqtt.connector.copas
|
|
|
|
local super = require "mqtt.connector.base.non_buffered_base"
|
|
local connector = setmetatable({}, super)
|
|
connector.__index = connector
|
|
connector.super = super
|
|
|
|
local socket = require("socket")
|
|
local copas = require("copas")
|
|
local new_lock = require("copas.lock").new
|
|
local validate_luasec = require("mqtt.connector.base.luasec")
|
|
|
|
|
|
-- validate connection options
|
|
function connector:validate()
|
|
if self.secure then
|
|
assert(self.ssl_module == "ssl" or self.ssl_module == nil, "Copas connector only supports 'ssl' as 'ssl_module'")
|
|
|
|
validate_luasec(self)
|
|
end
|
|
end
|
|
|
|
-- Open network connection to .host and .port in conn table
|
|
-- Store opened socket to conn table
|
|
-- Returns true on success, or false and error text on failure
|
|
function connector:connect()
|
|
self:validate()
|
|
local sock = copas.wrap(socket.tcp(), self.secure_params)
|
|
copas.setsocketname("mqtt@"..self.host..":"..self.port, sock)
|
|
|
|
sock:settimeouts(self.timeout, self.timeout, -1) -- no timout on reading
|
|
|
|
local ok, err = sock:connect(self.host, self.port)
|
|
if not ok then
|
|
return false, "copas.connect failed: "..err
|
|
end
|
|
self.sock = sock
|
|
self.send_lock = new_lock(30) -- 30 second timeout
|
|
return true
|
|
end
|
|
|
|
-- the packet was fully read, we can clear the bufer.
|
|
function connector:buffer_clear()
|
|
-- since the packet is complete, we wait now indefinitely for the next one
|
|
self.sock:settimeouts(nil, nil, -1) -- no timeout on reading
|
|
end
|
|
|
|
-- Shutdown network connection
|
|
function connector:shutdown()
|
|
self.sock:close()
|
|
self.send_lock:destroy()
|
|
end
|
|
|
|
-- Send data to network connection
|
|
function connector:send(data)
|
|
-- cache locally in case lock/sock gets replaced while we were sending
|
|
local sock = self.sock
|
|
local lock = self.send_lock
|
|
|
|
local ok, err = lock:get()
|
|
if not ok then
|
|
return nil, "failed acquiring send_lock: "..tostring(err)
|
|
end
|
|
|
|
local i = 1
|
|
while i < #data do
|
|
i, err = sock:send(data, i)
|
|
if not i then
|
|
lock:release()
|
|
return false, err
|
|
end
|
|
end
|
|
lock:release()
|
|
return true
|
|
end
|
|
|
|
-- Receive given amount of data from network connection
|
|
function connector:receive(size)
|
|
local sock = self.sock
|
|
local data, err = sock:receive(size)
|
|
if data then
|
|
-- bytes received, so change from idefinite timeout to regular until
|
|
-- packet is complete (see buffer_clear method)
|
|
self.sock:settimeouts(nil, nil, self.timeout)
|
|
return data
|
|
end
|
|
|
|
if err == "closed" then
|
|
return false, self.signal_closed
|
|
elseif err == "timout" then
|
|
return false, self.signal_idle
|
|
else
|
|
return false, err
|
|
end
|
|
end
|
|
|
|
-- export module table
|
|
return connector
|
|
|
|
-- vim: ts=4 sts=4 sw=4 noet ft=lua
|