-
Notifications
You must be signed in to change notification settings - Fork 696
Open
Description
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
import java.util.stream.IntStream;
public class AdvancedConcurrentProcessor {
// Using BlockingQueue for thread-safe producer-consumer interaction
private static final BlockingQueue<String> dataQueue = new LinkedBlockingQueue<>(10);
// Concurrent collection for safe storage of results
private static final List<String> results = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws InterruptedException {
// Create a fixed thread pool for parallel processing
ExecutorService executor = Executors.newFixedThreadPool(4);
// 1. Producer Task: Generates data
Runnable producer = () -> {
try {
for (int i = 0; i < 20; i++) {
String data = "Data-" + i;
dataQueue.put(data); // Blocks if queue is full
System.out.println(Thread.currentThread().getName() + " produced: " + data);
Thread.sleep(50); // Simulate production time
}
dataQueue.put("POISON_PILL"); // Signal completion
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
// 2. Consumer Tasks: Processes data concurrently
Runnable consumer = () -> {
try {
while (true) {
String data = dataQueue.take(); // Blocks if queue is empty
if ("POISON_PILL".equals(data)) {
dataQueue.put("POISON_PILL"); // Pass signal to other consumers
break;
}
// Heavy processing simulation
String processed = Thread.currentThread().getName() + " processed " + data;
results.add(processed);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
// Execute tasks
executor.submit(producer);
executor.submit(consumer);
executor.submit(consumer);
// Shutdown executor gracefully
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
// 3. Process Results using Streams API
System.out.println("\n--- Processing Complete. Total Results: " + results.size() + " ---");
results.stream()
.filter(s -> s.contains("Data-1")) // Filter data
.map(String::toUpperCase) // Transform
.forEach(System.out::println);
}
}
Metadata
Metadata
Assignees
Labels
No labels