并发编程 Concurrent Programming
并发编程 Concurrent Programming
并发编程是系统设计中的重要主题,涉及多线程、同步、死锁预防等多个方面。本指南涵盖并发编程的核心概念和实践。
基础概念 Basic Concepts
1. 进程与线程 Process and Thread
- 进程:独立的执行单元,拥有独立的内存空间
- 线程:进程内的执行单元,共享进程的内存空间
- 协程:用户态的轻量级线程
2. 同步与互斥 Synchronization and Mutual Exclusion
- 临界区:需要互斥访问的共享资源
- 死锁:多个线程互相等待对方持有的资源
- 饥饿:线程长期无法获得所需资源
同步原语 Synchronization Primitives
1. 互斥锁 Mutex
from threading import Lock
class SafeCounter:
def __init__(self):
self._counter = 0
self._lock = Lock()
def increment(self):
with self._lock:
self._counter += 1
def get_value(self):
with self._lock:
return self._counter
2. 信号量 Semaphore
from threading import Semaphore
import time
class BoundedQueue:
def __init__(self, capacity):
self.capacity = capacity
self.queue = []
self.producer_sem = Semaphore(capacity)
self.consumer_sem = Semaphore(0)
self.lock = Lock()
def produce(self, item):
self.producer_sem.acquire()
with self.lock:
self.queue.append(item)
self.consumer_sem.release()
def consume(self):
self.consumer_sem.acquire()
with self.lock:
item = self.queue.pop(0)
self.producer_sem.release()
return item
3. 条件变量 Condition Variable
from threading import Condition
class Buffer:
def __init__(self, size):
self.buffer = [None] * size
self.size = size
self.count = 0
self.in_pos = 0
self.out_pos = 0
self.condition = Condition()
def put(self, item):
with self.condition:
while self.count == self.size:
self.condition.wait()
self.buffer[self.in_pos] = item
self.in_pos = (self.in_pos + 1) % self.size
self.count += 1
self.condition.notify()
def get(self):
with self.condition:
while self.count == 0:
self.condition.wait()
item = self.buffer[self.out_pos]
self.out_pos = (self.out_pos + 1) % self.size
self.count -= 1
self.condition.notify()
return item
线程池 Thread Pool
1. 基本线程池
from concurrent.futures import ThreadPoolExecutor
import threading
class SimpleThreadPool:
def __init__(self, max_workers):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.tasks = []
def submit(self, fn, *args, **kwargs):
future = self.executor.submit(fn, *args, **kwargs)
self.tasks.append(future)
return future
def wait_all(self):
for future in self.tasks:
future.result()
self.tasks.clear()
2. 任务队列 Task Queue
from queue import Queue
from threading import Thread
class WorkerThread(Thread):
def __init__(self, task_queue):
super().__init__()
self.task_queue = task_queue
self.daemon = True
def run(self):
while True:
task, args, kwargs = self.task_queue.get()
try:
task(*args, **kwargs)
finally:
self.task_queue.task_done()
class TaskQueue:
def __init__(self, num_workers):
self.tasks = Queue()
self.workers = []
for _ in range(num_workers):
worker = WorkerThread(self.tasks)
worker.start()
self.workers.append(worker)
def add_task(self, task, *args, **kwargs):
self.tasks.put((task, args, kwargs))
def wait_completion(self):
self.tasks.join()
并发模式 Concurrent Patterns
1. 生产者-消费者模式
from queue import Queue
from threading import Thread
import time
class ProducerConsumer:
def __init__(self, buffer_size):
self.queue = Queue(buffer_size)
def producer(self, items):
for item in items:
self.queue.put(item)
print(f"Produced: {item}")
time.sleep(0.1)
def consumer(self):
while True:
item = self.queue.get()
if item is None:
break
print(f"Consumed: {item}")
time.sleep(0.2)
self.queue.task_done()
def run(self, items):
producer = Thread(target=self.producer, args=(items,))
consumer = Thread(target=self.consumer)
consumer.start()
producer.start()
producer.join()
self.queue.put(None)
consumer.join()
2. 读写锁模式
from threading import Lock, Thread
class ReadWriteLock:
def __init__(self):
self.read_lock = Lock()
self.write_lock = Lock()
self.read_count = 0
def acquire_read(self):
with self.read_lock:
self.read_count += 1
if self.read_count == 1:
self.write_lock.acquire()
def release_read(self):
with self.read_lock:
self.read_count -= 1
if self.read_count == 0:
self.write_lock.release()
def acquire_write(self):
self.write_lock.acquire()
def release_write(self):
self.write_lock.release()
死锁预防 Deadlock Prevention
1. 死锁检测
from threading import Lock
import threading
class DeadlockDetector:
def __init__(self):
self.locks = {}
self.lock_order = {}
def acquire(self, lock, thread_id):
if thread_id in self.locks:
for held_lock in self.locks[thread_id]:
if id(held_lock) > id(lock):
raise Exception("Potential deadlock detected")
if thread_id not in self.locks:
self.locks[thread_id] = set()
self.locks[thread_id].add(lock)
def release(self, lock, thread_id):
self.locks[thread_id].remove(lock)
if not self.locks[thread_id]:
del self.locks[thread_id]
2. 资源分级分配
class HierarchicalLock:
def __init__(self, level):
self.lock = Lock()
self.level = level
self.current_level = threading.local()
def acquire(self):
if not hasattr(self.current_level, 'value'):
self.current_level.value = self.level
elif self.current_level.value <= self.level:
raise Exception("Lock hierarchy violation")
self.current_level.value = self.level
self.lock.acquire()
def release(self):
self.current_level.value = self.level + 1
self.lock.release()
并发数据结构 Concurrent Data Structures
1. 线程安全队列
from queue import Queue
import threading
class SafeQueue:
def __init__(self):
self.queue = Queue()
self.lock = Lock()
def enqueue(self, item):
with self.lock:
self.queue.put(item)
def dequeue(self):
with self.lock:
if not self.queue.empty():
return self.queue.get()
return None
def size(self):
with self.lock:
return self.queue.qsize()
2. 并发字典
from threading import RLock
class ConcurrentDict:
def __init__(self):
self._dict = {}
self._lock = RLock()
def get(self, key, default=None):
with self._lock:
return self._dict.get(key, default)
def put(self, key, value):
with self._lock:
self._dict[key] = value
def remove(self, key):
with self._lock:
if key in self._dict:
del self._dict[key]
性能优化 Performance Optimization
1. 细粒度锁
class FineGrainedDict:
def __init__(self, num_shards=16):
self.num_shards = num_shards
self.shards = [{} for _ in range(num_shards)]
self.locks = [Lock() for _ in range(num_shards)]
def _get_shard(self, key):
return hash(key) % self.num_shards
def get(self, key):
shard_id = self._get_shard(key)
with self.locks[shard_id]:
return self.shards[shard_id].get(key)
def put(self, key, value):
shard_id = self._get_shard(key)
with self.locks[shard_id]:
self.shards[shard_id][key] = value
2. 无锁数据结构
from threading import Lock
import threading
class LockFreeCounter:
def __init__(self):
self._count = 0
self._lock = Lock()
def increment(self):
while True:
old_count = self._count
if self._cas(old_count, old_count + 1):
break
def _cas(self, old_value, new_value):
with self._lock:
if self._count == old_value:
self._count = new_value
return True
return False
实践建议 Practice Tips
-
锁的使用
- 最小化锁的范围
- 避免嵌套锁
- 使用合适的锁类型
-
线程安全
- 识别共享资源
- 保护临界区
- 避免竞态条件
-
性能考虑
- 合理使用线程池
- 避免过度同步
- 选择适当的并发级别
-
调试技巧
- 使用日志记录
- 死锁检测
- 性能分析
常见问题 Common Issues
-
竞态条件
- 资源竞争
- 数据不一致
- 顺序依赖
-
死锁问题
- 循环等待
- 资源饥饿
- 优先级反转
-
性能问题
- 过度同步
- 线程开销
- 上下文切换
-
可伸缩性
- 负载均衡
- 资源利用
- 并发度控制