MySQL备份恢复与安全管理
数据是企业的核心资产,确保数据安全性和可恢复性是DBA最重要的职责。今天,我们将深入探讨MySQL的备份恢复策略和安全管理制度,帮助你构建既安全又可靠的数据库环境。
1. 备份策略与实施
逻辑备份:mysqldump实用技巧
基础备份命令:
# 完整数据库备份
mysqldump -u root -p --all-databases --single-transaction --master-data=2 --flush-logs > full_backup_$(date +%Y%m%d).sql
# 单个数据库备份
mysqldump -u root -p --databases company --single-transaction --routines --triggers --events > company_backup_$(date +%Y%m%d).sql
# 单个表备份
mysqldump -u root -p company employees departments --single-transaction --where="salary>5000" > high_salary_employees.sql
# 压缩备份
mysqldump -u root -p --all-databases --single-transaction | gzip > full_backup_$(date +%Y%m%d).sql.gz
高级备份选项:
# 生产环境完整备份脚本
mysqldump -u backup_user -p'secure_password' \
--all-databases \
--single-transaction \
--master-data=2 \
--flush-logs \
--routines \
--triggers \
--events \
--hex-blob \
--complete-insert \
--extended-insert \
--max-allowed-packet=1G \
--set-gtid-purged=ON \
--result-file=/backup/full_backup_$(date +%Y%m%d_%H%M%S).sql
# 分库备份脚本
for DB in $(mysql -u root -p'password' -e "SHOW DATABASES;" | grep -Ev "(Database|information_schema|performance_schema|sys)")
do
mysqldump -u root -p'password' --databases $DB --single-transaction --routines --triggers > /backup/${DB}_backup_$(date +%Y%m%d).sql
done
备份验证脚本:
#!/bin/bash
# backup_verify.sh
BACKUP_FILE=$1
LOG_FILE="/var/log/mysql/backup_verify.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
verify_backup() {
local file=$1
log "开始验证备份文件: $file"
# 检查文件是否存在
if [ ! -f "$file" ]; then
log "错误: 备份文件不存在 - $file"
return 1
fi
# 检查文件大小
local file_size=$(stat -f%z "$file" 2>/dev/null || stat -c%s "$file" 2>/dev/null)
if [ "$file_size" -lt 1024 ]; then
log "错误: 备份文件过小 - $file"
return 1
fi
# 验证SQL文件完整性
if [[ "$file" == *.sql ]]; then
# 检查SQL文件头
if ! head -n 10 "$file" | grep -q "MySQL dump"; then
log "错误: 无效的SQL备份文件 - $file"
return 1
fi
# 检查SQL文件尾
if ! tail -n 5 "$file" | grep -q "Dump completed"; then
log "警告: 备份文件可能不完整 - $file"
fi
fi
# 验证压缩文件
if [[ "$file" == *.gz ]]; then
if ! gzip -t "$file" 2>/dev/null; then
log "错误: 压缩文件损坏 - $file"
return 1
fi
fi
log "备份文件验证通过: $file"
return 0
}
# 执行验证
verify_backup "$BACKUP_FILE"
exit $?
物理备份:XtraBackup实战
完整备份与恢复:
# 安装XtraBackup
# Ubuntu/Debian
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-80
# 完整备份
xtrabackup --backup --user=backup_user --password='secure_password' --target-dir=/backup/full_$(date +%Y%m%d_%H%M%S)
# 准备备份(应用日志)
xtrabackup --prepare --target-dir=/backup/full_20231201_120000
# 恢复备份
systemctl stop mysql
mv /var/lib/mysql /var/lib/mysql_old
xtrabackup --copy-back --target-dir=/backup/full_20231201_120000
chown -R mysql:mysql /var/lib/mysql
systemctl start mysql
增量备份策略:
#!/bin/bash
# incremental_backup.sh
BASE_DIR="/backup"
FULL_BACKUP_DIR="$BASE_DIR/full_$(date +%Y%m%d)"
INCREMENTAL_DIR="$BASE_DIR/inc_$(date +%Y%m%d_%H%M%S)"
BACKUP_USER="backup_user"
BACKUP_PASSWORD="secure_password"
LOG_FILE="/var/log/mysql/xtrabackup.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
# 检查基础备份是否存在
find_base_backup() {
find $BASE_DIR -name "full_*" -type d | sort -r | head -1
}
perform_full_backup() {
log "开始完整备份"
xtrabackup --backup --user=$BACKUP_USER --password=$BACKUP_PASSWORD --target-dir=$FULL_BACKUP_DIR
if [ $? -eq 0 ]; then
log "完整备份完成: $FULL_BACKUP_DIR"
echo $FULL_BACKUP_DIR > $BASE_DIR/latest_full_backup
else
log "完整备份失败"
exit 1
fi
}
perform_incremental_backup() {
local base_dir=$1
log "开始增量备份,基于: $base_dir"
xtrabackup --backup --user=$BACKUP_USER --password=$BACKUP_PASSWORD \
--target-dir=$INCREMENTAL_DIR \
--incremental-basedir=$base_dir
if [ $? -eq 0 ]; then
log "增量备份完成: $INCREMENTAL_DIR"
else
log "增量备份失败"
exit 1
fi
}
# 主逻辑
BASE_BACKUP=$(find_base_backup)
if [ -z "$BASE_BACKUP" ] || [ $(find $BASE_BACKUP -name "xtrabackup_checkpoints" -mtime +7 | wc -l) -gt 0 ]; then
# 没有基础备份或基础备份超过7天,执行完整备份
perform_full_backup
else
# 执行增量备份
perform_incremental_backup $BASE_BACKUP
fi
备份恢复演练:
#!/bin/bash
# disaster_recovery_drill.sh
RECOVERY_DIR="/recovery"
BACKUP_SOURCE="/backup"
MYSQL_DATA_DIR="/var/lib/mysql"
LOG_FILE="/var/log/mysql/recovery_drill.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
prepare_recovery_environment() {
log "准备恢复环境"
# 停止MySQL服务
systemctl stop mysql
# 备份当前数据
mv $MYSQL_DATA_DIR ${MYSQL_DATA_DIR}_backup_$(date +%Y%m%d_%H%M%S)
# 创建恢复目录
mkdir -p $RECOVERY_DIR
}
restore_from_backup() {
local backup_dir=$1
log "从备份恢复: $backup_dir"
# 准备备份
xtrabackup --prepare --apply-log-only --target-dir=$backup_dir
# 恢复备份
xtrabackup --copy-back --target-dir=$backup_dir
# 设置权限
chown -R mysql:mysql $MYSQL_DATA_DIR
}
verify_recovery() {
log "验证恢复结果"
# 启动MySQL
systemctl start mysql
# 等待服务启动
sleep 30
# 基础验证
if mysql -u root -p'password' -e "SELECT 1;" > /dev/null 2>&1; then
log "MySQL服务启动成功"
# 验证关键表
local table_count=$(mysql -u root -p'password' -N -e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema NOT IN ('mysql','information_schema','performance_schema','sys');")
log "发现 $table_count 个用户表"
# 验证数据完整性
mysql -u root -p'password' -e "CHECK TABLE company.employees EXTENDED;" >> $LOG_FILE
return 0
else
log "MySQL服务启动失败"
return 1
fi
}
# 执行恢复演练
prepare_recovery_environment
# 查找最新的完整备份
LATEST_FULL_BACKUP=$(find $BACKUP_SOURCE -name "full_*" -type d | sort -r | head -1)
if [ -n "$LATEST_FULL_BACKUP" ]; then
restore_from_backup $LATEST_FULL_BACKUP
verify_recovery
else
log "错误: 未找到完整备份"
exit 1
fi
增量备份与差异备份
二进制日志备份:
-- 启用二进制日志
-- 在my.cnf中配置
/*
[mysqld]
log_bin = /var/lib/mysql/mysql-bin
expire_logs_days = 7
max_binlog_size = 100M
*/
-- 查看二进制日志状态
SHOW BINARY LOGS;
/*
+------------------+-----------+
| Log_name | File_size |
+------------------+-----------+
| mysql-bin.000001 | 194 |
| mysql-bin.000002 | 456 |
| mysql-bin.000003 | 123 |
+------------------+-----------+
*/
-- 刷新日志(创建新的二进制日志文件)
FLUSH BINARY LOGS;
-- 查看当前正在使用的二进制日志
SHOW MASTER STATUS;
自动化二进制日志备份:
#!/bin/bash
# binlog_backup.sh
MYSQL_USER="backup_user"
MYSQL_PASSWORD="secure_password"
BACKUP_DIR="/backup/binlog"
LOG_FILE="/var/log/mysql/binlog_backup.log"
RETENTION_DAYS=7
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
backup_binlog() {
log "开始二进制日志备份"
# 获取当前二进制日志文件
CURRENT_BINLOG=$(mysql -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SHOW MASTER STATUS" | awk '{print $1}')
# 备份所有未备份的二进制日志
for BINLOG in $(mysql -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SHOW BINARY LOGS" | awk '{print $1}' | grep -v "$CURRENT_BINLOG"); do
if [ ! -f "$BACKUP_DIR/$BINLOG" ]; then
log "备份二进制日志: $BINLOG"
cp /var/lib/mysql/$BINLOG $BACKUP_DIR/
# 验证备份
if cmp /var/lib/mysql/$BINLOG $BACKUP_DIR/$BINLOG; then
log "备份验证成功: $BINLOG"
else
log "备份验证失败: $BINLOG"
fi
fi
done
}
purge_old_backups() {
log "清理过期备份(保留 $RETENTION_DAYS 天)"
find $BACKUP_DIR -name "mysql-bin.*" -mtime +$RETENTION_DAYS -delete
}
# 创建备份目录
mkdir -p $BACKUP_DIR
# 执行备份
backup_binlog
purge_old_backups
log "二进制日志备份完成"
备份压缩与加密
加密备份方案:
#!/bin/bash
# encrypted_backup.sh
BACKUP_DIR="/backup/encrypted"
MYSQL_USER="backup_user"
MYSQL_PASSWORD="secure_password"
ENCRYPTION_KEY="/etc/mysql/backup.key"
LOG_FILE="/var/log/mysql/encrypted_backup.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
generate_encryption_key() {
if [ ! -f "$ENCRYPTION_KEY" ]; then
log "生成加密密钥"
openssl rand -base64 32 > $ENCRYPTION_KEY
chmod 600 $ENCRYPTION_KEY
fi
}
create_encrypted_backup() {
local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).xb.enc"
log "创建加密备份: $backup_file"
# 使用XtraBackup创建备份并立即加密
xtrabackup --backup --user=$MYSQL_USER --password=$MYSQL_PASSWORD --stream=xbstream | \
openssl enc -aes-256-cbc -salt -pass file:$ENCRYPTION_KEY -out $backup_file
if [ $? -eq 0 ]; then
log "加密备份创建成功: $backup_file"
else
log "加密备份创建失败"
exit 1
fi
}
verify_encrypted_backup() {
local backup_file=$1
log "验证加密备份: $backup_file"
# 尝试解密备份头信息
if openssl enc -aes-256-cbc -d -pass file:$ENCRYPTION_KEY -in $backup_file | head -c 100 | strings | grep -q "MySQL"; then
log "加密备份验证成功"
return 0
else
log "加密备份验证失败"
return 1
fi
}
# 主逻辑
generate_encryption_key
mkdir -p $BACKUP_DIR
create_encrypted_backup
# 验证最新的备份
LATEST_BACKUP=$(ls -t $BACKUP_DIR/*.enc | head -1)
if [ -n "$LATEST_BACKUP" ]; then
verify_encrypted_backup $LATEST_BACKUP
fi
压缩备份优化:
#!/bin/bash
# compressed_backup.sh
BACKUP_DIR="/backup/compressed"
MYSQL_USER="backup_user"
MYSQL_PASSWORD="secure_password"
COMPRESSION_LEVEL=6 # 1-9,数字越大压缩率越高但速度越慢
LOG_FILE="/var/log/mysql/compressed_backup.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
create_compressed_backup() {
local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz"
log "创建压缩备份 (级别: $COMPRESSION_LEVEL)"
# 使用mysqldump和gzip创建压缩备份
mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction --routines --triggers --events | \
gzip -$COMPRESSION_LEVEL > $backup_file
local backup_size=$(du -h $backup_file | cut -f1)
log "压缩备份完成: $backup_file (大小: $backup_size)"
}
create_parallel_compressed_backup() {
local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz"
log "创建并行压缩备份"
# 使用pigz进行并行压缩(如果可用)
if command -v pigz >/dev/null 2>&1; then
mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction | \
pigz -p 4 -$COMPRESSION_LEVEL > $backup_file
else
mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction | \
gzip -$COMPRESSION_LEVEL > $backup_file
fi
local backup_size=$(du -h $backup_file | cut -f1)
log "并行压缩备份完成: $backup_file (大小: $backup_size)"
}
# 创建备份目录
mkdir -p $BACKUP_DIR
# 根据系统资源选择备份方式
if [ $(nproc) -gt 2 ]; then
create_parallel_compressed_backup
else
create_compressed_backup
fi
云环境备份方案
AWS S3备份方案:
#!/bin/bash
# s3_backup.sh
BACKUP_DIR="/backup/s3_upload"
S3_BUCKET="my-company-mysql-backups"
S3_PATH="mysql/$(hostname)"
MYSQL_USER="backup_user"
MYSQL_PASSWORD="secure_password"
RETENTION_DAYS=30
LOG_FILE="/var/log/mysql/s3_backup.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
create_backup() {
local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz"
log "创建备份文件: $backup_file"
mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction --routines --triggers --events | \
gzip > $backup_file
echo $backup_file
}
upload_to_s3() {
local backup_file=$1
local s3_key="$S3_PATH/$(basename $backup_file)"
log "上传到S3: s3://$S3_BUCKET/$s3_key"
if aws s3 cp $backup_file s3://$S3_BUCKET/$s3_key; then
log "S3上传成功"
return 0
else
log "S3上传失败"
return 1
fi
}
cleanup_old_backups() {
log "清理本地过期备份"
find $BACKUP_DIR -name "*.sql.gz" -mtime +7 -delete
log "清理S3过期备份"
aws s3 ls s3://$S3_BUCKET/$S3_PATH/ | while read line; do
create_date=$(echo $line | awk '{print $1" "$2}')
create_date_epoch=$(date -d "$create_date" +%s)
retention_epoch=$(date -d "$RETENTION_DAYS days ago" +%s)
if [ $create_date_epoch -lt $retention_epoch ]; then
file_name=$(echo $line | awk '{print $4}')
aws s3 rm s3://$S3_BUCKET/$S3_PATH/$file_name
log "删除过期S3备份: $file_name"
fi
done
}
verify_s3_backup() {
local backup_file=$1
local s3_key="$S3_PATH/$(basename $backup_file)"
log "验证S3备份完整性"
# 下载备份文件
local temp_file="/tmp/verify_$(basename $backup_file)"
aws s3 cp s3://$S3_BUCKET/$s3_key $temp_file
# 比较本地和S3的文件
if cmp $backup_file $temp_file; then
log "S3备份验证成功"
rm $temp_file
return 0
else
log "S3备份验证失败"
rm $temp_file
return 1
fi
}
# 主逻辑
mkdir -p $BACKUP_DIR
BACKUP_FILE=$(create_backup)
if [ -n "$BACKUP_FILE" ]; then
if upload_to_s3 $BACKUP_FILE; then
verify_s3_backup $BACKUP_FILE
fi
fi
cleanup_old_backups
2. 数据恢复与灾难恢复
基于时间点的恢复(PITR)
PITR恢复流程:
#!/bin/bash
# pitr_recovery.sh
RESTORE_TIME="2023-12-01 14:30:00"
BACKUP_DIR="/backup"
BINLOG_DIR="/var/lib/mysql"
RECOVERY_DIR="/recovery"
MYSQL_DATA_DIR="/var/lib/mysql"
LOG_FILE="/var/log/mysql/pitr_recovery.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
find_relevant_backup() {
log "查找适用于时间点 $RESTORE_TIME 的备份"
# 查找在恢复时间之前的最新完整备份
for BACKUP in $(ls -t $BACKUP_DIR/full_* 2>/dev/null); do
local backup_time=$(stat -c %y $BACKUP/xtrabackup_info | cut -d' ' -f1,2 | cut -d'.' -f1)
local backup_epoch=$(date -d "$backup_time" +%s)
local restore_epoch=$(date -d "$RESTORE_TIME" +%s)
if [ $backup_epoch -le $restore_epoch ]; then
echo $BACKUP
return 0
fi
done
log "错误: 未找到合适的完整备份"
exit 1
}
extract_binlog_events() {
local start_time=$1
local stop_time=$2
local output_file=$3
log "提取二进制日志事件: $start_time 到 $stop_time"
# 查找包含时间范围的二进制日志文件
for BINLOG in $(ls -tr $BINLOG_DIR/mysql-bin.* 2>/dev/null | grep -v '.index'); do
local first_event_time=$(mysqlbinlog $BINLOG | grep -m1 "end_log_pos" | awk '{print $1, $2}' | tr -d '#')
local last_event_time=$(mysqlbinlog $BINLOG | tail -10 | grep "end_log_pos" | tail -1 | awk '{print $1, $2}' | tr -d '#')
if [ -n "$first_event_time" ] && [ -n "$last_event_time" ]; then
local first_epoch=$(date -d "$first_event_time" +%s 2>/dev/null || echo 0)
local last_epoch=$(date -d "$last_event_time" +%s 2>/dev/null || echo 0)
local start_epoch=$(date -d "$start_time" +%s)
local stop_epoch=$(date -d "$stop_time" +%s)
if [ $last_epoch -ge $start_epoch ] && [ $first_epoch -le $stop_epoch ]; then
log "处理二进制日志: $BINLOG"
mysqlbinlog --start-datetime="$start_time" --stop-datetime="$stop_time" $BINLOG >> $output_file
fi
fi
done
}
perform_pitr_recovery() {
local base_backup=$1
log "执行时间点恢复"
# 准备恢复环境
systemctl stop mysql
mv $MYSQL_DATA_DIR ${MYSQL_DATA_DIR}_backup_$(date +%Y%m%d_%H%M%S)
# 恢复基础备份
xtrabackup --copy-back --target-dir=$base_backup
chown -R mysql:mysql $MYSQL_DATA_DIR
# 启动MySQL到恢复模式
systemctl start mysql
# 获取备份时间
local backup_time=$(stat -c %y $base_backup/xtrabackup_info | cut -d' ' -f1,2 | cut -d'.' -f1)
# 提取和应用二进制日志
local binlog_events="/tmp/binlog_events.sql"
echo "" > $binlog_events
extract_binlog_events "$backup_time" "$RESTORE_TIME" $binlog_events
# 应用二进制日志事件
if [ -s $binlog_events ]; then
log "应用二进制日志事件"
mysql -u root -p'password' < $binlog_events
else
log "没有需要应用的二进制日志事件"
fi
log "时间点恢复完成"
}
# 主逻辑
BASE_BACKUP=$(find_relevant_backup)
perform_pitr_recovery $BASE_BACKUP
误操作数据恢复方案
Flashback工具使用:
-- 安装mysqlbinlog_flashback工具
-- 使用my2sql或binlog2sql进行闪回
-- 示例:恢复误删除的数据
# 使用binlog2sql解析二进制日志
python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uroot -p'password' -dcompany -temployees --start-file='mysql-bin.000001' --start-pos=4 --stop-pos=1000 -B
-- 输出闪回SQL
/*
INSERT INTO `company`.`employees`(`create_time`, `phone`, `name`, `id`, `email`) VALUES ('2023-01-01 10:00:00', '13800138000', '张三', 1, 'zhangsan@company.com');
INSERT INTO `company`.`employees`(`create_time`, `phone`, `name`, `id`, `email`) VALUES ('2023-01-02 11:00:00', '13900139000', '李四', 2, 'lisi@company.com');
*/
基于备份的误操作恢复:
#!/bin/bash
# point_in_time_restore.sh
DB_NAME="company"
TABLE_NAME="employees"
BACKUP_DIR="/backup"
RESTORE_TIME="2023-12-01 10:00:00" # 误操作之前的时间
LOG_FILE="/var/log/mysql/point_restore.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
create_restore_database() {
local restore_db="${DB_NAME}_restore_$(date +%Y%m%d_%H%M%S)"
log "创建恢复数据库: $restore_db"
mysql -u root -p'password' -e "CREATE DATABASE $restore_db;"
echo $restore_db
}
restore_table_to_point() {
local restore_db=$1
local backup_file=$(find $BACKUP_DIR -name "*${DB_NAME}*" -type f | sort -r | head -1)
if [ -z "$backup_file" ]; then
log "错误: 未找到备份文件"
exit 1
fi
log "从备份恢复表结构"
# 提取表结构
if [[ $backup_file == *.sql.gz ]]; then
gunzip -c $backup_file | sed -n "/^-- Table structure for table \`$TABLE_NAME\`/,/^-- Table structure/p" | \
mysql -u root -p'password' $restore_db
else
sed -n "/^-- Table structure for table \`$TABLE_NAME\`/,/^-- Table structure/p" $backup_file | \
mysql -u root -p'password' $restore_db
fi
# 应用二进制日志到指定时间点
log "应用二进制日志到时间点: $RESTORE_TIME"
local binlog_events="/tmp/binlog_events_$restore_db.sql"
mysqlbinlog --database=$DB_NAME --stop-datetime="$RESTORE_TIME" /var/lib/mysql/mysql-bin.* | \
sed -n "/^### INSERT INTO \`$DB_NAME\`.\`$TABLE_NAME\`/,/^### INSERT INTO/p" | \
sed 's/^### //' > $binlog_events
mysql -u root -p'password' $restore_db < $binlog_events
rm $binlog_events
log "表恢复完成: $restore_db.$TABLE_NAME"
}
compare_and_restore() {
local restore_db=$1
log "比较并恢复数据"
# 生成恢复SQL
local restore_sql="/tmp/restore_data.sql"
cat > $restore_sql << EOF
-- 插入缺失的记录
INSERT INTO $DB_NAME.$TABLE_NAME
SELECT * FROM $restore_db.$TABLE_NAME r
WHERE NOT EXISTS (
SELECT 1 FROM $DB_NAME.$TABLE_NAME c
WHERE c.id = r.id
);
-- 更新被修改的记录
UPDATE $DB_NAME.$TABLE_NAME c
JOIN $restore_db.$TABLE_NAME r ON c.id = r.id
SET
c.name = r.name,
c.email = r.email,
c.phone = r.phone,
c.updated_at = NOW()
WHERE c.name != r.name
OR c.email != r.email
OR c.phone != r.phone;
EOF
mysql -u root -p'password' < $restore_sql
rm $restore_sql
log "数据恢复完成"
}
# 主逻辑
RESTORE_DB=$(create_restore_database)
restore_table_to_point $RESTORE_DB
compare_and_restore $RESTORE_DB
# 清理恢复数据库
mysql -u root -p'password' -e "DROP DATABASE $RESTORE_DB;"
主从切换与数据重建
计划内主从切换:
#!/bin/bash
# planned_failover.sh
CURRENT_MASTER="192.168.1.100"
NEW_MASTER="192.168.1.101"
MYSQL_USER="repl_user"
MYSQL_PASSWORD="repl_password"
LOG_FILE="/var/log/mysql/planned_failover.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
check_replication_health() {
log "检查复制健康状况"
# 检查主库
local master_status=$(mysql -h $CURRENT_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW MASTER STATUS\G")
if [ $? -ne 0 ]; then
log "错误: 无法连接主库 $CURRENT_MASTER"
exit 1
fi
# 检查从库延迟
local slave_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW SLAVE STATUS\G")
local seconds_behind=$(echo "$slave_status" | grep "Seconds_Behind_Master" | awk '{print $2}')
if [ "$seconds_behind" != "0" ]; then
log "警告: 从库有延迟 ($seconds_behind 秒)"
read -p "是否继续? (y/n): " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
exit 1
fi
fi
log "复制健康状况良好"
}
perform_failover() {
log "开始主从切换"
# 1. 设置原主库为只读
log "设置原主库为只读模式"
mysql -h $CURRENT_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SET GLOBAL read_only = ON;"
# 2. 等待从库应用所有日志
log "等待从库应用所有日志"
while true; do
local slave_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW SLAVE STATUS\G")
local seconds_behind=$(echo "$slave_status" | grep "Seconds_Behind_Master" | awk '{print $2}')
local io_running=$(echo "$slave_status" | grep "Slave_IO_Running" | awk '{print $2}')
local sql_running=$(echo "$slave_status" | grep "Slave_SQL_Running" | awk '{print $2}')
if [ "$seconds_behind" = "0" ] && [ "$io_running" = "Yes" ] && [ "$sql_running" = "Yes" ]; then
break
fi
sleep 1
done
# 3. 停止从库复制
log "停止新主库的复制"
mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "STOP SLAVE;"
# 4. 记录新主库的二进制日志位置
local new_master_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW MASTER STATUS\G")
local new_master_file=$(echo "$new_master_status" | grep "File" | awk '{print $2}')
local new_master_position=$(echo "$new_master_status" | grep "Position" | awk '{print $2}')
# 5. 设置新主库为可写
log "设置新主库为可写模式"
mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SET GLOBAL read_only = OFF;"
# 6. 配置其他从库指向新主库
log "重新配置其他从库"
# 这里可以添加其他从库的重新配置逻辑
log "主从切换完成"
log "新主库二进制日志位置: $new_master_file $new_master_position"
}
# 主逻辑
check_replication_health
perform_failover
灾难恢复演练
完整灾难恢复演练:
#!/bin/bash
# disaster_recovery_test.sh
DR_SITE_MYSQL="192.168.2.100"
BACKUP_SERVER="192.168.3.100"
MYSQL_USER="dr_user"
MYSQL_PASSWORD="dr_password"
LOG_FILE="/var/log/mysql/dr_test.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}
verify_dr_environment() {
log "验证灾备环境"
# 检查网络连通性
if ! ping -c 3 $DR_SITE_MYSQL > /dev/null 2>&1; then
log "错误: 无法连接到灾备MySQL服务器"
return 1
fi
# 检查MySQL服务
if ! mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SELECT 1;" > /dev/null 2>&1; then
log "错误: 灾备MySQL服务不可用"
return 1
fi
log "灾备环境验证通过"
return 0
}
restore_to_dr_site() {
log "开始恢复到灾备站点"
# 1. 停止灾备站点MySQL服务
log "停止灾备站点MySQL服务"
ssh root@$DR_SITE_MYSQL "systemctl stop mysql"
# 2. 备份当前数据
log "备份灾备站点当前数据"
ssh root@$DR_SITE_MYSQL "mv /var/lib/mysql /var/lib/mysql_backup_$(date +%Y%m%d_%H%M%S)"
# 3. 从备份服务器获取最新备份
log "获取最新备份"
local latest_backup=$(ssh root@$BACKUP_SERVER "ls -t /backup/full_* | head -1")
if [ -z "$latest_backup" ]; then
log "错误: 未找到备份文件"
return 1
fi
# 4. 传输备份到灾备站点
log "传输备份文件"
scp -r root@$BACKUP_SERVER:$latest_backup /tmp/dr_restore/
# 5. 准备备份
log "准备备份"
ssh root@$DR_SITE_MYSQL "xtrabackup --prepare --target-dir=/tmp/dr_restore/"
# 6. 恢复备份
log "恢复备份"
ssh root@$DR_SITE_MYSQL "xtrabackup --copy-back --target-dir=/tmp/dr_restore/"
# 7. 设置权限并启动服务
log "启动MySQL服务"
ssh root@$DR_SITE_MYSQL "chown -R mysql:mysql /var/lib/mysql && systemctl start mysql"
log "灾备恢复完成"
}
verify_dr_data() {
log "验证灾备数据"
# 检查数据库列表
local db_count=$(mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema NOT IN ('mysql','information_schema','performance_schema','sys');")
if [ "$db_count" -gt 0 ]; then
log "数据验证成功: 发现 $db_count 个用户表"
return 0
else
log "数据验证失败: 未发现用户表"
return 1
fi
}
perform_failover_test() {
log "执行故障切换测试"
# 模拟应用连接灾备数据库
local test_result=$(mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "CREATE DATABASE dr_test; USE dr_test; CREATE TABLE test_table (id INT); INSERT INTO test_table VALUES (1); SELECT * FROM test_table;" 2>&1)
if echo "$test_result" | grep -q "1"; then
log "故障切换测试成功"
# 清理测试数据
mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "DROP DATABASE dr_test;"
return 0
else
log "故障切换测试失败"
return 1
fi
}
# 主逻辑
if verify_dr_environment; then
restore_to_dr_site
if verify_dr_data; then
perform_failover_test
fi
fi
备份恢复监控告警
备份状态监控:
-- 创建备份监控表
CREATE TABLE backup_monitor (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
backup_type ENUM('FULL', 'INCREMENTAL', 'BINLOG') NOT NULL,
backup_file VARCHAR(500) NOT NULL,
backup_size BIGINT,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NULL,
status ENUM('RUNNING', 'COMPLETED', 'FAILED') DEFAULT 'RUNNING',
error_message TEXT,
checksum VARCHAR(64)
);
-- 创建备份告警表
CREATE TABLE backup_alerts (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
alert_type VARCHAR(50) NOT NULL,
alert_message TEXT NOT NULL,
severity ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL') NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
resolved_at TIMESTAMP NULL,
resolved_by VARCHAR(100)
);
-- 备份状态检查存储过程
DELIMITER //
CREATE PROCEDURE CheckBackupStatus()
BEGIN
DECLARE last_full_backup TIMESTAMP;
DECLARE backup_age_hours INT;
DECLARE failed_backups INT;
-- 检查最近完整备份的时间
SELECT MAX(start_time) INTO last_full_backup
FROM backup_monitor
WHERE backup_type = 'FULL' AND status = 'COMPLETED';
SET backup_age_hours = TIMESTAMPDIFF(HOUR, last_full_backup, NOW());
-- 如果超过24小时没有完整备份,发出告警
IF backup_age_hours > 24 THEN
INSERT INTO backup_alerts (alert_type, alert_message, severity)
VALUES ('BACKUP_MISSING',
CONCAT('超过', backup_age_hours, '小时没有完整备份'),
'HIGH');
END IF;
-- 检查失败的备份
SELECT COUNT(*) INTO failed_backups
FROM backup_monitor
WHERE status = 'FAILED' AND start_time > NOW() - INTERVAL 24 HOUR;
IF failed_backups > 0 THEN
INSERT INTO backup_alerts (alert_type, alert_message, severity)
VALUES ('BACKUP_FAILED',
CONCAT('过去24小时有', failed_backups, '个备份失败'),
'HIGH');
END IF;
END //
DELIMITER ;
3. 安全与权限管理
用户权限体系设计
最小权限原则实施:
-- 创建应用用户(遵循最小权限原则)
CREATE USER 'app_readonly'@'192.168.1.%' IDENTIFIED BY 'secure_password_123';
GRANT SELECT ON company.* TO 'app_readonly'@'192.168.1.%';
CREATE USER 'app_readwrite'@'192.168.1.%' IDENTIFIED BY 'secure_password_456';
GRANT SELECT, INSERT, UPDATE, DELETE ON company.* TO 'app_readwrite'@'192.168.1.%';
CREATE USER 'app_report'@'192.168.1.%' IDENTIFIED BY 'secure_password_789';
GRANT SELECT ON company.employees TO 'app_report'@'192.168.1.%';
GRANT SELECT ON company.departments TO 'app_report'@'192.168.1.%';
-- 创建管理用户
CREATE USER 'db_admin'@'localhost' IDENTIFIED BY 'admin_secure_password';
GRANT ALL PRIVILEGES ON *.* TO 'db_admin'@'localhost' WITH GRANT OPTION;
-- 创建备份用户
CREATE USER 'backup_user'@'localhost' IDENTIFIED BY 'backup_secure_password';
GRANT SELECT, RELOAD, PROCESS, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'backup_user'@'localhost';
-- 查看用户权限
SHOW GRANTS FOR 'app_readonly'@'192.168.1.%';
数据库权限审计:
-- 创建权限审计表
CREATE TABLE privilege_audit (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(100) NOT NULL,
host_pattern VARCHAR(100) NOT NULL,
database_name VARCHAR(100),
table_name VARCHAR(100),
privilege_type VARCHAR(50) NOT NULL,
granted_by VARCHAR(100),
granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_revoked BOOLEAN DEFAULT FALSE,
revoked_at TIMESTAMP NULL,
revoked_by VARCHAR(100)
);
-- 权限审计存储过程
DELIMITER //
CREATE PROCEDURE AuditUserPrivileges()
BEGIN
DECLARE done INT DEFAULT 0;
DECLARE v_user, v_host, v_db, v_table, v_privilege VARCHAR(100);
DECLARE cur CURSOR FOR
SELECT User, Host, Db, Table_name, Privilege
FROM information_schema.table_privileges;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
OPEN cur;
read_loop: LOOP
FETCH cur INTO v_user, v_host, v_db, v_table, v_privilege;
IF done THEN
LEAVE read_loop;
END IF;
-- 检查权限是否已经记录
IF NOT EXISTS (
SELECT 1 FROM privilege_audit
WHERE username = v_user
AND host_pattern = v_host
AND database_name = v_db
AND table_name = v_table
AND privilege_type = v_privilege
AND is_revoked = FALSE
) THEN
-- 记录新权限
INSERT INTO privilege_audit (username, host_pattern, database_name, table_name, privilege_type)
VALUES (v_user, v_host, v_db, v_table, v_privilege);
END IF;
END LOOP;
CLOSE cur;
-- 标记已撤销的权限
UPDATE privilege_audit pa
LEFT JOIN information_schema.table_privileges tp
ON pa.username = tp.User
AND pa.host_pattern = tp.Host
AND pa.database_name = tp.Db
AND pa.table_name = tp.Table_name
AND pa.privilege_type = tp.Privilege
SET pa.is_revoked = TRUE,
pa.revoked_at = CURRENT_TIMESTAMP
WHERE pa.is_revoked = FALSE
AND tp.User IS NULL;
END //
DELIMITER ;
角色管理与权限继承
MySQL 8.0角色管理:
-- 创建角色
CREATE ROLE read_only_role;
CREATE ROLE read_write_role;
CREATE ROLE dba_role;
-- 为角色分配权限
GRANT SELECT ON company.* TO read_only_role;
GRANT SELECT, INSERT, UPDATE, DELETE ON company.* TO read_write_role;
GRANT ALL PRIVILEGES ON *.* TO dba_role;
-- 创建用户并分配角色
CREATE USER 'report_user'@'%' IDENTIFIED BY 'report_password';
CREATE USER 'app_user'@'%' IDENTIFIED BY 'app_password';
CREATE USER 'admin_user'@'localhost' IDENTIFIED BY 'admin_password';
-- 分配角色给用户
GRANT read_only_role TO 'report_user'@'%';
GRANT read_write_role TO 'app_user'@'%';
GRANT dba_role TO 'admin_user'@'localhost';
-- 设置默认角色
SET DEFAULT ROLE read_only_role TO 'report_user'@'%';
SET DEFAULT ROLE read_write_role TO 'app_user'@'%';
SET DEFAULT ROLE dba_role TO 'admin_user'@'localhost';
-- 激活角色
SET ROLE ALL;
-- 查看角色权限
SHOW GRANTS FOR 'report_user'@'%' USING read_only_role;
-- 创建层次化角色
CREATE ROLE junior_dba;
CREATE ROLE senior_dba;
GRANT junior_dba TO senior_dba;
GRANT SELECT, INSERT, UPDATE, DELETE ON mysql.* TO junior_dba;
GRANT ALL PRIVILEGES ON *.* TO senior_dba;
动态权限管理:
-- 创建存储过程管理用户权限
DELIMITER //
CREATE PROCEDURE ManageUserAccess(
IN p_username VARCHAR(100),
IN p_host_pattern VARCHAR(100),
IN p_database_name VARCHAR(100),
IN p_action ENUM('GRANT_READ', 'GRANT_WRITE', 'REVOKE_ACCESS')
)
BEGIN
DECLARE user_exists INT;
-- 检查用户是否存在
SELECT COUNT(*) INTO user_exists
FROM mysql.user
WHERE User = p_username AND Host = p_host_pattern;
IF user_exists = 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '用户不存在';
END IF;
CASE p_action
WHEN 'GRANT_READ' THEN
SET @grant_sql = CONCAT('GRANT SELECT ON ', p_database_name, '.* TO ''', p_username, '''@''', p_host_pattern, '''');
PREPARE stmt FROM @grant_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- 记录权限变更
INSERT INTO privilege_audit (username, host_pattern, database_name, privilege_type)
VALUES (p_username, p_host_pattern, p_database_name, 'SELECT');
WHEN 'GRANT_WRITE' THEN
SET @grant_sql = CONCAT('GRANT SELECT, INSERT, UPDATE, DELETE ON ', p_database_name, '.* TO ''', p_username, '''@''', p_host_pattern, '''');
PREPARE stmt FROM @grant_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
INSERT INTO privilege_audit (username, host_pattern, database_name, privilege_type)
VALUES (p_username, p_host_pattern, p_database_name, 'READ_WRITE');
WHEN 'REVOKE_ACCESS' THEN
SET @revoke_sql = CONCAT('REVOKE ALL PRIVILEGES ON ', p_database_name, '.* FROM ''', p_username, '''@''', p_host_pattern, '''');
PREPARE stmt FROM @revoke_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
UPDATE privilege_audit
SET is_revoked = TRUE, revoked_at = NOW()
WHERE username = p_username
AND host_pattern = p_host_pattern
AND database_name = p_database_name
AND is_revoked = FALSE;
END CASE;
END //
DELIMITER ;
数据加密:透明加密与列加密
InnoDB表空间加密:
-- 安装密钥环组件(MySQL 8.0)
INSTALL COMPONENT "file://component_keyring_file";
SET GLOBAL keyring_file_data = '/var/lib/mysql-keyring/keyring';
-- 创建加密表空间
CREATE TABLESPACE encrypted_ts
ADD DATAFILE 'encrypted_ts.ibd'
ENGINE=InnoDB
ENCRYPTION='Y';
-- 在加密表空间中创建表
CREATE TABLE sensitive_data (
id INT PRIMARY KEY,
secret_data VARCHAR(500)
) TABLESPACE encrypted_ts;
-- 加密现有表
ALTER TABLE existing_sensitive_table ENCRYPTION='Y';
-- 查看加密状态
SELECT
TABLE_SCHEMA,
TABLE_NAME,
CREATE_OPTIONS
FROM information_schema.TABLES
WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%';
列级加密:
-- 创建加密函数
DELIMITER //
CREATE FUNCTION aes_encrypt(data TEXT, key_str VARCHAR(255))
RETURNS VARBINARY(500)
DETERMINISTIC
BEGIN
RETURN AES_ENCRYPT(data, key_str);
END //
CREATE FUNCTION aes_decrypt(encrypted_data VARBINARY(500), key_str VARCHAR(255))
RETURNS TEXT
DETERMINISTIC
BEGIN
RETURN AES_DECRYPT(encrypted_data, key_str);
END //
DELIMITER ;
-- 创建存储加密数据的表
CREATE TABLE user_secrets (
user_id INT PRIMARY KEY,
-- 加密存储的敏感数据
ssn VARBINARY(500),
credit_card VARBINARY(500),
medical_info VARBINARY(500),
-- 加密密钥(在实际应用中应该安全存储)
encryption_key VARCHAR(255) DEFAULT 'default_encryption_key'
);
-- 插入加密数据
INSERT INTO user_secrets (user_id, ssn, credit_card)
VALUES (
1,
aes_encrypt('123-45-6789', 'user1_key'),
aes_encrypt('4111111111111111', 'user1_key')
);
-- 查询解密数据
SELECT
user_id,
aes_decrypt(ssn, 'user1_key') as decrypted_ssn,
aes_decrypt(credit_card, 'user1_key') as decrypted_credit_card
FROM user_secrets
WHERE user_id = 1;
密钥管理策略:
-- 创建密钥管理表
CREATE TABLE encryption_keys (
key_id VARCHAR(100) PRIMARY KEY,
key_value VARBINARY(500) NOT NULL,
key_type ENUM('COLUMN', 'TABLE', 'BACKUP') NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by VARCHAR(100),
is_active BOOLEAN DEFAULT TRUE,
rotated_at TIMESTAMP NULL
);
-- 密钥轮换存储过程
DELIMITER //
CREATE PROCEDURE RotateEncryptionKey(
IN p_key_id VARCHAR(100),
IN p_new_key_value VARBINARY(500)
)
BEGIN
DECLARE old_key_value VARBINARY(500);
DECLARE done INT DEFAULT 0;
DECLARE v_user_id INT;
DECLARE v_ssn, v_credit_card VARBINARY(500);
-- 获取旧密钥
SELECT key_value INTO old_key_value
FROM encryption_keys
WHERE key_id = p_key_id AND is_active = TRUE;
IF old_key_value IS NULL THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '未找到活动的密钥';
END IF;
-- 使用游标处理所有需要重新加密的数据
DECLARE cur CURSOR FOR
SELECT user_id, ssn, credit_card
FROM user_secrets;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
OPEN cur;
read_loop: LOOP
FETCH cur INTO v_user_id, v_ssn, v_credit_card;
IF done THEN
LEAVE read_loop;
END IF;
-- 解密并使用新密钥重新加密
UPDATE user_secrets
SET ssn = aes_encrypt(aes_decrypt(v_ssn, old_key_value), p_new_key_value),
credit_card = aes_encrypt(aes_decrypt(v_credit_card, old_key_value), p_new_key_value)
WHERE user_id = v_user_id;
END LOOP;
CLOSE cur;
-- 停用旧密钥,激活新密钥
UPDATE encryption_keys SET is_active = FALSE, rotated_at = NOW() WHERE key_id = p_key_id;
INSERT INTO encryption_keys (key_id, key_value, key_type) VALUES (p_key_id, p_new_key_value, 'COLUMN');
END //
DELIMITER ;
审计日志与安全监控
MySQL企业版审计:
-- 安装审计插件(企业版)
INSTALL PLUGIN audit_log SONAME 'audit_log.so';
-- 配置审计日志(在my.cnf中)
/*
[mysqld]
audit_log_format=JSON
audit_log_file=/var/log/mysql/audit.log
audit_log_policy=ALL
audit_log_rotate_on_size=100000000
audit_log_rotations=5
*/
-- 查看审计日志状态
SHOW VARIABLES LIKE 'audit_log%';
-- 查询审计日志
SELECT
JSON_EXTRACT(audit_record, '$.timestamp') as timestamp,
JSON_EXTRACT(audit_record, '$.class') as event_class,
JSON_EXTRACT(audit_record, '$.event') as event_type,
JSON_EXTRACT(audit_record, '$.connection_id') as connection_id,
JSON_EXTRACT(audit_record, '$.user') as user,
JSON_EXTRACT(audit_record, '$.query') as query
FROM mysql.audit_log
WHERE JSON_EXTRACT(audit_record, '$.query') IS NOT NULL
ORDER BY timestamp DESC
LIMIT 10;
社区版审计方案:
-- 使用通用日志实现基础审计
SET GLOBAL general_log = 1;
SET GLOBAL log_output = 'TABLE';
-- 创建自定义审计表
CREATE TABLE custom_audit_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
event_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_host VARCHAR(200) NOT NULL,
thread_id BIGINT NOT NULL,
server_id INT NOT NULL,
command_type VARCHAR(64) NOT NULL,
argument TEXT NOT NULL,
client_ip VARCHAR(45),
database_name VARCHAR(100),
execution_time DECIMAL(10,6),
rows_affected INT
);
-- 审计触发器示例
DELIMITER //
CREATE TRIGGER audit_user_changes
AFTER INSERT ON mysql.user
FOR EACH ROW
BEGIN
INSERT INTO custom_audit_log (user_host, thread_id, server_id, command_type, argument, client_ip)
VALUES (USER(), CONNECTION_ID(), @@server_id, 'CREATE_USER',
CONCAT('Created user: ', NEW.User, '@', NEW.Host),
SUBSTRING_INDEX(USER(), '@', -1));
END //
CREATE TRIGGER audit_privilege_changes
AFTER INSERT ON mysql.db
FOR EACH ROW
BEGIN
INSERT INTO custom_audit_log (user_host, thread_id, server_id, command_type, argument, database_name)
VALUES (USER(), CONNECTION_ID(), @@server_id, 'GRANT_PRIVILEGE',
CONCAT('Granted privileges on ', NEW.Db, ' to ', NEW.User),
NEW.Db);
END //
DELIMITER ;
安全监控仪表板:
-- 创建安全监控视图
CREATE VIEW security_dashboard AS
SELECT
'Failed Logins' as metric_name,
COUNT(*) as metric_value,
MAX(event_time) as last_occurrence
FROM custom_audit_log
WHERE argument LIKE '%Access denied%'
AND event_time > NOW() - INTERVAL 1 HOUR
UNION ALL
SELECT
'New Users Created' as metric_name,
COUNT(*) as metric_value,
MAX(event_time) as last_occurrence
FROM custom_audit_log
WHERE command_type = 'CREATE_USER'
AND event_time > NOW() - INTERVAL 24 HOUR
UNION ALL
SELECT
'Privilege Changes' as metric_name,
COUNT(*) as metric_value,
MAX(event_time) as last_occurrence
FROM custom_audit_log
WHERE command_type IN ('GRANT_PRIVILEGE', 'REVOKE_PRIVILEGE')
AND event_time > NOW() - INTERVAL 24 HOUR
UNION ALL
SELECT
'Sensitive Data Access' as metric_name,
COUNT(*) as metric_value,
MAX(event_time) as last_occurrence
FROM custom_audit_log
WHERE argument LIKE '%user_secrets%'
AND event_time > NOW() - INTERVAL 1 HOUR;
SQL注入防护与安全开发
预处理语句使用:
-- 不安全的查询(容易SQL注入)
SET @user_input = "1'; DROP TABLE users; --";
SET @sql = CONCAT("SELECT * FROM users WHERE id = '", @user_input, "'");
PREPARE stmt FROM @sql;
EXECUTE stmt;
-- 安全的预处理语句
PREPARE safe_stmt FROM "SELECT * FROM users WHERE id = ?";
SET @user_id = "1";
EXECUTE safe_stmt USING @user_id;
-- 存储过程参数化查询
DELIMITER //
CREATE PROCEDURE GetUserByEmail(IN p_email VARCHAR(255))
BEGIN
-- 直接使用参数,避免拼接
SELECT * FROM users WHERE email = p_email;
END //
DELIMITER ;
输入验证函数:
DELIMITER //
CREATE FUNCTION ValidateEmail(email VARCHAR(255))
RETURNS BOOLEAN
DETERMINISTIC
BEGIN
-- 简单的邮箱格式验证
IF email REGEXP '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,4}$' THEN
RETURN TRUE;
ELSE
RETURN FALSE;
END IF;
END //
CREATE FUNCTION SanitizeInput(input_text TEXT)
RETURNS TEXT
DETERMINISTIC
BEGIN
-- 移除潜在的SQL注入字符
SET input_text = REPLACE(input_text, "'", "''");
SET input_text = REPLACE(input_text, ";", "");
SET input_text = REPLACE(input_text, "--", "");
SET input_text = REPLACE(input_text, "/*", "");
SET input_text = REPLACE(input_text, "*/", "");
RETURN input_text;
END //
DELIMITER ;
安全开发规范检查:
-- 检查存储过程的安全问题
SELECT
ROUTINE_NAME,
ROUTINE_DEFINITION
FROM information_schema.ROUTINES
WHERE ROUTINE_DEFINITION LIKE '%CONCAT(%'
OR ROUTINE_DEFINITION LIKE '%EXECUTE%IMMEDIATE%'
OR ROUTINE_DEFINITION LIKE '%PREPARE%'
OR ROUTINE_DEFINITION LIKE '%sp_executesql%';
-- 查找可能包含动态SQL的代码
SELECT
TABLE_NAME,
COLUMN_NAME
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = 'your_database'
AND (COLUMN_NAME LIKE '%sql%' OR COLUMN_NAME LIKE '%query%')
AND TABLE_NAME NOT LIKE '%audit%';
总结
通过本篇的深入学习,我们掌握了MySQL备份恢复和安全管理的完整体系:
- 备份策略:逻辑备份、物理备份、增量备份的实战应用
- 恢复技术:时间点恢复、误操作恢复、灾难恢复的完整流程
- 安全管理:权限体系、数据加密、审计监控的全面方案
- 安全开发:SQL注入防护、输入验证的安全编码实践
关键安全原则:
- 最小权限:用户只拥有完成工作所需的最小权限
- 纵深防御:多层安全措施,避免单点失效
- 定期审计:持续监控和审查安全状态
- 应急准备:完善的备份和恢复预案
备份恢复最佳实践:
- 3-2-1规则:3个副本,2种介质,1个离线存储
- 定期恢复演练:确保备份可用性
- 监控备份状态:及时发现问题
- 加密敏感数据:保护数据隐私
在下一篇中,我们将探讨MySQL在云原生环境中的应用,包括容器化部署、微服务架构集成等现代技术。
动手练习:
- 设计并实施完整的备份策略,包括完整备份和增量备份
- 执行时间点恢复演练,验证备份的可用性
- 建立权限管理体系,实施最小权限原则
- 配置数据加密和审计日志,增强安全性
- 进行安全代码审查,修复潜在的SQL注入漏洞
欢迎在评论区分享你的备份恢复实践和安全加固经验!