docker-maven-plugin 源码解析:深入理解插件架构与实现原理
2026/5/11 3:56:32
数据库连接池是后端开发中至关重要的组件,它通过复用数据库连接来提高应用性能和资源利用率。
本文将深入探讨Python中数据库连接池的原理、实现方式和最佳实践。
# 不使用连接池的问题 def query_user(user_id): # 每次请求都建立新连接 conn = create_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) result = cursor.fetchone() conn.close() # 关闭连接 return result # 问题: # 1. 连接建立/关闭开销大 # 2. 无法控制并发连接数 # 3. 可能耗尽数据库连接# 连接池结构 class ConnectionPool: def __init__(self, max_size=10): self.pool = [] self.max_size = max_size self.lock = threading.Lock() def get_connection(self): """获取连接""" with self.lock: if self.pool: return self.pool.pop() if len(self.pool) < self.max_size: return self._create_connection() # 等待可用连接 return None def release_connection(self, conn): """释放连接回池""" with self.lock: if len(self.pool) < self.max_size: self.pool.append(conn) def _create_connection(self): """创建新连接""" return psycopg2.connect(DB_URL)from sqlalchemy import create_engine # 创建连接池 engine = create_engine( "postgresql://user:password@localhost/db", pool_size=20, # 连接池大小 max_overflow=10, # 最大溢出连接数 pool_timeout=30, # 获取连接超时时间 pool_recycle=3600, # 连接回收时间 echo=True # 打印SQL语句 ) # 使用连接 with engine.connect() as conn: result = conn.execute("SELECT * FROM users") print(result.fetchall())# 参数说明 engine = create_engine( "mysql+pymysql://user:password@localhost/db", # 核心参数 pool_size=10, # 连接池维护的最小连接数 max_overflow=20, # 超出pool_size的临时连接数 pool_timeout=10, # 获取连接的等待超时(秒) pool_recycle=1800, # 连接自动回收时间(秒) pool_pre_ping=True, # 获取连接前检查连接可用性 # 连接参数 connect_args={ "connect_timeout": 5, "read_timeout": 30, } )import threading import queue import time class ThreadSafeConnectionPool: def __init__(self, max_size=10, idle_timeout=300): self.max_size = max_size self.idle_timeout = idle_timeout self.pool = queue.Queue(maxsize=max_size) self.connection_count = 0 self.lock = threading.Lock() self._start_cleanup_thread() def _create_connection(self): """创建新数据库连接""" import psycopg2 return psycopg2.connect("dbname=test user=postgres") def _start_cleanup_thread(self): """启动空闲连接清理线程""" def cleanup(): while True: time.sleep(60) self._cleanup_idle_connections() thread = threading.Thread(target=cleanup, daemon=True) thread.start() def _cleanup_idle_connections(self): """清理超时的空闲连接""" current_time = time.time() # 实现清理逻辑... def get(self): """获取连接""" try: # 先尝试从队列获取 conn = self.pool.get(timeout=1) return conn except queue.Empty: # 创建新连接 with self.lock: if self.connection_count < self.max_size: self.connection_count += 1 return self._create_connection() raise Exception("连接池已满") def put(self, conn): """放回连接""" try: self.pool.put(conn, block=False) except queue.Full: # 池已满,直接关闭连接 conn.close() with self.lock: self.connection_count -= 1# 使用连接池 pool = ThreadSafeConnectionPool(max_size=5) def get_user(user_id): conn = pool.get() try: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) return cursor.fetchone() finally: pool.put(conn) # 并发测试 import threading def query_users(): for i in range(10): get_user(i) threads = [threading.Thread(target=query_users) for _ in range(3)] for t in threads: t.start() for t in threads: t.join()import time class MonitoredConnectionPool(ThreadSafeConnectionPool): def __init__(self, max_size=10): super().__init__(max_size) self.stats = { 'total_requests': 0, 'wait_time': 0, 'connections_created': 0, 'connections_reused': 0, } def get(self): start = time.time() conn = super().get() wait = time.time() - start self.stats['total_requests'] += 1 self.stats['wait_time'] += wait return conn def get_stats(self): avg_wait = self.stats['wait_time'] / self.stats['total_requests'] if self.stats['total_requests'] > 0 else 0 return { 'total_requests': self.stats['total_requests'], 'average_wait_ms': avg_wait * 1000, 'pool_size': self.connection_count, } # 使用监控 pool = MonitoredConnectionPool() # ... 执行操作 ... print(pool.get_stats())# 根据应用特点调整参数 def create_optimized_pool(): # 高并发读场景 if is_read_heavy(): return create_engine( DB_URL, pool_size=30, max_overflow=20, pool_recycle=1800 ) # 写密集场景 if is_write_heavy(): return create_engine( DB_URL, pool_size=10, max_overflow=5, pool_pre_ping=True ) # 默认配置 return create_engine(DB_URL)# 使用context manager管理连接 def safe_query(sql): """安全的数据库查询""" with engine.connect() as conn: with conn.begin(): result = conn.execute(sql) return result.fetchall() # 自定义context manager class DatabaseSession: def __init__(self, pool): self.pool = pool self.conn = None def __enter__(self): self.conn = self.pool.get() return self.conn def __exit__(self, exc_type, exc_val, exc_tb): if self.conn: if exc_type: self.conn.rollback() else: self.conn.commit() self.pool.put(self.conn) # 使用示例 with DatabaseSession(pool) as conn: cursor = conn.cursor() cursor.execute("INSERT INTO logs VALUES ('test')")def validate_connection(conn): """验证连接是否可用""" try: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() return True except Exception: return False class ValidatingPool(ThreadSafeConnectionPool): def get(self): conn = super().get() if not validate_connection(conn): conn.close() return self._create_connection() return conndef robust_query(sql, retries=3): """带重试的查询""" for attempt in range(retries): try: with engine.connect() as conn: return conn.execute(sql).fetchall() except Exception as e: if attempt < retries - 1: time.sleep(1) continue raise e # 使用 result = robust_query("SELECT * FROM users")数据库连接池的关键要点:
在实际项目中,建议:
思考:在你的项目中,连接池遇到过哪些挑战?欢迎分享!