Working on custom mqtt library
This commit is contained in:
parent
8a711fd661
commit
f639cbf4db
266
controller-host/mymqtt.lua
Normal file
266
controller-host/mymqtt.lua
Normal file
@ -0,0 +1,266 @@
|
||||
local mqtt = {}
|
||||
local socket = require("socket")
|
||||
|
||||
local clientMt = {}
|
||||
|
||||
local function bytesUsedByVarIntForValue(value)
|
||||
if value <= 128-1 then
|
||||
return 1, nil
|
||||
elseif value <= 128*128-1 then
|
||||
return 2, nil
|
||||
elseif value <= 128*128*128-1 then
|
||||
return 3, nil
|
||||
elseif value <= 128*128*128*128-1 then
|
||||
return 4, nil
|
||||
else
|
||||
return nil, "invalid byte length"
|
||||
end
|
||||
end
|
||||
|
||||
local function bytesUsedByString(string)
|
||||
return 2 + #string
|
||||
end
|
||||
|
||||
function clientMt:receiveByte()
|
||||
|
||||
end
|
||||
|
||||
function clientMt:flush()
|
||||
local start = 1
|
||||
while start <= #self.buffer do
|
||||
print("flushing data")
|
||||
local _, err, sent = self.connection:send(self.buffer, start)
|
||||
if err then
|
||||
print("Error: " .. err)
|
||||
return nil, err
|
||||
else
|
||||
self.buffer = ""
|
||||
return true, err
|
||||
end
|
||||
end
|
||||
return true, nil
|
||||
end
|
||||
|
||||
function clientMt:sendByte(byte)
|
||||
self.buffer = self.buffer .. string.char(byte)
|
||||
if #self.buffer > 1024 then
|
||||
return self:flush()
|
||||
end
|
||||
end
|
||||
|
||||
function clientMt:sendData(data)
|
||||
local result, err = self:flush()
|
||||
if err then return result, err end
|
||||
|
||||
self.buffer = data
|
||||
|
||||
self:flush()
|
||||
local result, err = self:flush()
|
||||
if err then return result, err end
|
||||
end
|
||||
|
||||
function clientMt:sendVarInt(value)
|
||||
repeat
|
||||
local encoded = value & 0x7F
|
||||
value = value >> 7
|
||||
if value > 0 then
|
||||
encoded = encoded | 128
|
||||
end
|
||||
local _, err = self:sendByte(encoded)
|
||||
if err then
|
||||
return nil, err
|
||||
end
|
||||
until value <= 0
|
||||
return true, nil
|
||||
end
|
||||
|
||||
function clientMt:sendShort(value)
|
||||
local _, err = self:sendByte((value >> 8) & 0xFF)
|
||||
if err then return nil, err end
|
||||
local _, err = self:sendByte(value & 0xFF)
|
||||
if err then return nil, err end
|
||||
end
|
||||
|
||||
function clientMt:sendString(text)
|
||||
local _, err = self:sendShort(#text)
|
||||
if err then return nil, err end
|
||||
local _, err = self:sendData(text)
|
||||
if err then return nil, err end
|
||||
end
|
||||
|
||||
function clientMt:handleError(result, err)
|
||||
if err then
|
||||
print("Got error")
|
||||
if self.connection then
|
||||
self.connection:close()
|
||||
end
|
||||
self.connection = nil
|
||||
else
|
||||
assert(result, "Missing result")
|
||||
end
|
||||
return result, err
|
||||
end
|
||||
|
||||
function clientMt:sendPacket()
|
||||
return self:handleError(self:flush())
|
||||
end
|
||||
|
||||
function clientMt:connect()
|
||||
if self.connection then
|
||||
return true, nil
|
||||
end
|
||||
|
||||
local conn, err = socket.connect(self.uri, 1883)
|
||||
if not conn then
|
||||
return nil, "failed to connect: " .. err
|
||||
end
|
||||
conn:setoption("tcp-nodelay", true)
|
||||
conn:setoption("linger", {on = true, timeout = 100})
|
||||
conn:settimeout(nil)
|
||||
self.connection = conn
|
||||
|
||||
local _, err = self:sendByte(0x10)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
local length = 0
|
||||
|
||||
local protocolName = "MQTT"
|
||||
local protocolNameLength = bytesUsedByString(protocolName)
|
||||
length = length + protocolNameLength
|
||||
|
||||
local protocolVersion = 4
|
||||
local connectFlag = 0x02 -- 1 byte
|
||||
length = length + 2
|
||||
|
||||
local keepAlive = 0 -- 2 bytes
|
||||
length = length + 2
|
||||
|
||||
local clientIdLength = bytesUsedByString(self.id)
|
||||
length = length + clientIdLength
|
||||
|
||||
_, err = self:sendVarInt(length)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendString(protocolName)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendByte(protocolVersion)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendByte(connectFlag)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendShort(keepAlive)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendString(self.id)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
return self:sendPacket()
|
||||
end
|
||||
|
||||
function clientMt:disconnect(args)
|
||||
if not self.connection then
|
||||
return true, nil
|
||||
end
|
||||
|
||||
local _, err = self:sendByte(0xE0)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
local _, err = self:sendByte(0x00)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
local result, err = self:sendPacket()
|
||||
self.connection:shutdown("both")
|
||||
local peer
|
||||
repeat
|
||||
peer = self.connection:getpeername()
|
||||
if peer then socket.sleep(0.02) end
|
||||
until peer
|
||||
self.connection:close()
|
||||
self.connection = nil
|
||||
return result, err
|
||||
end
|
||||
|
||||
function clientMt:publish(args)
|
||||
local topic = args.topic
|
||||
local payload = args.payload
|
||||
|
||||
local _, err = self:connect()
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
local retain = args.retain and 0x01 or 0x00
|
||||
local _, err = self:sendByte(0x30 | retain)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
local topicLength = bytesUsedByString(topic)
|
||||
local payloadLength = #payload
|
||||
|
||||
_, err = self:sendVarInt(topicLength + payloadLength)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendString(topic)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendData(payload)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
return self:sendPacket()
|
||||
end
|
||||
|
||||
function clientMt:subscribe(args)
|
||||
local topic = args.topic
|
||||
|
||||
local _, err = self:connect()
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
local _, err = self:sendByte(0x82)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
local packetIdentifier = self.packetIdentifier
|
||||
self.packetIdentifier = self.packetIdentifier + 1
|
||||
if self.packetIdentifier > 0xFF00 then
|
||||
self.packetIdentifier = 1
|
||||
end
|
||||
|
||||
local topicFilter = 0
|
||||
local topicLength = bytesUsedByString(topic)
|
||||
|
||||
local length = 2 + topicLength + 1
|
||||
_, err = self:sendVarInt(length)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendShort(packetIdentifier)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendString(topic)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
_, err = self:sendByte(topicFilter)
|
||||
if err then return nil, self:handleError(err) end
|
||||
|
||||
return self:sendPacket()
|
||||
end
|
||||
|
||||
function clientMt:receivePacket()
|
||||
local firstByte = self:receiveByte()
|
||||
end
|
||||
|
||||
function clientMt:on(eventHandlers)
|
||||
self.eventHandlers = eventHandlers
|
||||
end
|
||||
|
||||
function mqtt.client(args)
|
||||
local client = {
|
||||
uri = args.uri,
|
||||
id = args.id,
|
||||
reconnect = 5,
|
||||
connection = nil,
|
||||
packetIdentifier = 1,
|
||||
buffer = "",
|
||||
}
|
||||
setmetatable(client, {__index = clientMt})
|
||||
return client
|
||||
end
|
||||
|
||||
return mqtt
|
@ -1,5 +1,5 @@
|
||||
package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua'
|
||||
local mqtt = require("mqtt")
|
||||
local mqtt = require("mymqtt")
|
||||
local client
|
||||
|
||||
function printTable(table, indentation)
|
||||
@ -35,8 +35,7 @@ end
|
||||
|
||||
client = mqtt.client {
|
||||
uri = "mqtt.seeseepuff.be",
|
||||
username = "tool-get-image",
|
||||
id = true,
|
||||
id = "tool-get-image",
|
||||
reconnect = 5,
|
||||
version = mqtt.v311,
|
||||
}
|
||||
@ -49,4 +48,10 @@ client:on {
|
||||
end,
|
||||
}
|
||||
|
||||
mqtt.run_ioloop(client)
|
||||
client:subscribe {
|
||||
topic = 'spider/controller/#'
|
||||
}
|
||||
|
||||
client:disconnect()
|
||||
|
||||
--mqtt.run_ioloop(client)
|
||||
|
@ -1,7 +1,7 @@
|
||||
package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua'
|
||||
local mqtt = require("mqtt")
|
||||
local socket = require("socket")
|
||||
local client
|
||||
local mqtt = require("mymqtt")
|
||||
--local socket = require("socket")
|
||||
--local client
|
||||
|
||||
local fh = io.open("spider-image.bin", "rb")
|
||||
local contents = fh:read("a")
|
||||
@ -12,15 +12,6 @@ local function onConnect(connack)
|
||||
print("Connection to broker failed:", connack:reason_string())
|
||||
os.exit(1)
|
||||
end
|
||||
|
||||
while true do
|
||||
assert(client:publish {
|
||||
topic = "spider/telemetry/camfeed",
|
||||
payload = contents,
|
||||
qos = 0
|
||||
})
|
||||
socket.sleep(1)
|
||||
end
|
||||
|
||||
print("Connected and subscribed")
|
||||
end
|
||||
@ -41,4 +32,18 @@ client:on {
|
||||
end,
|
||||
}
|
||||
|
||||
mqtt.run_ioloop(client)
|
||||
--mqtt.run_ioloop(client)
|
||||
|
||||
|
||||
--while true do
|
||||
assert(client:publish {
|
||||
topic = "spider/telemetry/camfeed",
|
||||
payload = "a" .. contents,
|
||||
qos = 0
|
||||
})
|
||||
--end
|
||||
|
||||
print("Calling disconnect")
|
||||
require("socket").sleep(0.002)
|
||||
client:disconnect()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user