Wednesday, August 31, 2011

The Exchanger and GC-less Java

Overview

The Exchanger class is very efficient at passing work between thread and recycling the objects used. AFAIK, It is also one of the least used Concurrency classes.
If you don't need GC less logging using an ArrayBlockingQueue is much simpler.

Exchanger class

The Exchanger class is useful for passing data back and forth between two threads. e.g. Producer/Consumer. It has the property of naturally recycling the data structures used to pass the work and supports GC-less sharing of work in an efficient manner.
Here is an example, passing logs to a background logger.
Work (a log entry) is batched into LogEntries and passed to a background thread which later passes it back to the thread so it can add more work. Provided the background thread is always finished before the batch is full, it is almost transparent. Increasing the size of the batch reduces how often the batch is full but increase the number of unprocessed entries waiting at any one time. Calling flush() can push out the data.
The key line is the following which exchanges the batch in the current thread with the batch in the other thread. The producer fills up the batch while the consumer is emptying it.
The exchange when it occurs typically takes 1-4 micro-seconds. In this case, once every 64 lines.

entries = logEntriesExchanger.exchange(entries);

How does this compare to the LMAX disruptor pattern

This approach has similar principles to the Disruptor. No GC using recycled, pre-allocated buffers and lock free operations (The Exchanger not completely lock free and doesn't busy wait, but it could)
Two keys difference are:
  • there is only one producer/consumer in this case, the disruptor supports multiple consumers. 
  • this approach re-uses a much smaller buffer efficiently. If you are using ByteBuffer (as I have in the past) an optimal size might be 32 KB.  The disruptor library was designed to exploit large amounts of memory on the assumption it is relative cheap and can use many GBs. e.g. it was design for servers with 144 GB.  I am sure it works well on much smaller servers. ;)
Thank you @Doug, for reminding me to mention the Disruptor pattern.

If you have dozen logs files (for different purposes) and you want to minimise memory foot print and you prefer the consuming thread to be blocking rather than busy waiting which consumes 100% of a thread (which adds a small latency of up to 10 us) then the Exchanger is better suited.

Exchanger example

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BackgroundLogger implements Runnable {
  static final int ENTRIES = 64;

  static class LogEntry {
    long time;
    int level;
    final StringBuilder text = new StringBuilder();
  }

  static class LogEntries {
    final LogEntry[] lines = new LogEntry[ENTRIES];
    int used = 0;
  }

  private final ExecutorService executor = Executors.newSingleThreadExecutor();
  final Exchanger<LogEntries> logEntriesExchanger = new Exchanger<LogEntries>();
  LogEntries entries = new LogEntries();

  BackgroundLogger() {
    executor.submit(this);
  }

  public StringBuilder log(int level) {
    try {
      if (entries.used == ENTRIES)
        entries = logEntriesExchanger.exchange(entries);
      LogEntry le = entries.lines[entries.used++];
      le.time = System.currentTimeMillis();
      le.level = level;
      return le.text;

    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  public void flush() throws InterruptedException {
    if(entries.used > 0)
        entries = logEntriesExchanger.exchange(entries);
  }

  public void stop() {
    try {
      flush();
    } catch (InterruptedException e) {
      e.printStackTrace(); // use standard logging.
    }
    executor.shutdownNow();
  }

  @Override
  public void run() {
    LogEntries entries = new LogEntries();
    try {
      while (!Thread.interrupted()) {
        entries = logEntriesExchanger.exchange(entries);
            for (int i = 0; i < entries.used; i++) {
              bgLog(entries.lines[i]);
              entries.lines[i].text.delete(0, entries.lines[i].text.length());
        }
        entries.used = 0;
      }
    } catch (InterruptedException ignored) {

    } finally {
      System.out.println("Warn: logger stopping."); // use standard logging.
    }
  }

  private void bgLog(LogEntry line) {
    // log the entry to a file.
  }
}

No comments:

Post a Comment