Avoiding stampede

We had a problem in our server cluster architecture. I call it the stampede problem. I thought stampede was an actual technical term, but a quick google search yielded nothing. I guess I made it up. The problem is about having too many requests killing your server or preventing it from starting up properly.

Imagine the following architecture. A cluster of Java web servers is deployed in front of an internal cluster of ‘service’ nodes. The service nodes are connected to some kind of data storage. The web servers are there for handling external requests and do a lot of caching. The service nodes are used-company wide by different applications. The data store is kind of a bottleneck in this architecture. I don’t recommend this approach but it’s one I have seen often.

It looks something like this. Imagine each box is actually a cluster. The real architecture was a bit more complicated but this serves as an example.
Cluster architecture
Let’s make things a bit less boring. I’d rather talk about space ships than databases and servers. Imagine the Death Star is under attack by the rebel fleet. Yes, I like Star Wars, shocker. Now the Empire has been looking for the rebels but never actually found them. Also, the Death Star’s information systems are not fully operational yet. The ships that appear on the scanners are unknown. The Death Star needs to relay the scanning information to the Imperial database hosted on Coruscant. This quasi-space link relay is slow and expensive. Furthermore, processing the scanning data takes some time. Once recognised the Imperial database sends back detailed blueprints for the spacecraft so the Death Star can prepare its defences. See my wonderful image and note that it’s similar to the boring one above.
Imperial architecture

Caching to the rescue

There are only so many types of rebel ships so the Death Star will cache each design it downloads from the Coruscant Imperial database. The Death Star cannot just load the whole space ship catalogue into its memory, it needs to do it lazily, i.e., on-demand.

Some prerequisites

We define the following interface which the Death Star needs to implement:

public interface ShipRegistry {
   String lookupBluePrints(final Integer shipTypeId);
}

In reality (ahem!) the Death Star sends complex scans and imagery but in our example it will just be an int which needs to be identified. The blueprints are represented by a String. When the Death Star is under attack it will call this code:

private void deathStarIsUnderAttack(final Integer shipTypeId, 
                                    final ShipRegistry registry) {
   final String shipDescription = registry.lookupBluePrints(shipTypeId);
   System.out.println("The rebels are deploying a " + shipDescription + "!");
}

Classic is the traditional implementation of a cache. It uses a simple map.

public class Classic implements ShipRegistry {

   private Map<Integer, String> shipCache = Maps.newHashMap(); 

   @Override
   public String lookupBluePrints(final Integer shipTypeId) {
      if (!shipCache.containsKey(shipTypeId)) {
         final String ship = ImperialDatabase.remoteLookup(shipTypeId);
         shipCache.put(shipTypeId, ship);
      }
      return shipCache.get(shipTypeId);
   }

}

Note that for brevity there is no proper constructor nor is ImperialDatabase properly injected. You might recognise Maps.newHashMap() from the excellent Guava library. In Java 7 you would use the diamond operator but the Death Star is still on Java 6. Tsk tsk. For now imagine the ImperialDatabase is just implemented as follows:

public static String remoteLookup(final Integer shipTypeId) {
   switch (shipTypeId) {
      case 1: return "X-wing starfighter";
      case 2: return "Y-wing starfighter";
      case 3: return "A-wing interceptor";
      case 4: return "Corellian Corvette";
      case 5: return "Z-95 Headhunter";
      case 6: return "TIE fighter?";
      default: return "unidentified rebel ship o_O";
   }
}

The first wave is attacking!

Let’s say we run all of this as follows:

final ShipRegistry registry = new Classic();
deathStarIsUnderAttack(1, registry);
deathStarIsUnderAttack(3, registry);

The result is as expected:

The rebels are deploying a X-wing starfighter!
The rebels are deploying a A-wing interceptor!

Hang on. The rebels are not stupid. They won’t attack serially one after the other! These were just scouts. Admiral Ackbar is smarter than this. In a real battle we will get multiple attacks at the same time! Our implementation using a normal Map is not thread-safe. If the rebels attack now we will get all kind of nasty problems with race conditions.

Concurrency

Thankfully the Empire employs a shrewd programmer, a real up-and-comer. He replaces the Map with a ConcurrentMap. That should solve any issue.

public class Concurrent implements ShipRegistry {

   private ConcurrentMap<Integer, String> shipCache = Maps.newConcurrentMap();

   @Override
   public String lookupBluePrints(final Integer shipTypeId) {
      if (!shipCache.containsKey(shipTypeId)) {
         final String ship = ImperialDatabase.remoteLookupHeavy(shipTypeId);
         shipCache.putIfAbsent(shipTypeId, ship);
      }
      return shipCache.get(shipTypeId);
   }

}

This code looks practically the same as before. Good job. Drop-in-replacement, right? Note that to simulate a real setup a bit better we are now calling remoteLookupHeavy. We’ve added this to ImperialDatabase.

public static String remoteLookupHeavy(final Integer shipTypeId) {
   System.out.println("Looking up " + shipTypeId);
   simulateWork();

   return remoteLookup(shipTypeId); // calls the switch from before
}

private static AtomicInteger concurrentRequests = new AtomicInteger(0);

private static void simulateWork() {
   final int load = concurrentRequests.incrementAndGet();

   if (load > 4) {
      System.out.println("Imperial database has crashed and burned! " +
                         "How will our precious death star defend itself now?");
      System.exit(-1);
      throw new RuntimeException("Database error of death");
   }

   try {
      Thread.sleep(2000L);
   } catch (InterruptedException e) {
      e.printStackTrace();
   } finally {
      concurrentRequests.decrementAndGet();
   }
}

The only thing it does is spend two seconds idling to simulate actual processing. Using an AtomicInteger we also simulate what happens when the database gets overloaded. There is only so much that Empire tech can do. In this case four is the maximum number of concurrent requests this database server can handle. Which Empire needs more than that? To test it we create this helper function and some threads to launch (pun intended) our ships:

private Callable<Void> createTask(final Integer shipTypeId,
                                  final ShipRegistry registry) {
   return new Callable<Void>() {
      @Override
      public Void call() {
         deathStarIsUnderAttack(shipTypeId, registry);
         return null;
      }
   };
}

// new shiny registry
final ShipRegistry registry = new Concurrent();
final ExecutorService exe = Executors.newCachedThreadPool();

// A wave of rebel ships attack simultaneously!!
exe.invokeAll(ImmutableList.of(
      createTask(1, registry), // red leader
      createTask(1, registry), 
      createTask(5, registry), 
      createTask(1, registry),
      createTask(1, registry), 
      createTask(5, registry), 
      createTask(5, registry),
      createTask(1, registry), // some punk from Tatooine
      createTask(1, registry), 
      createTask(1, registry)));

exe.shutdown();
exe.awaitTermination(6L, TimeUnit.SECONDS);

The results are:

Looking up 1
Looking up 1
Looking up 1
Looking up 5
Looking up 5
Looking up 5
Imperial database has crashed and burned! How will our precious death star defend itself now?
Looking up 1
Imperial database has crashed and burned! How will our precious death star defend itself now?
Looking up 1
Imperial database has crashed and burned! How will our precious death star defend itself now?
Imperial database has crashed and burned! How will our precious death star defend itself now?
Looking up 1
Imperial database has crashed and burned! How will our precious death star defend itself now?
Looking up 1
Imperial database has crashed and burned! How will our precious death star defend itself now?

This result was printed instantly. We did not see any work being simulated. What happened? We did not even recognise one ship! Now the Imperial database has crashed and our Death Star is lost. The problem is that a single lookup takes too long. Two seconds is an eternity for our Ghz CPUs. Even though we have a cache, it takes too long to retrieve the data. New requests are coming in much faster than we can handle and proceed to swamp the system.

Stampede!

lion-king-stampede
This is what I call the stampede effect. Your front-end servers get hammered by attacks, eh, I mean requests, but instead of protecting your internal servers, all requests just get forwarded instantly. This is of course because the cache hasn’t warmed up yet. Some developers think that programs and servers normally have time to startup calmly. In server world you should think differently. You should imagine thousands of enemy ships just waiting for your server to boot so they can start destroying it. Thinking about it that way your startup code becomes much more important. Not just for starting up the first time but also when a server needs to failover. Unfortunately for the emperor he cannot just reboot the database and Death Star. The same thing would just happen over and over again. The requests coming in at the front stampede all the way through to the back-end system.

The Empire strikes back

Palpatine has ordered the construction of a new Death Star. He fired the old architects (by force-grip) and has promoted our up-and-comer. The dark side is strong, we know the Death Star can work, if only the rebels would wait until the cache is warmed up.

Here is the idea: even though there are many attacks, the number of different ships is relatively low. We only want to make one request to the database for each type of ship. The other requests for the same ship type should just wait on the first one that requested it. In order to do this we will use the great class Future. (I personally believe we should teach Future first to beginning programmers instead of explaining threads and mutexes and all that.) The code is as follows:

public class FutureTech implements ShipRegistry {

   private ConcurrentMap<Integer, Future<String>> shipCache = 
         Maps.newConcurrentMap();

   @Override
   public String lookupBluePrints(final Integer shipTypeId) {
      Future<String> ship = shipCache.get(shipTypeId);

      if (ship == null) {
         FutureTask<String> newLookup = new FutureTask<String>(
               new Callable<String>() {
                  @Override
                  public String call() throws Exception {
                     return ImperialDatabase.remoteLookupHeavy(shipTypeId);
                  }
               });

         ship = shipCache.putIfAbsent(shipTypeId, newLookup);
         if (ship == null) {
            newLookup.run();
            ship = newLookup;
         }
      }

      try {
         return ship.get();
      } catch (Exception e) {
         throw Throwables.propagate(e);
      }
   }

}

This code needs some explanation. One line 3 we see a ConcurrentMap from Integer to a Future of String. A Future stands for a value that will be available in the future. Here it kind of serves as an ‘outstanding request’. When you call get() on a Future you will block until the value is received. If the value is already there then get() will return it instantly.

In line 8 we try to see if there is a pending request for this ship type. If ship is null it means the map did not contain any request for that type yet. In that case we need to create a task to retrieve it. Line 11 to 17 shows we create a simple task that just calls the Imperial database. Note that we don’t execute this piece of work yet!

Line 19 is very important. We submit our new task to the map. The pivotal putIfAbsent handles this in a thread-safe manner. We need to be careful though, even though no one made a request yet when were at line 10, one may have sneaked in before us now. Who knows how the Death Star interleaves all its worker drones? Fortunately, putIfAbsent returns null if you are the first to do a put, otherwise it returns the value that was put there before by someone else. This is why we need to execute the same check as in line 10 again in line 20. If we were actually the one who submitted the task then we run it. We actually run it in the same thread as the request itself, i.e., in the calling thread. If we were not the ones who submitted it then at least ship is now a reference to the actual work being done by someone else. In the end we just call get() on the correct Future to get the blueprints!

This prevents multiple calls to the Imperial database. Only one is made per ship type. Subsequent requests will wait (block) on the future for that ship type. You might realise that multiple redundant FutureTask objects will probably be created for one ship type. This is not a problem since only one of them will actually run and the others will just be garbage collected.

The new Death Star is operational

Victory! the output we get using our new FutureTech is:

Looking up 5
Looking up 1
-- there is a two second pause here --
The rebels are deploying a Z-95 Headhunter!
The rebels are deploying a X-wing starfighter!
The rebels are deploying a Z-95 Headhunter!
The rebels are deploying a X-wing starfighter!
The rebels are deploying a Z-95 Headhunter!
The rebels are deploying a X-wing starfighter!
The rebels are deploying a X-wing starfighter!
The rebels are deploying a X-wing starfighter!
The rebels are deploying a X-wing starfighter!
The rebels are deploying a X-wing starfighter!

This is exactly what we wanted. Only two requests were made to the database. The Empire is saving money on bandwidth payments to its ISP. All other requests patiently wait for the first two requests to come back, before all attacks are rebuffed successfully.

Using the force

The code for FutureTech is tricky though. It’s complicated and easy to get wrong. It turns out I had to implement this stampede protection more than once. Like a good and lazy programmer I set out to abstract this pattern into a reusable module. But, like so many things in life, it was already done. The smart people at Guava have already created something similar. I will show an implementation using MapMaker which does exactly the same as our FutureTech.

public class Guava implements ShipRegistry {

   private ConcurrentMap<Integer, String> shipCache = new MapMaker()
         .makeComputingMap(new Function<Integer, String>() {
            @Override
            public String apply(final Integer shipTypeId) {
               return ImperialDatabase.remoteLookupHeavy(shipTypeId);
            }
         });

   @Override
   public String lookupBluePrints(final Integer shipTypeId) {
      return shipCache.get(shipTypeId);
   }

}

Look at how concise this code is. No Future nor any other concurrency primitives. The last few years I’ve become more and more a fan of functional programming. Like many others it seems. Java is surprisingly non-functional and it irks me frequently to have to write these verbose inner classes. Still this code is a lot better than our previous example and works the same. I use this and other Guava classes frequently in my day-to-day code. Guava is the first thing I add to every pom I write.

(Note that makeComputingMap has actually been deprecated a while ago. You should use Cache and CacheBuilder now.)

In the end, what really matters is that the dark side has triumphed! 🙂

9448-star-wars-empire-at-war-1024x768

One thought on “Avoiding stampede

Leave a Reply

Your email address will not be published. Required fields are marked *