Showing posts with label Concurrency. Show all posts
Showing posts with label Concurrency. Show all posts

Monday, December 16, 2013

Interesting use-case for SynchronousQueue

While working on a very unusual system, where: 1) producers can be significantly faster than consumers at times (by more than a factor of two) and 2) producers have low latency processing overhead for real time data, I was contemplating on a data structure that is efficient, performant and can model this situation elegantly. After researching probable candidates, I came across Exchanger/SynchronousQueue from Java's util.concurrent class library.

If I was looking at SynchronousQueue without the above context, I would have wondered why would anyone need a queue that's not really a queue but more like a pointer swap between appropriate threads. But the use-case I'm dealing with ("event bursts") are probably the perfect use case for preventing the consumers from overwhelming rates by modeling the problem more as a "hand-off" than a typical case of buffered queuing. The central idea behind this data-structure is to adapt queue idioms without using a queue in a very efficient manner with an added feature that message production is rate limited by consumer's speed of processing them. Behind the scenes, it uses dual-stack/queue algorithm (depending on ordering fairness preference) to transfer a reference between threads.

SynchronousQueue is more of a thread queue than a data queue, it maintains a stack/queue of waiter threads (i.e. "consumers") and not the queue of data itself. You can probably achieve the same functionality by using BlockingQueue of size 1 or using an explicit object lock and explicit wait/notify on a datum reference like an example below:

//Example code, probably riddled with concurrency bugs 
//(I've only tested it on my laptop :)) 
public class MyNaiveSyncQueue {
    private final Object LOCK = new Object();
    private volatile Object data; //volatile is needed for non compressed OOPS
    public void put(Object o) throws InterruptedException{
        synchronized (LOCK) {
            if(data != null){
                LOCK.wait();
            }
            data = o;
            LOCK.notify();
        }
    }
    public Object take() throws InterruptedException{
        synchronized (LOCK) {
            if(data == null){
                LOCK.wait();
            }
            Object o = data;
            data = null;
            LOCK.notify();
            return o;
        }
    }
}

There are several problems with the solution above:
  • Violent locking and memory fence overhead: for individual queue operations, this will scale terribly with number of producers/consumers, especially on server class SMP hardware.
  • Constant context switching: each successful queue operation involves  syscall(s) for context switching which might involve kernel scheduler and everything that comes with it (cache flush/register reload et. al.).
  • Overhead for fair processing: JVM manages object monitor wait queues in a fixed FIFO order, there's certain overhead in seeking the first enqueued thread to schedule as a consumer. This may or may not be the behavior the programmer cares about.
SynchronousQueue takes care of all of these limitations by providing options for trade-off in terms of scheduler ordering fairness as well as eliminating expensive locking with hardware level CAS (whenever available). It also does a fair bit of spin-locking before a kernel level timed-wait kicks-in, this ensures that context-switches don't become the hot-spots in message processing.

So far this has been working great for the system I'm dealing with which processes about a couple of hundred messages per millisecond at peak bursts but I realize that it might not be appropriate (or even worth it) for non-realtime/non-bursty producers.


Monday, March 07, 2011

Why Concurrency is hard

Concurrency is hard because we haven't figure out how to make it easy. For most developers, specifically web developers, concurrency doesn't really matter. I envy that assuasive confident feeling of a sequential execution of http requests. The number of cores on my machine quadrupled in last three years and I don't know a single reliable, comforting (easy) way of harnessing it as much as possible, I feel a little sad about current state of concurrency support.

Utilizing all the processing power consistently is a lot easier for well defined and not so concurrent tasks such as map-reduce. I have done it a lot, processing gigabytes of data by reducing the problem to independent subsets is programmatic triviality. On the other hand, I have always found developing a relatively concurrent application the "right way" to be a nightmare. Concurrency applications come in two mutually exclusive flavours: slow or complex.

At this point enthusiasts will point out java.util.concurrent and move on. While j.u.concurrent is nice and a significant improvement over explicit synchronization, it still mandates that API users be concurrency wizards and its complexity exposure is nearly at par with explicit synchronization. Here's one example blog post explaining common gotcha with ConcurrentHashMap. The only benefit j.u.concurrency provides is finer grained control over where to do CAS. I am a huge fan of j.u.concurrent and have been using it pre-1.5 but I still don't think it makes concurrency so easy. For one more example,

synchronized(this){ aRef = newVal;  return aRef;}

v/s

 while (true) {
            V x = atomicRef.get();
            if (atomicRef.compareAndSet(x, newValue))
                return atomicRef.get();
 }


Which one do you think is easier to grasp?

Many people think that Actors are the next big thing to tackle concurrency monster and complexities introduced by these shared memory model primitives. I too initially thought so, but then I found that Actor model isn't really the sweet spot in practice as it is touted. The very notion that Actors can fail and code must handle the tricky bits to recover from it makes it even more complex than using locks/mutexes etc. I am in constant a awe to see people talking so lightly about  fault tolerant/fail safe systems without giving thought on the amount of complexity it adds. I am not necessarily protesting that philosophy but that behaviour is just not common in yer average regular applications (will your user be happy if one actor failed to process her payment and was asked to retry?). We still live in dark ages of transparent concurrency.

I remain as ignorant and unsatisfied about concurrency support as I was several years ago. For me, concurrency is hard so I am off to shopping!

Tuesday, May 05, 2009

Fork/Join Concurrency with Scala Actors

Have you ever wondered why there are no special frameworks to address concurrency in a Java based application? Considering Java's rich (NIH) ecosystem, I do wonder why I have to write same old state management code while introducing even a small amount of concurrency in Java application.

The reason why I think it is almost impossible to consider concurrency as an aspect in arbitrary application is because of JVM's native support for shared memory concurrency. As a result every developer is forced to think in terms of threaded shared state with guarded blocks. If you have read or written non-trivial piece of code using shared memory concurrency primitives (Mutex, Semaphore etc.) you probably know that the resultant code is hard to visualize and test.

I have been reading about Scala's Actor library and its share-nothing message passing abstraction built over existing concurrency model of JVM. While it doesn't try to solve the fundamental problem, it provides an alternative to address concurrency in your application from a different perspective which is testable and easier to understand.

In actor model, an Actor is a forkable task which runs independently, something like a serializable+immutable object with its private data and behavior. Each actor can send and receive (or react to) messages asynchronously, very similar to object oriented programming with objects responding to messages, but in a concurrent way. This abstraction can seamlessly be applied to a given application of divide and conquer nature and can be made concurrent with minimal efforts as compared to adapting to Java's concurrency primitives.

To explain my point further take a look at classically trivial Producer/Consumer example in Java.

public class Consumer extends Thread {
private final Buffer buffer;
public Consumer(Buffer buffer) {
super("Consumer");
this.buffer = buffer;
}
@Override
public void run() {
while (true){
System.out.println(buffer.next());
}
}
}
public class Producer extends Thread {
private final Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
Random random = new Random(System.nanoTime());
while (true) {
String num = Integer.toString(random.nextInt());
System.out.println(getName() + "=putting: " + num);
buffer.add(num + ": " + getName());
try {
sleep(400);
} catch (InterruptedException e) {
}
}
}
}
public class Buffer {
private String string;
private boolean ready = false;
public synchronized String next() {
if (ready != true)
try {
wait();
} catch (InterruptedException e) {
}
ready = false;
return string;
}

public synchronized void add(String string) {
while(ready == true)
try {
wait();
} catch (InterruptedException e) {
}
this.string = string;
notifyAll();
ready = true;
}

}
public class Test {
public static void main(String[] args) throws Throwable {
Buffer buffer = new Buffer();
new Consumer(buffer).start();
Producer producer = new Producer(buffer);
producer.start();
producer.join();
}
}


Take a look at Buffer class, we have used some concurrency primitives there since that's the place where state is being manipulated. We didn't declare variable ready as volatile since primitive assignments are guaranteed to be atomic (except long and double), Even a simple problem like this involves fair bit of understanding of the underlying threading model. There's no doubt this complexity will extrapolate in non-trivial applications e.g. multi-phase concurrent incremental compiler, SEDA based server etc.

Now take a look at the equivalent Producer/Consumer example in Scala.

import actors._
import actors.Actor._
import util.Random

case class SimpleMessage(num: Long)

class Producer(c: Consumer) extends Actor{
val random = new Random(System nanoTime)
def act = {
loop{
val num = produce
println("Sending: " + num )
c ! SimpleMessage(num) // asynchronous message passing
}
}
def produce(): Long = {
Thread sleep 400
return random.nextLong
}
}
class Consumer() extends Actor{
def act = {
loop{
receive{ //blocks here
case SimpleMessage(num) => println("Received: " + num);
}
}
}
}
object PCTest {
def main(args : Array[String]) : Unit = {
var c = new Consumer()
var p = new Producer(c)
c.start;p.start
}
}

Even if we don't compare the amount of code, the Scala code above is much more clear in terms of its functionality. In Scala, Actors can be mapped to a single native thread with 'receive' (similar to Thread#wait()) or we can replace 'receive' with 'react' which is event based invocation but doesn't cost a blocked thread. The code within 'react' is executed by any non-blocked thread from a pre-created thread-pool. Just a single change and your application is scalable!

The Java example code above can be equally trivialized with the util.concurrent BlockingQueue, but the important point to take away is, writing shared memory concurrency code is inherently difficult and error-prone. With JDK1.7 we will get similar fork/join abstraction in Java itself (JSR166y), which will add new alternative to how we design and write concurrent applications.

Scala borrowed Actors from Erlang and similar libraries exist for Java as well. If you are curious about interesting details on Actor based OO concurrency implementation in Java, take a look at some of the thoughts Sebastian is sharing with his ConcurrentObjects library.