Concurrent Collections
Concurrent collections provide thread-safe versions of all data structures for use in multi-threaded applications. Harneet's concurrent collections use optimized synchronization mechanisms to ensure data integrity while maintaining performance.
Overview
Concurrent collections solve the problem of data races when multiple threads access shared data structures simultaneously. They provide:
- Thread Safety: All operations are atomic and prevent data corruption
- Deadlock Prevention: Proper locking hierarchies prevent circular dependencies
- Performance Optimization: Read-write locks and lock-free operations where possible
- Consistent API: Same interface as non-concurrent collections
Available Concurrent Collections
| Collection | Thread-Safe Version | Synchronization Method |
| Stack | ConcurrentStack | RWMutex with lock-free optimizations |
| Queue | ConcurrentQueue | RWMutex with circular buffer |
| Set | ConcurrentSet | RWMutex with hash-based storage |
| Map | ConcurrentMap | RWMutex with optimized bucket locking |
| LinkedList | ConcurrentLinkedList | RWMutex with node-level locking |
| BinarySearchTree | ConcurrentBST | RWMutex with tree-level locking |
Creating Concurrent Collections
| Creating Concurrent Collections |
|---|
| import collections
// Create concurrent versions
var c_stack = collections.new_concurrent_stack()
var c_queue = collections.new_concurrent_queue()
var c_set = collections.new_concurrent_set()
var c_map = collections.new_concurrent_map()
var c_list = collections.new_concurrent_linked_list()
var c_tree = collections.new_concurrent_bst()
|
Basic Thread-Safe Operations
Concurrent Stack
| Concurrent Stack |
|---|
| var shared_stack = collections.new_concurrent_stack()
// Thread-safe operations
function producer() {
for i in range(100) {
shared_stack.push(i)
fmt.Println("Pushed:", i)
}
}
function consumer() {
for i in range(50) {
if !shared_stack.is_empty() {
var item = shared_stack.pop()
fmt.Println("Popped:", item)
}
}
}
// Start concurrent operations
do producer()
do consumer()
await_all() // Wait for both to complete
|
Concurrent Queue
| Concurrent Queue |
|---|
| var work_queue = collections.new_concurrent_queue()
function task_producer() {
for i in range(20) {
var task = "task_" + i.to_string()
work_queue.enqueue(task)
fmt.Println("Produced:", task)
sleep(50) // Simulate work
}
}
function task_consumer(worker_id) {
for {
if !work_queue.is_empty() {
var task = work_queue.dequeue()
fmt.Println("Worker", worker_id, "processing:", task)
sleep(100) // Simulate processing
} else {
sleep(10) // Wait for more work
}
}
}
// Start producer and multiple consumers
do task_producer()
do task_consumer(1)
do task_consumer(2)
do task_consumer(3)
|
Concurrent Set
| Concurrent Set |
|---|
| var shared_set = collections.new_concurrent_set()
function add_numbers(start, end) {
for i in range(start, end) {
shared_set.add(i)
fmt.Println("Added:", i)
}
}
function check_numbers(numbers) {
for num in numbers {
if shared_set.contains(num) {
fmt.Println("Found:", num)
}
}
}
// Concurrent additions and lookups
do add_numbers(1, 50)
do add_numbers(50, 100)
do check_numbers([25, 75, 125])
|
Concurrent Map
| Concurrent Map |
|---|
| var cache = collections.new_concurrent_map()
function cache_writer(prefix, count) {
for i in range(count) {
var key = prefix + "_" + i.to_string()
var value = "value_" + i.to_string()
cache.set(key, value)
fmt.Println("Cached:", key)
}
}
function cache_reader(keys) {
for key in keys {
var value = cache.get(key)
if value != null {
fmt.Println("Read:", key, "->", value)
}
}
}
// Concurrent reads and writes
do cache_writer("user", 10)
do cache_writer("session", 10)
do cache_reader(["user_5", "session_3", "nonexistent"])
|
Advanced Concurrent Patterns
Producer-Consumer with Multiple Workers
| Producer-Consumer Pattern |
|---|
| struct Job {
id: int
data: string
priority: int
}
var job_queue = collections.new_concurrent_queue()
var results = collections.new_concurrent_map()
var active_workers = 0
var max_workers = 3
function job_producer() {
for i in range(50) {
var job = Job{
id: i,
data: "process_" + i.to_string(),
priority: i % 3
}
job_queue.enqueue(job)
fmt.Println("Queued job:", job.id)
sleep(20)
}
}
function job_worker(worker_id) {
active_workers = active_workers + 1
fmt.Println("Worker", worker_id, "started")
for {
if !job_queue.is_empty() {
var job = job_queue.dequeue()
fmt.Println("Worker", worker_id, "processing job:", job.id)
// Simulate processing time based on priority
sleep(job.priority * 50 + 100)
// Store result
var result = "completed_" + job.data
results.set(job.id, result)
fmt.Println("Worker", worker_id, "completed job:", job.id)
} else {
sleep(50) // Wait for more work
}
}
}
function result_monitor() {
var processed = 0
for {
var current_size = results.size()
if current_size > processed {
fmt.Println("Total processed jobs:", current_size)
processed = current_size
}
sleep(1000)
}
}
// Start the system
do job_producer()
for i in range(max_workers) {
do job_worker(i + 1)
}
do result_monitor()
|
Thread-Safe Cache with Expiration
| Thread-Safe Cache |
|---|
| struct CacheEntry {
value: any
expiry_time: int
}
struct ExpiringCache {
data: ConcurrentMap
cleanup_interval: int
}
function new_expiring_cache(cleanup_interval) {
var cache = ExpiringCache{
data: collections.new_concurrent_map(),
cleanup_interval: cleanup_interval
}
// Start cleanup routine
do cache.cleanup_routine()
return cache
}
function (ec *ExpiringCache) set(key, value, ttl_seconds) {
var expiry = current_time() + ttl_seconds
var entry = CacheEntry{value: value, expiry_time: expiry}
ec.data.set(key, entry)
}
function (ec *ExpiringCache) get(key) {
var entry = ec.data.get(key)
if entry == null {
return null
}
if current_time() > entry.expiry_time {
ec.data.remove(key) // Expired
return null
}
return entry.value
}
function (ec *ExpiringCache) cleanup_routine() {
for {
sleep(ec.cleanup_interval * 1000)
var current = current_time()
var expired_keys = []
// Find expired entries
var all_keys = ec.data.keys()
for key in all_keys {
var entry = ec.data.get(key)
if entry != null && current > entry.expiry_time {
expired_keys.append(key)
}
}
// Remove expired entries
for key in expired_keys {
ec.data.remove(key)
}
if expired_keys.size() > 0 {
fmt.Println("Cleaned up", expired_keys.size(), "expired entries")
}
}
}
// Usage
var cache = new_expiring_cache(30) // Cleanup every 30 seconds
function cache_user(user_id) {
var key = "user_" + user_id.to_string()
cache.set(key, {"name": "User " + user_id.to_string()}, 60) // 60 second TTL
}
function get_user(user_id) {
var key = "user_" + user_id.to_string()
return cache.get(key)
}
// Concurrent cache operations
do {
for i in range(100) {
cache_user(i)
sleep(10)
}
}
do {
for i in range(50) {
var user = get_user(i)
if user != null {
fmt.Println("Found user:", user.name)
}
sleep(100)
}
}
|
Concurrent Data Processing Pipeline
| Data Processing Pipeline |
|---|
| struct DataItem {
id: int
raw_data: string
processed: bool
}
struct ProcessingPipeline {
input_queue: ConcurrentQueue
processing_queue: ConcurrentQueue
output_queue: ConcurrentQueue
error_queue: ConcurrentQueue
}
function new_processing_pipeline() {
return ProcessingPipeline{
input_queue: collections.new_concurrent_queue(),
processing_queue: collections.new_concurrent_queue(),
output_queue: collections.new_concurrent_queue(),
error_queue: collections.new_concurrent_queue()
}
}
function (pp *ProcessingPipeline) start_pipeline() {
// Stage 1: Input validation
do pp.input_validator()
// Stage 2: Data processing (multiple workers)
for i in range(3) {
do pp.data_processor(i + 1)
}
// Stage 3: Output handler
do pp.output_handler()
// Error handler
do pp.error_handler()
}
function (pp *ProcessingPipeline) input_validator() {
for {
if !pp.input_queue.is_empty() {
var item = pp.input_queue.dequeue()
// Validate input
if item.raw_data != null && item.raw_data.length() > 0 {
pp.processing_queue.enqueue(item)
fmt.Println("Validated item:", item.id)
} else {
pp.error_queue.enqueue(item)
fmt.Println("Invalid item:", item.id)
}
} else {
sleep(10)
}
}
}
function (pp *ProcessingPipeline) data_processor(worker_id) {
for {
if !pp.processing_queue.is_empty() {
var item = pp.processing_queue.dequeue()
fmt.Println("Worker", worker_id, "processing:", item.id)
// Simulate processing
sleep(100 + random_int(0, 100))
// Process data
item.raw_data = item.raw_data.to_upper()
item.processed = true
pp.output_queue.enqueue(item)
fmt.Println("Worker", worker_id, "completed:", item.id)
} else {
sleep(20)
}
}
}
function (pp *ProcessingPipeline) output_handler() {
for {
if !pp.output_queue.is_empty() {
var item = pp.output_queue.dequeue()
fmt.Println("Output:", item.id, "->", item.raw_data)
// Could save to database, send to API, etc.
} else {
sleep(30)
}
}
}
function (pp *ProcessingPipeline) error_handler() {
for {
if !pp.error_queue.is_empty() {
var item = pp.error_queue.dequeue()
fmt.Println("Error processing item:", item.id)
// Could retry, log, alert, etc.
} else {
sleep(100)
}
}
}
function (pp *ProcessingPipeline) add_item(item) {
pp.input_queue.enqueue(item)
}
// Usage
var pipeline = new_processing_pipeline()
pipeline.start_pipeline()
// Add items to process
for i in range(20) {
var item = DataItem{
id: i,
raw_data: "data_" + i.to_string(),
processed: false
}
pipeline.add_item(item)
sleep(50)
}
|
Read-Write Lock Optimization
| Read-Write Lock Optimization |
|---|
| // Concurrent collections use RWMutex for better read performance
var shared_map = collections.new_concurrent_map()
// Multiple readers can access simultaneously
function reader(reader_id) {
for i in range(100) {
var value = shared_map.get("key_" + (i % 10).to_string())
// Multiple readers can run this concurrently
sleep(1)
}
}
// Writers are exclusive
function writer(writer_id) {
for i in range(10) {
shared_map.set("key_" + i.to_string(), "value_" + i.to_string())
// Only one writer at a time
sleep(10)
}
}
// Start multiple readers and one writer
for i in range(5) {
do reader(i + 1)
}
do writer(1)
|
Lock-Free Operations Where Possible
| Lock-Free Operations |
|---|
| // Some operations can be lock-free for better performance
var atomic_counter = collections.new_concurrent_map()
atomic_counter.set("count", 0)
function increment_counter() {
for i in range(1000) {
// Atomic increment operation
var current = atomic_counter.get("count")
atomic_counter.set("count", current + 1)
}
}
// Multiple threads incrementing
for i in range(10) {
do increment_counter()
}
await_all()
fmt.Println("Final count:", atomic_counter.get("count"))
|
| Batch Operations |
|---|
| var batch_set = collections.new_concurrent_set()
// Instead of individual operations
function slow_batch_add(items) {
for item in items {
batch_set.add(item) // Individual lock/unlock for each
}
}
// Use batch operations when available
function fast_batch_add(items) {
batch_set.add_all(items) // Single lock for entire batch
}
var large_batch = range(1000)
do fast_batch_add(large_batch)
|
Error Handling and Debugging
Deadlock Prevention
| Deadlock Prevention |
|---|
| // Always acquire locks in consistent order
var map1 = collections.new_concurrent_map()
var map2 = collections.new_concurrent_map()
function transfer_data(from_map, to_map, key) {
// Acquire locks in consistent order (by memory address or ID)
var first_map = if from_map.id() < to_map.id() { from_map } else { to_map }
var second_map = if from_map.id() < to_map.id() { to_map } else { from_map }
// This prevents circular wait conditions
first_map.lock()
second_map.lock()
var value = from_map.get(key)
if value != null {
to_map.set(key, value)
from_map.remove(key)
}
second_map.unlock()
first_map.unlock()
}
|
Timeout Operations
| Timeout Operations |
|---|
| // Operations with timeouts to prevent indefinite blocking
var timed_queue = collections.new_concurrent_queue()
function try_dequeue_with_timeout(timeout_ms) {
var start_time = current_time_millis()
for {
if !timed_queue.is_empty() {
return timed_queue.dequeue()
}
if current_time_millis() - start_time > timeout_ms {
return null // Timeout
}
sleep(1)
}
}
// Usage
var item = try_dequeue_with_timeout(5000) // 5 second timeout
if item == null {
fmt.Println("Timeout waiting for item")
}
|
Monitoring and Diagnostics
| Monitoring and Diagnostics |
|---|
| struct ConcurrentCollectionStats {
read_count: int
write_count: int
contention_count: int
avg_wait_time: float
}
// Monitor collection performance
function monitor_collection_stats(collection) {
for {
var stats = collection.get_stats()
fmt.Println("Reads:", stats.read_count)
fmt.Println("Writes:", stats.write_count)
fmt.Println("Contention:", stats.contention_count)
fmt.Println("Avg wait:", stats.avg_wait_time, "ms")
fmt.Println("---")
sleep(5000)
}
}
var monitored_map = collections.new_concurrent_map()
do monitor_collection_stats(monitored_map)
|
Best Practices
1. Use Concurrent Collections Only When Needed
| Use Only When Needed |
|---|
| // Don't use concurrent collections for single-threaded code
function single_threaded_function() {
var local_map = collections.new_map() // Regular map is fine
// ... single-threaded operations
}
// Use concurrent collections for shared data
var shared_cache = collections.new_concurrent_map() // Multiple threads access this
|
2. Minimize Lock Scope
| Minimize Lock Scope |
|---|
| // Bad: holding lock too long
function bad_example(concurrent_map) {
concurrent_map.lock()
var value = concurrent_map.get("key")
// Long computation while holding lock
var result = expensive_computation(value)
concurrent_map.set("result", result)
concurrent_map.unlock()
}
// Good: minimize lock time
function good_example(concurrent_map) {
var value = concurrent_map.get("key") // Short lock
// Computation outside of lock
var result = expensive_computation(value)
concurrent_map.set("result", result) // Short lock
}
|
3. Use Batch Operations
| Use Batch Operations |
|---|
| // Instead of multiple individual operations
for item in large_list {
concurrent_set.add(item) // Many lock/unlock cycles
}
// Use batch operations
concurrent_set.add_all(large_list) // Single lock/unlock
|
4. Handle Contention Gracefully
| Handle Contention |
|---|
| function handle_high_contention(concurrent_collection) {
var max_retries = 3
var retry_count = 0
for retry_count < max_retries {
if concurrent_collection.try_operation() {
return // Success
}
retry_count = retry_count + 1
sleep(retry_count * 10) // Exponential backoff
}
fmt.Println("Operation failed after retries")
}
|
| Monitor Performance |
|---|
| // Track performance metrics
var start_time = current_time_millis()
concurrent_map.set("key", "value")
var end_time = current_time_millis()
if end_time - start_time > 100 {
fmt.Println("High latency detected:", end_time - start_time, "ms")
}
|
Migration from Non-Concurrent Collections
Simple Replacement
| Simple Replacement |
|---|
| // Old code
var shared_data = collections.new_map()
// New code - just change constructor
var shared_data = collections.new_concurrent_map()
// All other operations remain the same
|
Adding Proper Synchronization
| Proper Synchronization |
|---|
| // Old code with manual locking
var data = collections.new_map()
var mutex = sync.new_mutex()
function update_data(key, value) {
mutex.lock()
data.set(key, value)
mutex.unlock()
}
// New code - built-in synchronization
var data = collections.new_concurrent_map()
function update_data(key, value) {
data.set(key, value) // Automatically thread-safe
}
|
Concurrent collections provide robust, thread-safe data structures that maintain performance while ensuring data integrity in multi-threaded applications. They handle the complexity of synchronization internally, allowing you to focus on your application logic rather than low-level threading concerns.