Master Java's concurrent programming through thread-safe design patterns, performance optimization strategies, and advanced parallel processing techniques
By the end of this lesson, you will:
<aside> ⚙️
Concurrency Philosophy
Concurrency is like orchestrating a symphony - multiple musicians (threads) play different parts simultaneously, requiring careful coordination and synchronization to create harmony instead of chaos. Java provides the conductor's tools for managing this complexity.
</aside>
Java Concurrency Architecture - From Process to Threads
═══════════════════════════════════════════════════════════════════════════════
🖥️ PROCESS LEVEL 🧵 THREAD LEVEL ⚙️ CONCURRENCY TOOLS
┌─────────────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐
│ • Separate memory space │ │ • Shared memory space │ │ • Synchronization │
│ • Heavy resource overhead │ │ • Lightweight │ │ • Thread pools │
│ • Inter-process comms │ │ • Fast context switch │ │ • Concurrent collections│
│ • Process isolation │ │ • Shared data risks │ │ • Atomic operations │
└─────────────────────────────┘ └─────────────────────────┘ └─────────────────────────┘
▲ ▲ ▲
│ │ │
┌───────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ JVM Process │ │ Application │ │ Executor │
│ • Heap Memory │ │ Threads │ │ Framework │
│ • Method Area │ │ • Main Thread │ │ • ThreadPool │
│ • Program Counter │ │ • Worker Threads │ │ • Future/Task │
└───────────────────┘ └──────────────────┘ └──────────────────┘
📊 THREAD STATE LIFECYCLE
┌─────────────────────────────────────────────────────────────────────────┐
│ │
│ NEW ──start()──▶ RUNNABLE ◄──────▶ RUNNING ──────▶ TERMINATED │
│ ▲ │ │
│ │ ▼ │
│ BLOCKED ◄─── synchronization │
│ ▲ │
│ │ │
│ WAITING ◄─── wait(), join() │
│ ▲ │
│ │ │
│ TIMED_WAITING ◄─── sleep(), wait(timeout) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
<aside> 🚀
Thread Creation Strategy
Choose Runnable over Thread extension for better flexibility, use lambda expressions for simple tasks, and prefer Executor framework over manual thread creation for production applications.
</aside>
public class ManufacturingProductionSystem {
// 📦 Product model
public static class Product {
private final int id;
private final String type;
private final long productionTime; // milliseconds
private String status;
private long startTime;
private long endTime;
public Product(int id, String type, long productionTime) {
[this.id](<http://this.id>) = id;
this.type = type;
this.productionTime = productionTime;
this.status = "QUEUED";
}
public void startProduction() {
this.status = "IN_PRODUCTION";
this.startTime = System.currentTimeMillis();
}
public void completeProduction() {
this.status = "COMPLETED";
this.endTime = System.currentTimeMillis();
}
// Getters
public int getId() { return id; }
public String getType() { return type; }
public long getProductionTime() { return productionTime; }
public String getStatus() { return status; }
public long getActualProductionTime() { return endTime - startTime; }
@Override
public String toString() {
return String.format("Product[ID:%d, Type:%s, Status:%s]", id, type, status);
}
}
// 🏭 Production worker using Thread inheritance (Method 1)
public static class ProductionWorkerThread extends Thread {
private final String workerName;
private final BlockingQueue<Product> productQueue;
private final AtomicInteger completedProducts;
private volatile boolean running = true;
public ProductionWorkerThread(String workerName, BlockingQueue<Product> productQueue,
AtomicInteger completedProducts) {
this.workerName = workerName;
this.productQueue = productQueue;
this.completedProducts = completedProducts;
this.setName(workerName); // Set thread name
}
@Override
public void run() {
System.out.println("🔧 " + workerName + " started working");
while (running && !Thread.currentThread().isInterrupted()) {
try {
// Take product from queue (blocks if empty)
Product product = productQueue.poll(1, TimeUnit.SECONDS);
if (product != null) {
processProduct(product);
completedProducts.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt flag
System.out.println("🛑 " + workerName + " interrupted");
break;
}
}
System.out.println("✅ " + workerName + " finished work");
}
private void processProduct(Product product) throws InterruptedException {
System.out.println("🔨 " + workerName + " processing " + product);
product.startProduction();
// Simulate production time with interruption handling
long remainingTime = product.getProductionTime();
long sleepInterval = Math.min(100, remainingTime); // Check every 100ms
while (remainingTime > 0 && !Thread.currentThread().isInterrupted()) {
long sleepTime = Math.min(sleepInterval, remainingTime);
Thread.sleep(sleepTime);
remainingTime -= sleepTime;
}
if (!Thread.currentThread().isInterrupted()) {
product.completeProduction();
System.out.println("✅ " + workerName + " completed " + product +
" in " + product.getActualProductionTime() + "ms");
}
}
public void shutdown() {
running = false;
this.interrupt(); // Wake up if waiting
}
}
// 🏭 Quality Control Inspector using Runnable (Method 2)
public static class QualityControlInspector implements Runnable {
private final String inspectorName;
private final BlockingQueue<Product> completedProducts;
private final AtomicInteger inspectedCount;
private volatile boolean active = true;
public QualityControlInspector(String inspectorName,
BlockingQueue<Product> completedProducts,
AtomicInteger inspectedCount) {
this.inspectorName = inspectorName;
this.completedProducts = completedProducts;
this.inspectedCount = inspectedCount;
}
@Override
public void run() {
System.out.println("🔍 " + inspectorName + " started quality control");
while (active && !Thread.currentThread().isInterrupted()) {
try {
Product product = completedProducts.poll(500, TimeUnit.MILLISECONDS);
if (product != null) {
inspectProduct(product);
inspectedCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("🛑 " + inspectorName + " interrupted");
break;
}
}
System.out.println("✅ " + inspectorName + " finished quality control");
}
private void inspectProduct(Product product) throws InterruptedException {
System.out.println("🔍 " + inspectorName + " inspecting " + product);
// Simulate inspection time
Thread.sleep(200 + (int)(Math.random() * 300)); // 200-500ms
// 95% pass rate
boolean passed = Math.random() < 0.95;
if (passed) {
System.out.println("✅ " + inspectorName + " approved " + product);
} else {
System.out.println("❌ " + inspectorName + " rejected " + product);
}
}
public void shutdown() {
active = false;
}
}
// 🏭 Main production system
public static void demonstrateThreadCreation() throws InterruptedException {
System.out.println("🏭 Manufacturing Production System Starting");
System.out.println("==========================================");
// Shared data structures (thread-safe)
BlockingQueue<Product> productionQueue = new LinkedBlockingQueue<>();
BlockingQueue<Product> completedQueue = new LinkedBlockingQueue<>();
AtomicInteger completedProducts = new AtomicInteger(0);
AtomicInteger inspectedProducts = new AtomicInteger(0);
// Method 1: Create workers using Thread inheritance
List<ProductionWorkerThread> workers = Arrays.asList(
new ProductionWorkerThread("Worker-A", productionQueue, completedProducts),
new ProductionWorkerThread("Worker-B", productionQueue, completedProducts),
new ProductionWorkerThread("Worker-C", productionQueue, completedProducts)
);
// Method 2: Create inspectors using Runnable
List<Thread> inspectorThreads = Arrays.asList(
new Thread(new QualityControlInspector("Inspector-1", completedQueue, inspectedProducts)),
new Thread(new QualityControlInspector("Inspector-2", completedQueue, inspectedProducts))
);
// Method 3: Create supplier using Lambda (Method 3)
Thread supplierThread = new Thread(() -> {
System.out.println("📦 Supplier started");
try {
for (int i = 1; i <= 20; i++) {
Product product = new Product(i,
"Product-" + (char)('A' + (i % 3)),
500 + (long)(Math.random() * 1000));
productionQueue.put(product);
System.out.println("📥 Supplier added " + product);
Thread.sleep(100); // Supply rate control
}
System.out.println("📦 Supplier finished supplying products");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("📦 Supplier interrupted");
}
}, "Supplier-Thread");
// Method 4: Product mover using method reference
Runnable productMover = () -> {
System.out.println("🚚 Product Mover started");
try {
while (!Thread.currentThread().isInterrupted()) {
// Check for completed products and move them to QC
// This would be more complex in real scenario
Thread.sleep(50);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("🚚 Product Mover interrupted");
}
};
Thread moverThread = new Thread(productMover, "Product-Mover");
// Start all threads
System.out.println("🚀 Starting all production threads...");
// Start workers (Thread inheritance)
workers.forEach(Thread::start);
// Start inspectors (Runnable implementation)
inspectorThreads.forEach(Thread::start);
// Start supplier (Lambda)
supplierThread.start();
// Start mover (Method reference)
moverThread.start();
// Monitor production for 10 seconds
monitorProduction(workers, inspectorThreads, completedProducts, inspectedProducts);
// Shutdown sequence
shutdownProduction(workers, inspectorThreads, supplierThread, moverThread);
// Final statistics
System.out.println("\\n📊 Final Production Statistics:");
System.out.println(" Products completed: " + completedProducts.get());
System.out.println(" Products inspected: " + inspectedProducts.get());
System.out.println(" Products in queue: " + productionQueue.size());
}
private static void monitorProduction(List<ProductionWorkerThread> workers,
List<Thread> inspectors,
AtomicInteger completed,
AtomicInteger inspected) throws InterruptedException {
System.out.println("\\n📊 Monitoring production for 10 seconds...");
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
// Check thread states
long activeWorkers = [workers.stream](<http://workers.stream>)()
.filter(w -> w.getState() != Thread.State.TERMINATED)
.count();
long activeInspectors = [inspectors.stream](<http://inspectors.stream>)()
.filter(t -> t.getState() != Thread.State.TERMINATED)
.count();
System.out.printf("⏰ Time: %ds | Workers: %d | Inspectors: %d | Completed: %d | Inspected: %d%n",
i + 1, activeWorkers, activeInspectors, completed.get(), inspected.get());
}
}
private static void shutdownProduction(List<ProductionWorkerThread> workers,
List<Thread> inspectors,
Thread supplier,
Thread mover) throws InterruptedException {
System.out.println("\\n🛑 Initiating production shutdown...");
// Interrupt supplier first
supplier.interrupt();
supplier.join(2000);
// Shutdown workers
workers.forEach(ProductionWorkerThread::shutdown);
// Wait for workers to finish current tasks
for (ProductionWorkerThread worker : workers) {
worker.join(2000);
if (worker.isAlive()) {
System.out.println("⚠️ " + worker.getName() + " did not shutdown gracefully");
worker.interrupt();
}
}
// Shutdown inspectors
inspectors.forEach(Thread::interrupt);
for (Thread inspector : inspectors) {
inspector.join(2000);
}
// Shutdown mover
mover.interrupt();
mover.join(1000);
System.out.println("✅ All threads have been shutdown");
}
// 🔍 Thread state demonstration
public static void demonstrateThreadStates() throws InterruptedException {
System.out.println("\\n🔍 Thread State Demonstration");
System.out.println("=============================");
Thread demoThread = new Thread(() -> {
try {
System.out.println("Thread executing...");
// Show TIMED_WAITING state
Thread.sleep(2000);
// Show WAITING state
synchronized (ManufacturingProductionSystem.class) {
ManufacturingProductionSystem.class.wait(1000);
}
System.out.println("Thread finishing...");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread interrupted");
}
}, "Demo-Thread");
// NEW state
System.out.println("1. NEW: " + demoThread.getState());
// Start thread -> RUNNABLE
demoThread.start();
Thread.sleep(100); // Give it time to start
System.out.println("2. RUNNABLE: " + demoThread.getState());
// During sleep -> TIMED_WAITING
Thread.sleep(500);
System.out.println("3. TIMED_WAITING: " + demoThread.getState());
// Wake up the waiting thread
synchronized (ManufacturingProductionSystem.class) {
ManufacturingProductionSystem.class.notifyAll();
}
// Wait for completion -> TERMINATED
demoThread.join();
System.out.println("4. TERMINATED: " + demoThread.getState());
}
}