dimanche, 15 novembre 2009

Call different services, use the first to reply

The needs


My needs were the following : I have an arbitrary amount of webservices to call, but I don't know, for a particular query, which one will return the answer. More than that, some are slower and others are faster, but again, I don't know which is the fastest for my particular query.

So what I will do is to parallelize the queries, and use the value of the first which return my a good value.

Use an Executor

The first component to implement such a requirement is an Executor.
The Executor will run all my queries, in as many threads as I want, permitting to fine tune the behavior of the system.
As many queries will input the system and will be translated in many more threads, it is important to use a limited amount of threads to not crash the system.

Synchronization on result

A specific object will be used to synchronize the result : the first result will be returned to the caller, letting the running threads die, and no more threads will be run for this query.

Some basic primitive for concurrent programming (java.util.concurrent) will be used, such as CountDownLatch : the caller will block on the latch, and once a thread will find the answer, it will set the value into the result and count down the latch. The caller will be freed and the value will be available.

Watchdog if all the queries fail

One point to not forget is if every queries fail, the caller should be informed of this fail.
A watchdog will be used : it will block till all the threads executing the query will finish, and will then be run. Its only task is to free the caller. This one will continue the processing, but the result will not be available. This will be the responsibility of the caller to run user code to handle this special case.

Strong requirements

The requirements are the following :
  • Use as little synchronization as possible
  • Use as much concurrent primitives (java.util.concurrent) as possible
  • Do it as generic as possible
Java implementation

The interesting part of the implementation is given here below.
The Executor is simply an java.util.concurrent.Executor. I'm not good enough to implement a better solution than they do.
The result object is composed of the following attributes :
public class FutureResult {
private T result;
private boolean done;
private AtomicInteger threadCount = new AtomicInteger(0);
private CountDownLatch latch = new CountDownLatch(1);
}
A generic type for the value, a AtomicInteger to count how many other threads are working on this result, and a latch to make the caller wait.

The watchdog class will simply wait till all the threads are finish and free the caller if it is not already the case :
class WatchDogRunnable implements Runnable {
public void run() {
while (result.hasThreadsWorking()) {
synchronized (result) {
result.wait(2000);
}
}
latch.countDown();
}
}
And finally an abstract callable worker which can be easily extended to run the wanted tasks. The call function will increment the AtomicInteger, do the work if it is not already done, if a result is found, free the caller, decrement the AtomicInteger and return the value if it is set. Not that the worker implementation will implement the callInternal function.
public abstract class AbstractCallableWorker
implements Callable {
private FutureResult result;
public final T call() {
result.setOneMoreThread();
if (!result.isDone()) {
T callResult = callInternal();
if (callResult != null) {
setResult(callResult);
latch.countDown();
}
}
result.setOneLessWorkingThead();
return result.getResult();
}
protected abstract T callInternal();
}
Everything put together, you can implement as much worker as you want and call as many webservice as you want, with the only guaranty to get the caller continue with the fastest service to reply !

Aucun commentaire: