Skip to content

Concurrent Collections

Concurrent collections provide thread-safe versions of all data structures for use in multi-threaded applications. Harneet's concurrent collections use optimized synchronization mechanisms to ensure data integrity while maintaining performance.

Overview

Concurrent collections solve the problem of data races when multiple threads access shared data structures simultaneously. They provide:

  • Thread Safety: All operations are atomic and prevent data corruption
  • Deadlock Prevention: Proper locking hierarchies prevent circular dependencies
  • Performance Optimization: Read-write locks and lock-free operations where possible
  • Consistent API: Same interface as non-concurrent collections

Available Concurrent Collections

Collection Thread-Safe Version Synchronization Method
Stack ConcurrentStack RWMutex with lock-free optimizations
Queue ConcurrentQueue RWMutex with circular buffer
Set ConcurrentSet RWMutex with hash-based storage
Map ConcurrentMap RWMutex with optimized bucket locking
LinkedList ConcurrentLinkedList RWMutex with node-level locking
BinarySearchTree ConcurrentBST RWMutex with tree-level locking

Creating Concurrent Collections

Creating Concurrent Collections
1
2
3
4
5
6
7
8
9
import collections

// Create concurrent versions
var c_stack = collections.new_concurrent_stack()
var c_queue = collections.new_concurrent_queue()
var c_set = collections.new_concurrent_set()
var c_map = collections.new_concurrent_map()
var c_list = collections.new_concurrent_linked_list()
var c_tree = collections.new_concurrent_bst()

Basic Thread-Safe Operations

Concurrent Stack

Concurrent Stack
var shared_stack = collections.new_concurrent_stack()

// Thread-safe operations
function producer() {
    for i in range(100) {
        shared_stack.push(i)
        fmt.Println("Pushed:", i)
    }
}

function consumer() {
    for i in range(50) {
        if !shared_stack.is_empty() {
            var item = shared_stack.pop()
            fmt.Println("Popped:", item)
        }
    }
}

// Start concurrent operations
do producer()
do consumer()
await_all()  // Wait for both to complete

Concurrent Queue

Concurrent Queue
var work_queue = collections.new_concurrent_queue()

function task_producer() {
    for i in range(20) {
        var task = "task_" + i.to_string()
        work_queue.enqueue(task)
        fmt.Println("Produced:", task)
        sleep(50)  // Simulate work
    }
}

function task_consumer(worker_id) {
    for {
        if !work_queue.is_empty() {
            var task = work_queue.dequeue()
            fmt.Println("Worker", worker_id, "processing:", task)
            sleep(100)  // Simulate processing
        } else {
            sleep(10)  // Wait for more work
        }
    }
}

// Start producer and multiple consumers
do task_producer()
do task_consumer(1)
do task_consumer(2)
do task_consumer(3)

Concurrent Set

Concurrent Set
var shared_set = collections.new_concurrent_set()

function add_numbers(start, end) {
    for i in range(start, end) {
        shared_set.add(i)
        fmt.Println("Added:", i)
    }
}

function check_numbers(numbers) {
    for num in numbers {
        if shared_set.contains(num) {
            fmt.Println("Found:", num)
        }
    }
}

// Concurrent additions and lookups
do add_numbers(1, 50)
do add_numbers(50, 100)
do check_numbers([25, 75, 125])

Concurrent Map

Concurrent Map
var cache = collections.new_concurrent_map()

function cache_writer(prefix, count) {
    for i in range(count) {
        var key = prefix + "_" + i.to_string()
        var value = "value_" + i.to_string()
        cache.set(key, value)
        fmt.Println("Cached:", key)
    }
}

function cache_reader(keys) {
    for key in keys {
        var value = cache.get(key)
        if value != null {
            fmt.Println("Read:", key, "->", value)
        }
    }
}

// Concurrent reads and writes
do cache_writer("user", 10)
do cache_writer("session", 10)
do cache_reader(["user_5", "session_3", "nonexistent"])

Advanced Concurrent Patterns

Producer-Consumer with Multiple Workers

Producer-Consumer Pattern
struct Job {
    id: int
    data: string
    priority: int
}

var job_queue = collections.new_concurrent_queue()
var results = collections.new_concurrent_map()
var active_workers = 0
var max_workers = 3

function job_producer() {
    for i in range(50) {
        var job = Job{
            id: i,
            data: "process_" + i.to_string(),
            priority: i % 3
        }
        job_queue.enqueue(job)
        fmt.Println("Queued job:", job.id)
        sleep(20)
    }
}

function job_worker(worker_id) {
    active_workers = active_workers + 1
    fmt.Println("Worker", worker_id, "started")

    for {
        if !job_queue.is_empty() {
            var job = job_queue.dequeue()
            fmt.Println("Worker", worker_id, "processing job:", job.id)

            // Simulate processing time based on priority
            sleep(job.priority * 50 + 100)

            // Store result
            var result = "completed_" + job.data
            results.set(job.id, result)

            fmt.Println("Worker", worker_id, "completed job:", job.id)
        } else {
            sleep(50)  // Wait for more work
        }
    }
}

function result_monitor() {
    var processed = 0
    for {
        var current_size = results.size()
        if current_size > processed {
            fmt.Println("Total processed jobs:", current_size)
            processed = current_size
        }
        sleep(1000)
    }
}

// Start the system
do job_producer()
for i in range(max_workers) {
    do job_worker(i + 1)
}
do result_monitor()

Thread-Safe Cache with Expiration

Thread-Safe Cache
struct CacheEntry {
    value: any
    expiry_time: int
}

struct ExpiringCache {
    data: ConcurrentMap
    cleanup_interval: int
}

function new_expiring_cache(cleanup_interval) {
    var cache = ExpiringCache{
        data: collections.new_concurrent_map(),
        cleanup_interval: cleanup_interval
    }

    // Start cleanup routine
    do cache.cleanup_routine()

    return cache
}

function (ec *ExpiringCache) set(key, value, ttl_seconds) {
    var expiry = current_time() + ttl_seconds
    var entry = CacheEntry{value: value, expiry_time: expiry}
    ec.data.set(key, entry)
}

function (ec *ExpiringCache) get(key) {
    var entry = ec.data.get(key)
    if entry == null {
        return null
    }

    if current_time() > entry.expiry_time {
        ec.data.remove(key)  // Expired
        return null
    }

    return entry.value
}

function (ec *ExpiringCache) cleanup_routine() {
    for {
        sleep(ec.cleanup_interval * 1000)

        var current = current_time()
        var expired_keys = []

        // Find expired entries
        var all_keys = ec.data.keys()
        for key in all_keys {
            var entry = ec.data.get(key)
            if entry != null && current > entry.expiry_time {
                expired_keys.append(key)
            }
        }

        // Remove expired entries
        for key in expired_keys {
            ec.data.remove(key)
        }

        if expired_keys.size() > 0 {
            fmt.Println("Cleaned up", expired_keys.size(), "expired entries")
        }
    }
}

// Usage
var cache = new_expiring_cache(30)  // Cleanup every 30 seconds

function cache_user(user_id) {
    var key = "user_" + user_id.to_string()
    cache.set(key, {"name": "User " + user_id.to_string()}, 60)  // 60 second TTL
}

function get_user(user_id) {
    var key = "user_" + user_id.to_string()
    return cache.get(key)
}

// Concurrent cache operations
do {
    for i in range(100) {
        cache_user(i)
        sleep(10)
    }
}

do {
    for i in range(50) {
        var user = get_user(i)
        if user != null {
            fmt.Println("Found user:", user.name)
        }
        sleep(100)
    }
}

Concurrent Data Processing Pipeline

Data Processing Pipeline
struct DataItem {
    id: int
    raw_data: string
    processed: bool
}

struct ProcessingPipeline {
    input_queue: ConcurrentQueue
    processing_queue: ConcurrentQueue
    output_queue: ConcurrentQueue
    error_queue: ConcurrentQueue
}

function new_processing_pipeline() {
    return ProcessingPipeline{
        input_queue: collections.new_concurrent_queue(),
        processing_queue: collections.new_concurrent_queue(),
        output_queue: collections.new_concurrent_queue(),
        error_queue: collections.new_concurrent_queue()
    }
}

function (pp *ProcessingPipeline) start_pipeline() {
    // Stage 1: Input validation
    do pp.input_validator()

    // Stage 2: Data processing (multiple workers)
    for i in range(3) {
        do pp.data_processor(i + 1)
    }

    // Stage 3: Output handler
    do pp.output_handler()

    // Error handler
    do pp.error_handler()
}

function (pp *ProcessingPipeline) input_validator() {
    for {
        if !pp.input_queue.is_empty() {
            var item = pp.input_queue.dequeue()

            // Validate input
            if item.raw_data != null && item.raw_data.length() > 0 {
                pp.processing_queue.enqueue(item)
                fmt.Println("Validated item:", item.id)
            } else {
                pp.error_queue.enqueue(item)
                fmt.Println("Invalid item:", item.id)
            }
        } else {
            sleep(10)
        }
    }
}

function (pp *ProcessingPipeline) data_processor(worker_id) {
    for {
        if !pp.processing_queue.is_empty() {
            var item = pp.processing_queue.dequeue()

            fmt.Println("Worker", worker_id, "processing:", item.id)

            // Simulate processing
            sleep(100 + random_int(0, 100))

            // Process data
            item.raw_data = item.raw_data.to_upper()
            item.processed = true

            pp.output_queue.enqueue(item)
            fmt.Println("Worker", worker_id, "completed:", item.id)
        } else {
            sleep(20)
        }
    }
}

function (pp *ProcessingPipeline) output_handler() {
    for {
        if !pp.output_queue.is_empty() {
            var item = pp.output_queue.dequeue()
            fmt.Println("Output:", item.id, "->", item.raw_data)

            // Could save to database, send to API, etc.
        } else {
            sleep(30)
        }
    }
}

function (pp *ProcessingPipeline) error_handler() {
    for {
        if !pp.error_queue.is_empty() {
            var item = pp.error_queue.dequeue()
            fmt.Println("Error processing item:", item.id)

            // Could retry, log, alert, etc.
        } else {
            sleep(100)
        }
    }
}

function (pp *ProcessingPipeline) add_item(item) {
    pp.input_queue.enqueue(item)
}

// Usage
var pipeline = new_processing_pipeline()
pipeline.start_pipeline()

// Add items to process
for i in range(20) {
    var item = DataItem{
        id: i,
        raw_data: "data_" + i.to_string(),
        processed: false
    }
    pipeline.add_item(item)
    sleep(50)
}

Performance Considerations

Read-Write Lock Optimization

Read-Write Lock Optimization
// Concurrent collections use RWMutex for better read performance
var shared_map = collections.new_concurrent_map()

// Multiple readers can access simultaneously
function reader(reader_id) {
    for i in range(100) {
        var value = shared_map.get("key_" + (i % 10).to_string())
        // Multiple readers can run this concurrently
        sleep(1)
    }
}

// Writers are exclusive
function writer(writer_id) {
    for i in range(10) {
        shared_map.set("key_" + i.to_string(), "value_" + i.to_string())
        // Only one writer at a time
        sleep(10)
    }
}

// Start multiple readers and one writer
for i in range(5) {
    do reader(i + 1)
}
do writer(1)

Lock-Free Operations Where Possible

Lock-Free Operations
// Some operations can be lock-free for better performance
var atomic_counter = collections.new_concurrent_map()
atomic_counter.set("count", 0)

function increment_counter() {
    for i in range(1000) {
        // Atomic increment operation
        var current = atomic_counter.get("count")
        atomic_counter.set("count", current + 1)
    }
}

// Multiple threads incrementing
for i in range(10) {
    do increment_counter()
}

await_all()
fmt.Println("Final count:", atomic_counter.get("count"))

Batch Operations for Better Performance

Batch Operations
var batch_set = collections.new_concurrent_set()

// Instead of individual operations
function slow_batch_add(items) {
    for item in items {
        batch_set.add(item)  // Individual lock/unlock for each
    }
}

// Use batch operations when available
function fast_batch_add(items) {
    batch_set.add_all(items)  // Single lock for entire batch
}

var large_batch = range(1000)
do fast_batch_add(large_batch)

Error Handling and Debugging

Deadlock Prevention

Deadlock Prevention
// Always acquire locks in consistent order
var map1 = collections.new_concurrent_map()
var map2 = collections.new_concurrent_map()

function transfer_data(from_map, to_map, key) {
    // Acquire locks in consistent order (by memory address or ID)
    var first_map = if from_map.id() < to_map.id() { from_map } else { to_map }
    var second_map = if from_map.id() < to_map.id() { to_map } else { from_map }

    // This prevents circular wait conditions
    first_map.lock()
    second_map.lock()

    var value = from_map.get(key)
    if value != null {
        to_map.set(key, value)
        from_map.remove(key)
    }

    second_map.unlock()
    first_map.unlock()
}

Timeout Operations

Timeout Operations
// Operations with timeouts to prevent indefinite blocking
var timed_queue = collections.new_concurrent_queue()

function try_dequeue_with_timeout(timeout_ms) {
    var start_time = current_time_millis()

    for {
        if !timed_queue.is_empty() {
            return timed_queue.dequeue()
        }

        if current_time_millis() - start_time > timeout_ms {
            return null  // Timeout
        }

        sleep(1)
    }
}

// Usage
var item = try_dequeue_with_timeout(5000)  // 5 second timeout
if item == null {
    fmt.Println("Timeout waiting for item")
}

Monitoring and Diagnostics

Monitoring and Diagnostics
struct ConcurrentCollectionStats {
    read_count: int
    write_count: int
    contention_count: int
    avg_wait_time: float
}

// Monitor collection performance
function monitor_collection_stats(collection) {
    for {
        var stats = collection.get_stats()
        fmt.Println("Reads:", stats.read_count)
        fmt.Println("Writes:", stats.write_count)
        fmt.Println("Contention:", stats.contention_count)
        fmt.Println("Avg wait:", stats.avg_wait_time, "ms")
        fmt.Println("---")
        sleep(5000)
    }
}

var monitored_map = collections.new_concurrent_map()
do monitor_collection_stats(monitored_map)

Best Practices

1. Use Concurrent Collections Only When Needed

Use Only When Needed
1
2
3
4
5
6
7
8
// Don't use concurrent collections for single-threaded code
function single_threaded_function() {
    var local_map = collections.new_map()  // Regular map is fine
    // ... single-threaded operations
}

// Use concurrent collections for shared data
var shared_cache = collections.new_concurrent_map()  // Multiple threads access this

2. Minimize Lock Scope

Minimize Lock Scope
// Bad: holding lock too long
function bad_example(concurrent_map) {
    concurrent_map.lock()
    var value = concurrent_map.get("key")

    // Long computation while holding lock
    var result = expensive_computation(value)

    concurrent_map.set("result", result)
    concurrent_map.unlock()
}

// Good: minimize lock time
function good_example(concurrent_map) {
    var value = concurrent_map.get("key")  // Short lock

    // Computation outside of lock
    var result = expensive_computation(value)

    concurrent_map.set("result", result)   // Short lock
}

3. Use Batch Operations

Use Batch Operations
1
2
3
4
5
6
7
// Instead of multiple individual operations
for item in large_list {
    concurrent_set.add(item)  // Many lock/unlock cycles
}

// Use batch operations
concurrent_set.add_all(large_list)  // Single lock/unlock

4. Handle Contention Gracefully

Handle Contention
function handle_high_contention(concurrent_collection) {
    var max_retries = 3
    var retry_count = 0

    for retry_count < max_retries {
        if concurrent_collection.try_operation() {
            return  // Success
        }

        retry_count = retry_count + 1
        sleep(retry_count * 10)  // Exponential backoff
    }

    fmt.Println("Operation failed after retries")
}

5. Monitor Performance

Monitor Performance
1
2
3
4
5
6
7
8
// Track performance metrics
var start_time = current_time_millis()
concurrent_map.set("key", "value")
var end_time = current_time_millis()

if end_time - start_time > 100 {
    fmt.Println("High latency detected:", end_time - start_time, "ms")
}

Migration from Non-Concurrent Collections

Simple Replacement

Simple Replacement
1
2
3
4
5
6
// Old code
var shared_data = collections.new_map()

// New code - just change constructor
var shared_data = collections.new_concurrent_map()
// All other operations remain the same

Adding Proper Synchronization

Proper Synchronization
// Old code with manual locking
var data = collections.new_map()
var mutex = sync.new_mutex()

function update_data(key, value) {
    mutex.lock()
    data.set(key, value)
    mutex.unlock()
}

// New code - built-in synchronization
var data = collections.new_concurrent_map()

function update_data(key, value) {
    data.set(key, value)  // Automatically thread-safe
}

Concurrent collections provide robust, thread-safe data structures that maintain performance while ensuring data integrity in multi-threaded applications. They handle the complexity of synchronization internally, allowing you to focus on your application logic rather than low-level threading concerns.