⚙️ Concurrency and Multithreading

Master Java's concurrent programming through thread-safe design patterns, performance optimization strategies, and advanced parallel processing techniques


🎯 Learning Objectives

By the end of this lesson, you will:


🏗️ Concurrency Architecture Overview

<aside> ⚙️

Concurrency Philosophy

Concurrency is like orchestrating a symphony - multiple musicians (threads) play different parts simultaneously, requiring careful coordination and synchronization to create harmony instead of chaos. Java provides the conductor's tools for managing this complexity.

</aside>

Thread vs Process vs Concurrency Concepts

Java Concurrency Architecture - From Process to Threads
═══════════════════════════════════════════════════════════════════════════════

    🖥️ PROCESS LEVEL               🧵 THREAD LEVEL              ⚙️ CONCURRENCY TOOLS
┌─────────────────────────────┐   ┌─────────────────────────┐   ┌─────────────────────────┐
│ • Separate memory space     │   │ • Shared memory space   │   │ • Synchronization       │
│ • Heavy resource overhead   │   │ • Lightweight           │   │ • Thread pools          │
│ • Inter-process comms       │   │ • Fast context switch   │   │ • Concurrent collections│
│ • Process isolation        │   │ • Shared data risks     │   │ • Atomic operations     │
└─────────────────────────────┘   └─────────────────────────┘   └─────────────────────────┘
            ▲                              ▲                              ▲
            │                              │                              │
    ┌───────────────────┐          ┌──────────────────┐          ┌──────────────────┐
    │ JVM Process       │          │ Application      │          │ Executor         │
    │ • Heap Memory     │          │ Threads          │          │ Framework        │
    │ • Method Area     │          │ • Main Thread    │          │ • ThreadPool     │
    │ • Program Counter │          │ • Worker Threads │          │ • Future/Task    │
    └───────────────────┘          └──────────────────┘          └──────────────────┘

    📊 THREAD STATE LIFECYCLE
    ┌─────────────────────────────────────────────────────────────────────────┐
    │                                                                         │
    │  NEW ──start()──▶ RUNNABLE ◄──────▶ RUNNING ──────▶ TERMINATED          │
    │                      ▲                 │                                │
    │                      │                 ▼                                │
    │                   BLOCKED ◄─── synchronization                          │
    │                      ▲                                                  │
    │                      │                                                  │
    │                   WAITING ◄─── wait(), join()                          │
    │                      ▲                                                  │
    │                      │                                                  │
    │                TIMED_WAITING ◄─── sleep(), wait(timeout)               │
    │                                                                         │
    └─────────────────────────────────────────────────────────────────────────┘

🧵 Thread Creation and Management Excellence

Thread Creation Patterns

<aside> 🚀

Thread Creation Strategy

Choose Runnable over Thread extension for better flexibility, use lambda expressions for simple tasks, and prefer Executor framework over manual thread creation for production applications.

</aside>

🏭 Manufacturing Production Line System

public class ManufacturingProductionSystem {
    
    // 📦 Product model
    public static class Product {
        private final int id;
        private final String type;
        private final long productionTime; // milliseconds
        private String status;
        private long startTime;
        private long endTime;
        
        public Product(int id, String type, long productionTime) {
            [this.id](<http://this.id>) = id;
            this.type = type;
            this.productionTime = productionTime;
            this.status = "QUEUED";
        }
        
        public void startProduction() {
            this.status = "IN_PRODUCTION";
            this.startTime = System.currentTimeMillis();
        }
        
        public void completeProduction() {
            this.status = "COMPLETED";
            this.endTime = System.currentTimeMillis();
        }
        
        // Getters
        public int getId() { return id; }
        public String getType() { return type; }
        public long getProductionTime() { return productionTime; }
        public String getStatus() { return status; }
        public long getActualProductionTime() { return endTime - startTime; }
        
        @Override
        public String toString() {
            return String.format("Product[ID:%d, Type:%s, Status:%s]", id, type, status);
        }
    }
    
    // 🏭 Production worker using Thread inheritance (Method 1)
    public static class ProductionWorkerThread extends Thread {
        private final String workerName;
        private final BlockingQueue<Product> productQueue;
        private final AtomicInteger completedProducts;
        private volatile boolean running = true;
        
        public ProductionWorkerThread(String workerName, BlockingQueue<Product> productQueue,
                                    AtomicInteger completedProducts) {
            this.workerName = workerName;
            this.productQueue = productQueue;
            this.completedProducts = completedProducts;
            this.setName(workerName); // Set thread name
        }
        
        @Override
        public void run() {
            System.out.println("🔧 " + workerName + " started working");
            
            while (running && !Thread.currentThread().isInterrupted()) {
                try {
                    // Take product from queue (blocks if empty)
                    Product product = productQueue.poll(1, TimeUnit.SECONDS);
                    
                    if (product != null) {
                        processProduct(product);
                        completedProducts.incrementAndGet();
                    }
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // Restore interrupt flag
                    System.out.println("🛑 " + workerName + " interrupted");
                    break;
                }
            }
            
            System.out.println("✅ " + workerName + " finished work");
        }
        
        private void processProduct(Product product) throws InterruptedException {
            System.out.println("🔨 " + workerName + " processing " + product);
            
            product.startProduction();
            
            // Simulate production time with interruption handling
            long remainingTime = product.getProductionTime();
            long sleepInterval = Math.min(100, remainingTime); // Check every 100ms
            
            while (remainingTime > 0 && !Thread.currentThread().isInterrupted()) {
                long sleepTime = Math.min(sleepInterval, remainingTime);
                Thread.sleep(sleepTime);
                remainingTime -= sleepTime;
            }
            
            if (!Thread.currentThread().isInterrupted()) {
                product.completeProduction();
                System.out.println("✅ " + workerName + " completed " + product + 
                                 " in " + product.getActualProductionTime() + "ms");
            }
        }
        
        public void shutdown() {
            running = false;
            this.interrupt(); // Wake up if waiting
        }
    }
    
    // 🏭 Quality Control Inspector using Runnable (Method 2)
    public static class QualityControlInspector implements Runnable {
        private final String inspectorName;
        private final BlockingQueue<Product> completedProducts;
        private final AtomicInteger inspectedCount;
        private volatile boolean active = true;
        
        public QualityControlInspector(String inspectorName, 
                                     BlockingQueue<Product> completedProducts,
                                     AtomicInteger inspectedCount) {
            this.inspectorName = inspectorName;
            this.completedProducts = completedProducts;
            this.inspectedCount = inspectedCount;
        }
        
        @Override
        public void run() {
            System.out.println("🔍 " + inspectorName + " started quality control");
            
            while (active && !Thread.currentThread().isInterrupted()) {
                try {
                    Product product = completedProducts.poll(500, TimeUnit.MILLISECONDS);
                    
                    if (product != null) {
                        inspectProduct(product);
                        inspectedCount.incrementAndGet();
                    }
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.out.println("🛑 " + inspectorName + " interrupted");
                    break;
                }
            }
            
            System.out.println("✅ " + inspectorName + " finished quality control");
        }
        
        private void inspectProduct(Product product) throws InterruptedException {
            System.out.println("🔍 " + inspectorName + " inspecting " + product);
            
            // Simulate inspection time
            Thread.sleep(200 + (int)(Math.random() * 300)); // 200-500ms
            
            // 95% pass rate
            boolean passed = Math.random() < 0.95;
            if (passed) {
                System.out.println("✅ " + inspectorName + " approved " + product);
            } else {
                System.out.println("❌ " + inspectorName + " rejected " + product);
            }
        }
        
        public void shutdown() {
            active = false;
        }
    }
    
    // 🏭 Main production system
    public static void demonstrateThreadCreation() throws InterruptedException {
        System.out.println("🏭 Manufacturing Production System Starting");
        System.out.println("==========================================");
        
        // Shared data structures (thread-safe)
        BlockingQueue<Product> productionQueue = new LinkedBlockingQueue<>();
        BlockingQueue<Product> completedQueue = new LinkedBlockingQueue<>();
        AtomicInteger completedProducts = new AtomicInteger(0);
        AtomicInteger inspectedProducts = new AtomicInteger(0);
        
        // Method 1: Create workers using Thread inheritance
        List<ProductionWorkerThread> workers = Arrays.asList(
            new ProductionWorkerThread("Worker-A", productionQueue, completedProducts),
            new ProductionWorkerThread("Worker-B", productionQueue, completedProducts),
            new ProductionWorkerThread("Worker-C", productionQueue, completedProducts)
        );
        
        // Method 2: Create inspectors using Runnable
        List<Thread> inspectorThreads = Arrays.asList(
            new Thread(new QualityControlInspector("Inspector-1", completedQueue, inspectedProducts)),
            new Thread(new QualityControlInspector("Inspector-2", completedQueue, inspectedProducts))
        );
        
        // Method 3: Create supplier using Lambda (Method 3)
        Thread supplierThread = new Thread(() -> {
            System.out.println("📦 Supplier started");
            try {
                for (int i = 1; i <= 20; i++) {
                    Product product = new Product(i, 
                                                "Product-" + (char)('A' + (i % 3)), 
                                                500 + (long)(Math.random() * 1000));
                    
                    productionQueue.put(product);
                    System.out.println("📥 Supplier added " + product);
                    
                    Thread.sleep(100); // Supply rate control
                }
                System.out.println("📦 Supplier finished supplying products");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("📦 Supplier interrupted");
            }
        }, "Supplier-Thread");
        
        // Method 4: Product mover using method reference
        Runnable productMover = () -> {
            System.out.println("🚚 Product Mover started");
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    // Check for completed products and move them to QC
                    // This would be more complex in real scenario
                    Thread.sleep(50);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("🚚 Product Mover interrupted");
            }
        };
        Thread moverThread = new Thread(productMover, "Product-Mover");
        
        // Start all threads
        System.out.println("🚀 Starting all production threads...");
        
        // Start workers (Thread inheritance)
        workers.forEach(Thread::start);
        
        // Start inspectors (Runnable implementation)
        inspectorThreads.forEach(Thread::start);
        
        // Start supplier (Lambda)
        supplierThread.start();
        
        // Start mover (Method reference)
        moverThread.start();
        
        // Monitor production for 10 seconds
        monitorProduction(workers, inspectorThreads, completedProducts, inspectedProducts);
        
        // Shutdown sequence
        shutdownProduction(workers, inspectorThreads, supplierThread, moverThread);
        
        // Final statistics
        System.out.println("\\n📊 Final Production Statistics:");
        System.out.println("  Products completed: " + completedProducts.get());
        System.out.println("  Products inspected: " + inspectedProducts.get());
        System.out.println("  Products in queue: " + productionQueue.size());
    }
    
    private static void monitorProduction(List<ProductionWorkerThread> workers,
                                        List<Thread> inspectors,
                                        AtomicInteger completed,
                                        AtomicInteger inspected) throws InterruptedException {
        
        System.out.println("\\n📊 Monitoring production for 10 seconds...");
        
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            
            // Check thread states
            long activeWorkers = [workers.stream](<http://workers.stream>)()
                .filter(w -> w.getState() != Thread.State.TERMINATED)
                .count();
            
            long activeInspectors = [inspectors.stream](<http://inspectors.stream>)()
                .filter(t -> t.getState() != Thread.State.TERMINATED)
                .count();
            
            System.out.printf("⏰ Time: %ds | Workers: %d | Inspectors: %d | Completed: %d | Inspected: %d%n",
                            i + 1, activeWorkers, activeInspectors, completed.get(), inspected.get());
        }
    }
    
    private static void shutdownProduction(List<ProductionWorkerThread> workers,
                                         List<Thread> inspectors,
                                         Thread supplier,
                                         Thread mover) throws InterruptedException {
        
        System.out.println("\\n🛑 Initiating production shutdown...");
        
        // Interrupt supplier first
        supplier.interrupt();
        supplier.join(2000);
        
        // Shutdown workers
        workers.forEach(ProductionWorkerThread::shutdown);
        
        // Wait for workers to finish current tasks
        for (ProductionWorkerThread worker : workers) {
            worker.join(2000);
            if (worker.isAlive()) {
                System.out.println("⚠️ " + worker.getName() + " did not shutdown gracefully");
                worker.interrupt();
            }
        }
        
        // Shutdown inspectors
        inspectors.forEach(Thread::interrupt);
        for (Thread inspector : inspectors) {
            inspector.join(2000);
        }
        
        // Shutdown mover
        mover.interrupt();
        mover.join(1000);
        
        System.out.println("✅ All threads have been shutdown");
    }
    
    // 🔍 Thread state demonstration
    public static void demonstrateThreadStates() throws InterruptedException {
        System.out.println("\\n🔍 Thread State Demonstration");
        System.out.println("=============================");
        
        Thread demoThread = new Thread(() -> {
            try {
                System.out.println("Thread executing...");
                
                // Show TIMED_WAITING state
                Thread.sleep(2000);
                
                // Show WAITING state
                synchronized (ManufacturingProductionSystem.class) {
                    ManufacturingProductionSystem.class.wait(1000);
                }
                
                System.out.println("Thread finishing...");
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Thread interrupted");
            }
        }, "Demo-Thread");
        
        // NEW state
        System.out.println("1. NEW: " + demoThread.getState());
        
        // Start thread -> RUNNABLE
        demoThread.start();
        Thread.sleep(100); // Give it time to start
        System.out.println("2. RUNNABLE: " + demoThread.getState());
        
        // During sleep -> TIMED_WAITING
        Thread.sleep(500);
        System.out.println("3. TIMED_WAITING: " + demoThread.getState());
        
        // Wake up the waiting thread
        synchronized (ManufacturingProductionSystem.class) {
            ManufacturingProductionSystem.class.notifyAll();
        }
        
        // Wait for completion -> TERMINATED
        demoThread.join();
        System.out.println("4. TERMINATED: " + demoThread.getState());
    }
}