2012-12-16

Java Concurrency, Threads and Synchronization

I've created a class which synchronizes many readers and writers (or better, modifier). It's a hybrid solution similar to a thread barrier and producers-consumers.

  • Unlike producers-consumers, readers don't process a queue, they only access the same data storage.
  • Unlike barier, modifier does not wait for readers to reach the barier.

There's an implementation of ReadWriteLock, ReentrantReadWriteLock, but that seemed too cumbersome and slow:

Whether or not a read-write lock will improve performance over the use of a mutual exclusion lock depends on the frequency that the data is read compared to being modified, the duration of the read and write operations, and the contention for the data – that is, the number of threads that will try to read or write the data at the same time. For example, a collection that is initially populated with data and thereafter infrequently modified, while being frequently searched (such as a directory of some kind) is an ideal candidate for the use of a read-write lock. However, if updates become frequent then the data spends most of its time being exclusively locked and there is little, if any increase in concurrency. Further, if the read operations are too short the overhead of the read-write lock implementation (which is inherently more complex than a mutual exclusion lock) can dominate the execution cost, particularly as many read-write lock implementations still serialize all threads through a small section of code. Ultimately, only profiling and measurement will establish whether the use of a read-write lock is suitable for your application.

First, there's a final(?) re-entrant livelock-proof implementation.
Then, there's a simple non-reentrant implementation, for one writer only.
On the bottom of the page, there's my original solution which leads to starvation of the modifier. (That will come handy later as a bad example :)

Multiple modifiers, multiple readers, re-entrant, livelock-proof:

Uses a ReentrantLock with two Conditions.
Also uses an adapted HashMap to count thread re-entries.

package cz.dynawest.webttd;

import java.util.*;
import java.util.concurrent.locks.*;
import java.util.logging.Logger;




/**
 *
 * @author Ondrej Zizka
 */
public class ReadersAndModifierSynchronizerReentrant implements ReadersAndModifiersSynchronizer {

  private static transient final Logger log = Logger.getLogger( ReadersAndModifierSynchronizerReentrant.class.getName() );

  private int readersActive = 0;  // Currently active readers.
  private int wantToWrite = 0;    // Writers in queue.

  /** Reader threads re-entries. */
  private CounterMap<Thread> depths = new CounterMap<Thread>();

  /** Eventual owning writer thread and re-entries count. */
  private Thread owningWriter = null;
  private int owningWriterReentries = 0;


  // Lock and conditions
  private final Lock lock = new ReentrantLock(true);
  private final Condition notReading  = lock.newCondition();
  private final Condition notWriting = lock.newCondition();




  public void waitUntilReadable() throws InterruptedException {
    lock.lock();
    //if( this.getExclusiveOwnerThread() == Thread.currentThread() )
    try{
      // Increment the thread entry map.
      Thread th = Thread.currentThread();
      int depth = this.depths.get( th );
      if( 0 == depth ){
        while( this.wantToWrite > 0 ){
          //log.info("readersActive: "+this.readersActive + "; wantToWrite: "+this.wantToWrite);
          try{ notWriting.await(); }
          catch( InterruptedException ex ) {
            //log.warning("Interrupted! ################");  return;
            throw ex;
          }
        }
        this.readersActive++;
      }
      this.depths.set( th, depth + 1 );
      //log.info("readersActive: "+this.readersActive);
    }
    finally{ lock.unlock(); }
  }



  /**
   * Lock for writing.
   */
  public void lockForWrite() throws InterruptedException{
    lock.lock();
    try{
      Thread th = Thread.currentThread();

      // Already owning this lock -> only increment re-entries count.
      if( owningWriter == th ){
        this.owningWriterReentries++;
        return;
      }

      this.wantToWrite++;

      // Other writer -> wait for the lock.
      while( owningWriter != null ){
        // Other writer will let us know by notReading.signal();
        // If we used notWriting.await(), this thread would compete with reader threads.
        // Update: wantToWrite is now int instead of boolean -> no competing needed; see doneWriting().
        try{ notReading.await(); }
        catch( InterruptedException ex ) { throw ex; }
      }

      // No writer -> grab this lock.
      //if( owningWriter == null ){
        owningWriter = th;
        this.owningWriterReentries = 1;
      //}

      while( this.readersActive > 0 ){
        try{ notReading.await(); }
        catch( InterruptedException ex ) {
          throw ex;
        }
      }
    }
    finally{ lock.unlock(); }
  }



  public void doneReading(){
    lock.lock();
    try{
      Thread th = Thread.currentThread();
      int depth = this.depths.decAndGet( th );
      if( 0 == depth ){
        this.readersActive--;
        notReading.signal();
      }
    }
    finally{ lock.unlock(); }
  }



  public void doneWriting(){
    lock.lock();
    try{
      Thread th = Thread.currentThread();
      if( this.owningWriter != th )
        throw new IllegalThreadStateException("This thread does not own the write lock.");

      if( this.owningWriterReentries-- > 0 )
        return;

      // Another writer queued?
      if( this.wantToWrite-- > 0 ){
        notReading.signal();      // Let next writer in queue know.
      } else {
        notWriting.signalAll();   // Let all the waiting readers know.
      }
    }
    finally{ lock.unlock(); }
  }


}// class ReadersAndModifierSynchronizer2



final class CounterMap<K> {

  private Map<K, Integer> counter = new HashMap();


  public final int get( K key ){
    Integer cnt = this.counter.get( key );
    if( null == cnt )  cnt = 0;
    return cnt;
  }

  public final void set( K key, int val ){
    this.counter.put( key, val );
  }

  public final int decAndGet( K key ){
    Integer cnt = this.counter.get( key );
    if( null == cnt || 0 == cnt )
      throw new IllegalStateException("Counter went under zero.");
    cnt--;
    if( 0 == cnt )
      this.counter.remove( key );
    else
      this.counter.put( key, cnt );
    return cnt;
  }

}

One modifier and many concurrent Readers, non-reentrant:

import java.util.concurrent.locks.*;
import java.util.logging.Logger;


/**
 *
 * @author Ondrej Zizka
 */
public class ReadersAndModifierSynchronizer2 {

  private static final Logger log = Logger.getLogger( ReadersAndModifierSynchronizer2.class.getName() );

  private int readersActive = 0;
  private boolean wantToWrite = false;

  // Lock and conditions
  final Lock lock = new ReentrantLock(true);
  final Condition notReading  = lock.newCondition();
  final Condition notWriting = lock.newCondition();


  // TODO: Modify methods to use Conditions. See Condition javadoc.

  public void waitUntilReadable(){
    lock.lock();
    try{
      while( this.wantToWrite /*|| this.readersActive < 0*/ ){
        //log.info("readersActive: "+this.readersActive + "; wantToWrite: "+this.wantToWrite);
        try{ notWriting.await(); }
        catch( InterruptedException ex ) { log.warning("Interrupted! ################");  return; }
      }
      this.readersActive++;
      //log.info("readersActive: "+this.readersActive);
    }
    finally{ lock.unlock(); }
  }


  public void lockForWrite(){
    lock.lock();
    try{
      this.wantToWrite = true;
      while( this.readersActive > 0 ){
        try{ notReading.await(); }
        catch( InterruptedException ex ) { log.warning("Interrupted! ################");  return; }
      }
      //this.readersActive = -1;
      //log.info("readersActive: "+this.readersActive);
    }
    finally{ lock.unlock(); }
  }

  public void doneReading(){
    lock.lock();
    try{
      this.readersActive--;
      //log.info("readersActive: "+this.readersActive);
      notReading.signal();
    }
    finally{ lock.unlock(); }
  }

  public void doneWriting(){
    lock.lock();
    try{
      this.wantToWrite = false;
      //this.readersActive = 0;
      //log.info("readersActive: "+this.readersActive);
      notWriting.signalAll();
    }
    finally{ lock.unlock(); }
  }

}// class ReadersAndModifierSynchronizer2
import cz.dynawest.logging.LoggingUtils;
import java.util.*;
import java.util.logging.Logger;
import org.testng.annotations.Test;

/**
 *
 * @author Ondrej Zizka
 */
public class ReadersAndModifierTest {

  private static final Logger log = Logger.getLogger( ReadersAndModifierTest.class.getName() );

  private static final int INHIBITOR = 30; // more -> slow down
  private static final int TEST_DURATION_SEC = 20;
  private static final int READERS_NUM = 300;


  final ReadersAndModifierTest self = this;

  @Test
  public void testReadersAndModifier() throws InterruptedException
  {

    LoggingUtils.initLogging();

    final ReadersAndModifierSynchronizer2 pcs = new ReadersAndModifierSynchronizer2();


    Thread proc = createWriterThread( pcs );
    //proc.setPriority( Thread.MAX_PRIORITY );
    proc.start();


    List<Thread> readers = new ArrayList(READERS_NUM);
    for( int i = 0; i < READERS_NUM; i++ ) {
      Thread t = createReaderThread(i+1, pcs );
      readers.add( t );
      //t.setPriority( Thread.MIN_PRIORITY );
      t.start();
    }


    Thread.sleep( TEST_DURATION_SEC * 1000 );

  }


  /** Creates the modifier thread. */
  private Thread createWriterThread( final ReadersAndModifierSynchronizer2 pcs ){
      Runnable modifier = new Runnable() {
        public void run() {
          try {
            while( true ){
              log.info("Modifier locking. ----------------------------------------------------------------");
              pcs.lockForWrite();
              log.info("Modifier locked, modifying for 4500 ms...  =======================================");
              Thread.sleep(150 * INHIBITOR);
              pcs.doneWriting();
              log.info("Modifier released, now sleeping 9500 ms... +++++++++++++++++++++++++++++++++++++++");
              Thread.sleep(55 * INHIBITOR);
            }
          }
          catch( InterruptedException ex ){
            return;
          }
        }
      };
      return new Thread( modifier );
  }


  /** Creates reader thread. */
  private Thread createReaderThread( final int num, final ReadersAndModifierSynchronizer2 pcs ){

      Runnable cons = new Runnable() {
        public void run() {
          int ms;
          try {
            while( true ){
              log.info("Reader "+num+" waiting until data readable.");
              pcs.waitUntilReadable();
              ms = 10 + new Random().nextInt( 35 );
              log.info("Reader "+num+" satisfied, reading ("+ms+" ms)...");
              Thread.sleep( ms * INHIBITOR );
              pcs.doneReading();
              ms = 5 + new Random().nextInt( 50 );
              log.info("Reader "+num+" has done reading, now sleeping "+ms+" ms...");
              Thread.sleep( ms * INHIBITOR );
            }
          }
          catch( InterruptedException ex ){
            return;
          }
        }
      };

      return new Thread( cons );
  }

}

Modifier and concurrent Readers, leads to modifier starvation:

/**
 * Todo: try to use Condition.
 *
 * @author Ondrej Zizka
 */
public class ReadersAndModifierSynchronizer {

  private static final Logger log = Logger.getLogger( ReadersAndModifierSynchronizer.class.getName() );

  private int readersActive = 0;

  public synchronized void waitUntilReadable(){
      while( this.readersActive < 0 ){
        try{ this.wait(); }
        catch( InterruptedException ex ) { return; }
      }
      this.readersActive++;
      log.info("readersActive: "+this.readersActive);
  }


  public synchronized void lockForWrite(){
      while( this.readersActive > 0 ){
        try{ this.wait(); }
        catch( InterruptedException ex ) { return; }
      }
      this.readersActive = -1;
      log.info("readersActive: "+this.readersActive);
  }

  public synchronized void doneReading(){
      this.readersActive--;
      log.info("readersActive: "+this.readersActive);
      this.notifyAll();
  }

  public synchronized void doneWriting(){
      this.readersActive = 0;
      log.info("readersActive: "+this.readersActive);
      this.notifyAll();
  }

}// class ReadersAndModifierSynchronizer
import cz.dynawest.logging.LoggingUtils;
import java.util.*;
import java.util.logging.Logger;
import org.testng.annotations.Test;

/**
 *
 * @author Ondrej Zizka
 */
public class ReadersAndModifierTest {

  private static final Logger log = Logger.getLogger( ReadersAndModifierTest.class.getName() );

  final ReadersAndModifierTest self = this;

  @Test
  public void testReadersAndModifier() throws InterruptedException
  {

    LoggingUtils.initLogging();

    final ReadersAndModifierSynchronizer pcs = new ReadersAndModifierSynchronizer();

    List<Thread> readers = new ArrayList();

    for( int i = 0; i < 100; i++ ) {
      Thread t = createReaderThread(i+1, pcs );
      readers.add( t );
      t.start();
    }

    Thread proc = createWriterThread( pcs );
    proc.start();

    Thread.sleep( 60000 );

  }


  /** Creates the modifier thread. */
  private Thread createWriterThread( final ReadersAndModifierSynchronizer pcs ){
      Runnable modifier = new Runnable() {
        public void run() {
          try {
            while( true ){
              log.info("Modifier locking.");
              pcs.lockForWrite();
              log.info("Modifier locked, modifying for 4500 ms...");
              Thread.sleep(4500);
              pcs.doneWriting();
              log.info("Modifier released, now sleeping 9500 ms...");
              Thread.sleep(9500);
            }
          }
          catch( InterruptedException ex ){
            return;
          }
        }
      };
      return new Thread( modifier );
  }


  /** Creates reader thread. */
  private Thread createReaderThread( final int num, final ReadersAndModifierSynchronizer pcs ){

      Runnable cons = new Runnable() {
        public void run() {
          int ms;
          try {
            while( true ){
              log.info("Reader "+num+" waiting until data readable.");
              pcs.waitUntilReadable();
              ms = 5000 + new Random().nextInt( 2500 );
              log.info("Reader "+num+" satisfied, reading ("+ms+" ms)...");
              Thread.sleep( ms );
              pcs.doneReading();
              log.info("Reader "+num+" has done reading, now sleeping "+ms+" ms...");
              ms = 10000 + new Random().nextInt( 5000 );
              Thread.sleep( ms );
            }
          }
          catch( InterruptedException ex ){
            return;
          }
        }
      };

      return new Thread( cons );
  }

}// class TestTest

0