Got lua mqtt issue fixed it seems
This commit is contained in:
		@@ -1073,14 +1073,25 @@ function client_mt:_apply_network_timeout()
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
			-- replace connection recv_func with coroutine-based version
 | 
								-- replace connection recv_func with coroutine-based version
 | 
				
			||||||
			local sync_recv_func = conn.recv_func
 | 
								local sync_recv_func = conn.recv_func
 | 
				
			||||||
			conn.recv_func = function(...)
 | 
								conn.recv_func = function(totalSize, ...)
 | 
				
			||||||
 | 
									print("Args: ("..(1+#{...})..")", totalSize, ...)
 | 
				
			||||||
				while true do
 | 
									while true do
 | 
				
			||||||
					local data, err = sync_recv_func(...)
 | 
										local allData = ""
 | 
				
			||||||
 | 
										while true do
 | 
				
			||||||
 | 
											local size = math.min(totalSize, 16384)
 | 
				
			||||||
 | 
											local data, err = sync_recv_func(size)
 | 
				
			||||||
						if not data and (err == "timeout" or err == "wantread") then
 | 
											if not data and (err == "timeout" or err == "wantread") then
 | 
				
			||||||
							loop.timeouted = true
 | 
												loop.timeouted = true
 | 
				
			||||||
							coroutine_yield(err)
 | 
												coroutine_yield(err)
 | 
				
			||||||
 | 
											elseif data then
 | 
				
			||||||
 | 
												allData = allData .. data
 | 
				
			||||||
 | 
												totalSize = totalSize - size
 | 
				
			||||||
 | 
												if totalSize == 0 then
 | 
				
			||||||
 | 
													return allData, nil
 | 
				
			||||||
 | 
												end
 | 
				
			||||||
						else
 | 
											else
 | 
				
			||||||
						return data, err
 | 
												return nil, err
 | 
				
			||||||
 | 
											end
 | 
				
			||||||
					end
 | 
										end
 | 
				
			||||||
				end
 | 
									end
 | 
				
			||||||
			end
 | 
								end
 | 
				
			||||||
@@ -1167,6 +1178,7 @@ function client_mt:_receive_packet()
 | 
				
			|||||||
		return false, "network connection is not opened"
 | 
							return false, "network connection is not opened"
 | 
				
			||||||
	end
 | 
						end
 | 
				
			||||||
	-- parse packet
 | 
						-- parse packet
 | 
				
			||||||
 | 
						print("Calling ", conn.recv_func)
 | 
				
			||||||
	local packet, err = self._parse_packet(conn.recv_func)
 | 
						local packet, err = self._parse_packet(conn.recv_func)
 | 
				
			||||||
	if not packet then
 | 
						if not packet then
 | 
				
			||||||
		return false, err
 | 
							return false, err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -24,7 +24,14 @@ end
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
-- Send data to network connection
 | 
					-- Send data to network connection
 | 
				
			||||||
function luasocket.send(conn, data, i, j)
 | 
					function luasocket.send(conn, data, i, j)
 | 
				
			||||||
 | 
						conn.sock:settimeout(nil, "t")
 | 
				
			||||||
 | 
						print("Sending bytes ", #data, i, j)
 | 
				
			||||||
	local ok, err = conn.sock:send(data, i, j)
 | 
						local ok, err = conn.sock:send(data, i, j)
 | 
				
			||||||
 | 
						if err == "timeout" then
 | 
				
			||||||
 | 
							print("Oops, got timeout")
 | 
				
			||||||
 | 
							print(debug.traceback())
 | 
				
			||||||
 | 
						end
 | 
				
			||||||
 | 
						conn.sock:settimeout(conn.timeout, "t")
 | 
				
			||||||
	-- print("    luasocket.send:", ok, err, require("mqtt.tools").hex(data))
 | 
						-- print("    luasocket.send:", ok, err, require("mqtt.tools").hex(data))
 | 
				
			||||||
	return ok, err
 | 
						return ok, err
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
@@ -32,16 +39,25 @@ end
 | 
				
			|||||||
-- Receive given amount of data from network connection
 | 
					-- Receive given amount of data from network connection
 | 
				
			||||||
function luasocket.receive(conn, size)
 | 
					function luasocket.receive(conn, size)
 | 
				
			||||||
	local ok, err = conn.sock:receive(size)
 | 
						local ok, err = conn.sock:receive(size)
 | 
				
			||||||
	-- if ok then
 | 
						 if ok then
 | 
				
			||||||
	-- 	print("    luasocket.receive:", size, require("mqtt.tools").hex(ok))
 | 
							if #ok ~= size then
 | 
				
			||||||
	-- elseif err ~= "timeout" then
 | 
								assert(false, "bad size")
 | 
				
			||||||
	-- 	print("    luasocket.receive:", ok, err)
 | 
							end
 | 
				
			||||||
	-- end
 | 
							if #ok > 100 then
 | 
				
			||||||
 | 
						 		print("    luasocket.receive good:", size, #ok, "(long)")
 | 
				
			||||||
 | 
							else
 | 
				
			||||||
 | 
								--print(debug.traceback())
 | 
				
			||||||
 | 
								print("    luasocket.receive good:", size, #ok, require("mqtt.tools").hex(ok))
 | 
				
			||||||
 | 
							end
 | 
				
			||||||
 | 
						 elseif err ~= "timeout" then
 | 
				
			||||||
 | 
						 	print("    luasocket.receive fail:", ok, err)
 | 
				
			||||||
 | 
						 end
 | 
				
			||||||
	return ok, err
 | 
						return ok, err
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- Set connection's socket to non-blocking mode and set a timeout for it
 | 
					-- Set connection's socket to non-blocking mode and set a timeout for it
 | 
				
			||||||
function luasocket.settimeout(conn, timeout)
 | 
					function luasocket.settimeout(conn, timeout)
 | 
				
			||||||
 | 
						print("Setting timeout to " .. timeout)
 | 
				
			||||||
	conn.timeout = timeout
 | 
						conn.timeout = timeout
 | 
				
			||||||
	conn.sock:settimeout(timeout, "t")
 | 
						conn.sock:settimeout(timeout, "t")
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -490,12 +490,16 @@ function protocol.start_parse_packet(read_func)
 | 
				
			|||||||
	len, err = parse_var_length(read_func)
 | 
						len, err = parse_var_length(read_func)
 | 
				
			||||||
	if not len then
 | 
						if not len then
 | 
				
			||||||
		return false, "failed to parse remaining length: "..err
 | 
							return false, "failed to parse remaining length: "..err
 | 
				
			||||||
 | 
						else
 | 
				
			||||||
 | 
							print("Variable length is: " .. len)
 | 
				
			||||||
	end
 | 
						end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	-- create packet parser instance (aka input)
 | 
						-- create packet parser instance (aka input)
 | 
				
			||||||
	local input = {1, available = 0} -- input data offset and available size
 | 
						local input = {1, available = 0} -- input data offset and available size
 | 
				
			||||||
	if len > 0 then
 | 
						if len > 0 then
 | 
				
			||||||
 | 
							print("Reading payload body")
 | 
				
			||||||
		data, err = read_func(len)
 | 
							data, err = read_func(len)
 | 
				
			||||||
 | 
							print("Reading payload body done")
 | 
				
			||||||
	else
 | 
						else
 | 
				
			||||||
		data = ""
 | 
							data = ""
 | 
				
			||||||
	end
 | 
						end
 | 
				
			||||||
@@ -516,6 +520,7 @@ function protocol.start_parse_packet(read_func)
 | 
				
			|||||||
		return res
 | 
							return res
 | 
				
			||||||
	end
 | 
						end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						print("Available data is: " .. input.available)
 | 
				
			||||||
	return ptype, flags, input
 | 
						return ptype, flags, input
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										51
									
								
								get-image.lua
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										51
									
								
								get-image.lua
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,51 @@
 | 
				
			|||||||
 | 
					package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua'
 | 
				
			||||||
 | 
					local mqtt = require("mqtt")
 | 
				
			||||||
 | 
					local client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					function printTable(table, indentation)
 | 
				
			||||||
 | 
						indentation = indentation or ""
 | 
				
			||||||
 | 
						for name, value in pairs(table) do
 | 
				
			||||||
 | 
							print(indentation .. tostring(name) .. ": " .. tostring(value))
 | 
				
			||||||
 | 
						end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					local count = 0
 | 
				
			||||||
 | 
					local function onMessage(data)
 | 
				
			||||||
 | 
						--print("Got payload on topic " .. data.topic)
 | 
				
			||||||
 | 
						local fh = io.open("spider-image.bin", "wb")
 | 
				
			||||||
 | 
						fh:write(data.payload)
 | 
				
			||||||
 | 
						fh:close()
 | 
				
			||||||
 | 
						count = count + 1
 | 
				
			||||||
 | 
						print("Wrote image " .. count .. " of length " .. #data.payload)
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					local function onConnect(connack)
 | 
				
			||||||
 | 
						if connack.rc ~= 0 then
 | 
				
			||||||
 | 
							print("Connection to broker failed:", connack:reason_string())
 | 
				
			||||||
 | 
							os.exit(1)
 | 
				
			||||||
 | 
						end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						assert(client:subscribe{
 | 
				
			||||||
 | 
							topic = "spider/telemetry/#"
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						print("Connected and subscribed")
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					client = mqtt.client {
 | 
				
			||||||
 | 
						uri = "mqtt.seeseepuff.be",
 | 
				
			||||||
 | 
						username = "tools",
 | 
				
			||||||
 | 
						clean = true,
 | 
				
			||||||
 | 
						reconnect = 5,
 | 
				
			||||||
 | 
						version = mqtt.v311,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					client:on {
 | 
				
			||||||
 | 
						connect = onConnect,
 | 
				
			||||||
 | 
						message = onMessage,
 | 
				
			||||||
 | 
						error = function(err)
 | 
				
			||||||
 | 
							print("MQTT client error:", err)
 | 
				
			||||||
 | 
						end,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					mqtt.run_ioloop(client)
 | 
				
			||||||
@@ -37,7 +37,7 @@ def process_request(request):
 | 
				
			|||||||
				'/'.join([str(p.bytes_used) for p in metadata.planes]))
 | 
									'/'.join([str(p.bytes_used) for p in metadata.planes]))
 | 
				
			||||||
			with mfb.MappedFrameBuffer(buffer) as mappedBuffer:
 | 
								with mfb.MappedFrameBuffer(buffer) as mappedBuffer:
 | 
				
			||||||
				for plane in mappedBuffer.planes:
 | 
									for plane in mappedBuffer.planes:
 | 
				
			||||||
					mqttc.publish("spider/telemetry/camfeed", plane.tobytes())
 | 
										mqttc.publish("spider/telemetry/camfeed", plane[0:256000].tobytes())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	request.reuse()
 | 
						request.reuse()
 | 
				
			||||||
	camera.queue_request(request)
 | 
						camera.queue_request(request)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								spider-image.bin
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								spider-image.bin
									
									
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							
							
								
								
									
										45
									
								
								test-image.lua
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								test-image.lua
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,45 @@
 | 
				
			|||||||
 | 
					package.path = package.path .. ';./controller-host/?/init.lua;./controller-host/?.lua'
 | 
				
			||||||
 | 
					local mqtt = require("mqtt")
 | 
				
			||||||
 | 
					local client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					local function onConnect(connack)
 | 
				
			||||||
 | 
						if connack.rc ~= 0 then
 | 
				
			||||||
 | 
							print("Connection to broker failed:", connack:reason_string())
 | 
				
			||||||
 | 
							os.exit(1)
 | 
				
			||||||
 | 
						end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						--local length = 1000000
 | 
				
			||||||
 | 
						-- 500000
 | 
				
			||||||
 | 
						local length = 1000000
 | 
				
			||||||
 | 
						-- 100000
 | 
				
			||||||
 | 
						--while true do
 | 
				
			||||||
 | 
						local payload = string.rep("b", length)
 | 
				
			||||||
 | 
						--length = length + 100000
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						assert(client:publish {
 | 
				
			||||||
 | 
							topic = "spider/telemetry/camfeed",
 | 
				
			||||||
 | 
							payload = payload,
 | 
				
			||||||
 | 
							qos = 0
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						--end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						print("Connected and subscribed")
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					client = mqtt.client {
 | 
				
			||||||
 | 
						uri = "mqtt.seeseepuff.be",
 | 
				
			||||||
 | 
						username = "tools",
 | 
				
			||||||
 | 
						clean = true,
 | 
				
			||||||
 | 
						reconnect = 5,
 | 
				
			||||||
 | 
						version = mqtt.v311,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					client:on {
 | 
				
			||||||
 | 
						connect = onConnect,
 | 
				
			||||||
 | 
						message = onMessage,
 | 
				
			||||||
 | 
						error = function(err)
 | 
				
			||||||
 | 
							print("MQTT client error:", err)
 | 
				
			||||||
 | 
						end,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					mqtt.run_ioloop(client)
 | 
				
			||||||
		Reference in New Issue
	
	Block a user