diff --git a/controller-host/mymqtt.lua b/controller-host/mymqtt.lua new file mode 100644 index 0000000..32420b3 --- /dev/null +++ b/controller-host/mymqtt.lua @@ -0,0 +1,266 @@ +local mqtt = {} +local socket = require("socket") + +local clientMt = {} + +local function bytesUsedByVarIntForValue(value) + if value <= 128-1 then + return 1, nil + elseif value <= 128*128-1 then + return 2, nil + elseif value <= 128*128*128-1 then + return 3, nil + elseif value <= 128*128*128*128-1 then + return 4, nil + else + return nil, "invalid byte length" + end +end + +local function bytesUsedByString(string) + return 2 + #string +end + +function clientMt:receiveByte() + +end + +function clientMt:flush() + local start = 1 + while start <= #self.buffer do + print("flushing data") + local _, err, sent = self.connection:send(self.buffer, start) + if err then + print("Error: " .. err) + return nil, err + else + self.buffer = "" + return true, err + end + end + return true, nil +end + +function clientMt:sendByte(byte) + self.buffer = self.buffer .. string.char(byte) + if #self.buffer > 1024 then + return self:flush() + end +end + +function clientMt:sendData(data) + local result, err = self:flush() + if err then return result, err end + + self.buffer = data + + self:flush() + local result, err = self:flush() + if err then return result, err end +end + +function clientMt:sendVarInt(value) + repeat + local encoded = value & 0x7F + value = value >> 7 + if value > 0 then + encoded = encoded | 128 + end + local _, err = self:sendByte(encoded) + if err then + return nil, err + end + until value <= 0 + return true, nil +end + +function clientMt:sendShort(value) + local _, err = self:sendByte((value >> 8) & 0xFF) + if err then return nil, err end + local _, err = self:sendByte(value & 0xFF) + if err then return nil, err end +end + +function clientMt:sendString(text) + local _, err = self:sendShort(#text) + if err then return nil, err end + local _, err = self:sendData(text) + if err then return nil, err end +end + +function clientMt:handleError(result, err) + if err then + print("Got error") + if self.connection then + self.connection:close() + end + self.connection = nil + else + assert(result, "Missing result") + end + return result, err +end + +function clientMt:sendPacket() + return self:handleError(self:flush()) +end + +function clientMt:connect() + if self.connection then + return true, nil + end + + local conn, err = socket.connect(self.uri, 1883) + if not conn then + return nil, "failed to connect: " .. err + end + conn:setoption("tcp-nodelay", true) + conn:setoption("linger", {on = true, timeout = 100}) + conn:settimeout(nil) + self.connection = conn + + local _, err = self:sendByte(0x10) + if err then return nil, self:handleError(err) end + + local length = 0 + + local protocolName = "MQTT" + local protocolNameLength = bytesUsedByString(protocolName) + length = length + protocolNameLength + + local protocolVersion = 4 + local connectFlag = 0x02 -- 1 byte + length = length + 2 + + local keepAlive = 0 -- 2 bytes + length = length + 2 + + local clientIdLength = bytesUsedByString(self.id) + length = length + clientIdLength + + _, err = self:sendVarInt(length) + if err then return nil, self:handleError(err) end + + _, err = self:sendString(protocolName) + if err then return nil, self:handleError(err) end + + _, err = self:sendByte(protocolVersion) + if err then return nil, self:handleError(err) end + + _, err = self:sendByte(connectFlag) + if err then return nil, self:handleError(err) end + + _, err = self:sendShort(keepAlive) + if err then return nil, self:handleError(err) end + + _, err = self:sendString(self.id) + if err then return nil, self:handleError(err) end + + return self:sendPacket() +end + +function clientMt:disconnect(args) + if not self.connection then + return true, nil + end + + local _, err = self:sendByte(0xE0) + if err then return nil, self:handleError(err) end + + local _, err = self:sendByte(0x00) + if err then return nil, self:handleError(err) end + + local result, err = self:sendPacket() + self.connection:shutdown("both") + local peer + repeat + peer = self.connection:getpeername() + if peer then socket.sleep(0.02) end + until peer + self.connection:close() + self.connection = nil + return result, err +end + +function clientMt:publish(args) + local topic = args.topic + local payload = args.payload + + local _, err = self:connect() + if err then return nil, self:handleError(err) end + + local retain = args.retain and 0x01 or 0x00 + local _, err = self:sendByte(0x30 | retain) + if err then return nil, self:handleError(err) end + + local topicLength = bytesUsedByString(topic) + local payloadLength = #payload + + _, err = self:sendVarInt(topicLength + payloadLength) + if err then return nil, self:handleError(err) end + + _, err = self:sendString(topic) + if err then return nil, self:handleError(err) end + + _, err = self:sendData(payload) + if err then return nil, self:handleError(err) end + + return self:sendPacket() +end + +function clientMt:subscribe(args) + local topic = args.topic + + local _, err = self:connect() + if err then return nil, self:handleError(err) end + + local _, err = self:sendByte(0x82) + if err then return nil, self:handleError(err) end + + local packetIdentifier = self.packetIdentifier + self.packetIdentifier = self.packetIdentifier + 1 + if self.packetIdentifier > 0xFF00 then + self.packetIdentifier = 1 + end + + local topicFilter = 0 + local topicLength = bytesUsedByString(topic) + + local length = 2 + topicLength + 1 + _, err = self:sendVarInt(length) + if err then return nil, self:handleError(err) end + + _, err = self:sendShort(packetIdentifier) + if err then return nil, self:handleError(err) end + + _, err = self:sendString(topic) + if err then return nil, self:handleError(err) end + + _, err = self:sendByte(topicFilter) + if err then return nil, self:handleError(err) end + + return self:sendPacket() +end + +function clientMt:receivePacket() + local firstByte = self:receiveByte() +end + +function clientMt:on(eventHandlers) + self.eventHandlers = eventHandlers +end + +function mqtt.client(args) + local client = { + uri = args.uri, + id = args.id, + reconnect = 5, + connection = nil, + packetIdentifier = 1, + buffer = "", + } + setmetatable(client, {__index = clientMt}) + return client +end + +return mqtt diff --git a/get-image.lua b/get-image.lua index 9cce0fc..2a7244e 100644 --- a/get-image.lua +++ b/get-image.lua @@ -1,5 +1,5 @@ package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua' -local mqtt = require("mqtt") +local mqtt = require("mymqtt") local client function printTable(table, indentation) @@ -35,8 +35,7 @@ end client = mqtt.client { uri = "mqtt.seeseepuff.be", - username = "tool-get-image", - id = true, + id = "tool-get-image", reconnect = 5, version = mqtt.v311, } @@ -49,4 +48,10 @@ client:on { end, } -mqtt.run_ioloop(client) +client:subscribe { + topic = 'spider/controller/#' +} + +client:disconnect() + +--mqtt.run_ioloop(client) diff --git a/test-image.lua b/test-image.lua index 81c262a..374cd5a 100644 --- a/test-image.lua +++ b/test-image.lua @@ -1,7 +1,7 @@ package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua' -local mqtt = require("mqtt") -local socket = require("socket") -local client +local mqtt = require("mymqtt") +--local socket = require("socket") +--local client local fh = io.open("spider-image.bin", "rb") local contents = fh:read("a") @@ -12,15 +12,6 @@ local function onConnect(connack) print("Connection to broker failed:", connack:reason_string()) os.exit(1) end - - while true do - assert(client:publish { - topic = "spider/telemetry/camfeed", - payload = contents, - qos = 0 - }) - socket.sleep(1) - end print("Connected and subscribed") end @@ -41,4 +32,18 @@ client:on { end, } -mqtt.run_ioloop(client) +--mqtt.run_ioloop(client) + + +--while true do +assert(client:publish { + topic = "spider/telemetry/camfeed", + payload = "a" .. contents, + qos = 0 + }) +--end + +print("Calling disconnect") +require("socket").sleep(0.002) +client:disconnect() +