并发编程 Concurrent Programming


并发编程 Concurrent Programming

并发编程是系统设计中的重要主题,涉及多线程、同步、死锁预防等多个方面。本指南涵盖并发编程的核心概念和实践。

基础概念 Basic Concepts

1. 进程与线程 Process and Thread

  • 进程:独立的执行单元,拥有独立的内存空间
  • 线程:进程内的执行单元,共享进程的内存空间
  • 协程:用户态的轻量级线程

2. 同步与互斥 Synchronization and Mutual Exclusion

  • 临界区:需要互斥访问的共享资源
  • 死锁:多个线程互相等待对方持有的资源
  • 饥饿:线程长期无法获得所需资源

同步原语 Synchronization Primitives

1. 互斥锁 Mutex

from threading import Lock

class SafeCounter:
    def __init__(self):
        self._counter = 0
        self._lock = Lock()
        
    def increment(self):
        with self._lock:
            self._counter += 1
            
    def get_value(self):
        with self._lock:
            return self._counter

2. 信号量 Semaphore

from threading import Semaphore
import time

class BoundedQueue:
    def __init__(self, capacity):
        self.capacity = capacity
        self.queue = []
        self.producer_sem = Semaphore(capacity)
        self.consumer_sem = Semaphore(0)
        self.lock = Lock()
        
    def produce(self, item):
        self.producer_sem.acquire()
        with self.lock:
            self.queue.append(item)
        self.consumer_sem.release()
        
    def consume(self):
        self.consumer_sem.acquire()
        with self.lock:
            item = self.queue.pop(0)
        self.producer_sem.release()
        return item

3. 条件变量 Condition Variable

from threading import Condition

class Buffer:
    def __init__(self, size):
        self.buffer = [None] * size
        self.size = size
        self.count = 0
        self.in_pos = 0
        self.out_pos = 0
        self.condition = Condition()
        
    def put(self, item):
        with self.condition:
            while self.count == self.size:
                self.condition.wait()
            self.buffer[self.in_pos] = item
            self.in_pos = (self.in_pos + 1) % self.size
            self.count += 1
            self.condition.notify()
            
    def get(self):
        with self.condition:
            while self.count == 0:
                self.condition.wait()
            item = self.buffer[self.out_pos]
            self.out_pos = (self.out_pos + 1) % self.size
            self.count -= 1
            self.condition.notify()
            return item

线程池 Thread Pool

1. 基本线程池

from concurrent.futures import ThreadPoolExecutor
import threading

class SimpleThreadPool:
    def __init__(self, max_workers):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.tasks = []
        
    def submit(self, fn, *args, **kwargs):
        future = self.executor.submit(fn, *args, **kwargs)
        self.tasks.append(future)
        return future
        
    def wait_all(self):
        for future in self.tasks:
            future.result()
        self.tasks.clear()

2. 任务队列 Task Queue

from queue import Queue
from threading import Thread

class WorkerThread(Thread):
    def __init__(self, task_queue):
        super().__init__()
        self.task_queue = task_queue
        self.daemon = True
        
    def run(self):
        while True:
            task, args, kwargs = self.task_queue.get()
            try:
                task(*args, **kwargs)
            finally:
                self.task_queue.task_done()

class TaskQueue:
    def __init__(self, num_workers):
        self.tasks = Queue()
        self.workers = []
        for _ in range(num_workers):
            worker = WorkerThread(self.tasks)
            worker.start()
            self.workers.append(worker)
            
    def add_task(self, task, *args, **kwargs):
        self.tasks.put((task, args, kwargs))
        
    def wait_completion(self):
        self.tasks.join()

并发模式 Concurrent Patterns

1. 生产者-消费者模式

from queue import Queue
from threading import Thread
import time

class ProducerConsumer:
    def __init__(self, buffer_size):
        self.queue = Queue(buffer_size)
        
    def producer(self, items):
        for item in items:
            self.queue.put(item)
            print(f"Produced: {item}")
            time.sleep(0.1)
            
    def consumer(self):
        while True:
            item = self.queue.get()
            if item is None:
                break
            print(f"Consumed: {item}")
            time.sleep(0.2)
            self.queue.task_done()
            
    def run(self, items):
        producer = Thread(target=self.producer, args=(items,))
        consumer = Thread(target=self.consumer)
        consumer.start()
        producer.start()
        producer.join()
        self.queue.put(None)
        consumer.join()

2. 读写锁模式

from threading import Lock, Thread

class ReadWriteLock:
    def __init__(self):
        self.read_lock = Lock()
        self.write_lock = Lock()
        self.read_count = 0
        
    def acquire_read(self):
        with self.read_lock:
            self.read_count += 1
            if self.read_count == 1:
                self.write_lock.acquire()
                
    def release_read(self):
        with self.read_lock:
            self.read_count -= 1
            if self.read_count == 0:
                self.write_lock.release()
                
    def acquire_write(self):
        self.write_lock.acquire()
        
    def release_write(self):
        self.write_lock.release()

死锁预防 Deadlock Prevention

1. 死锁检测

from threading import Lock
import threading

class DeadlockDetector:
    def __init__(self):
        self.locks = {}
        self.lock_order = {}
        
    def acquire(self, lock, thread_id):
        if thread_id in self.locks:
            for held_lock in self.locks[thread_id]:
                if id(held_lock) > id(lock):
                    raise Exception("Potential deadlock detected")
        
        if thread_id not in self.locks:
            self.locks[thread_id] = set()
        self.locks[thread_id].add(lock)
        
    def release(self, lock, thread_id):
        self.locks[thread_id].remove(lock)
        if not self.locks[thread_id]:
            del self.locks[thread_id]

2. 资源分级分配

class HierarchicalLock:
    def __init__(self, level):
        self.lock = Lock()
        self.level = level
        self.current_level = threading.local()
        
    def acquire(self):
        if not hasattr(self.current_level, 'value'):
            self.current_level.value = self.level
        elif self.current_level.value <= self.level:
            raise Exception("Lock hierarchy violation")
        self.current_level.value = self.level
        self.lock.acquire()
        
    def release(self):
        self.current_level.value = self.level + 1
        self.lock.release()

并发数据结构 Concurrent Data Structures

1. 线程安全队列

from queue import Queue
import threading

class SafeQueue:
    def __init__(self):
        self.queue = Queue()
        self.lock = Lock()
        
    def enqueue(self, item):
        with self.lock:
            self.queue.put(item)
            
    def dequeue(self):
        with self.lock:
            if not self.queue.empty():
                return self.queue.get()
            return None
            
    def size(self):
        with self.lock:
            return self.queue.qsize()

2. 并发字典

from threading import RLock

class ConcurrentDict:
    def __init__(self):
        self._dict = {}
        self._lock = RLock()
        
    def get(self, key, default=None):
        with self._lock:
            return self._dict.get(key, default)
            
    def put(self, key, value):
        with self._lock:
            self._dict[key] = value
            
    def remove(self, key):
        with self._lock:
            if key in self._dict:
                del self._dict[key]

性能优化 Performance Optimization

1. 细粒度锁

class FineGrainedDict:
    def __init__(self, num_shards=16):
        self.num_shards = num_shards
        self.shards = [{} for _ in range(num_shards)]
        self.locks = [Lock() for _ in range(num_shards)]
        
    def _get_shard(self, key):
        return hash(key) % self.num_shards
        
    def get(self, key):
        shard_id = self._get_shard(key)
        with self.locks[shard_id]:
            return self.shards[shard_id].get(key)
            
    def put(self, key, value):
        shard_id = self._get_shard(key)
        with self.locks[shard_id]:
            self.shards[shard_id][key] = value

2. 无锁数据结构

from threading import Lock
import threading

class LockFreeCounter:
    def __init__(self):
        self._count = 0
        self._lock = Lock()
        
    def increment(self):
        while True:
            old_count = self._count
            if self._cas(old_count, old_count + 1):
                break
                
    def _cas(self, old_value, new_value):
        with self._lock:
            if self._count == old_value:
                self._count = new_value
                return True
            return False

实践建议 Practice Tips

  1. 锁的使用

    • 最小化锁的范围
    • 避免嵌套锁
    • 使用合适的锁类型
  2. 线程安全

    • 识别共享资源
    • 保护临界区
    • 避免竞态条件
  3. 性能考虑

    • 合理使用线程池
    • 避免过度同步
    • 选择适当的并发级别
  4. 调试技巧

    • 使用日志记录
    • 死锁检测
    • 性能分析

常见问题 Common Issues

  1. 竞态条件

    • 资源竞争
    • 数据不一致
    • 顺序依赖
  2. 死锁问题

    • 循环等待
    • 资源饥饿
    • 优先级反转
  3. 性能问题

    • 过度同步
    • 线程开销
    • 上下文切换
  4. 可伸缩性

    • 负载均衡
    • 资源利用
    • 并发度控制

参考资源 References

  1. Python并发编程
  2. Java并发编程实战
  3. Go语言并发模式