Thursday, 15 May 2014

Lua fun

Today the following code (lua) floated past my desk with a request for any improvement.

The background is that we need to detect auditable events on an external source, and then assign those events to a pool of workers to be processed. These workers are not 'threads', they are fully-fledged processes that may be on remote machines. It is expected that more than one process may be free and try to pick up the event. The tool which will hold the queued events is a database, as it is a lightweight conversion from a RabbitMQ based approach in use at the moment to a comparable implementation that uses the existing database of one of our products for customers that do not want to maintain RabbitMQ in production.

    
    -- get next row
    id = db:get_next_event()
    
    -- attempt lock, may fail with unique constraint.              
    ok, err = insert_event_lock(id,owner)         

    if ok then 
        return id
    else

        -- analyze error message for constraint violation
        if process_error(err) == "duplicate" then 

            -- retry immediately
            return nil          
                  
        else

            -- mark as error and retry after timeout
            error("row is bad") 
                  
        end

   end

That's ok, but I didn't like the way the process_error() relies on detecting the name of the unique index we are expecting to be violated when load increases, and the general approach of using an database exception for normal flow control.

Here is my improved version


    work_id = next_lock_for_owner(self)

    if not work_id then 

        -- get next action with no lock
        next_id = get_next_pending_id()           

        -- can be duplicate
        insert_empty_lock(next_id)                

        -- ‘where clause’ updates all duplicates with same id to self
        update_lock_owner(next_id,self)           

    end

    -- may be nil (get new lock on *next* select to avoid races)
    return work_id                                

We can do event better than that though, in fact the criteria that implements the get_next_pending_id() function, which is basically returning an id in the source that does not have a lock, well that can be implicit in the where clause of the update. We also need to add an update step before the next_lock_for_owner() step to set a 'processing' flag, which makes the implementation of the next merely return the top 1 record with the processing flag set. This allows us to run all three steps in separate processes/threads, like so:


Thread 1 (T1):

    -- sets a processing flag
    mark_next_lock_for_owner(self)                

    -- may be nil, signifies no work to do
    return next_lock_for_owner(self)              

Thread 2 (T2):

    -- get next action with no lock
    next_id = get_next_pending_id()
               
    if (next_id ~= nil) then

         -- can be duplicate
         insert_empty_lock(next_id)               

    end

Thread 3 (T3):

    -- ‘where clause’ updates all duplicates with same id to self
    update_lock_owner(next_id,self)               

Key to understanding this version is the fact that it uses the transactional handling of the database for the heavy lifting. In this version, we are not trying to stop duplicate locks being made, just ensuring that it's harmless when it happens. There are three possible race conditions to deal with:

Notation will be as follows. For now, assume that there are two processes P1 and P2, with all three of these threads running in each one (in reality, the distribution of threads will be across multiple machines for scalability and robustness concerns. This leaves us with a notation like this P2T1 -> P1T2/3 which should be read as process 2 firing between process 1's thread 2 and thread 3 completion.

  • Another process's T2 triggers between a different process's T2 -> T3 gap. This will result in a duplicate empty lock, something harmless as the update for the T3 updates all locks with the same id.
  • Another process's T3 triggers between a different process's T3 -> T1 gap. This will result in a previously assigned worker having it's work 'stolen', something we expect and is harmless.
  • Another process's T1 triggers between a T2 and a T3: Where clause of the T1 returns only fully-assigned locks, after T3. But it's completion update (not shown) updates all locks for the id, whether or not they were assigned to it, hence closing any locks that slipped through the T2->T3 gap.
  • If a race condition occurs between the insert of an empty lock and the selection of the next lock, we will end up with duplicate locks. Something which is harmless as long as they are all assigned to the same worker.
  • If we get a race condition between the

additionally, the existence of duplicate locks is a clear measure that the system has too many workers for it's load.