From 0395f166e5e8b87f8e346a20ac72760ac7529c2d Mon Sep 17 00:00:00 2001 From: Gerhard Lipp Date: Thu, 28 Nov 2013 09:21:21 +0100 Subject: [PATCH 1/8] Merge branch 'resumable-connection' / not all tests pass Conflicts: src/jet/daemon.lua src/jet/peer.lua src/jet/socket.lua --- examples/persistant_ticker.lua | 24 +++ rockspecs/lua-jet-scm-1.rockspec | 3 +- spec/daemon_spec.lua | 233 +++++++++++++++++++++++++++- spec/peer_spec.lua | 113 ++++++++++++-- src/jet.lua | 3 +- src/jet/daemon.lua | 214 +++++++++++++++++++------- src/jet/daemon/fetcher.lua | 5 +- src/jet/peer.lua | 253 ++++++++++++++++++++++++++++--- src/jet/socket.lua | 1 + 9 files changed, 753 insertions(+), 96 deletions(-) create mode 100755 examples/persistant_ticker.lua diff --git a/examples/persistant_ticker.lua b/examples/persistant_ticker.lua new file mode 100755 index 0000000..3ad2f33 --- /dev/null +++ b/examples/persistant_ticker.lua @@ -0,0 +1,24 @@ +#!/usr/bin/env lua +-- example program for manually testing persistant peers +local jet = require'jet' +local ev = require'ev' + +assert(arg[1],'ip exepected') + +local peer = jet.peer.new({ + ip = arg[1], + persist = 10, +}) + +local tick_tack = peer:state({ + path = 'tick_tack', + value = 1 +}) + +ev.Timer.new(function() + local new = tick_tack:value() + 1 + print(new) + tick_tack:value(new) + end,1,1):start(ev.Loop.default) + +peer:loop() diff --git a/rockspecs/lua-jet-scm-1.rockspec b/rockspecs/lua-jet-scm-1.rockspec index a0c386d..c825ed8 100644 --- a/rockspecs/lua-jet-scm-1.rockspec +++ b/rockspecs/lua-jet-scm-1.rockspec @@ -17,7 +17,8 @@ dependencies = { 'lua-websockets', 'luasocket', 'lua-ev', - 'lpack' + 'lpack', + 'lua-step' } build = { diff --git a/spec/daemon_spec.lua b/spec/daemon_spec.lua index 5859ed0..e7cc5b9 100644 --- a/spec/daemon_spec.lua +++ b/spec/daemon_spec.lua @@ -83,12 +83,16 @@ for _,info in ipairs(addresses_to_test) do local sock + local new_sock = function() + if info.family == 'inet6' then + return socket.tcp6() + else + return socket.tcp() + end + end + before_each(function() - if info.family == 'inet6' then - sock = socket.tcp6() - else - sock = socket.tcp() - end + sock = new_sock() end) after_each(function() @@ -204,6 +208,225 @@ for _,info in ipairs(addresses_to_test) do message_socket:send('this is no json') end) + it('a peer can configure persist and resume',function(done) + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,1) + assert.is_string(response.result) + local resume_id = response.result + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,2) + -- the server has received two messages associated with this persist session + -- before processing config.resume + -- (config.persist) + assert.is_same(response.result,1) + done() + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + receivedCount = 1 + } + }, + id = 2, + })) + + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + persist = 0.001 + }, + id = 1, + })) + end) + + it('a peer can configure persist and resume (twice)',function(done) + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,1) + assert.is_string(response.result) + local resume_id = response.result + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,2) + -- the server has received two messages associated with this persist session + -- before processing config.resume + -- (config.persist) + assert.is_same(response.result,1) + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response.id,3) + -- the server has received two messages associated with this persist session + -- before processing config.resume + -- (config.persist) + assert.is_same(response.result,1) + done() + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + receivedCount = 2 + } + }, + id = 3, + })) + + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + receivedCount = 1 + } + }, + id = 2, + })) + + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + persist = 0.001 + }, + id = 1, + })) + end) + + + it('a peer can configure persist and resume and missed messages are resend by daemon',function(done) + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + if #response > 0 then + response = response[1] + end + assert.is_string(response.result) + local resume_id = response.result + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response,{ + { + result = 4, -- five messages received by daemon yet (assoc. to the persist id) + id = 5, -- config.resume id + }, + { + result = true, -- fetch set up successfully + id = 3, + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 123, + event = 'add', + } + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 234, + event = 'change', + } + }, + { + result = true, -- change notification was success + id = 4 + } + }) + done() + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + -- pretend only config and add responses have been receveived + receivedCount = 2 + } + }, + id = 5, + })) + + end)) + message_socket:send(cjson.encode({ + { + method = 'config', + params = { + persist = 0.001 + }, + id = 1, + }, + { + method = 'add', + params = { + path = 'dummy', + value = 123 + }, + id = 2, + }, + { + method = 'fetch', + params = { + id = 'fetchy', + matches = {'.*'} + }, + id = 3, + }, + { + method = 'change', + params = { + path = 'dummy', + value = 234 + }, + id = 4, + }, + })) + end) + + local req_resp_test = function(desc) local requests = desc.requests local responses = desc.responses diff --git a/spec/peer_spec.lua b/spec/peer_spec.lua index 014d189..c7e96b2 100644 --- a/spec/peer_spec.lua +++ b/spec/peer_spec.lua @@ -9,15 +9,16 @@ local dt = 0.05 setloop('ev') + + describe( - 'A peer basic tests', + 'When a daemon is running', function() local daemon local peer setup(function() daemon = jetdaemon.new{ port = port, - print = function() end } daemon:start() end) @@ -26,7 +27,7 @@ describe( daemon:stop() end) - it('provides the correct interface',function() + it('provides the correct interface',function(done) local peer = jetpeer.new{port = port} assert.is_true(type(peer) == 'table') assert.is_true(type(peer.state) == 'function') @@ -36,6 +37,10 @@ describe( assert.is_true(type(peer.fetch) == 'function') assert.is_true(type(peer.batch) == 'function') assert.is_true(type(peer.loop) == 'function') + peer:on_error(async(function(err) + assert.is_truthy(err:match('closed')) + done() + end)) peer:close() end) @@ -48,9 +53,95 @@ describe( done() end) } - -- finally(function() peer:close() end) end) + describe( + 'when using persist option', + function() + it('on_connect gets called when using persist option and close callback comes after > 2secs',function(done) + settimeout(3) -- closing takes > 2 secs + local ppeer + ppeer = jetpeer.new + { + port = port, + persist = 0.3, + on_connect = async(function(p) + assert.is_equal(ppeer,p) + ppeer:close(async(function() + done() + end)) + end) + } + end) + + it('daemon can start after peer',function(done) + settimeout(30) -- closing takes > 2 secs + local d2 = jetdaemon.new{ + port = port + 10 + } + local ppeer + ppeer = jetpeer.new + { + port = port + 10, + persist = 0.3, + on_connect = async(function(p) + assert.is_equal(ppeer,p) + ppeer:close(async(function() + done() + end)) + end) + } + ev.Timer.new(function() + d2:start() + end,0.2):start(loop) + finally(function() + d2:stop() + end) + end) + + + it('on_connect gets called when using persist option and close callback comes after > 2secs',function(done) + settimeout(4) -- closing takes > 2 secs + local ppeer + ppeer = jetpeer.new + { + port = port, + persist = 1, + on_connect = async(function(p) + assert.is_equal(ppeer,p) + ppeer:close(nil,true) -- just underlying socket, but dont quit resume loop + ev.Timer.new(async(function() + local some_state + some_state = ppeer:state({ + path = 'popopo', + value = 873 + },{ + error = async(function(err) + assert.is_nil(err or 'should not happen') + end), + success = async(function() + some_state:remove({ + error = async(function(err) + assert.is_nil(err or 'should not happen') + end), + success = async(function() + done() + ppeer:close() + + end) + }) + end) + }) + + end),1):start(loop) + end) + } + end) + + + end) + + it('can add a state',function(done) peer:state( { @@ -149,7 +240,7 @@ describe( end) after_each(function(done) - peer:close() + peer:close(async(done)) end) it( @@ -247,7 +338,9 @@ describe( { event = 'remove', action = function() - states.test:add() + print(pcall(function() + states.test:add() + end)) end }, { @@ -261,6 +354,7 @@ describe( local fetcher = peer:fetch( states.test:path(), async(function(fpath,fevent,fvalue) + print(fpath,fevent) count = count + 1 assert.is_equal(expected[count].event,fevent) assert.is_equal(fpath,states.test:path()) @@ -287,7 +381,7 @@ describe( async(function(fpath,fevent,fdata,fetcher) timer:stop(loop) fetcher:unfetch() - assert.is_falsy('should not happen'..fpath) + assert.is_falsy('should not happen '..fpath) done() end)) timer = ev.Timer.new(async(function() @@ -507,8 +601,8 @@ describe( } end) - after_each(function() - peer:close() + after_each(function(done) + peer:close(async(function() done() end)) end) it('set gets timeout error',function(done) @@ -1129,7 +1223,6 @@ if ipv6_localhost_addr then daemon = jetdaemon.new{ port = port, interface = ipv6_localhost_addr, - print = function() end } daemon:start() end) diff --git a/src/jet.lua b/src/jet.lua index 930bb21..4121c89 100644 --- a/src/jet.lua +++ b/src/jet.lua @@ -4,7 +4,8 @@ local daemon = require'jet.daemon' local jet = { peer = peer, daemon = daemon, - new = peer.new + new = peer.new, + _VERSION = '0.10' } return jet diff --git a/src/jet/daemon.lua b/src/jet/daemon.lua index 0d60eca..16bbc16 100644 --- a/src/jet/daemon.lua +++ b/src/jet/daemon.lua @@ -28,7 +28,6 @@ local response_timeout = jutils.response_timeout local internal_error = jutils.internal_error local parse_error = jutils.parse_error local method_not_found = jutils.method_not_found - local is_empty_table = jutils.is_empty_table --- creates and returns a new daemon instance. @@ -57,6 +56,7 @@ local create_daemon = function(options) -- with original id and receiver (peer) and request -- timeout timer. local routes = {} + local resumables = {} -- global for tracking the neccassity of lower casing -- paths on publish @@ -87,7 +87,7 @@ local create_daemon = function(options) local publish = function(path,event,value,element) local lpath = has_case_insensitives and path:lower() for fetcher in pairs(element.fetchers) do - local ok,err = pcall(fetcher,path,lpath,event,value) + local ok,err = pcall(fetcher.op,path,lpath,event,value) if not ok then crit('publish failed',err,path,event) end @@ -164,11 +164,9 @@ local create_daemon = function(options) local fetch = function(peer,message) local params = message.params local fetch_id = checked(params,'id','string') - local queue_notification = function(nparams) - assert(false,'fetcher misbehaves: must not be called yet') - end + local params_ok,fetcher local notify = function(nparams) - queue_notification(nparams) + fetcher.queue(nparams) end local sorter_ok,sorter,flush = pcall(jsorter.new,params,notify) local initializing = true @@ -179,14 +177,14 @@ local create_daemon = function(options) sorter(nparams,initializing) end end - local params_ok,fetcher,is_case_insensitive = pcall(jfetcher.new,params,notify) + params_ok,fetcher = pcall(jfetcher.new,params,notify) if not params_ok then error(invalid_params({fetchParams = params, reason = fetcher})) end peer.fetchers[fetch_id] = fetcher - if is_case_insensitive then + if fetcher.is_case_insensitive then case_insensitives[fetcher] = true has_case_insensitives = true end @@ -200,16 +198,16 @@ local create_daemon = function(options) end end - local cq = peer.queue - queue_notification = function(nparams) - cq(peer,{ + fetcher.queue = function(nparams) + peer:queue({ method = fetch_id, params = nparams, }) end + local fetchop = fetcher.op for path,element in pairs(elements) do - local may_have_interest = fetcher(path,has_case_insensitives and path:lower(),'add',element.value) + local may_have_interest = fetchop(path,has_case_insensitives and path:lower(),'add',element.value) if may_have_interest then element.fetchers[fetcher] = true end @@ -360,21 +358,88 @@ local create_daemon = function(options) local config = function(peer,message) local params = message.params - if params.peer then - peer = nil - for peer_ in pairs(peers) do - if peer_.name == params.peer then - peer = peer_ - break + + if params.debug ~= nil then + if params.peer then + peer = nil + for peer_ in pairs(peers) do + if peer_.name == params.peer then + peer = peer_ + break + end + end + if not peer then + error('unknown peer') end end - if not peer then - error('unknown peer') - end + peer.debug = params.debug + return end + if params.name then peer.name = params.name + return + end + + -- enables message history and makes this peer + -- resumable in case of close/error event + -- returns the unique persist id, which + -- must be used to resume the peer. + if params.persist ~= nil then + peer.message_history = {} + local persist_id = tostring(peer) + peer.persist_id = persist_id + peer.persist_time = tonumber(params.persist) or 120 + resumables[persist_id] = peer + return persist_id end + + -- if valid resume parameters are passed in, + -- returns the last received message number (not id) + -- and resends all missed messages from history. + -- the peer must have been configured as persistant before. + if params.resume then + local persist_id = checked(params.resume,'id','string') + local received_count = checked(params.resume,'receivedCount','number') + local resumer = resumables[persist_id] + if not resumer then + error(invalid_params({invalidPersistId=persist_id})) + end + resumer.mediated = true + -- check if the daemon has already noticed, that the resumer died + if resumer.release_timer then + resumer.release_timer:stop(loop) + resumer.release_timer:clear_pending(loop) + resumer.release_timer = nil + else + resumer:close() + end + local missed_messages_count = resumer.message_count - received_count + local history = resumer.message_history + local start = #history-missed_messages_count + 1 + if start < 0 then + error(internal_error({historyNotAvailable=missed_messages_count})) + end + resumer:transfer_fetchers(peer) + resumer:transfer_elements(peer) + peer.receive_count = resumer.receive_count + if message.id then + peer:queue({ + id = message.id, + result = peer.receive_count, + }) + end + for i=start,#history do + peer:queue(history[i]) + end + peer.message_history = {} + resumables[persist_id] = peer + peer.persist_id = persist_id + peer.persist_time = resumer.persist_time + peer.flush() + return nil,true -- set dont_auto_reply true + end + if params.encoding then if params.encoding == 'msgpack' then local ok,cmsgpack = pcall(require,'cmsgpack') @@ -394,9 +459,11 @@ local create_daemon = function(options) peer.encode = cmsgpack.pack peer.decode = cmsgpack.unpack return nil,true -- set dont_auto_reply true + else + error(invalid_params({encodingNotSupported=params.encoding})) end end - peer.debug = params.debug + end local sync = function(f) @@ -533,10 +600,12 @@ local create_daemon = function(options) error = invalid_request(message) }) elseif #message > 0 then + peer.receive_count = peer.receive_count + #message for i,message in ipairs(message) do dispatch_single_message(peer,message) end else + peer.receive_count = peer.receive_count + 1 dispatch_single_message(peer,message) end else @@ -552,56 +621,95 @@ local create_daemon = function(options) local create_peer = function(ops) local peer = {} + peer.receive_count = 0 + local release = function() + for _,fetcher in pairs(peer.fetchers) do + case_insensitives[fetcher] = nil + for _,element in pairs(elements) do + element.fetchers[fetcher] = nil + end + end + has_case_insensitives = not is_empty_table(case_insensitives) + peer.fetchers = {} + peers[peer] = nil + for path,element in pairs(elements) do + if element.peer == peer then + publish(path,'remove',element.value,element) + elements[path] = nil + end + end + flush_peers() + ops.close() + peer = nil + end + peer.transfer_fetchers = function(_,new_peer) + for fetch_id,fetcher in pairs(peer.fetchers) do + fetcher.queue = function(nparams) + new_peer:queue({ + method = fetch_id, + params = nparams + }) + end + end + end + peer.transfer_elements = function(_,new_peer) + for _,element in pairs(elements) do + element.peer = new_peer + end + end peer.release = function(_) if peer then - for _,fetcher in pairs(peer.fetchers) do - case_insensitives[fetcher] = nil - for _,element in pairs(elements) do - element.fetchers[fetcher] = nil - end - end - has_case_insensitives = not is_empty_table(case_insensitives) - peer.fetchers = {} - peers[peer] = nil - for path,element in pairs(elements) do - if element.peer == peer then - publish(path,'remove',element.value,element) - elements[path] = nil - end + if peer.message_history then + resumables[peer.persist_id] = peer + peer.release_timer = ev.Timer.new(function() + peer.release_timer = nil + if not peer.mediated then + resumables[peer.persist_id] = nil + release() + end + end,peer.persist_time or 1) + peer.release_timer:start(loop) + else + release() end - flush_peers() - ops.close() - peer = nil end end peer.close = function(_) peer:flush() ops.close() end + peer.messages = {} peer.queue = function(_,message) - if not peer.messages then - peer.messages = {} - end tinsert(peer.messages,message) end local send = ops.send + peer.message_count = 0 peer.flush = function(_) - if peer.messages then - local num = #peer.messages - local message + local messages = peer.messages + local num = #messages + peer.message_count = peer.message_count + num + local history = peer.message_history + if history then + for _,message in ipairs(messages) do + tinsert(history,message) + end + local history_num = #history + -- limit history num to 100 + for i=1,(history_num-100) do + tremove(history,1) + end + assert(#history <= 100) + end + if num > 0 and not peer.release_timer then if num == 1 then - message = peer.messages[1] - elseif num > 1 then - message = peer.messages - else - assert(false,'messages must contain at least one element if not nil') + messages = messages[1] end if peer.debug then - debug(peer.name or 'unnamed peer','<-',jencode(message)) + debug(peer.name or 'unnamed peer','<-',jencode(messages)) end - send(peer.encode(message)) - peer.messages = nil + send(peer.encode(messages)) end + peer.messages = {} end peer.fetchers = {} peer.encode = cjson.encode diff --git a/src/jet/daemon/fetcher.lua b/src/jet/daemon/fetcher.lua index 8a62ce7..0e41837 100644 --- a/src/jet/daemon/fetcher.lua +++ b/src/jet/daemon/fetcher.lua @@ -105,7 +105,10 @@ local create_fetcher = function(options,notify) options.caseInsensitive = false end - return fetchop,options.caseInsensitive + return { + op = fetchop, + is_case_insensitive = options.caseInsensitive, + } end return { diff --git a/src/jet/peer.lua b/src/jet/peer.lua index 4103328..85c40a7 100644 --- a/src/jet/peer.lua +++ b/src/jet/peer.lua @@ -3,6 +3,7 @@ local socket = require'socket' local ev = require'ev' local cjson = require'cjson' local jutils = require'jet.utils' +local step = require'step' local tinsert = table.insert local tremove = table.remove @@ -23,6 +24,24 @@ local error_object = function(err) return error end +local eps = 2^-40 + +local detach = function(f,loop) + if ev.Idle then + ev.Idle.new(function(loop,io) + io:stop(loop) + f() + end):start(loop) + else + ev.Timer.new(function(loop,io) + io:stop(loop) + f() + end,eps):start(loop) + end +end + +local noop = function() end + new = function(config) config = config or {} local log = config.log or noop @@ -30,6 +49,7 @@ new = function(config) local port = config.port or 11122 local encode = cjson.encode local decode = cjson.decode + local log = config.log or noop if config.sync then local sock = socket.connect(ip,port) if not sock then @@ -87,22 +107,61 @@ new = function(config) local queue = function(message) tinsert(messages,message) end + local message_count = 0 + local message_history = {} + local pending local will_flush = true - local flush = function(reason) - local n = #messages - if n == 1 then - wsock:send(encode(messages[1])) - elseif n > 1 then - wsock:send(encode(messages)) + local flush + local is_persistant + + if not config.persist then + flush = function(reason) + local n = #messages + if n == 1 then + wsock:send(encode(messages[1])) + elseif n > 1 then + wsock:send(encode(messages)) + end + messages = {} + will_flush = false + end + else + + flush = function(reason) + local num = #messages + if not is_persistant then + message_count = message_count + num + end + local history = message_history + if history then + for _,message in ipairs(messages) do + tinsert(history,message) + end + local history_num = #history + -- limit history num to 100 + for i=1,(history_num-100) do + tremove(history,1) + end + assert(#history <= 100) + end + if not pending then + if num == 1 then + wsock:send(encode(messages[1])) + elseif num > 1 then + wsock:send(encode(messages)) + end + end + messages = {} + will_flush = false end - messages = {} - will_flush = false end + local request_dispatchers = {} local response_dispatchers = {} local dispatch_response = function(self,message) local mid = message.id local callbacks = response_dispatchers[mid] + assert(mid,cjson.encode(message)) response_dispatchers[mid] = nil if callbacks then if message.result then @@ -148,6 +207,7 @@ new = function(config) } end end + local received_count = 0 local dispatch_single_message = function(self,message) if message.method and message.params then dispatch_request(self,message) @@ -165,11 +225,16 @@ new = function(config) end will_flush = true if message then - if #message > 0 then + local num = #message + if num > 0 then + -- The received count MUST be incremented here for arrays! + -- This is relevant for resuming... + received_count = received_count + num for i,message in ipairs(message) do dispatch_single_message(self,message) end else + received_count = received_count + 1 dispatch_single_message(self,message) end else @@ -184,9 +249,58 @@ new = function(config) flush('dispatch_message') end wsock:on_message(dispatch_message) - wsock:on_error(log) - wsock:on_close(config.on_close or function() end) + wsock:on_error(config.on_error or noop) + local persist_id + local closing + local connect_sequence + local on_close + local try = {} local j = {} + on_close = function() + if not closing and config.persist and not pending then + messages = {} + pending = true + encode = cjson.encode + decode = cjson.decode + wsock = jsocket.new({ip = ip, port = port, loop = loop}) + wsock:on_message(dispatch_message) + wsock:on_error(config.on_error or noop) + wsock:on_close(on_close) + wsock:on_connect(function() + is_persistant = false + connect_sequence = step.new({ + try = try, + catch = function(err) + j:close() + end, + finally = function() + if config.on_connect then + config.on_connect(j) + config.on_connect = nil + end + flush('on_connect') + end + }) + + connect_sequence() + flush('resume') + end) + + ev.Timer.new(function(loop,io) + if pending and not closing then + wsock:connect() + else + io:stop(loop) + end + end,0.5,0.5):start(loop) + end + + if config.on_close then + config.on_close() + end + end + + wsock:on_close(on_close) j.loop = function() loop:loop() @@ -196,9 +310,30 @@ new = function(config) on_no_dispatcher = f end - j.close = function(self,options) + j.on_error = function(_,f) + wsock:on_error(f) + end + + j.close = function(self,done,debug_resume) flush('close') wsock:close() + if debug_resume then + return + end + closing = true + if done then + if config.persist then + -- the daemon keeps states for config.persist seconds. + -- during this time, the states / paths are still blocked + -- by this peer. wait some seconds more and asume + -- all peer related resources are freed by the daemon. + ev.Timer.new(function() + done() + end,config.persist + 2):start(loop) + else + detach(done,loop) + end + end end local id = 0 @@ -582,30 +717,98 @@ new = function(config) cmsgpack = require'cmsgpack' end - wsock:on_connect(function() - if config.name or config.encoding then - j:config({ - name = config.name, - encoding = config.encoding - },{ + if config.name then + table.insert(try,function(step) + j:config({name=config.name},step) + end) + end + + if config.encoding then + table.insert(try,function(step) + j:config({encoding=config.encoding},{ success = function() flush('config') if config.encoding then encode = cmsgpack.pack decode = cmsgpack.unpack end - if config.on_connect then - config.on_connect(j) - end + step.success() end, error = function(err) - j:close() + step.error(err) end }) - elseif config.on_connect then - config.on_connect(j) + end) + end + + if config.persist then + table.insert(try,function(step) + if not persist_id then + j:config({persist=config.persist},{ + success = function(pid) + persist_id = pid + is_persistant = true + step.success() + end, + error = function(err) + step.error(err) + end + }) + else + j:config({resume={ + id = persist_id, + receivedCount = received_count + }},{ + success = function(received_by_daemon_count) + flush('resume') + pending = false + is_persistant = true + local missed_messages_count = message_count - received_by_daemon_count + local history = message_history + local start = #history-missed_messages_count + if start < 0 then + step.error(internal_error(historyNotAvailable)) + end + local missed = {} + for i=start,#history do + tinsert(missed,history[i]) + end + + if #missed > 0 then + wsock:send(encode(missed)) + end + step.success() + end, + error = function(err) + step.error(err) + end + }) + end + end) + + end + + + connect_sequence = step.new({ + try = try, + catch = function(err) + if not config.persist then + j:close() + end + end, + finally = function() + if config.on_connect then + config.on_connect(j) + config.on_connect = nil + end + flush('on_connect') end - flush('on_connect') + }) + + + wsock:on_connect(function() + connect_sequence() + flush('config') end) wsock:connect() diff --git a/src/jet/socket.lua b/src/jet/socket.lua index be09931..fa8f04e 100644 --- a/src/jet/socket.lua +++ b/src/jet/socket.lua @@ -27,6 +27,7 @@ end local wrap = function(sock,args) assert(sock) args = args or {} + -- set non blocking sock:settimeout(0) -- send message asap From 6a8b91801513c75be3878f90d3f17b211388fc99 Mon Sep 17 00:00:00 2001 From: Gerhard Lipp Date: Thu, 28 Nov 2013 15:49:06 +0100 Subject: [PATCH 2/8] remove prints --- spec/peer_spec.lua | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/spec/peer_spec.lua b/spec/peer_spec.lua index c7e96b2..461fdf6 100644 --- a/spec/peer_spec.lua +++ b/spec/peer_spec.lua @@ -338,9 +338,7 @@ describe( { event = 'remove', action = function() - print(pcall(function() - states.test:add() - end)) + states.test:add() end }, { @@ -354,7 +352,6 @@ describe( local fetcher = peer:fetch( states.test:path(), async(function(fpath,fevent,fvalue) - print(fpath,fevent) count = count + 1 assert.is_equal(expected[count].event,fevent) assert.is_equal(fpath,states.test:path()) From 382c375798af0091ec70e5214bd32db19e0ca99b Mon Sep 17 00:00:00 2001 From: Gerhard Lipp Date: Thu, 28 Nov 2013 15:49:21 +0100 Subject: [PATCH 3/8] fix fetch on add --- src/jet/daemon.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jet/daemon.lua b/src/jet/daemon.lua index 16bbc16..af8f70f 100644 --- a/src/jet/daemon.lua +++ b/src/jet/daemon.lua @@ -328,7 +328,7 @@ local create_daemon = function(options) -- don't depend on the value of the element). for peer in pairs(peers) do for _,fetcher in pairs(peer.fetchers) do - local ok,may_have_interest = pcall(fetcher,path,lpath,'add',value) + local ok,may_have_interest = pcall(fetcher.op,path,lpath,'add',value) if ok then if may_have_interest then element.fetchers[fetcher] = true From 0919dbc9f82a8dae443f7d164ce92a6586036c58 Mon Sep 17 00:00:00 2001 From: Gerhard Lipp Date: Thu, 28 Nov 2013 21:39:01 +0100 Subject: [PATCH 4/8] add peer.cancel_request / TEST REQUIRED! --- spec/peer_spec.lua | 2 +- spec/utils_spec.lua | 13 +++++++++++++ src/jet/daemon.lua | 33 +++++++++++++++++++++++---------- src/jet/utils.lua | 14 ++++++++++++++ 4 files changed, 51 insertions(+), 11 deletions(-) diff --git a/spec/peer_spec.lua b/spec/peer_spec.lua index 461fdf6..9e808e1 100644 --- a/spec/peer_spec.lua +++ b/spec/peer_spec.lua @@ -338,7 +338,7 @@ describe( { event = 'remove', action = function() - states.test:add() + states.test:add() end }, { diff --git a/spec/utils_spec.lua b/spec/utils_spec.lua index 31d3a28..c31364a 100644 --- a/spec/utils_spec.lua +++ b/spec/utils_spec.lua @@ -24,5 +24,18 @@ describe( assert.is_false(utils.is_valid_path('asdppp*')) end) + it('remove works',function() + local t = {2,5,6} + local found = utils.remove(t,5) + assert.is_true(found) + assert.is_same({2,6},t) + + local t = {2,5,6} + local found = utils.remove(t,4) + assert.is_false(found) + assert.is_same({2,5,6},t) + + end) + end) diff --git a/src/jet/daemon.lua b/src/jet/daemon.lua index af8f70f..87bf53b 100644 --- a/src/jet/daemon.lua +++ b/src/jet/daemon.lua @@ -258,11 +258,15 @@ local create_daemon = function(options) local timeout = optional(params,'timeout','number') or 5 local element = elements[path] if element then - local id local mid = message.id + local req = {} if mid then local timer = new_timer(function() - routes[id] = nil + local element = elements[path] + if element then + element.peer:cancel_request(req) + end + routes[req.id] = nil peer:queue({ id = mid, error = response_timeout(params), @@ -270,20 +274,16 @@ local create_daemon = function(options) peer:flush() end,timeout) timer:start(loop) - id = mid..tostring(peer) - assert(not routes[id]) + req.id = mid..tostring(peer) + assert(not routes[req.id]) -- save route to forward reply - routes[id] = { + routes[req.id] = { receiver = peer, id = mid, timer = timer, } end - local req = { - id = id,-- maybe nil - method = path, - } - + req.method = path local value = params.value if value ~= nil then req.params = {value = value} @@ -684,6 +684,19 @@ local create_daemon = function(options) end local send = ops.send peer.message_count = 0 + peer.cancel_request = function(_,req) + -- maybe messages have not been flushed + jutils.remove(peer.messages,req) + -- messages are flushed, but maybe the peer + -- is persistant, so remove from history + local history = peer.message_history + if history then + local found = jutils.remove(history,req) + if found then + peer.message_count = peer.message_count - 1 + end + end + end peer.flush = function(_) local messages = peer.messages local num = #messages diff --git a/src/jet/utils.lua b/src/jet/utils.lua index 6cc9dfd..de78c7d 100644 --- a/src/jet/utils.lua +++ b/src/jet/utils.lua @@ -74,6 +74,19 @@ local is_valid_path = function(path) return not path:match('[$^*]') end +-- searches an element in an array and removes +-- it if found. +-- if element has been removed returns true. +local remove = function(array,value) + for index,val in ipairs(array) do + if val == value then + table.remove(array,index) + return true + end + end + return false +end + return { noop = noop, is_empty_table = is_empty_table, @@ -84,5 +97,6 @@ return { parse_error = parse_error, response_timeout = response_timeout, is_valid_path = is_valid_path, + remove = remove, } From cffe2bd59d186c9cab7f05f354e965b7c0dddf65 Mon Sep 17 00:00:00 2001 From: Gerhard Lipp Date: Tue, 7 Jan 2014 13:49:41 +0100 Subject: [PATCH 5/8] handle persist/resume as first config steps. don't resend config.resume. --- src/jet/peer.lua | 57 ++++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/src/jet/peer.lua b/src/jet/peer.lua index 24710eb..afbd6cf 100644 --- a/src/jet/peer.lua +++ b/src/jet/peer.lua @@ -122,7 +122,6 @@ new = function(config) will_flush = false end else - flush = function(reason) local num = #messages if not is_persistant then @@ -714,30 +713,6 @@ new = function(config) cmsgpack = require'cmsgpack' end - if config.name then - table.insert(try,function(step) - j:config({name=config.name},step) - end) - end - - if config.encoding then - table.insert(try,function(step) - j:config({encoding=config.encoding},{ - success = function() - flush('config') - if config.encoding then - encode = cmsgpack.pack - decode = cmsgpack.unpack - end - step.success() - end, - error = function(err) - step.error(err) - end - }) - end) - end - if config.persist then table.insert(try,function(step) if not persist_id then @@ -762,15 +737,17 @@ new = function(config) is_persistant = true local missed_messages_count = message_count - received_by_daemon_count local history = message_history - local start = #history-missed_messages_count + -- the last message in history is "config.resume" + -- skip that! + local start = #history-missed_messages_count-1 + local stop = #history-1 if start < 0 then step.error(internal_error(historyNotAvailable)) end local missed = {} - for i=start,#history do + for i=start,stop do tinsert(missed,history[i]) end - if #missed > 0 then wsock:send(encode(missed)) end @@ -785,6 +762,30 @@ new = function(config) end + if config.name then + table.insert(try,function(step) + j:config({name=config.name},step) + flush('name') + end) + end + + if config.encoding then + table.insert(try,function(step) + j:config({encoding=config.encoding},{ + success = function() + flush('encoding') + if config.encoding then + encode = cmsgpack.pack + decode = cmsgpack.unpack + end + step.success() + end, + error = function(err) + step.error(err) + end + }) + end) + end connect_sequence = step.new({ try = try, From cb6f385e761130e0c7779b00b7c5f0b3779d9c3f Mon Sep 17 00:00:00 2001 From: Gerhard Lipp Date: Tue, 7 Jan 2014 13:50:18 +0100 Subject: [PATCH 6/8] fix persist/resume count stuff --- spec/daemon_spec.lua | 176 ++++++++++++++++++++++++++++++++++++++++--- src/jet/daemon.lua | 10 ++- 2 files changed, 174 insertions(+), 12 deletions(-) diff --git a/spec/daemon_spec.lua b/spec/daemon_spec.lua index c0961e0..ea6200e 100644 --- a/spec/daemon_spec.lua +++ b/spec/daemon_spec.lua @@ -228,9 +228,8 @@ for _,info in ipairs(addresses_to_test) do response = cjson.decode(response) assert.is_same(response.id,2) -- the server has received two messages associated with this persist session - -- before processing config.resume - -- (config.persist) - assert.is_same(response.result,1) + -- (config.persist and config.resume) + assert.is_same(response.result,2) done() end)) message_socket:send(cjson.encode({ @@ -274,9 +273,8 @@ for _,info in ipairs(addresses_to_test) do response = cjson.decode(response) assert.is_same(response.id,2) -- the server has received two messages associated with this persist session - -- before processing config.resume - -- (config.persist) - assert.is_same(response.result,1) + -- (config.persist and config.resume) + assert.is_same(response.result,2) sock:close() sock = new_sock() sock:connect(info.addr,port) @@ -286,10 +284,9 @@ for _,info in ipairs(addresses_to_test) do function(_,response) response = cjson.decode(response) assert.is_same(response.id,3) - -- the server has received two messages associated with this persist session - -- before processing config.resume - -- (config.persist) - assert.is_same(response.result,1) + -- the server has received three messages associated with this persist session + -- (config.persist, config.resume and config.resume) + assert.is_same(response.result,3) done() end)) message_socket:send(cjson.encode({ @@ -326,6 +323,163 @@ for _,info in ipairs(addresses_to_test) do end) + it('a peer can configure persist and resume and missed messages are resend by daemon twice',function(done) + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + if #response > 0 then + response = response[1] + end + assert.is_string(response.result) + local resume_id = response.result + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response,{ + { + result = 5, -- five messages received by daemon yet (assoc. to the persist id) + id = 5, -- config.resume id + }, + { + result = true, -- fetch set up successfully + id = 3, + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 123, + event = 'add', + } + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 234, + event = 'change', + } + }, + { + result = true, -- change notification was success + id = 4 + } + }) + sock:close() + sock = new_sock() + sock:connect(info.addr,port) + local message_socket = jetsocket.wrap(sock) + message_socket:on_message( + async( + function(_,response) + response = cjson.decode(response) + assert.is_same(response,{ + { + result = 6, -- six messages received by daemon yet (assoc. to the persist id) + id = 6, -- config.resume id + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 123, + event = 'add', + } + }, + { + method = 'fetchy', -- resumed fetched data + params = { + path = 'dummy', + value = 234, + event = 'change', + } + }, + { + result = true, -- change notification was success + id = 4 + } + }) + -- close the socket and wait some time + -- to let the daemon cleanup + -- the dummy states assoc with this + -- sock. should be improved be using + -- *some* callback + sock:close() + ev.Timer.new(function() + done() + end,0.01):start(ev.Loop.default) + end)) + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + -- pretend only 2 more messages are + -- received (config and fetch setup) + receivedCount = 2+2 + } + }, + id = 6, + })) + + end)) + + message_socket:send(cjson.encode({ + method = 'config', + params = { + resume = { + id = resume_id, + -- pretend only config and add responses have been receveived + receivedCount = 2 + } + }, + id = 5, + })) + + end)) + message_socket:send(cjson.encode({ + { + method = 'config', + params = { + persist = 0.001 + }, + id = 1, + }, + { + method = 'add', + params = { + path = 'dummy', + value = 123 + }, + id = 2, + }, + { + method = 'fetch', + params = { + id = 'fetchy', + matches = {'.*'} + }, + id = 3, + }, + { + method = 'change', + params = { + path = 'dummy', + value = 234 + }, + id = 4, + }, + })) + end) + it('a peer can configure persist and resume and missed messages are resend by daemon',function(done) sock:connect(info.addr,port) local message_socket = jetsocket.wrap(sock) @@ -348,7 +502,7 @@ for _,info in ipairs(addresses_to_test) do response = cjson.decode(response) assert.is_same(response,{ { - result = 4, -- five messages received by daemon yet (assoc. to the persist id) + result = 5, -- five messages received by daemon yet (assoc. to the persist id) id = 5, -- config.resume id }, { diff --git a/src/jet/daemon.lua b/src/jet/daemon.lua index 5bcf2f3..3a42935 100644 --- a/src/jet/daemon.lua +++ b/src/jet/daemon.lua @@ -419,13 +419,15 @@ local create_daemon = function(options) end resumer:transfer_fetchers(peer) resumer:transfer_elements(peer) - peer.receive_count = resumer.receive_count + peer.receive_count = resumer.receive_count + peer.receive_count if message.id then peer:queue({ id = message.id, result = peer.receive_count, }) end + -- this will add messages to peer.message_history + -- during flush. for i=start,#history do peer:queue(history[i]) end @@ -434,6 +436,12 @@ local create_daemon = function(options) peer.persist_id = persist_id peer.persist_time = resumer.persist_time peer.flush() + -- the peer message_count must be set here + -- to mimic a continuously growing message_count with respect to receivedCount + peer.message_count = resumer.message_count + if message.id then + peer.message_count = peer.message_count + 1 + end return nil,true -- set dont_auto_reply true end From bd97f23faba01f30da9304ab87c443416a594b29 Mon Sep 17 00:00:00 2001 From: Gerhard Lipp Date: Tue, 7 Jan 2014 14:04:00 +0100 Subject: [PATCH 7/8] make persistant peer --- examples/fetch.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/fetch.lua b/examples/fetch.lua index 27dede8..a284e3f 100755 --- a/examples/fetch.lua +++ b/examples/fetch.lua @@ -7,7 +7,7 @@ local ip = arg[2] local port = arg[3] local cjson = require'cjson' -local peer = require'jet.peer'.new{ip=ip,port=port} +local peer = require'jet.peer'.new{ip=ip,port=port,persist=100} local is_json,exp_json = pcall(cjson.decode,exp) if is_json then From d6ad4d6b4aa5b758f037716b5e2ea30bb7c42484 Mon Sep 17 00:00:00 2001 From: Gerhard Lipp Date: Tue, 7 Jan 2014 14:10:04 +0100 Subject: [PATCH 8/8] add file --- test_persist.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 test_persist.md diff --git a/test_persist.md b/test_persist.md new file mode 100644 index 0000000..4218642 --- /dev/null +++ b/test_persist.md @@ -0,0 +1,17 @@ +# Testing persistance feature + +For manually "testing" play with this (preferably in different terminals): + +```shell +$ jetd.lua +$ lua examples/fetch.lua '{}' 172.19.1.41 +$ lua examples/persistant_ticker.lua localhost +$ sudo ifconfig lo down +$ sudo ifconfig lo up +``` + +The idea is to force connections to be closed by shutting down network interfaces. +Both fetch.lua and persistant_ticker.lua are running with persist option and should +automatically reconnect to jetd. + +Better, automated test welcome!!!! \ No newline at end of file