What is Rinda anyway?

Posted on August 24, 2006

What methods TupleSpace provides?

The tuple space exposes a handful number of methods that can be used to manipulate the content of tuple space:

  1. write(tuple, timeout) adds a tuple to TupleSpace. Note that TupleSpace can store several instances of the same tuple.

  2. take(template, timeout) {|t| ...} removes a tuple from TupleSpace. The first parameter is a template used to search for tuples that we are interested in. If block is specified, the found tuple will be passed to the block as parameter.

  3. read(template, timeout) {|t| ...} same as take method, but it leaves the found tuple in TupleSpace.

  4. read_all(template) reads all tuples matching the template.

  5. notify(event, template, timeout) is used to register for notifications of changes in TupleSpace.

Several things should be noted.

First of, all methods but write take template object as first parameter. This object is used as a search specification for tuples that we are trying to find. Methods return a tuple if it matches template using very simple rules: tuple must be of the same size as a template and each tuple's component should be equal to the corresponding template's component (unless template's component is nil, which plays as a wildcard). The equality of components is determined by using == or === methods. The example is in order:

  Let's say we have [:foo, 5] tuple in the repository, 
  then it matches these templates: 
  [:foo, nil], [nil, 5], [nil, nil].

Secondly, all methods but read_all take timeout argument as last parameter. It can be either a number of seconds to wait for an operation to complete, or it can be an object which implements renew interface (this goes beyond the scope of this article). In case of write method this argument is used to specify how long a tuple should be stored in the repository. If this parameter is nil, then an operation or a tuple will never expire.

Bootstrap process

In order to start using TupleSpace, we need to find out how we can run it first. It actually quite easy to do. Use the following script:

    require "rinda/tuplespace"

    ts = Rinda::TupleSpace.new
    DRb.start_service(nil, ts)
    puts DRb.uri
    DRb.thread.join

When it is started, it will print URI that a client script can use to find our TupleSpace. Below is the code that a client script should use to find our TupleSpace:

    require "rinda/rinda"

    URI = "..." # our TupleSpace URI
    include Rinda
    DRb.start_service
    ts = DRbObject.new(nil, URI)

Note that we have to pass TupleSpace's URI to all client scripts. Wouldn't it be great if we were able to remove this tedious step? Sure, it would. And here is how: Rinda framework provides RingServer and RingFinger classes that help us with this task. That's how you start the TupleSpace server with help of RingServer:

    require "rinda/tuplespace"
    require "rinda/ring"

    include Rinda
    DRb.start_service

    ts = TupleSpace.new
    ring = RingServer.new(ts)
    DRb.thread.join

And this is the client's bootstrap process:

    require 'rinda/rinda'
    require 'rinda/ring'

    include Rinda
    DRb.start_service

    ts = RingFinger.primary

That's it. What is happening is that RingServer class initiate a thread which listens UDP multicast socket requests from RingFinger class. This way RingFinger can locate all TupleSpace servers available on the local network. This greatly simplifies the bootstrap process!

Now when we are done with that, let's test that our TupleSpace actually works:

    ts.write [:message, 'Hello, World!']
    ts.read([:message, nil])[1] # => 'Hello, World!'

OK, we are good to go. In the following examples I assume that TupleSpace is up and running and each client script has ts variable initialized with the code above.

Distributed Data Structures

Shared Variable

Probably the simplest distributed data structure is Shared Variable. A shared variable allows multiple processes to access and change a single variable in an atomic manner. For example, a shared variable can be used as a distributed counter.

Client 1:

    ts.write [:variable, 1]
    sleep 3
    ts.read([:variable, nil]) # => 2

Client 2:

    var = ts.take([:variable, nil])
    var[1] += 1
    ts.write var

Crucial requirement of a shared variables is that its modifications must be atomic. This requirement is satisfied by the fact that in order to modify a shared variable each client must first remove this variable from TupleSpace, modify it and only then place it back. When a tuple is removed from the repository, no other client will be able to find it and therefor modify it.

Unordered Data Structures

A typical unordered data structure is called bag because it resembles a bag of objects. It has a numerous uses in the distributed computing since it provides a loose coupling to processes. Each process requests and provides services without knowing who they are requesting a service from or providing a service to. A more specific example of bags is when an initiator creates a number of tasks in TupleSpace and there are several worker processes that request a task from the bag, process a task and place a result to a result bag. Bag data structure is very easy to implement with a tuple of array type. The master process (initiator) places tasks to the bag and waits for results:

    num.times { ts.write [:request, data] }
    (1..num).map { ts.take([:result, nil]).last } # array of results

The workers take requests from the request bag, process them and place a result into the result bag:

    loop do 
      data = ts.take([:request, nil]).last
      ts.write [:result, process(data)]
    end

One thing that makes this pattern so powerful is that processes are loosely coupled with each other. You can add a new worker to the system any time and it will find a next available request and start processing it.

Ordered Data Structures

Let's see how we can implement a distributed array. The easiest solution that comes to mind is this:

Client 1:

    ts.write [:array, [1,2,3]]
    sleep 3
    a = ts.take([:array, nil]).last
    a[1] += 1
    ts.write [:array, a]

Client 2:

    a = ts.take([:array, nil]).last
    a[0] += 1
    ts.write [:array, a]

We represent our distributed array with a single tuple where the second component is the actual array object. The problem with this approach is that it doesn't scale. Even though these two clients modify different elements of the same array, they have to wait for each other before they can start their modifications. In order to improve it, we have to write a single tuple for each element of array. Below is the complete source code of our distributed Rinda array:

    class RArray
      include Enumerable

      def initialize(ts, name)
        @ts, @name = ts, name
      end

      def self.create(ts, name, size, &block)
        result = new(ts, name)
        result.init(size, &block)
        result
      end

      class << self
        alias_method :find, :new
      end

      def init(size)
        @ts.write [:rarray, @name, size]
        0.upto(size-1) do |idx|
          @ts.write [@name, idx, block_given? ? yield(idx) : nil]
        end
      end

      def [](idx)
        @ts.read([@name, idx, nil])[2]
      end

      def []=(idx, value)
        t = @ts.take [@name, idx, nil]
        t[2] = value
        @ts.write t
        value
      end

      def size
        @ts.read([:rarray, @name, nil])[2] 
      end

      def each
        0.upto(size-1) do |idx|
          yield self[idx]
        end
      end
    end

This is how you can use it:

Client 1:

    ary = RArray.create(ts, :array, 3) {|idx| idx}
    sleep 1
    ary[2] += 1
    ary.each do |v|
      puts v
    end

Client 2:

    ary = RArray.find(ts, :array)
    ary[1] += 2
    ary[0] = "client2"

Note that in this example Client 1 doesn't have to wait for Client 2 in order to modify array. You can improve RArray class by providing append method which adds a new element to array thus changing its size. Implement it as an exercise. Also the same way other data structures like hashes, lists, queues, trees, etc can be implemented.

Synchronization

Synchronization can be quite a challenge in the distributed computing. I've mentioned above that TupleSpace operations have an atomic effect, where multiple processes can read from TupleSpace, but in order to update an entry, a process must remove it from TupleSpace thereby gaining exclusive access to it. We can use this feature to create interesting synchronization constructs.

Semaphores

Let's say we need to coordinate access to a number of shared resources. None of these resources can be used by two or more processes at the same time. A semaphore can help us with that. A semaphore provides two methods:

  • down method checks that the counter is greater than zero, and if so, decrements it and returns control to the process. If the counter value is already zero, the process should wait until the value becomes greater than zero.

  • up method increments the counter. If one or more processes are waiting in the down method, then one of them should proceed.

It seems that a semaphore is a specific case of shared variable, but by doing so we create a not too scalable solution. What I'm going to do, I implement a semaphore as a bag of resources. For each resource I write a single tuple in TupleSpace. Then down method will simply removes on these tuples from TupleSpace and up method will return it back:

    class RSemaphore
      def initialize(ts, name)
        @ts, @name = ts, name
      end

      class << self
        def create(ts, name, num)
          result = new(ts, name)
          result.init(num)
          result
        end

        alias_method :find, :new

        def exists?(ts, name)
          not ts.read_all([:rsemaphore, name]).empty?
        end
      end

      def init(num)
        num.times do
          @ts.write [:rsemaphore, @name]
        end
      end

      def down
        @ts.take [:rsemaphore, @name]
      end

      def up
        @ts.write [:rsemaphore, @name]
      end
    end

Note how I used ts.read_all method in order to check that our semaphore is already initialized. Here is a simple client application:

    require "rsemaphore"
    name = 'printer'

    sem = RSemaphore.exists?(ts, name) ? 
            RSemaphore.find(ts, name) : 
            RSemaphore.create(ts, name, 3)

    puts "#{$$}: requests resource"
    sem.down
    puts "#{$$}: resource taken"
    sleep rand(5)
    sem.up
    puts "#{$$}: resource released"

When I run four clients simultaneously, I get something like this in my terminal:

    10041: requests resource
    10045: requests resource
    10043: requests resource
    10041: resource taken
    10045: resource taken
    10043: resource taken
    10047: requests resource
    10047: resource taken
    10047: resource released
    10041: resource released
    10045: resource released
    10043: resource released

Barrier

A barrier is a particular point in the system execution where several processes must reach before any process can proceed further. If you look at this problem more closely, you notice that this is a custom case of shared variable pattern:

    class RBarrier
      def initialize(ts, name)
        @ts, @name = ts, name
      end

      class << self
        def create(ts, name, num)
          result = new(ts, name)
          result.init(num)
          result
        end

        def exists?(ts, name)
          not ts.read_all([:rbarrier, name, nil, nil]).empty?
        end

        alias_method :find, :new
      end

      def init(num)
        @ts.write [:rbarrier, @name, num, 0]
      end

      def wait
        t = @ts.take [:rbarrier, @name, nil, nil]
        t[3] += 1
        @ts.write t
        @ts.read [:rbarrier, @name, t[2], t[2]]
      end
    end

And a simple client application:

    require "rbarrier"
    name = 'barrier'

    barrier = RBarrier.exists?(ts, name) ?
                RBarrier.find(ts, name) :
                RBarrier.create(ts, name, 3)

    puts "#{$$}: waiting..."
    barrier.wait
    puts "#{$$}: done."

Run tree client scripts at the same time to see the effect of barrier.

Read-Write Lock

Read/Write Lock is used when you have two types of processes: readers that just read a resource value without any modifications and writers that modify a resource value. Two rules must be enforced:

  1. The resource must be accessed by several readers or by a single writer simultaneously, but not by both.

  2. Both reader and writer must have a chance to access the resource.

The implementation is a little bit more complex than all previous synchronization mechanisms, but still it's quite straightforward. I'm going to use tree counters:

  1. Dispenser counter is used to generate a unique ticket number. Any process that wishes to use resource must take a ticket number from the dispenser.

  2. Turn counter is used to create a process queue. Any process will wait in the queue for its turn. A process gets its turn when the turn counter value reaches a number received by the process from dispenser counter.

  3. "Reader" counter is used to keep a number of active reader processes. A writer will wait until this counter reaches zero before proceeding.

When a reader gets its turn it immediately increases the turn counter, so the next process gets a chance to proceed. When a writer gets its turn it doesn't increase the turn counter, so it basically lock the turn queue. Then it waits until reader counter reaches zero. Only when writer is done with the resource, it increases the turn counter.

Below is the implementation of the read-write lock:

    class RWLock
      class Counter
        def initialize(ts, rw_name, name)
          @ts, @rw_name, @name = ts, rw_name, name
        end

        def init(n = 0)
          update(n)
        end

        def modify
          t = @ts.take [@rw_name, @name, nil]
          result = yield t
          @ts.write t
          result
        end
        private :modify

        def inc
          modify{|t| t[2] += 1}
        end

        def dec
          modify{|t| t[2] -= 1}
        end

        def wait(n, oper = :take)
          @ts.send oper, [@rw_name, @name, n]
        end

        def update(n)
          @ts.write [@rw_name, @name, n]
        end
      end

      def initialize(ts, name)
        @ts, @name = ts, name
        @dispenser = Counter.new(@ts, @name, 'dispenser')
        @reader = Counter.new(@ts, @name, 'reader')
        @turn = Counter.new(@ts, @name, 'turn')
      end

      def init
        @dispenser.init(-1)
        @reader.init
        @turn.init
      end

      class << self
        def create(*args)
          result = new(*args)
          result.init
          result
        end

        alias_method :find, :new

        def exists?(ts, name)
          not ts.read_all([name, 'dispenser', nil]).empty?
        end
      end

      def read_lock
        ticket = @dispenser.inc
        @turn.wait(ticket)
        @reader.inc
        @turn.update(ticket + 1)
      end

      def read_unlock
        @reader.dec
      end

      def write_lock
        @ticket = @dispenser.inc
        @turn.wait(@ticket)
        @reader.wait(0, :read)
      end

      def write_unlock
        @turn.update(@ticket + 1)
      end
    end

I won't present a test programs here, try it for yourself.

Communications

Another aspect of TupleSpace programming is the process communication. Just by using a handful number of methods, TupleSpace provides loosely-coupled, location and time independent mechanism of data exchange. It doesn't have much of an effort in order for one process to send a message to another process, or many processes to send messages to many other processes. A process doesn't care who the receiver is or when a receiver will pick up the message. The simplest form of the process communication is when a provider publishes a tuple and a consumer takes this tuple from TupleSpace:

Producer:

    ts.write [:message, :consumer, data]

Consumer:

    ts.take [:message, :consumer, nil]

where the first tuple's component identify our data type (:message, in this case), the second tuple's component identify who receiver is (:consumer, in our example), and the third component contains the message data.

Data Queue

What if our producer needs to generate several messages. In this case we are going to use an idea of data queue. Data queue is a distributed data structure where a producer can place messages and a consumer can read them in first in first out order. In order to implement queue, we need to place a marker at the end of the queue:

    class SimpleQueue
      def initialize(ts, name)
        @ts, @name = ts, name
      end

      def init
        @ts.write [:squeue, :tail, @name, 0]
      end

      class << self
        def create(*args)
          result = new(*args)
          result.init
          result
        end

        alias_method :find, :new
      end

      def send(data)
        tail = @ts.take [:squeue, :tail, @name, nil]
        tail[3] += 1
        @ts.write [:squeue, @name, tail[3], data]
        @ts.write tail
      end

      def each
        pos = 1
        loop do
          t = @ts.read [:squeue, @name, pos, nil]
          yield t[3]
          pos += 1
        end
      end
    end

Note that tail marker keeps the position of the last message in the queue. By using SimpleQueue class, several producers can publish messages and several consumer can receive them:

Producer:

    loop do
      msg = create_message()
      squeue.send msg
    end

Consumer:

    squeue.each do |msg|
      process(msg)
    end

This particular communication type is also called Topic/Subscriber mechanism.

Dedicated Queues

We can modify this example, so that only one consumer receives a single message. In order to do that, we need keep track of the head of our queue as well:

    class Queue
      def initialize(ts, name)
        @ts, @name = ts, name
      end

      def init
        @ts.write [:queue, :tail, @name, 0]
        @ts.write [:queue, :head, @name, 1]
      end

      class << self
        def create(*args)
          result = new(*args)
          result.init
          result
        end

        alias_method :find, :new
      end

      def send(data)
        tail = @ts.take [:queue, :tail, @name, nil]
        tail[3] += 1
        @ts.write [:queue, @name, tail[3], data]
        @ts.write tail
      end

      def read
        head = @ts.take [:queue, :head, @name, nil]
        t = @ts.take [:queue, @name, head[3], nil]
        head[3] += 1
        @ts.write head
        t[3]
      end

      def each
        loop do
          yield read
        end
      end
    end

Bounded Queue

Both these queue implementations suffer from one problem, they can populate TupleSpace with unlimited number of data, thus it might lead to out of memory situation. My final example is a demonstration of bounded queue implementation.

This queue can store only a limited number of data. So if producer tries to send more data and the queue is full, the producer will block until a consumer read at least one message. It also work other way around, when a consumer tries to read a message from the empty queue, it will block until a producer places a new message to the queue.

For the implementation of bounded queue we need to keep its status tuple:

    class Queue
      def initialize(ts, name)
        @ts, @name = ts, name
      end

      SIZE, LENGTH, FULL, EMPTY = (2..5).to_a

      def init(size)
        @ts.write [:bqueue, :tail, @name, 0]
        @ts.write [:bqueue, :head, @name, 1]
        length = 0
        full = false
        empty = true
        @ts.write [:bqueue, :status, size, length, full, empty]
      end

      class << self
        def create(ts, name, size)
          result = new(ts, name)
          result.init(size)
          result
        end

        alias_method :find, :new
      end

      def send(data)
        # read unless queue is full
        status = @ts.take [:bqueue, :status, nil, nil, false, nil]

        # update status
        status[LENGTH] += 1
        status[FULL] = status[SIZE] == status[LENGTH]
        status[EMPTY] = status[LENGTH] == 0

        tail = @ts.take [:bqueue, :tail, @name, nil]
        tail[3] += 1

        @ts.write status
        @ts.write [:bqueue, @name, tail[3], data]
        @ts.write tail
      end

      def read
        # read unless queue is empty
        status = @ts.take [:bqueue, :status, nil, nil, nil, false]

        # update status
        status[LENGTH] -= 1
        status[FULL] = status[SIZE] == status[LENGTH]
        status[EMPTY] = status[LENGTH] == 0

        head = @ts.take [:bqueue, :head, @name, nil]
        t = @ts.take [:bqueue, @name, head[3], nil]
        head[3] += 1

        @ts.write status
        @ts.write head

        t[3]
      end

      def each
        loop do
          yield read
        end
      end
    end

Conclusion

Tuple space is a very interesting technology. With these examples I have tried to demonstrate several usages of this powerful tool and this is just a tip of the iceberg. There are some useful links that you can find on the web that discuss this topic further.

I hope you will find this article useful.

Comments
  1. EvanFebruary 12, 2007 @ 01:41 PM

    very handy article. the color scheme (grey on black) is hard to read though.

  2. TheZedJune 03, 2007 @ 09:58 AM

    So few resources on Rinda ! Thanks for sharing your experiments ! I regret however that you do not mention the case of the failing client. I am strongly attracted to using Rinda for many tasks in a server farm I administer at work but I wonder about the best strategy to handle a failing client that has taken a resource off the ts.

    I see two approaches:

    1) To leave the resource in the ts during the update/work but mark it as read-only via a secondary tuple itself with an expiration value.

    If the consumer carries out its processing on/of the resource and relinquishes it properly, all is fine.

    If the consumer crashes while it has de facto ownership of the resource, the secondary tuple will eventually expire hence releasing the resource for another consumer to 'take'.

    2) To have producers monitor the lifecycle of the resources thay put in the ts and reincarnate them, should a consumer blow up or take the tuple too long.

    I personnally have a preference for the first approach but these are just my thoughts and I do not have any real hands-on experience on this matter, what do you think ?

  3. RichJune 29, 2007 @ 10:00 PM

    Thanks for writing the article, it was very useful!

    I found a bug in the RWLock class with the way the exists? method works. It would occasionally return false when it shouldn't have, causing a new RWLock to be created in the tuplespace.

    I fixed this by changing 'initialize' to:

    def initialize(ts, name)
     @ts, @name = ts, name
     @dispenser = Counter.new(@ts, @name, 'dispenser')
     @reader = Counter.new(@ts, @name, 'reader')
     @turn = Counter.new(@ts, @name, 'turn')
    
     @ts.write [@name, 'marker'] 
    end
    

    and changing 'exists?' to:

    def exists?(ts, name)
      not ts.read_all([name, 'marker']).empty?
    end
    

    Please post some more Rinda articles!

  4. m_sekiJuly 28, 2007 @ 03:47 AM

    There is a draft of 2nd dRuby book. However, it is Japanese text.

    http://www.druby.org/ilikeruby/d208.html

  5. jhJanuary 29, 2008 @ 05:17 PM

    Thanks for the really helpful article!

Post a comment
Comment





You may use Markdown in your comments, so please give your code snippets a four-space indent.