What methods TupleSpace provides?
The tuple space exposes a handful number of methods that can be used to manipulate the content of tuple space:
write(tuple, timeout)adds a tuple to TupleSpace. Note that TupleSpace can store several instances of the same tuple.take(template, timeout) {|t| ...}removes a tuple from TupleSpace. The first parameter is a template used to search for tuples that we are interested in. If block is specified, the found tuple will be passed to the block as parameter.read(template, timeout) {|t| ...}same astakemethod, but it leaves the found tuple in TupleSpace.read_all(template)reads all tuples matching the template.notify(event, template, timeout)is used to register for notifications of changes in TupleSpace.
Several things should be noted.
First of, all methods but write take template object as first parameter. This object is used as a search specification for tuples that we are trying to find. Methods return a tuple if it matches template using very simple rules: tuple must be of the same size as a template and each tuple's component should be equal to the corresponding template's component (unless template's component is nil, which plays as a wildcard). The equality of components is determined by using == or === methods. The example is in order:
Let's say we have [:foo, 5] tuple in the repository,
then it matches these templates:
[:foo, nil], [nil, 5], [nil, nil].
Secondly, all methods but read_all take timeout argument as last parameter. It can be either a number of seconds to wait for an operation to complete, or it can be an object which implements renew interface (this goes beyond the scope of this article). In case of write method this argument is used to specify how long a tuple should be stored in the repository. If this parameter is nil, then an operation or a tuple will never expire.
Bootstrap process
In order to start using TupleSpace, we need to find out how we can run it first. It actually quite easy to do. Use the following script:
require "rinda/tuplespace"
ts = Rinda::TupleSpace.new
DRb.start_service(nil, ts)
puts DRb.uri
DRb.thread.join
When it is started, it will print URI that a client script can use to find our TupleSpace. Below is the code that a client script should use to find our TupleSpace:
require "rinda/rinda"
URI = "..." # our TupleSpace URI
include Rinda
DRb.start_service
ts = DRbObject.new(nil, URI)
Note that we have to pass TupleSpace's URI to all client scripts. Wouldn't it be great if we were able to remove this tedious step? Sure, it would. And here is how: Rinda framework provides RingServer and RingFinger classes that help us with this task. That's how you start the TupleSpace server with help of RingServer:
require "rinda/tuplespace"
require "rinda/ring"
include Rinda
DRb.start_service
ts = TupleSpace.new
ring = RingServer.new(ts)
DRb.thread.join
And this is the client's bootstrap process:
require 'rinda/rinda'
require 'rinda/ring'
include Rinda
DRb.start_service
ts = RingFinger.primary
That's it. What is happening is that RingServer class initiate a thread which listens UDP multicast socket requests from RingFinger class. This way RingFinger can locate all TupleSpace servers available on the local network. This greatly simplifies the bootstrap process!
Now when we are done with that, let's test that our TupleSpace actually works:
ts.write [:message, 'Hello, World!']
ts.read([:message, nil])[1] # => 'Hello, World!'
OK, we are good to go. In the following examples I assume that TupleSpace is up and running and each client script has ts variable initialized with the code above.
Distributed Data Structures
Shared Variable
Probably the simplest distributed data structure is Shared Variable. A shared variable allows multiple processes to access and change a single variable in an atomic manner. For example, a shared variable can be used as a distributed counter.
Client 1:
ts.write [:variable, 1]
sleep 3
ts.read([:variable, nil]) # => 2
Client 2:
var = ts.take([:variable, nil])
var[1] += 1
ts.write var
Crucial requirement of a shared variables is that its modifications must be atomic. This requirement is satisfied by the fact that in order to modify a shared variable each client must first remove this variable from TupleSpace, modify it and only then place it back. When a tuple is removed from the repository, no other client will be able to find it and therefor modify it.
Unordered Data Structures
A typical unordered data structure is called bag because it resembles a bag of objects. It has a numerous uses in the distributed computing since it provides a loose coupling to processes. Each process requests and provides services without knowing who they are requesting a service from or providing a service to. A more specific example of bags is when an initiator creates a number of tasks in TupleSpace and there are several worker processes that request a task from the bag, process a task and place a result to a result bag. Bag data structure is very easy to implement with a tuple of array type. The master process (initiator) places tasks to the bag and waits for results:
num.times { ts.write [:request, data] }
(1..num).map { ts.take([:result, nil]).last } # array of results
The workers take requests from the request bag, process them and place a result into the result bag:
loop do
data = ts.take([:request, nil]).last
ts.write [:result, process(data)]
end
One thing that makes this pattern so powerful is that processes are loosely coupled with each other. You can add a new worker to the system any time and it will find a next available request and start processing it.
Ordered Data Structures
Let's see how we can implement a distributed array. The easiest solution that comes to mind is this:
Client 1:
ts.write [:array, [1,2,3]]
sleep 3
a = ts.take([:array, nil]).last
a[1] += 1
ts.write [:array, a]
Client 2:
a = ts.take([:array, nil]).last
a[0] += 1
ts.write [:array, a]
We represent our distributed array with a single tuple where the second component is the actual array object. The problem with this approach is that it doesn't scale. Even though these two clients modify different elements of the same array, they have to wait for each other before they can start their modifications. In order to improve it, we have to write a single tuple for each element of array. Below is the complete source code of our distributed Rinda array:
class RArray
include Enumerable
def initialize(ts, name)
@ts, @name = ts, name
end
def self.create(ts, name, size, &block)
result = new(ts, name)
result.init(size, &block)
result
end
class << self
alias_method :find, :new
end
def init(size)
@ts.write [:rarray, @name, size]
0.upto(size-1) do |idx|
@ts.write [@name, idx, block_given? ? yield(idx) : nil]
end
end
def [](idx)
@ts.read([@name, idx, nil])[2]
end
def []=(idx, value)
t = @ts.take [@name, idx, nil]
t[2] = value
@ts.write t
value
end
def size
@ts.read([:rarray, @name, nil])[2]
end
def each
0.upto(size-1) do |idx|
yield self[idx]
end
end
end
This is how you can use it:
Client 1:
ary = RArray.create(ts, :array, 3) {|idx| idx}
sleep 1
ary[2] += 1
ary.each do |v|
puts v
end
Client 2:
ary = RArray.find(ts, :array)
ary[1] += 2
ary[0] = "client2"
Note that in this example Client 1 doesn't have to wait for Client 2 in order to modify array. You can improve RArray class by providing append method which adds a new element to array thus changing its size. Implement it as an exercise. Also the same way other data structures like hashes, lists, queues, trees, etc can be implemented.
Synchronization
Synchronization can be quite a challenge in the distributed computing. I've mentioned above that TupleSpace operations have an atomic effect, where multiple processes can read from TupleSpace, but in order to update an entry, a process must remove it from TupleSpace thereby gaining exclusive access to it. We can use this feature to create interesting synchronization constructs.
Semaphores
Let's say we need to coordinate access to a number of shared resources. None of these resources can be used by two or more processes at the same time. A semaphore can help us with that. A semaphore provides two methods:
downmethod checks that the counter is greater than zero, and if so, decrements it and returns control to the process. If the counter value is already zero, the process should wait until the value becomes greater than zero.upmethod increments the counter. If one or more processes are waiting in thedownmethod, then one of them should proceed.
It seems that a semaphore is a specific case of shared variable, but by doing so we create a not too scalable solution. What I'm going to do, I implement a semaphore as a bag of resources. For each resource I write a single tuple in TupleSpace. Then down method will simply removes on these tuples from TupleSpace and up method will return it back:
class RSemaphore
def initialize(ts, name)
@ts, @name = ts, name
end
class << self
def create(ts, name, num)
result = new(ts, name)
result.init(num)
result
end
alias_method :find, :new
def exists?(ts, name)
not ts.read_all([:rsemaphore, name]).empty?
end
end
def init(num)
num.times do
@ts.write [:rsemaphore, @name]
end
end
def down
@ts.take [:rsemaphore, @name]
end
def up
@ts.write [:rsemaphore, @name]
end
end
Note how I used ts.read_all method in order to check that our semaphore is already initialized. Here is a simple client application:
require "rsemaphore"
name = 'printer'
sem = RSemaphore.exists?(ts, name) ?
RSemaphore.find(ts, name) :
RSemaphore.create(ts, name, 3)
puts "#{$$}: requests resource"
sem.down
puts "#{$$}: resource taken"
sleep rand(5)
sem.up
puts "#{$$}: resource released"
When I run four clients simultaneously, I get something like this in my terminal:
10041: requests resource
10045: requests resource
10043: requests resource
10041: resource taken
10045: resource taken
10043: resource taken
10047: requests resource
10047: resource taken
10047: resource released
10041: resource released
10045: resource released
10043: resource released
Barrier
A barrier is a particular point in the system execution where several processes must reach before any process can proceed further. If you look at this problem more closely, you notice that this is a custom case of shared variable pattern:
class RBarrier
def initialize(ts, name)
@ts, @name = ts, name
end
class << self
def create(ts, name, num)
result = new(ts, name)
result.init(num)
result
end
def exists?(ts, name)
not ts.read_all([:rbarrier, name, nil, nil]).empty?
end
alias_method :find, :new
end
def init(num)
@ts.write [:rbarrier, @name, num, 0]
end
def wait
t = @ts.take [:rbarrier, @name, nil, nil]
t[3] += 1
@ts.write t
@ts.read [:rbarrier, @name, t[2], t[2]]
end
end
And a simple client application:
require "rbarrier"
name = 'barrier'
barrier = RBarrier.exists?(ts, name) ?
RBarrier.find(ts, name) :
RBarrier.create(ts, name, 3)
puts "#{$$}: waiting..."
barrier.wait
puts "#{$$}: done."
Run tree client scripts at the same time to see the effect of barrier.
Read-Write Lock
Read/Write Lock is used when you have two types of processes: readers that just read a resource value without any modifications and writers that modify a resource value. Two rules must be enforced:
The resource must be accessed by several readers or by a single writer simultaneously, but not by both.
Both reader and writer must have a chance to access the resource.
The implementation is a little bit more complex than all previous synchronization mechanisms, but still it's quite straightforward. I'm going to use tree counters:
Dispenser counter is used to generate a unique ticket number. Any process that wishes to use resource must take a ticket number from the dispenser.
Turn counter is used to create a process queue. Any process will wait in the queue for its turn. A process gets its turn when the turn counter value reaches a number received by the process from dispenser counter.
"Reader" counter is used to keep a number of active reader processes. A writer will wait until this counter reaches zero before proceeding.
When a reader gets its turn it immediately increases the turn counter, so the next process gets a chance to proceed. When a writer gets its turn it doesn't increase the turn counter, so it basically lock the turn queue. Then it waits until reader counter reaches zero. Only when writer is done with the resource, it increases the turn counter.
Below is the implementation of the read-write lock:
class RWLock
class Counter
def initialize(ts, rw_name, name)
@ts, @rw_name, @name = ts, rw_name, name
end
def init(n = 0)
update(n)
end
def modify
t = @ts.take [@rw_name, @name, nil]
result = yield t
@ts.write t
result
end
private :modify
def inc
modify{|t| t[2] += 1}
end
def dec
modify{|t| t[2] -= 1}
end
def wait(n, oper = :take)
@ts.send oper, [@rw_name, @name, n]
end
def update(n)
@ts.write [@rw_name, @name, n]
end
end
def initialize(ts, name)
@ts, @name = ts, name
@dispenser = Counter.new(@ts, @name, 'dispenser')
@reader = Counter.new(@ts, @name, 'reader')
@turn = Counter.new(@ts, @name, 'turn')
end
def init
@dispenser.init(-1)
@reader.init
@turn.init
end
class << self
def create(*args)
result = new(*args)
result.init
result
end
alias_method :find, :new
def exists?(ts, name)
not ts.read_all([name, 'dispenser', nil]).empty?
end
end
def read_lock
ticket = @dispenser.inc
@turn.wait(ticket)
@reader.inc
@turn.update(ticket + 1)
end
def read_unlock
@reader.dec
end
def write_lock
@ticket = @dispenser.inc
@turn.wait(@ticket)
@reader.wait(0, :read)
end
def write_unlock
@turn.update(@ticket + 1)
end
end
I won't present a test programs here, try it for yourself.
Communications
Another aspect of TupleSpace programming is the process communication. Just by using a handful number of methods, TupleSpace provides loosely-coupled, location and time independent mechanism of data exchange. It doesn't have much of an effort in order for one process to send a message to another process, or many processes to send messages to many other processes. A process doesn't care who the receiver is or when a receiver will pick up the message. The simplest form of the process communication is when a provider publishes a tuple and a consumer takes this tuple from TupleSpace:
Producer:
ts.write [:message, :consumer, data]
Consumer:
ts.take [:message, :consumer, nil]
where the first tuple's component identify our data type (:message, in this case), the second tuple's component identify who receiver is (:consumer, in our example), and the third component contains the message data.
Data Queue
What if our producer needs to generate several messages. In this case we are going to use an idea of data queue. Data queue is a distributed data structure where a producer can place messages and a consumer can read them in first in first out order. In order to implement queue, we need to place a marker at the end of the queue:
class SimpleQueue
def initialize(ts, name)
@ts, @name = ts, name
end
def init
@ts.write [:squeue, :tail, @name, 0]
end
class << self
def create(*args)
result = new(*args)
result.init
result
end
alias_method :find, :new
end
def send(data)
tail = @ts.take [:squeue, :tail, @name, nil]
tail[3] += 1
@ts.write [:squeue, @name, tail[3], data]
@ts.write tail
end
def each
pos = 1
loop do
t = @ts.read [:squeue, @name, pos, nil]
yield t[3]
pos += 1
end
end
end
Note that tail marker keeps the position of the last message in the queue. By using SimpleQueue class, several producers can publish messages and several consumer can receive them:
Producer:
loop do
msg = create_message()
squeue.send msg
end
Consumer:
squeue.each do |msg|
process(msg)
end
This particular communication type is also called Topic/Subscriber mechanism.
Dedicated Queues
We can modify this example, so that only one consumer receives a single message. In order to do that, we need keep track of the head of our queue as well:
class Queue
def initialize(ts, name)
@ts, @name = ts, name
end
def init
@ts.write [:queue, :tail, @name, 0]
@ts.write [:queue, :head, @name, 1]
end
class << self
def create(*args)
result = new(*args)
result.init
result
end
alias_method :find, :new
end
def send(data)
tail = @ts.take [:queue, :tail, @name, nil]
tail[3] += 1
@ts.write [:queue, @name, tail[3], data]
@ts.write tail
end
def read
head = @ts.take [:queue, :head, @name, nil]
t = @ts.take [:queue, @name, head[3], nil]
head[3] += 1
@ts.write head
t[3]
end
def each
loop do
yield read
end
end
end
Bounded Queue
Both these queue implementations suffer from one problem, they can populate TupleSpace with unlimited number of data, thus it might lead to out of memory situation. My final example is a demonstration of bounded queue implementation.
This queue can store only a limited number of data. So if producer tries to send more data and the queue is full, the producer will block until a consumer read at least one message. It also work other way around, when a consumer tries to read a message from the empty queue, it will block until a producer places a new message to the queue.
For the implementation of bounded queue we need to keep its status tuple:
class Queue
def initialize(ts, name)
@ts, @name = ts, name
end
SIZE, LENGTH, FULL, EMPTY = (2..5).to_a
def init(size)
@ts.write [:bqueue, :tail, @name, 0]
@ts.write [:bqueue, :head, @name, 1]
length = 0
full = false
empty = true
@ts.write [:bqueue, :status, size, length, full, empty]
end
class << self
def create(ts, name, size)
result = new(ts, name)
result.init(size)
result
end
alias_method :find, :new
end
def send(data)
# read unless queue is full
status = @ts.take [:bqueue, :status, nil, nil, false, nil]
# update status
status[LENGTH] += 1
status[FULL] = status[SIZE] == status[LENGTH]
status[EMPTY] = status[LENGTH] == 0
tail = @ts.take [:bqueue, :tail, @name, nil]
tail[3] += 1
@ts.write status
@ts.write [:bqueue, @name, tail[3], data]
@ts.write tail
end
def read
# read unless queue is empty
status = @ts.take [:bqueue, :status, nil, nil, nil, false]
# update status
status[LENGTH] -= 1
status[FULL] = status[SIZE] == status[LENGTH]
status[EMPTY] = status[LENGTH] == 0
head = @ts.take [:bqueue, :head, @name, nil]
t = @ts.take [:bqueue, @name, head[3], nil]
head[3] += 1
@ts.write status
@ts.write head
t[3]
end
def each
loop do
yield read
end
end
end
Conclusion
Tuple space is a very interesting technology. With these examples I have tried to demonstrate several usages of this powerful tool and this is just a tip of the iceberg. There are some useful links that you can find on the web that discuss this topic further.
I hope you will find this article useful.
very handy article. the color scheme (grey on black) is hard to read though.
So few resources on Rinda ! Thanks for sharing your experiments ! I regret however that you do not mention the case of the failing client. I am strongly attracted to using Rinda for many tasks in a server farm I administer at work but I wonder about the best strategy to handle a failing client that has taken a resource off the ts.
I see two approaches:
1) To leave the resource in the ts during the update/work but mark it as read-only via a secondary tuple itself with an expiration value.
If the consumer carries out its processing on/of the resource and relinquishes it properly, all is fine.
If the consumer crashes while it has de facto ownership of the resource, the secondary tuple will eventually expire hence releasing the resource for another consumer to 'take'.
2) To have producers monitor the lifecycle of the resources thay put in the ts and reincarnate them, should a consumer blow up or take the tuple too long.
I personnally have a preference for the first approach but these are just my thoughts and I do not have any real hands-on experience on this matter, what do you think ?
Thanks for writing the article, it was very useful!
I found a bug in the RWLock class with the way the exists? method works. It would occasionally return false when it shouldn't have, causing a new RWLock to be created in the tuplespace.
I fixed this by changing 'initialize' to:
and changing 'exists?' to:
Please post some more Rinda articles!
There is a draft of 2nd dRuby book. However, it is Japanese text.
http://www.druby.org/ilikeruby/d208.html
Thanks for the really helpful article!