使用mysql-router根据sql语句自动路由主库从库
时间: 2025-07-27 13:28:57 AIGC 浏览: 23
<think>我们使用MySQLRouter来实现读写分离,它会根据SQL语句自动路由到主库(写操作)或从库(读操作)。关键点:1.配置两个路由端口:一个用于写操作(主库),一个用于读操作(从库)。2.在应用层,我们需要将写操作(INSERT,UPDATE,DELETE等)发送到写端口,将读操作(SELECT)发送到读端口。3.MySQLRouter本身不解析SQL语句,因此它无法自动根据SQL语句类型路由。所以我们需要在应用层进行区分,或者使用中间件(如ProxySQL)来实现自动路由。但是,MySQLRouter8.0以后支持通过配置实现简单的自动路由,例如:-配置一个读写端口(如6446),当连接建立时,默认路由到主库。如果执行了读操作,并且支持从库读取,则可能会切换到从库(但需要应用设置会话属性)。-或者配置两个端口,一个只写(主库),一个只读(从库)。然而,MySQLRouter的默认行为是:-当配置为读写端口时,初始连接到主库,并且在整个会话期间保持连接到该主库,直到连接断开。它不会在同一个连接中自动将读操作切换到从库。-因此,要实现自动路由,通常需要应用层使用两个不同的连接:一个连接写端口用于写操作,另一个连接读端口用于读操作。如果我们希望MySQLRouter能够根据SQL语句自动切换,我们可以使用“routing_strategy”为“round-robin-with-fallback”并配合元数据缓存,但这样仍然不能在一个连接中自动切换。因此,常见的做法是:1.在应用层区分读写操作,使用两个不同的端口。2.使用连接池,配置两个数据源:一个指向Router的写端口,一个指向Router的读端口。下面,我将给出一个完整的MySQLRouter配置示例,以及如何在应用层(以Python为例)实现读写分离。步骤:1.配置MySQLRouter2.在应用程序中使用不同的端口进行读写操作注意:MySQLRouter也可以配置一个端口同时支持读写,但这样就不能自动将读操作负载均衡到从库,而是所有操作都发送到主库。所以通常我们配置两个端口。配置示例(mysqlrouter.conf):```ini[DEFAULT]logging_folder=/var/log/mysqlrouterruntime_folder=/var/run/mysqlrouterconfig_folder=/etc/mysqlrouterdata_folder=/var/lib/mysqlrouter[logger]level=INFO#写路由(主库)[routing:primary]bind_address=0.0.0.0bind_port=6446destinations=mysql_master:3306routing_strategy=first-available#读路由(从库)[routing:replicas]bind_address=0.0.0.0bind_port=6447destinations=mysql_slave1:3306,mysql_slave2:3306routing_strategy=round-robin```在应用程序中,我们需要根据操作类型选择不同的端口。以下是一个Python示例:```pythonimportmysql.connectorclassDatabase:def__init__(self):self.write_conn=mysql.connector.connect(host='localhost',port=6446,#写端口user='app_user',password='password',database='app_db')self.read_conn=mysql.connector.connect(host='localhost',port=6447,#读端口user='app_user',password='password',database='app_db')defexecute(self,query,params=None,operation='read'):conn=self.read_connifoperation=='read'elseself.write_conncursor=conn.cursor()cursor.execute(query,params)ifoperationin['write','update','delete']:conn.commit()result=cursor.fetchall()ifoperation=='read'elsecursor.rowcountcursor.close()returnresult#使用示例db=Database()#写操作insert_query="INSERTINTOusers(name,email)VALUES(%s,%s)"db.execute(insert_query,('John','[email protected]'),operation='write')#读操作select_query="SELECT*FROMusers"users=db.execute(select_query,operation='read')```但是,如果我们希望中间件能自动识别SQL语句类型并路由,我们可以使用更高级的中间件如ProxySQL。不过,如果我们坚持使用MySQLRouter,并且希望在一个连接中自动路由读操作到从库,这是不支持的。因为MySQLRouter的设计是:一旦连接建立,就会一直使用同一个后端MySQL服务器,直到连接关闭。因此,如果我们需要自动路由,可以考虑以下两种方案:方案一:在应用层进行分离(如上面的代码示例),使用不同的连接。方案二:使用支持SQL解析的中间件,如ProxySQL。下面,我将给出一个ProxySQL的简单配置示例,它可以自动根据SQL语句路由。但是,根据问题要求,我们使用MySQLRouter,因此我们采用应用层分离的方案。如果我们想要更自动化的方式,可以考虑使用ORM框架,并配置读写分离。例如,在Django中,我们可以配置多个数据库路由。总结:MySQLRouter本身不提供基于SQL语句的自动路由(在同一个连接中),需要应用层区分读写操作并连接不同的端口。下面,我将给出一个更完整的应用层示例,包括连接池管理(使用DBUtils)以提高性能。```pythonfromdbutils.pooled_dbimportPooledDBimportmysql.connector#配置连接池write_pool=PooledDB(creator=mysql.connector,host='localhost',port=6446,user='app_user',password='password',database='app_db',pool_size=5)read_pool=PooledDB(creator=mysql.connector,host='localhost',port=6447,user='app_user',password='password',database='app_db',pool_size=10)defexecute_query(query,params=None,operation='read'):ifoperationin['write','update','delete']:pool=write_poolelse:pool=read_poolconn=pool.connection()cursor=conn.cursor()cursor.execute(query,params)ifoperationin['write','update','delete']:conn.commit()result=cursor.rowcountelse:result=cursor.fetchall()cursor.close()conn.close()#实际上,PooledDB的连接close方法会将连接返回到连接池returnresult#示例使用#写操作execute_query("INSERTINTOlogs(message)VALUES(%s)",('errormessage',),'write')#读操作results=execute_query("SELECT*FROMlogs",operation='read')```这样,我们就实现了在应用层根据操作类型自动选择主库或从库,并且使用了连接池管理连接。注意:上面的代码中,我们显式指定了操作类型。为了进一步自动化,我们可以通过解析SQL语句的第一个单词来判断操作类型(但这有风险,因为SQL语句可能包含注释等)。或者,我们可以设计一个装饰器或上下文管理器来标记操作类型。例如,我们可以创建一个装饰器:```pythondefuse_master(original_func):defwrapper(*args,**kwargs):kwargs['operation']='write'returnoriginal_func(*args,**kwargs)returnwrapper@use_masterdefcreate_user(username,email):query="INSERTINTOusers(username,email)VALUES(%s,%s)"execute_query(query,(username,email))```但这样需要为每个函数标记。或者,我们可以创建一个函数,自动判断SQL语句的类型:```pythondefget_operation_type(query):first_word=query.strip().split()[0].lower()iffirst_wordin['select','show']:return'read'eliffirst_wordin['insert','update','delete','create','alter','drop']:return'write'else:return'read'#默认读defsmart_execute(query,params=None):operation=get_operation_type(query)returnexecute_query(query,params,operation)#使用smart_execute("SELECT*FROMusers")#自动使用读池smart_execute("INSERTINTOusers(name)VALUES('Alice')")#自动使用写池```注意:自动判断SQL类型可能不总是准确,例如在存储过程中或者子查询。因此,生产环境中建议显式指定操作类型。最后,我们还需要注意事务处理。在事务中,即使有读操作,也应该使用主库,因为从库可能有延迟。因此,在事务中,我们应该强制使用写连接。我们可以通过上下文管理器来管理事务:```pythonclassTransaction:def__init__(self):self.conn=write_pool.connection()self.conn.start_transaction()defexecute(self,query,params=None):cursor=self.conn.cursor()cursor.execute(query,params)result=cursor.rowcountcursor.close()returnresultdefquery(self,query,params=None):cursor=self.conn.cursor()cursor.execute(query,params)result=cursor.fetchall()cursor.close()returnresultdefcommit(self):self.conn.commit()self.conn.close()defrollback(self):self.conn.rollback()self.conn.close()#使用示例try:t=Transaction()t.execute("UPDATEaccountsSETbalance=balance-%sWHEREid=%s",(100,1))t.execute("UPDATEaccountsSETbalance=balance+%sWHEREid=%s",(100,2))t.commit()except:t.rollback()raise```这样,我们就实现了一个较为完整的读写分离方案。总结步骤:1.配置MySQLRouter的两个端口:一个指向主库(写),一个指向从库(读,负载均衡)。2.在应用程序中,根据操作类型(读/写)选择不同的端口连接。3.使用连接池管理连接。4.对于事务,使用写连接并在整个事务中保持同一个连接。注意:如果主库切换,需要更新MySQLRouter的配置(destinations)并重启或重新加载配置。相关问题:</think>### MySQL Router 根据 SQL 语句自动路由主从库解决方案
MySQL Router 本身不解析 SQL 语句内容,但可以通过多种方式实现基于 SQL 类型的自动路由。以下是完整的实现方案:
#### 方法 1:应用层路由(推荐)
在应用层根据 SQL 类型选择连接端口:
```python
import mysql.connector
class SmartRouterConnection:
def __init__(self, host, user, password, database):
self.host = host
self.user = user
self.password = password
self.database = database
self.write_conn = None
self.read_conn = None
def get_connection(self, sql):
"""根据 SQL 类型选择连接"""
sql_type = self.detect_sql_type(sql)
if sql_type == "WRITE":
if not self.write_conn or not self.write_conn.is_connected():
self.write_conn = mysql.connector.connect(
host=self.host,
port=6446, # 写端口
user=self.user,
password=self.password,
database=self.database
)
return self.write_conn
else:
if not self.read_conn or not self.read_conn.is_connected():
self.read_conn = mysql.connector.connect(
host=self.host,
port=6447, # 读端口
user=self.user,
password=self.password,
database=self.database
)
return self.read_conn
def detect_sql_type(self, sql):
"""检测 SQL 类型"""
sql = sql.strip().upper()
# 写操作检测
write_keywords = ["INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP", "TRUNCATE", "LOCK"]
for keyword in write_keywords:
if sql.startswith(keyword):
return "WRITE"
# 显式主库读取
if "/* FORCE_MASTER */" in sql:
return "WRITE"
# 默认为读操作
return "READ"
def execute(self, sql, params=None):
conn = self.get_connection(sql)
cursor = conn.cursor()
try:
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
# 处理结果
if sql.strip().upper().startswith("SELECT"):
result = cursor.fetchall()
else:
conn.commit()
result = cursor.rowcount
return result
finally:
cursor.close()
# 使用示例
router = SmartRouterConnection(
host="localhost",
user="app_user",
password="AppPass123!",
database="app_db"
)
# 自动路由到写端口(主库)
router.execute("INSERT INTO users (name) VALUES ('John')")
# 自动路由到读端口(从库)
users = router.execute("SELECT * FROM users")
# 强制主库读取
router.execute("SELECT /* FORCE_MASTER */ * FROM orders WHERE id = 123")
```
#### 方法 2:MySQL Router + 自定义路由插件
创建自定义路由插件实现 SQL 解析:
```cpp
// mysqlrouter_custom_plugin.cc
#include <mysqlrouter/routing.h>
#include <mysqlrouter/plugin_config.h>
#include <regex>
class SQLRoutingPlugin : public mysqlrouter::BaseRoutingPlugin {
public:
SQLRoutingPlugin(const mysqlrouter::PluginConfig &config)
: BaseRoutingPlugin(config) {}
mysqlrouter::RoutingResult route(mysqlrouter::RoutingProtocol protocol,
const std::string &data) override {
// 检测 SQL 类型
std::regex write_pattern(R"((INSERT|UPDATE|DELETE|CREATE|ALTER|DROP|TRUNCATE|LOCK))",
std::regex_constants::icase);
if (std::regex_search(data, write_pattern)) {
// 路由到主库
return {"primary", 6446};
} else {
// 路由到从库
return {"replicas", 6447};
}
}
};
// 插件注册
extern "C" mysqlrouter::BasePlugin *init(mysqlrouter::PluginInfo info) {
info.plugin = new SQLRoutingPlugin(info.config);
return info.plugin;
}
```
编译安装插件:
```bash
# 编译插件
g++ -shared -fPIC -o libsql_router.so mysqlrouter_custom_plugin.cc \
-I /path/to/mysql-router/include
# 配置 MySQL Router
[logger]
level = DEBUG
[routing:custom]
plugin = libsql_router.so
bind_address = 0.0.0.0
bind_port = 6448
destinations = metadata-cache://cluster_metadata/default?role=PRIMARY_AND_SECONDARY
```
#### 方法 3:使用 MySQL Router 元数据缓存
配置元数据缓存实现智能路由:
```ini
[metadata_cache:cluster_metadata]
router_id=1
bootstrap_server_addresses=mysql-master:3306
user=mysql_router
metadata_cluster=cluster_metadata
ttl=5
# 读写分离路由
[routing:read_write]
bind_address = 0.0.0.0
bind_port = 6446
destinations = metadata-cache://cluster_metadata/default?role=PRIMARY
routing_strategy = round-robin
[routing:read_only]
bind_address = 0.0.0.0
bind_port = 6447
destinations = metadata-cache://cluster_metadata/default?role=SECONDARY
routing_strategy = round-robin
# 自动路由端口(需要应用指定路由提示)
[routing:auto_route]
bind_address = 0.0.0.0
bind_port = 6448
destinations = metadata-cache://cluster_metadata/default?role=PRIMARY_AND_SECONDARY
routing_strategy = round-robin
```
#### 方法 4:SQL 提示强制路由
在 SQL 中添加特殊注释指定路由:
```sql
-- 强制主库执行
SELECT /*+ PRIMARY_ROUTE */ * FROM orders;
-- 强制从库执行
SELECT /*+ REPLICA_ROUTE */ * FROM users;
```
在 MySQL Router 配置中解析提示:
```ini
[routing:primary]
bind_port = 6446
destinations = mysql-master:3306
routing_strategy = first-available
route_hint = PRIMARY_ROUTE
[routing:replica]
bind_port = 6447
destinations = mysql-slave1:3306,mysql-slave2:3306
routing_strategy = round-robin
route_hint = REPLICA_ROUTE
```
#### 综合测试脚本
```python
import mysql.connector
import threading
def test_auto_routing(router_port):
conn = mysql.connector.connect(
host="localhost",
port=router_port,
user="app_user",
password="AppPass123!",
database="app_db"
)
tests = [
("SELECT 'Read operation'", "READ"),
("INSERT INTO test VALUES (NULL, 'data')", "WRITE"),
("UPDATE test SET data = 'updated'", "WRITE"),
("DELETE FROM test", "WRITE"),
("SELECT /* FORCE_MASTER */ 'Master read'", "MASTER_READ"),
("CREATE TEMPORARY TABLE temp_table (id INT)", "WRITE")
]
for sql, expected in tests:
cursor = conn.cursor()
cursor.execute("SELECT @@hostname AS server")
server = cursor.fetchone()[0]
# 执行测试SQL
try:
cursor.execute(sql)
if sql.startswith("SELECT"):
cursor.fetchone()
except:
pass
print(f"SQL: {sql[:30]}... | Expected: {expected} | Server: {server}")
cursor.close()
# 启动多个客户端测试
threads = []
for i in range(5):
t = threading.Thread(target=test_auto_routing, args=(6448,))
threads.append(t)
t.start()
for t in threads:
t.join()
```
#### 性能优化建议
1. **连接池配置**:
```ini
[connection_pool]
max_idle_server_connections = 50
idle_timeout = 300
```
2. **缓存预处理语句**:
```python
# Python 应用层缓存
class StatementCache:
def __init__(self):
self.cache = {}
def prepare(self, sql):
if sql not in self.cache:
self.cache[sql] = {
'type': self.detect_sql_type(sql),
'statement': None
}
return self.cache[sql]
```
3. **动态负载均衡**:
```ini
[routing:replicas]
routing_strategy = adaptive-load-balance
min_threshold = 10
max_threshold = 100
```
#### 监控与诊断
```sql
-- 查看路由状态
SELECT * FROM performance_schema.routing_status;
-- 分析查询分布
SELECT
CASE
WHEN query LIKE 'SELECT%' THEN 'READ'
WHEN query LIKE 'INSERT%' THEN 'WRITE'
WHEN query LIKE 'UPDATE%' THEN 'WRITE'
ELSE 'OTHER'
END AS query_type,
COUNT(*) AS count,
AVG(query_time) AS avg_time
FROM mysql_query_history
GROUP BY query_type;
```
### 生产环境建议
1. **读写分离策略**:
```ini
[routing:strategy]
# 读操作延迟阈值 (毫秒)
max_replica_lag = 200
# 自动故障转移
failover_strategy = auto-reconnect
```
2. **安全加固**:
```ini
[security]
# 启用 TLS
ssl_mode = REQUIRED
# SQL 注入防护
sql_safe_updates = ON
```
3. **动态配置更新**:
```bash
# 热更新路由配置
mysqlrouter --set-option \
--section=routing:replicas \
--option=destinations \
--value="slave1:3306,slave2:3306,slave3:3306" \
--config /etc/mysqlrouter/mysqlrouter.conf
```
阅读全文
相关推荐



















