The Java Concurrency Framework
 
			    	    Managing concurrent threads and performance tuning of a multithreaded program is a tedious task for software programmers. Java 5 introduced the java.util.concurrent package that contains high-performance, scalable, thread-safe utilities for developing multi-threaded concurrent classes and applications. The package includes an arsenal of the following utilities:
- Executor Service Framework
- Concurrent Collections
- Synchronizers
- Atomic Classes
- Locks
- TimeUnit Utility
These utility classes relieve the programmer from re-inventing the common utilities of handling concurrent tasks. The java.util.concurrent package provides deeper-level support that takes advantage of the concurrency capabilities provided by the underlying processor, enabling programmers to implement high-performance concurrent algorithms.
In this article, we will not cover the TimeUnit Utility. Feel free to bookmark this article and revisit often for refreshing on the concurrent concepts.
1. Executor Service Framework
Java 5 introduced the Executor Service Framework. The java.util.concurrent.Executor interface executes the submitted java.lang.Runnable tasks. This interface provides a way for programmers to implement concurrent programs without worrying about the thread use and scheduling mechanics. An Executor instance can be used instead of explicitly creating thread objects to run the programs.
For example, let us consider the following code:
public class RunnableTask implements Runnable {
    private int id;
    private Random randomGenerator;
    public RunnableTask(int id) {
        this.id = id;
        this.randomGenerator = new Random(id);
    }
    @Override
    public void run() {
        System.out.println(id + ": " + "Started");
        try {
            long seconds = 1000L + randomGenerator.nextInt(4000);
            Thread.sleep(seconds);  // Imitates a slow running method.
        } catch (InterruptedException e) {
            // Do nothing
        }
        System.out.println(id + ": " + "Ended");
    }
}
This is an example of a RunnableTask that performs some resource-intensive computation in line number 16. To execute ten such runnable tasks, we could create threads as follows:
for (int i = 0; i < 50; i++) {
	Thread aNewThread = new Thread(new RunnableTask(i));
	aNewThread.start();
}
The problem with the above code is that all the threads would start running at once which could kill the host machine. Before Java 5 developers would have to write programs that would restrict the total number of concurrent threads that have to be executed.
Java 5’s Executor Service Framework provides a more reliable and efficient way without the hassle of writing one. The Executor Service provides a way to limit the number of concurrent threads. It can be achieved as follows:
// An Executor service that ensures only 10 threads are running at a time
ExecutorService fixedThreadPoolExecutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 50; i++) {
	fixedThreadPoolExecutor.execute(new RunnableTask(i));
}
// Signal the job is done.
fixedThreadPoolExecutor.shutdown();
// Wait for all the pending runnable tasks to complete.
while (!fixedThreadPoolExecutor.isTerminated()){
	try {
		Thread.sleep(1000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}
The Executors.newFixedThreadPool(10); method returns a thread pool that reuses a fixed number of threads to run the runnable task. At any point, at most 10 threads will be active processing tasks. The java.util.concurrent.Executors class is a factory and utility for different concurrent threads execution strategy. The java.util.concurrent.Executors class can be used to create the following type of executors:
- Fixed Thread Pool – A thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
- Cached Thread Pool – A thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
- Scheduled Thread Pool – A thread pool that can schedule commands to run after a given delay, or to execute periodically.
- Single Thread Executor – An Executor that uses a single worker thread operating off an unbounded queue.
- Single Thread Scheduled Executor – A single-threaded executor that can schedule commands to run after a given delay or to execute periodically.
- Work Stealing Pool (Java 8) – A work-stealing thread pool using the number of available processors as its target parallelism level.
Fixed Thread Pool and Single Thread Executor
The fixed thread pool ensures that only a specific number of threads run at a time. The Single thread executor has the same ideology except the concurrent thread count will be one. the following code shows how these executor services are created.
// Ensures only 10 threads are running at a time ExecutorService fixedThreadPoolExecutor = Executors.newFixedThreadPool(10); fixedThreadPoolExecutor.submit(new RunnableTask()); // An Executor service that ensures only 1 thread is running at a time ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); singleThreadExecutor.submit(new RunnableTask());
Cached Thread Pool
The cached thread pool is used to try to utilize any idle threads if possible. If no idle threads are available to execute, it creates a new thread and to execute the new runnable task and adds the thread to the pool. This thread pool does not have any bound, but it tries its best to utilize existing threads. To create a cached thread pool, use the following code snippet.
// Tries to reuse any idle threads to run. // If no idle threads are available, a new thread is created. ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); cachedThreadPool.submit(new RunnableTask());
Scheduled Thread Pool and Single-Threaded Scheduled Executor
The Scheduled Thread Pool is useful when you need to schedule commands to run after a given delay, or to execute periodically. A scheduled thread pool is created with a core pool size. The core pool size is the number of threads maintained in the pool, even if they are idle.
A single-threaded scheduled executor is the same as the scheduled thread pool. The only difference is that it will have a core pool size of one. To create scheduled thread pools, use the following code snippet.
// A scheduled thread pool with a core pool size of 10.
// It will keep 10 threads in the pool, even if they are idle
ScheduledExecutorService scheduledThreadPool 
    = Executors.newScheduledThreadPool(10);
// A scheduled thread pool with a core pool size of one.
// It will keep one thread in the pool.
ScheduledExecutorService singleThreadScheduledExecutor 
    = Executors.newSingleThreadScheduledExecutor();
// Submits a task that runs immediately.
scheduledThreadPool.submit(new RunnableTask());
// Submits a task that runs after the given delay (2 seconds here).
scheduledThreadPool.schedule(new RunnableTask(), 2, TimeUnit.SECONDS);
// Submits a task that runs after the given delay (2 seconds here).
// It periodically repeats the task (5 seconds here).
scheduledThreadPool.scheduleAtFixedRate(new RunnableTask(), 2, 5, TimeUnit.SECONDS);
// Submits a task that runs after the given delay (2 seconds here).
// When the first task is completed, 
// the commencement of the next task is delayed. (5 seconds here)
scheduledThreadPool.scheduleWithFixedDelay(new RunnableTask(), 2, 5, TimeUnit.SECONDS);
Work Stealing Pool (Java 8)
The work-stealing pool is an implementation based on the work-stealing algorithm to achieve reduced contention among multiple threads in an application. In the work-stealing pool, If one thread has completed its task and has nothing more to do, it can “steal” a task from the other thread’s queue.
The parallelism level of the work-stealing pools is the maximum number of threads that are processing tasks parallelly. The level of parallelism is considered based on the number of available processors by default. It is also possible to have a different level of parallelism by passing a parameter. To create work-stealing thread pools, use the following code snippet.
// A work-stealing thread pool with parallelism based on the running system's available processors. ExecutorService workStealingPool = Executors.newWorkStealingPool(); workStealingPool.submit(new RunnableTask()); // A work-stealing thread pool with a parallelism at 4 processors. ExecutorService workStealingPoolWithParallelism = Executors.newWorkStealingPool(4); workStealingPoolWithParallelism.submit(new RunnableTask());
2. Concurrent Collections
Java provides thread-safe collections that are designed to be used in multithreaded programs. The Collections in java.util package provides collections that are not thread-safe. This means if multiple concurrent threads manipulate these objects, then there is no guarantee for consistency and atomicity.
The alternative collection classes provided in the java.util.concurrent package provides a thread-safe solution that ensures atomicity. The most important collection classes are:
- java.util.concurrent.ConcurrentHashMap – A thread-safe hash table. Even though it is thread-safe, retrieval operations do not entail locking.
- java.util.concurrent.CopyOnWriteArrayList – A thread-safe variant of java.util.ArrayList.
- java.util.concurrent.CopyOnWriteArraySet – A thread-safe Set implementation.
- java.util.concurrent.ArrayBlockingQueue – A bounded blocking queue backed by an array.
- java.util.concurrent.LinkedBlockingQueue – An optionally-bounded blocking queue based on a linked list.
- java.util.concurrent.PriorityBlockingQueue – An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.
There are several other special concurrent collections as well that are not mentioned here. Explore more such collections and other classes in the java’s “Collections Framework Overview” document.
3. Synchronizers
Java has inbuilt classes that can be used for common synchronization between concurrent threads. These inbuilt classes are:
- Semaphore (since Java 1.5) – A classic concurrency tool.
- CountDownLatch (since Java 1.5) – A simple yet very common utility for blocking until a given number of signals, events, or conditions hold.
- CyclicBarrier (since Java 1.5) – A resettable multiway synchronization point used in some styles of parallel programming.
- Exchanger (since Java 1.5) – Allows two threads to exchange objects at a rendezvous point, and used in several pipeline designs.
- Phaser (since Java 7) – Provides a flexible form of barrier that may be used to control phased computation among multiple threads.
Semaphore
A Semaphore is a classic concurrency tool that allows up to a maximum number of concurrent threads that can access a shared resource This maximum number is called permits.  The acquire() method can be called to acquire a permit if it is not exhausted. When the maximum number of permits is exhausted, the thread will be in the blocked state until a permit is made available. The release() method can be called to release a permit. Semaphores maintain a record of permits and behave accordingly.
Let us see an example use of semaphores. Let us consider that we have a BuildManager class that spawns new BuildJob threads. If we can only process three jobs at a time, then we can use semaphores with three permits. The classes would be as follows:
import java.util.Scanner;
public class BuildManager {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("How many build jobs to run?");
        Scanner inputScanner = new Scanner(System.in);
        BuildManager buildManager = new BuildManager();
        int N = inputScanner.nextInt();
        for (int i = 0; i < N; i++) {
            buildManager.buildJob(i);
        }
    }
    private synchronized void buildJob(int num) throws InterruptedException {
        BuildJob server = new BuildJob("BuildJob-" + num);
        server.start();
    }
}
The BuildManager class takes user input and starts N number of BuildJob threads.
import java.util.Random;
import java.util.concurrent.Semaphore;
public class BuildJob extends Thread {
    private static final int MAX_SERVERS = 3;
    private static final Semaphore PERMITS = new Semaphore(MAX_SERVERS);
    public BuildJob(String name){
        super(name);
    }
    @Override
    public void run() {
        try {
            PERMITS.acquire();
            System.out.println(getName() + ":\t Permit acquired." );
            build();
            PERMITS.release();
            System.out.println(getName() + ":\t Permit released." );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private void build() throws InterruptedException {
        System.out.println(getName() + ":\t Running Build job.");
        // To mimic a slow running process on a shared resource
        long duration = new Random().nextInt(3000);
        Thread.sleep(duration);
        System.out.println(getName() + ":\t Build Job Completed in " + duration + "ms");
    }
}
The BuildJob will try to acquire permits using PERMITS.acquire(); method call. When the build job is completed, it will call PERMITS.release(); method to release the permit. This would allow the next thread to acquire a permit. The line private static final Semaphore PERMITS = new Semaphore(MAX_SERVERS); ensures that there is only a maximum of three threads that are doing the job at any given time.
The overloaded constructor new Semaphore(MAX_SERVERS, false) is used to set the fairness parameter to false. This allows barging of threads and does not guarantee any order of which thread could successfully acquire a permit. Setting the fairness parameter to false would be faster, but the trade-off is that it may result in the starvation of threads to access the shared resources because they do not get permits. By default, fairness is set to true and this ensures that the permits are acquired on a first-come-first-served basis.
CountDownLatch
The CountDownLatch class provides a way for threads to wait until a set of operations by other concurrent threads completes. The CountDownLatch is initialized with a count, N. The await()method can be called to wait for a set of N operations to complete. Threads can signal completion of operations by calling the countDown() method. The countDown() method decrements the count and releasing all waiting threads if the count reaches zero.
Let us see an example where we have to convert an FLV video file to MP4 format. To do that, we need to split the operation to process 2-minute (120 seconds) chunks of the video concurrently. Let us create two classes as examples: Flv2Mp4Converter and Flv2Mp4ConversionOperation.
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
public class Flv2Mp4Converter {
    private static final int PROCESSING_CHUNK_DURATION = 120;
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Enter the duration of the video file in seconds (E.g: 1200)");
        Scanner inputScanner = new Scanner(System.in);
        double totalDuration = inputScanner.nextDouble();
        int totalOperations = (int) Math.ceil(totalDuration/PROCESSING_CHUNK_DURATION);
        CountDownLatch latch = new CountDownLatch(totalOperations);
        for (int i = 0; i < totalOperations; i++) {
            new Flv2Mp4ConversionOperation("Chunk-"+i, latch).start();
        }
        latch.await();
        performPostOperation();
    }
    private static void performPostOperation() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ":\t\tStarting post operation.");
        // Mimicking slow process
        long duration = new Random().nextInt(3000);
        Thread.sleep(duration);
        System.out.println(Thread.currentThread().getName() + ":\t\tCompleted conversion operation(" + duration + "ms).");
    }
}
The Flv2Mp4Converterclass takes a user input totalDuration and calculates the totalOperations by dividing the totalDuration by the PROCESSING_CHUNK_DURATION  (120 seconds). It then spawns Flv2Mp4ConversionOperation threads that would process the 120-second chunk of the video.
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Flv2Mp4ConversionOperation extends Thread {
    private final CountDownLatch latch;
    public Flv2Mp4ConversionOperation(String name, CountDownLatch latch){
        super(name);
        this.latch = latch;
    }
    @Override
    public void run() {
        convert();
        latch.countDown();
        cleanup();
    }
    private void convert() {
        try {
            System.out.println(getName() + ":\t\tStarting conversion operation.");
            // Mimicking slow process
            long duration = new Random().nextInt(4000);
            Thread.sleep(duration);
            System.out.println(getName() + ":\t\tCompleted conversion operation(" + duration + "ms).");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private void cleanup() {
        try {
            System.out.println(getName() + ":\t\tStarting cleanup operation.");
            // Mimicking slow process
            long duration = new Random().nextInt(4000);
            Thread.sleep(duration);
            System.out.println(getName() + ":\t\tCompleted cleanup operation(" + duration + "ms).");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
The Flv2Mp4ConversionOperation threads perform the conversion operation and call the latch.countDown();  method. When all the operations are completed, the main thread would resume with performPostOperation(); method. The important thing you may observe is that the Flv2Mp4ConversionOperation threads don’t have to complete before the main thread can resume. As soon as the latch.countDown(); method is called by the last thread, the main() method would resume while the operation thread would proceed to the cleanup(); method concurrently.
The count down cannot be reset and the CountDownLatch provides a one-time synchronization solution. For recurring cycles and reset capabilities, we can use the CyclicBarrier class.
CyclicBarrier
The CyclicBarrier class provides a way for a fixed size party of threads to wait on each other until a common barrier point is reached. The CyclicBarrier supports an optional Runnable command. The runnable task runs once per barrier point, after the last thread in the party arrives, but before any threads are released.
Let us consider that we want to run a process after every N number of chunks of the video file has been converted. The code would be as follows:
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Flv2Mp4Converter {
    private static final int PROCESSING_CHUNK_DURATION = 120;
    private static final int CHUNKS_PER_BATCH = 3;
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        System.out.println("Enter the duration of the video file in seconds (E.g: 1200)");
        Scanner inputScanner = new Scanner(System.in);
        double totalDuration = inputScanner.nextDouble();
        int totalOperations = (int) Math.ceil(totalDuration/PROCESSING_CHUNK_DURATION);
        CyclicBarrier barrier = new CyclicBarrier(CHUNKS_PER_BATCH, new ProcessBatch());
        for (int i = 0; i < totalOperations; i++) {
            new Flv2Mp4ConversionOperation("Chunk-"+i, barrier).start();
        }
        performPostOperation();
    }
    private static void performPostOperation() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + ":\t\tStarting post operation.");
        // Mimicking slow process
        long duration = new Random().nextInt(3000);
        Thread.sleep(duration);
        System.out.println(Thread.currentThread().getName() + ":\t\tCompleted conversion operation(" + duration + "ms).");
    }
}
Here in line number, the CyclkicBarrier is created with a barrier of CHUNKS_PER_BATCH. When those many await() methods have been called, the barrier is broken and the ProcessBatch thread is executed. This would keep repeating for every batch.  The Flv2Mp4ConversionOperation implementation is:
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Flv2Mp4ConversionOperation extends Thread {
    private final CyclicBarrier barrier;
    public Flv2Mp4ConversionOperation(String name, CyclicBarrier barrier){
        super(name);
        this.barrier = barrier;
    }
    @Override
    public void run() {
        convert();
        try {
            barrier.await(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        cleanup();
    }
    private void convert() {
        try {
            System.out.println(getName() + ":\t\tStarting conversion operation.");
            // Mimicking slow process
            long duration = new Random().nextInt(4000);
            Thread.sleep(duration);
            System.out.println(getName() + ":\t\tCompleted conversion operation(" + duration + "ms).");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private void cleanup() {
        try {
            System.out.println(getName() + ":\t\tStarting cleanup operation.");
            // Mimicking slow process
            long duration = new Random().nextInt(4000);
            Thread.sleep(duration);
            System.out.println(getName() + ":\t\tCompleted cleanup operation(" + duration + "ms).");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
The ProcessBatch is a simple class:
public class ProcessBatch extends Thread {
    @Override
    public void run() {
        System.out.println("Running ProcessBatch");
    }
}
Unlike the CountDownLatch, the CyclicBarrier can be re-used after the waiting threads are released.
Exchanger
The Exchanger provides a synchronization point for thread pairs that exchange data between themselves. Each thread provides some data on entry to the exchange() method, matches with a partner thread, and receives its partner’s object on return.
Let us consider an example where we are handling a buffer data of int[]. The data is filled by a Publisher and is used by a Consumer. The Publisher code would be:
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Exchanger;
public class Publisher extends Thread {
    private final Exchanger<int[]> exchanger;
    private boolean signalled;
    private int[] data = new int[10];
    private static final Random randomGenerator = new Random();
    public Publisher(Exchanger<int[]> exchanger) {
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        while (!signalled) {
            buildData(data);
            try {
                data = exchanger.exchange(data);
                System.out.println("Publisher - Got Data: " + Arrays.toString(data));
                sleep(800);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    private void buildData(int[] data) {
        for (int i = 0; i < data.length; i++) {
            data[i] = randomGenerator.nextInt(1000);
        }
    }
    public void signal() {
        this.signalled = true;
    }
}
The Publisher fills the data and sends it to the Consumer through the exchanger by calling the exchange() method and accepts data sent by the Consumer.
The Consumer code is:
import java.util.Arrays;
import java.util.concurrent.Exchanger;
public class Consumer extends Thread {
    private final Exchanger<int[]> exchanger;
    private boolean signalled;
    private int[] data;
    public Consumer(Exchanger<int[]> exchanger) {
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        while (!signalled) {
            try {
                data = exchanger.exchange(new int[10]);
                System.out.println("Consumer - Got Data: " + Arrays.toString(data));
                sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void signal() {
        this.signalled = true;
    }
}
The Consumer gets the data built by the Publisher and sends an empty int[] to the Publisher. The caller of the exchange() method is blocked until other party has made the call.
Phaser (Java 7)
Phaser is a reusable synchronization barrier similar to the CountDownLatch and CyclicBarrier. It is more flexible to use and reuse compared to the other semaphores.
Let us consider a general example:
import java.util.concurrent.Phaser;
public class BuildJob extends Thread {
    private Phaser ph;
    public BuildJob(String name, Phaser ph){
        super(name);
        this.ph = ph;
        ph.register();
    }
    @Override
    public void run() {
        System.out.println(getName() + ":\t Phasers:" + ph.getPhase() );
        ph.arriveAndAwaitAdvance();
        System.out.println(getName() + ":\t arrived." );
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ph.arriveAndDeregister();
        System.out.println(getName() + ":\t de-registered." );
    }
}
The BuildJob instances are created by the class BuildManager:
import java.util.Scanner;
import java.util.concurrent.Phaser;
public class BuildManager {
    public static void main(String[] args) {
        System.out.println("How many build jobs to run?");
        Scanner inputScanner = new Scanner(System.in);
        int N = inputScanner.nextInt();
        Phaser ph = new Phaser();
        ph.register();
        BuildManager buildManager = new BuildManager();
        for (int i = 0; i < N; i++) {
            buildManager.buildJob(i, ph);
        }
        ph.arriveAndDeregister();
    }
    private synchronized void buildJob(int num, Phaser ph) {
        BuildJob server = new BuildJob("BuildJob-" + num, ph);
        server.start();
    }
}
4. Atomic Classes
The java.util.concurrent.atomic package classes that support lock-free thread-safe programming on single variables. The atomic classes provide thread-safe alternatives and utilities for:
- Atomic Primitives
- Atomic References
- Atomic Reflection Updater Utilities
- Adders and Accumulators Utilities
Atomic Primitives
The thread-safe alternatives of primitives are:
- AtomicBoolean – boolean value that may be updated atomically.
- AtomicInteger – int value that may be updated atomically.
- AtomicLong – long value that may be updated atomically.
- AtomicIntegerArray – int[] in which elements may be updated atomically.
- AtomicLongArray – long[] in which elements may be updated atomically.
Let us take a simple example where we have to keep track of concurrent threads and the number of concurrent threads that are running should not cross a maximum bound. Let us first consider a primitive int type variable for keeping track of the count.
import java.util.Random;
public class BoundedWorkerTask extends Thread {
    private static final int BOUND = 40;
    private static int currentRunningThreads = 0;
    public BoundedWorkerTask(String name) {
        super(name);
    }
    @Override
    public void run() {
        while (currentRunningThreads >= BOUND) {
            sleep(100);
        }
        System.out.println(getName() + ":\tStarted along with " + currentRunningThreads);
        currentRunningThreads++;
        // Mimicking a slow running process
        sleep(new Random().nextInt(100));
        currentRunningThreads--;
        System.out.println(getName() + ":\tCompleted, pending " + currentRunningThreads);
    }
    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static int getCurrentRunningThreads() {
        return currentRunningThreads;
    }
}
The int currentRunningThreads variable keeps track of the number of threads that are currently running. The problem with this code is that the use of int type does not guarantee atomicity in a concurrent application. There could be inconsistencies and the number would not be accurate. For example, if you are running 500 such threads, after all the threads have completed running you may get an arbitrary value in currentRunningThreads and that may not be zero as expected.
To ensure atomic operation on the integer, we could replace the variable with java.util.concurrent.atomic.AtomicInteger type. The new code could be as follows:
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicBoundedWorkerTask extends Thread {
    private static final int BOUND = 40;
    private static AtomicInteger currentRunningThreads = new AtomicInteger(0);
    public AtomicBoundedWorkerTask(String name) {
        super(name);
    }
    @Override
    public void run() {
        while (currentRunningThreads.get() >= BOUND) {
            sleep(100);
        }
        System.out.println(getName() + ":\tStarted along with " + currentRunningThreads.getAndIncrement());
        sleep(new Random().nextInt(100));
        System.out.println(getName() + ":\tCompleted, pending " + currentRunningThreads.decrementAndGet());
    }
    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static int getCurrentRunningThreads() {
        return currentRunningThreads.get();
    }
}
The methods like get(), getAndIncrement(), getAndDecrement(), incrementAndGet(), decrementAndGet(), compareAndSet(), etc ensures atomic operation on this variable. This means it ensures that no matter how many threads are accessing it, it would ensure atomic operations on the value. The example shown above will return zero after all threads are executed completely.
For more examples to play with refer GitHub code repository.
Atomic References
The thread-safe reference utilities available are:
- AtomicReference<V> – Object reference that may be updated atomically.
- AtomicMarkableReference<V> – Object reference along with a mark bit, that can be updated atomically.
- AtomicReferenceArray<E> – Array of object references in which elements may be updated atomically.
- AtomicStampedReference<V> – Object reference along with an integer “stamp”, that can be updated atomically.
The atomic reference classes provide a way to atomically access object references in a concurrent environment.
Let us take an example where we have a linked list of string (alphanumeric characters) as data in each node, and we need to process each node which would be a slow running process:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicAlphaNumericListProcessor extends Thread {
    public static Node top;
    private static AtomicReference<Node> currentNode;
    private List<String> dataProcessedByMe = new ArrayList<String>();
    public AtomicAlphaNumericListProcessor(String name) {
        super(name);
        init();
    }
    private synchronized void init() {
        if(currentNode == null) {
            currentNode = new AtomicReference<Node>(top);
        }
    }
    @Override
    public void run() {
        while (currentNode.get() != null){
            Node myNode = currentNode.get();
            while (!currentNode.compareAndSet(myNode, myNode.getNext())) {
                myNode = currentNode.get();
            }
            System.out.println(getName() + ": Processing " + myNode.getData());
            dataProcessedByMe.add(myNode.getData());
            verySlowTask();
        }
    }
    private void verySlowTask() {
        try {
            sleep(2000 + new Random().nextInt(3000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void printMyData() {
        System.out.println(getName() + ": My Data - " + dataProcessedByMe);
    }
}
The following lines try to get a unique node to process:
Node myNode = currentNode.get();
while (!currentNode.compareAndSet(myNode, myNode.getNext())) {
  myNode = currentNode.get();
}
This can also be achieved through synchronized block like this:
Node myNode;
synchronized (SimpleAlphaNumericListProcessor.class) {
  myNode = currentNode;
  currentNode = myNode.getNext();
}
This is a very simple example but in complex situations, the synchronized blocks could affect the liveness of the thread. The atomic references may be a better alternative in such scenarios.
The AtomicMarkableReference<V> class is the same as AtomicReference<V> apart from the fact that the AtomicMarkableReference<V> has an additional boolean property called ‘mark’. The value of the mark can also be atomically updated along with the object reference.
The AtomicReferenceArray<E>class can hold an array of object references that are updated atomically. The AtomicStampedReference<V>is similar to AtomicMarkableReference<V>, but has an integer property ‘stamp’ instead of a boolean.
Atomic Reflection Updater Utilities
The thread-safe reflection-based utilities are:
- AtomicIntegerFieldUpdater<T> – A reflection-based utility that enables atomic updates to designated volatile int fields of designated classes.
- AtomicLongFieldUpdater<T> – A reflection-based utility that enables atomic updates to designated volatile long fields of designated classes.
- AtomicReferenceFieldUpdater<T,V> – A reflection-based utility that enables atomic updates to designated volatile reference fields of designated classes.
These are Java reflection-based utilities that can be used to atomically update other classes’ volatile Integer, Long, and Reference properties.
Let us just take one example of AtomicIntegerFieldUpdater<T>.  Let us say instead of updating a local property, you have another class that maintains the count of running threads. The ExecutionDetails class keeps the details of the currentRunningThreads. The code of the ExecutionDetails class example is:
public class ExecutionDetails {
    volatile int currentRunningThreads = 0;
    public int getCurrentRunningThreads() {
        return currentRunningThreads;
    }
    public void setCurrentRunningThreads(int currentRunningThreads) {
        this.currentRunningThreads = currentRunningThreads;
    }
}
We can use the AtomicIntegerFieldUpdater<ExecutionDetails> instance to atomically update the currentRunningThreads int property.
import java.util.Random;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class AtomicBoundedWorkerTask extends Thread {
    private static final int BOUND = 40;
    private static final AtomicIntegerFieldUpdater<ExecutionDetails>
            atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(ExecutionDetails.class, "currentRunningThreads" );
    private static final ExecutionDetails executionDetails = new ExecutionDetails();
    public AtomicBoundedWorkerTask(String name) {
        super(name);
    }
    @Override
    public void run() {
        while (atomicIntegerFieldUpdater.get(executionDetails) >= BOUND) {
            sleep(100);
        }
        System.out.println(getName() + ":\tStarted along with " + atomicIntegerFieldUpdater.getAndIncrement(executionDetails));
        sleep(new Random().nextInt(100));
        System.out.println(getName() + ":\tCompleted, pending " + atomicIntegerFieldUpdater.decrementAndGet(executionDetails));
    }
    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static int getCurrentRunningThreads() {
        return atomicIntegerFieldUpdater.get(executionDetails);
    }
}
The methods like get(T), getAndIncrement(T), getAndDecrement(T), incrementAndGet(T), decrementAndGet(T), compareAndSet(T), etc, ensures atomic operation on the volatile variable currentRunningThreads in the ExecutionDetailsobject that is passed in the argument of the methods.
Adders and Accumulators Utilities
The thread-safe utilities for adders and accumulators were introduced in Java 8:
- DoubleAccumulator – One or more variables that together maintain a running double value updated using a supplied function.
- DoubleAdder – One or more variables that together maintain an initially zero double sum.
- LongAccumulator – One or more variables that together maintain a running long value updated using a supplied function.
- LongAdder – One or more variables that together maintain an initially zero long sum.
The Accumulators and Adders are special types of concurrent utilities that provide a thread-safe way to maintain running accumulated values.
The Accumulators maintains a running value that is updated using a supplied function in the constructor. The method accumulate(long) is called to update the value with the function that was supplied during the construction of the LongAccumulator instance. Let us take an example, where we need to find the maximum value supplied by multiple threads:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.LongAccumulator;
public class AtomicLongAccumulatorThread extends Thread {
    public static final LongAccumulator MAX_ACCUMULATOR = new LongAccumulator(Math::max, 0L);
    private final Random randomNumberGenerator = new Random();
    private List<Long> myNumbers = new ArrayList<>();
    private boolean stopSignalled;
    public AtomicLongAccumulatorThread(String name) {
        super(name);
    }
    @Override
    public void run() {
        while (!stopSignalled) {
            long number = 1000 + randomNumberGenerator.nextInt(1000000);
            myNumbers.add(number);
            MAX_ACCUMULATOR.accumulate(number);
            sleep();
        }
        long mySum = 0;
        for(long number:myNumbers){
            mySum += number;
        }
        System.out.println(getName() + ":\t" + mySum + " <= " + myNumbers);
    }
    private void sleep() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void signalStop() {
        this.stopSignalled = true;
    }
}
The constructor accepts two parameters. The first one is a lambda function on two long types of variables. The second parameter is the identity or initial value. In the example above, we are using Math.max(long, long) method along with 0L as the initial identity value. When the MAX_ACCUMULATOR.accumulate(number); method is called, the supplied lambda function is called on the current identity value and the new parameter value. In our example, it would return the larger number and sets it as the new identity.
The Adder is similar to the Accumulator and performs the addition operation on the running value. An example:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.LongAdder;
public class AtomicRandomNumberAdderThread extends Thread {
    public static final LongAdder adder = new LongAdder();
    private final Random randomNumberGenerator = new Random();
    private List<Long> myNumbers = new ArrayList<>();
    private boolean stopSignalled;
    public AtomicRandomNumberAdderThread(String name) {
        super(name);
    }
    @Override
    public void run() {
        while (!stopSignalled) {
            long number = randomNumberGenerator.nextInt(50);
            myNumbers.add(number);
            adder.add(number);
            sleep();
        }
        long mySum = 0;
        for(long number:myNumbers){
            mySum += number;
        }
        System.out.println(getName() + ":\t" + mySum + " <= " + myNumbers);
    }
    private void sleep() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void signalStop() {
        this.stopSignalled = true;
    }
}
The LongAdder class is a utility that maintains a running sum of numbers provided to it through the add(long) method.
5. Locks
The java.util.concurrent.locks package contains interfaces and classes for locking and waiting conditions. The framework provides much greater flexibility for synchronization using locks and conditions. The main interfaces are Lock, ReadWriteLock, and Condition. The Lock implementations is a more generic locking mechanism and can also be used in non-blocking-structured contexts. A non-blocking context is in which the process either performs the action or notifies that the action could not be performed. The thread will not be waiting if a lock cannot be acquired. The ReadWriteLock implementations can be shared by many readers but are exclusive to one writer. The Condition interface can be used to describe condition variables that are associated with Locks.
The most important built-in lock implementations that are available to programmers are:
- ReentrantLock – a simple lock that can be acquired by a thread more than once.
- ReentrantReadWriteLock – a lock that provides separate read and write locks.
Apart from the important ones mentioned above, the java.util.concurrent.locks package contains other interfaces and utilities. See the Java documentation for more information.
ReentrantLock
The ReentrantLock allows threads to acquire a lock on a resource more than once. When the thread first acquires the lock using the lock() method, a ‘hold count’ is set to one. The thread can re-acquire a lock multiple times before releasing the lock and the ‘hold count’ is incremented by one every time. The treads can call the unlock() method to decrement the ‘hold count’ by one and when hold count is 0, the resource is unlocked.
The ReentrantLock also offers a fairness parameter. The default value is set to true and when the lock is available, it would go to the thread which has been waiting for the longest time. When the fairness is set to false, it allows barging of threads and does not guarantee any order of which thread could successfully acquire a permit. This could result in thread starvations.
Let us see an example:
Let us see publisher-subscriber codes that handle a ConcurrentList type shared resource. The Publisher and Subscriber classes extend SingledThread class, which is a Thread that repeatedly calls the process() until the signal() method is called.
The Publisher implementation:
import java.util.Random;
public class Publisher extends SingledThread {
    private ConcurrentList<Integer> sharedList;
    private Random random = new Random();
    public Publisher(String name, ConcurrentList<Integer> sharedList) {
        super(name);
        this.sharedList = sharedList;
    }
    public void process() {
        int number = random.nextInt(500);
        sharedList.add(number);
        System.out.println(getName() + "\t: Added " + number + ", Size: " + sharedList.size());
        sleep();
    }
    private void sleep() {
        try {
            Thread.sleep(100 + random.nextInt(100));
        } catch (InterruptedException ie ) { ie.printStackTrace(); }
    }
}
The Subscriber implementation:
import java.util.Random;
public class Subscriber extends SingledThread {
    private ConcurrentList<Integer> sharedList;
    private Random random = new Random();
    public Subscriber(String name, ConcurrentList<Integer> sharedList) {
        super(name);
        this.sharedList = sharedList;
    }
    public void process() {
        int index = random.nextInt(sharedList.size());
        Integer number = sharedList.get(index);
        System.out.println(getName() + "\t: Got " + number + ", Remaining: " + sharedList.size());
        sleep();
    }
    private void sleep() {
        try {
            Thread.sleep(300 + random.nextInt(200));
        } catch (InterruptedException ie ) { ie.printStackTrace(); }
    }
}
The ConcurrentList class contains a ReentrantLock to restrict access to the shared list among the publishers and subscribers.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConcurrentList<E> {
    private final List<E> list = new ArrayList<E>();
    private Lock lock = new ReentrantLock();
    public ConcurrentList(E... initialItems) {
        list.addAll(Arrays.asList(initialItems));
    }
    public void add(E item) {
        lock.lock();
        try {
            list.add(item);
        } finally {
            lock.unlock();
        }
    }
    public E get(int index) {
        lock.lock();
        try {
            return list.remove(index);
        } finally {
            lock.unlock();
        }
    }
    public int size() {
        lock.lock();
        try {
            return list.size();
        } finally {
            lock.unlock();
        }
    }
}
The ConcurrentList class acquires lock by calling the lock() method and later releases the lock by calling the unlock() in the finally blocks. This ensures that only one thread has access to the list.
ReentrantReadWriteLock
The ReentrantLock gives exclusive locks even if there is no modification in the shared resource. The ReentrantReadWriteLock is similar to the ReentrantLock, but provides separate locks for readers and writers. This provides multiple reader threads to access the shared resource simultaneously if there is no writing thread that is waiting, hence improving the performance compared to the ReentrantLock.
A thread that tries to acquire a read lock will block if either the write lock is held, or there is a waiting writer thread. A thread that tries to acquire a fair write lock will block unless when there is no read or write threads waiting. The ReentrantReadWriteLock also offers a fairness parameter similar to the ReentrantLock.
In the above example, we can modify the ConcurrentList class to use the ReentrantReadWriteLock instead of ReentrantLock:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ConcurrentList<E> {
    private final List<E> list = new ArrayList<E>();
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    public ConcurrentList(E... initialItems) {
        list.addAll(Arrays.asList(initialItems));
    }
    public void add(E item) {
        Lock writeLock = lock.writeLock();
        writeLock.lock();
        try {
            list.add(item);
        } finally {
            writeLock.unlock();
        }
    }
    public E get(int index) {
        Lock readLock = lock.readLock();
        readLock.lock();
        try {
            return list.remove(index);
        } finally {
            readLock.unlock();
        }
    }
    public int size() {
        Lock readLock = lock.readLock();
        readLock.lock();
        try {
            return list.size();
        } finally {
            readLock.unlock();
        }
    }
}
You can find the complete source of all the examples in my GitHub code repository.
 
		    				        




