lk.Scheduler
:pcall (f, errorHandler)
require 'zmq' local lib = {type = 'lk.Scheduler'} lib.__index = lib lk.Scheduler = lib local private = {} local elapsed = elapsed
setmetatable(lib, { __call = function(lib, ...) return lib.new(...) end })
local self = { now = 0, -- This is used by widgets to detect volontary close vs -- quit close. should_run = true, -- Ordered linked list of events by event time. at_next = nil, -- List of threads that have added their filedescriptors to -- select. fd_count = 0, idx_to_thread = {}, -- These are plain lua functions that will be called when -- quitting. finalizers = {}, -- Using zmq for polling poller = zmq.Poller(), } return setmetatable(self, lib) end
--- Sleeps a given number of milliseconds starting from the -- current time (not the logical time). This should not be used as a -- timer because it will drift. -- FIXME: remove warning if delay > 10 then -- We changed interval from milliseconds to seconds. printf("Using interval of %f seconds.\n%s\n", delay, debug.traceback()) end if delay == 0 then coroutine.yield('wait', 0) else coroutine.yield('wait', elapsed() + delay) end end
function sleep(delay) sched:sleep(delay) end
self.should_run = false return true end
--- Make it easy to rewrite logger. print(typ, msg) end
--- Waits a given number of milliseconds starting from the -- logical time. This should be used as a timer because it will not -- drift. -- FIXME: remove warning if delay > 10 then -- We changed interval from milliseconds to seconds. printf("Using interval of %f seconds.\n%s\n", interval, debug.traceback()) end if delay == 0 then coroutine.yield('wait', 0) else coroutine.yield('wait', self.now + delay) end end
coroutine.yield('read', fd) end
coroutine.yield('write', fd) end
coroutine.yield('abort') end
if func then self.main = lk.Thread(func) end
if not rawget(_G, 'mimas') then -- without mimas self:loop() end
if self.should_run or self.restart_with_mimas then self.restart_with_mimas = nil -- restarting self.should_run = true -- now we are with mimas self.mimas = true private.usePoller(self, mimas.Poller()) app:exec() end private.finalize(self) end
local poller = self.poller local idx_to_thread = self.idx_to_thread
--=============================================== MAIN EVENT LOOP while self.should_run do local now = elapsed() local now_list = {} local timeout = -1 local next_thread = self.at_next while next_thread and next_thread.at < now do -- collect table.insert(now_list, next_thread) -- get next next_thread = next_thread.at_next end -- remove from at_list self.at_next = next_thread
for i, thread in ipairs(now_list) do -- We run them all now so that we give time for poll based events in -- case they keep on adding new events in at_next list. -- TODO: We could discard events if this loop takes more then some max seconds to run. private.runThread(self, thread) end -- the running thread might have added new elements next_thread = self.at_next now = elapsed() if next_thread then if next_thread.at < now then timeout = 0 else timeout = next_thread.at - now end end if self.fd_count == 0 and timeout == -1 and not self.mimas then -- done self.should_run = false elseif self.should_run then if not poller:poll(timeout) then -- interrupted self.should_run = false break end -- First collect events so that running the threads (and -- possibly adding new file descriptors) does not alter -- the list. local events = poller:events() if events then -- Execute poll events. for _, ev_idx in ipairs(events) do local thread = idx_to_thread[ev_idx] if thread then private.runThread(self, thread) else error(string.format("Unknown thread idx '%i' in poller", ev_idx)) end end end end if self.garbage then private.cleanup(self) end
end end
-- We use the wrapper internally so that we do not prevent -- garbage collection. thread.at = at -- sorted insert (next event first) local prev = self local ne = prev.at_next local previous_at if ne then previous_at = ne.at end while true do if not ne then prev.at_next = thread thread.at_next = nil break elseif at < ne.at then prev.at_next = thread thread.at_next = ne break else prev = ne end ne = prev.at_next end if self.mimas and self.at_next.at ~= previous_at then -- We need to reschedule because this method can be called from a GUI -- callback and we won't get back to the loop to do this. self.poller:resumeAt(self.at_next.at) end end
--- Tries to run 'func' count times (retry every 2^n seconds) and calls -- 'failed' function on failure. if type(count) == 'function' then func, failed, wait = count, func, failed -- default count count = 4 end local wait = wait or 1 -- FIXME: if wait > 10 then printf("Using interval of %f seconds.\n%s\n", wait, debug.traceback()) end return lk.Thread(function() local res, err for i=1,count do res, err = self:pcall(func) if res then return else sleep(wait * 2^i) end end failed(err) end) end
-- This is called by lk.Thread when the thread is garbage collected. -- We will cleanup any filedescriptor in the main thread (not now or -- we crash because we call C from the garbage collector). if thread.idx then self.idx_to_thread[thread.idx] = nil end if thread.fd then if not self.garbage then self.garbage = {thread} else table.insert(self.garbage, thread) end end self:removeFd(thread) thread.t.t = nil end
local fd = thread.fd if thread.idx then self.idx_to_thread[thread.idx] = nil end if fd then self.poller:remove(thread.idx) thread.fd = nil self.fd_count = self.fd_count - 1 end thread.idx = nil end
errorHandler = errorHandler or private.errorHandler local co = coroutine.create(f) while true do local status, a, b = coroutine.resume(co) if coroutine.status(co) == 'suspended' then coroutine.yield(a, b) -- suspend across `mypcall' elseif not status then -- error return false, errorHandler(a, co) else -- ended normally return status, a, b -- error or normal return end end end
--=============================================== PRIVATE
local zmq_POLLIN, zmq_POLLOUT = zmq.POLLIN, zmq.POLLOUT local zmq_const = {read = zmq_POLLIN, write = zmq_POLLOUT}
function private:usePoller(new_poller) self.poller = new_poller local idx_to_thread = self.idx_to_thread self.idx_to_thread = {} -- add all filedescriptors for idx, thread in pairs(idx_to_thread) do thread.idx = new_poller:add(thread.fd, thread.event) self.idx_to_thread[thread.idx] = thread end end
-- FIXME: If fd == zmq.Socket, we never gc because 'self' is in -- the zmq socket thread (even if we do not use it). -- -- What objects really need their own thread (+userdata env) ? -- Mimas widgets -- Nothing else
function private:runThread(thread) -- get lk.Thread from wrap local t = thread.t.t if not t or not t.co then -- has been killed or gc if thread.fd then self:removeFd(thread) thread.fd = nil end -- let it repose in peace if t then t:finalize(self) end return end if thread.at and thread.at > 0 then -- logical time sched.now = thread.at -- This does not work. Think better and make something that works. -- if elapsed() > thread.at + 2 then -- -- 2 == jitter between realtime and logical time. -- print("OUT OF REAL TIME", elapsed() - thread.at) -- end else sched.now = elapsed() end thread.at = nil local ok, a, b = coroutine.resume(t.co) local event = zmq_const[a]
if not ok then -- a = error in thread if t.error then -- Thread has an error handler, call it t.error(a, debug.traceback(t.co)) else print('UNPROTECTED ERROR', a, t.co, debug.traceback(t.co)) end
if t.restart then -- Restart on error t.co = coroutine.create(t.func) else if thread.fd then self:removeFd(thread) end -- let it repose in peace thread.fd = nil t.co = nil t:finalize(self) end
elseif event then if thread.fd then if thread.fd == b then -- same self.poller:modify(thread.idx, event) elseif b then -- changed fd self.poller:modify(thread.idx, event, b) else assert(false, 'This is a bug. Check sched:waitRead calls.') end else -- add fd thread.fd = b self.fd_count = self.fd_count + 1 -- keep zmq.Socket object to avoid gc thread.idx = self.poller:add(b, event) -- if process then -- print(process.name, 'ADD', thread.idx, b, event) -- elseif morph then -- print('morph', 'ADD', thread.idx, b, event) -- else -- print('???', 'ADD', thread.idx, b, event) -- end self.idx_to_thread[thread.idx] = thread end -- We need this information in case we change poller. thread.event = event elseif a == 'join' then if thread.fd then self:removeFd(thread) end b:addJoin(thread) elseif a == 'wait' then if thread.fd then self:removeFd(thread) end -- b = at -- a value of 0 == execute again as soon as possible self:scheduleAt(b, thread) elseif not t.co or coroutine.status(t.co) == 'dead' or a == 'abort' then if thread.fd then self:removeFd(thread) end -- let it repose in peace thread.fd = nil t.co = nil t:finalize(self) elseif a == 'mimas' then self.should_run = false self.restart_with_mimas = true -- restart as soon as we have restarted mimas scheduler self:scheduleAt(0, thread) else if thread.fd then self:removeFd(thread) end print('BAD YIELD:', a, b) end end
function private:finalize() for _, func in ipairs(self.finalizers) do -- if one fails, should not crash all finalizers... pcall(func) end end
-- FIXME: traceback not working good enough function private.errorHandler(err, co) if true then return err .. '\n' .. debug.traceback(co) end local tb = lk.split(debug.traceback(co), '\n') local max_i = 5 local message = err for i = 4,#tb do if string.find(tb[i], 'lubyk/lib/test.lua') then max_i = i - 2 break end end for i = 4,max_i do message = message .. '\n' .. tb[i] end return message end