忘忧的小站

  • 首页
  • 文章归档
  • 日志
  • 关于页面

  • 搜索
异常处理 AutoWrapper 入门 NoSql 数据库 sqlserver 1 分布式索引 索引 全文搜索 Lucene.Net GPS 音视频 过滤 AOP 时区 升级 ABP.Zero 数据备份 linux 阿里云盘 aliyunpan 面试题 Signalr S 汉字 css html 前端 拼音键盘 在线键盘 uniapp .Net Core XMLRPC Serilog LOKI Nlog 分布式日志 加密 总结 人生 Asp.Net Core Swagger Web Element-plus Quasar 匹配 JavaScript 正则 .Net 后台 架构师 Redis EF CORE MySQL 自考 英语 集群 Jenkins CI/DI 内网穿透 代理 ABP 学习 后端 软考

MySql入门:备份恢复与安全管理

发表于 2025-05-22 | 分类于 数据库 | 0 | 阅读次数 14755

MySQL备份恢复与安全管理

数据是企业的核心资产,确保数据安全性和可恢复性是DBA最重要的职责。今天,我们将深入探讨MySQL的备份恢复策略和安全管理制度,帮助你构建既安全又可靠的数据库环境。

1. 备份策略与实施

逻辑备份:mysqldump实用技巧

基础备份命令:

# 完整数据库备份
mysqldump -u root -p --all-databases --single-transaction --master-data=2 --flush-logs > full_backup_$(date +%Y%m%d).sql

# 单个数据库备份
mysqldump -u root -p --databases company --single-transaction --routines --triggers --events > company_backup_$(date +%Y%m%d).sql

# 单个表备份
mysqldump -u root -p company employees departments --single-transaction --where="salary>5000" > high_salary_employees.sql

# 压缩备份
mysqldump -u root -p --all-databases --single-transaction | gzip > full_backup_$(date +%Y%m%d).sql.gz

高级备份选项:

# 生产环境完整备份脚本
mysqldump -u backup_user -p'secure_password' \
  --all-databases \
  --single-transaction \
  --master-data=2 \
  --flush-logs \
  --routines \
  --triggers \
  --events \
  --hex-blob \
  --complete-insert \
  --extended-insert \
  --max-allowed-packet=1G \
  --set-gtid-purged=ON \
  --result-file=/backup/full_backup_$(date +%Y%m%d_%H%M%S).sql

# 分库备份脚本
for DB in $(mysql -u root -p'password' -e "SHOW DATABASES;" | grep -Ev "(Database|information_schema|performance_schema|sys)")
do
    mysqldump -u root -p'password' --databases $DB --single-transaction --routines --triggers > /backup/${DB}_backup_$(date +%Y%m%d).sql
done

备份验证脚本:

#!/bin/bash
# backup_verify.sh

BACKUP_FILE=$1
LOG_FILE="/var/log/mysql/backup_verify.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

verify_backup() {
    local file=$1
    
    log "开始验证备份文件: $file"
    
    # 检查文件是否存在
    if [ ! -f "$file" ]; then
        log "错误: 备份文件不存在 - $file"
        return 1
    fi
    
    # 检查文件大小
    local file_size=$(stat -f%z "$file" 2>/dev/null || stat -c%s "$file" 2>/dev/null)
    if [ "$file_size" -lt 1024 ]; then
        log "错误: 备份文件过小 - $file"
        return 1
    fi
    
    # 验证SQL文件完整性
    if [[ "$file" == *.sql ]]; then
        # 检查SQL文件头
        if ! head -n 10 "$file" | grep -q "MySQL dump"; then
            log "错误: 无效的SQL备份文件 - $file"
            return 1
        fi
        
        # 检查SQL文件尾
        if ! tail -n 5 "$file" | grep -q "Dump completed"; then
            log "警告: 备份文件可能不完整 - $file"
        fi
    fi
    
    # 验证压缩文件
    if [[ "$file" == *.gz ]]; then
        if ! gzip -t "$file" 2>/dev/null; then
            log "错误: 压缩文件损坏 - $file"
            return 1
        fi
    fi
    
    log "备份文件验证通过: $file"
    return 0
}

# 执行验证
verify_backup "$BACKUP_FILE"
exit $?

物理备份:XtraBackup实战

完整备份与恢复:

# 安装XtraBackup
# Ubuntu/Debian
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-80

# 完整备份
xtrabackup --backup --user=backup_user --password='secure_password' --target-dir=/backup/full_$(date +%Y%m%d_%H%M%S)

# 准备备份(应用日志)
xtrabackup --prepare --target-dir=/backup/full_20231201_120000

# 恢复备份
systemctl stop mysql
mv /var/lib/mysql /var/lib/mysql_old
xtrabackup --copy-back --target-dir=/backup/full_20231201_120000
chown -R mysql:mysql /var/lib/mysql
systemctl start mysql

增量备份策略:

#!/bin/bash
# incremental_backup.sh

BASE_DIR="/backup"
FULL_BACKUP_DIR="$BASE_DIR/full_$(date +%Y%m%d)"
INCREMENTAL_DIR="$BASE_DIR/inc_$(date +%Y%m%d_%H%M%S)"
BACKUP_USER="backup_user"
BACKUP_PASSWORD="secure_password"
LOG_FILE="/var/log/mysql/xtrabackup.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

# 检查基础备份是否存在
find_base_backup() {
    find $BASE_DIR -name "full_*" -type d | sort -r | head -1
}

perform_full_backup() {
    log "开始完整备份"
    xtrabackup --backup --user=$BACKUP_USER --password=$BACKUP_PASSWORD --target-dir=$FULL_BACKUP_DIR
    if [ $? -eq 0 ]; then
        log "完整备份完成: $FULL_BACKUP_DIR"
        echo $FULL_BACKUP_DIR > $BASE_DIR/latest_full_backup
    else
        log "完整备份失败"
        exit 1
    fi
}

perform_incremental_backup() {
    local base_dir=$1
    log "开始增量备份,基于: $base_dir"
    
    xtrabackup --backup --user=$BACKUP_USER --password=$BACKUP_PASSWORD \
        --target-dir=$INCREMENTAL_DIR \
        --incremental-basedir=$base_dir
    
    if [ $? -eq 0 ]; then
        log "增量备份完成: $INCREMENTAL_DIR"
    else
        log "增量备份失败"
        exit 1
    fi
}

# 主逻辑
BASE_BACKUP=$(find_base_backup)

if [ -z "$BASE_BACKUP" ] || [ $(find $BASE_BACKUP -name "xtrabackup_checkpoints" -mtime +7 | wc -l) -gt 0 ]; then
    # 没有基础备份或基础备份超过7天,执行完整备份
    perform_full_backup
else
    # 执行增量备份
    perform_incremental_backup $BASE_BACKUP
fi

备份恢复演练:

#!/bin/bash
# disaster_recovery_drill.sh

RECOVERY_DIR="/recovery"
BACKUP_SOURCE="/backup"
MYSQL_DATA_DIR="/var/lib/mysql"
LOG_FILE="/var/log/mysql/recovery_drill.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

prepare_recovery_environment() {
    log "准备恢复环境"
    
    # 停止MySQL服务
    systemctl stop mysql
    
    # 备份当前数据
    mv $MYSQL_DATA_DIR ${MYSQL_DATA_DIR}_backup_$(date +%Y%m%d_%H%M%S)
    
    # 创建恢复目录
    mkdir -p $RECOVERY_DIR
}

restore_from_backup() {
    local backup_dir=$1
    
    log "从备份恢复: $backup_dir"
    
    # 准备备份
    xtrabackup --prepare --apply-log-only --target-dir=$backup_dir
    
    # 恢复备份
    xtrabackup --copy-back --target-dir=$backup_dir
    
    # 设置权限
    chown -R mysql:mysql $MYSQL_DATA_DIR
}

verify_recovery() {
    log "验证恢复结果"
    
    # 启动MySQL
    systemctl start mysql
    
    # 等待服务启动
    sleep 30
    
    # 基础验证
    if mysql -u root -p'password' -e "SELECT 1;" > /dev/null 2>&1; then
        log "MySQL服务启动成功"
        
        # 验证关键表
        local table_count=$(mysql -u root -p'password' -N -e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema NOT IN ('mysql','information_schema','performance_schema','sys');")
        log "发现 $table_count 个用户表"
        
        # 验证数据完整性
        mysql -u root -p'password' -e "CHECK TABLE company.employees EXTENDED;" >> $LOG_FILE
        
        return 0
    else
        log "MySQL服务启动失败"
        return 1
    fi
}

# 执行恢复演练
prepare_recovery_environment

# 查找最新的完整备份
LATEST_FULL_BACKUP=$(find $BACKUP_SOURCE -name "full_*" -type d | sort -r | head -1)

if [ -n "$LATEST_FULL_BACKUP" ]; then
    restore_from_backup $LATEST_FULL_BACKUP
    verify_recovery
else
    log "错误: 未找到完整备份"
    exit 1
fi

增量备份与差异备份

二进制日志备份:

-- 启用二进制日志
-- 在my.cnf中配置
/*
[mysqld]
log_bin = /var/lib/mysql/mysql-bin
expire_logs_days = 7
max_binlog_size = 100M
*/

-- 查看二进制日志状态
SHOW BINARY LOGS;
/*
+------------------+-----------+
| Log_name         | File_size |
+------------------+-----------+
| mysql-bin.000001 |       194 |
| mysql-bin.000002 |       456 |
| mysql-bin.000003 |       123 |
+------------------+-----------+
*/

-- 刷新日志(创建新的二进制日志文件)
FLUSH BINARY LOGS;

-- 查看当前正在使用的二进制日志
SHOW MASTER STATUS;

自动化二进制日志备份:

#!/bin/bash
# binlog_backup.sh

MYSQL_USER="backup_user"
MYSQL_PASSWORD="secure_password"
BACKUP_DIR="/backup/binlog"
LOG_FILE="/var/log/mysql/binlog_backup.log"
RETENTION_DAYS=7

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

backup_binlog() {
    log "开始二进制日志备份"
    
    # 获取当前二进制日志文件
    CURRENT_BINLOG=$(mysql -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SHOW MASTER STATUS" | awk '{print $1}')
    
    # 备份所有未备份的二进制日志
    for BINLOG in $(mysql -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SHOW BINARY LOGS" | awk '{print $1}' | grep -v "$CURRENT_BINLOG"); do
        if [ ! -f "$BACKUP_DIR/$BINLOG" ]; then
            log "备份二进制日志: $BINLOG"
            cp /var/lib/mysql/$BINLOG $BACKUP_DIR/
            
            # 验证备份
            if cmp /var/lib/mysql/$BINLOG $BACKUP_DIR/$BINLOG; then
                log "备份验证成功: $BINLOG"
            else
                log "备份验证失败: $BINLOG"
            fi
        fi
    done
}

purge_old_backups() {
    log "清理过期备份(保留 $RETENTION_DAYS 天)"
    find $BACKUP_DIR -name "mysql-bin.*" -mtime +$RETENTION_DAYS -delete
}

# 创建备份目录
mkdir -p $BACKUP_DIR

# 执行备份
backup_binlog
purge_old_backups

log "二进制日志备份完成"

备份压缩与加密

加密备份方案:

#!/bin/bash
# encrypted_backup.sh

BACKUP_DIR="/backup/encrypted"
MYSQL_USER="backup_user"
MYSQL_PASSWORD="secure_password"
ENCRYPTION_KEY="/etc/mysql/backup.key"
LOG_FILE="/var/log/mysql/encrypted_backup.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

generate_encryption_key() {
    if [ ! -f "$ENCRYPTION_KEY" ]; then
        log "生成加密密钥"
        openssl rand -base64 32 > $ENCRYPTION_KEY
        chmod 600 $ENCRYPTION_KEY
    fi
}

create_encrypted_backup() {
    local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).xb.enc"
    
    log "创建加密备份: $backup_file"
    
    # 使用XtraBackup创建备份并立即加密
    xtrabackup --backup --user=$MYSQL_USER --password=$MYSQL_PASSWORD --stream=xbstream | \
    openssl enc -aes-256-cbc -salt -pass file:$ENCRYPTION_KEY -out $backup_file
    
    if [ $? -eq 0 ]; then
        log "加密备份创建成功: $backup_file"
    else
        log "加密备份创建失败"
        exit 1
    fi
}

verify_encrypted_backup() {
    local backup_file=$1
    
    log "验证加密备份: $backup_file"
    
    # 尝试解密备份头信息
    if openssl enc -aes-256-cbc -d -pass file:$ENCRYPTION_KEY -in $backup_file | head -c 100 | strings | grep -q "MySQL"; then
        log "加密备份验证成功"
        return 0
    else
        log "加密备份验证失败"
        return 1
    fi
}

# 主逻辑
generate_encryption_key
mkdir -p $BACKUP_DIR
create_encrypted_backup

# 验证最新的备份
LATEST_BACKUP=$(ls -t $BACKUP_DIR/*.enc | head -1)
if [ -n "$LATEST_BACKUP" ]; then
    verify_encrypted_backup $LATEST_BACKUP
fi

压缩备份优化:

#!/bin/bash
# compressed_backup.sh

BACKUP_DIR="/backup/compressed"
MYSQL_USER="backup_user"
MYSQL_PASSWORD="secure_password"
COMPRESSION_LEVEL=6  # 1-9,数字越大压缩率越高但速度越慢
LOG_FILE="/var/log/mysql/compressed_backup.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

create_compressed_backup() {
    local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz"
    
    log "创建压缩备份 (级别: $COMPRESSION_LEVEL)"
    
    # 使用mysqldump和gzip创建压缩备份
    mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction --routines --triggers --events | \
    gzip -$COMPRESSION_LEVEL > $backup_file
    
    local backup_size=$(du -h $backup_file | cut -f1)
    log "压缩备份完成: $backup_file (大小: $backup_size)"
}

create_parallel_compressed_backup() {
    local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz"
    
    log "创建并行压缩备份"
    
    # 使用pigz进行并行压缩(如果可用)
    if command -v pigz >/dev/null 2>&1; then
        mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction | \
        pigz -p 4 -$COMPRESSION_LEVEL > $backup_file
    else
        mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction | \
        gzip -$COMPRESSION_LEVEL > $backup_file
    fi
    
    local backup_size=$(du -h $backup_file | cut -f1)
    log "并行压缩备份完成: $backup_file (大小: $backup_size)"
}

# 创建备份目录
mkdir -p $BACKUP_DIR

# 根据系统资源选择备份方式
if [ $(nproc) -gt 2 ]; then
    create_parallel_compressed_backup
else
    create_compressed_backup
fi

云环境备份方案

AWS S3备份方案:

#!/bin/bash
# s3_backup.sh

BACKUP_DIR="/backup/s3_upload"
S3_BUCKET="my-company-mysql-backups"
S3_PATH="mysql/$(hostname)"
MYSQL_USER="backup_user"
MYSQL_PASSWORD="secure_password"
RETENTION_DAYS=30
LOG_FILE="/var/log/mysql/s3_backup.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

create_backup() {
    local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz"
    
    log "创建备份文件: $backup_file"
    
    mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction --routines --triggers --events | \
    gzip > $backup_file
    
    echo $backup_file
}

upload_to_s3() {
    local backup_file=$1
    local s3_key="$S3_PATH/$(basename $backup_file)"
    
    log "上传到S3: s3://$S3_BUCKET/$s3_key"
    
    if aws s3 cp $backup_file s3://$S3_BUCKET/$s3_key; then
        log "S3上传成功"
        return 0
    else
        log "S3上传失败"
        return 1
    fi
}

cleanup_old_backups() {
    log "清理本地过期备份"
    find $BACKUP_DIR -name "*.sql.gz" -mtime +7 -delete
    
    log "清理S3过期备份"
    aws s3 ls s3://$S3_BUCKET/$S3_PATH/ | while read line; do
        create_date=$(echo $line | awk '{print $1" "$2}')
        create_date_epoch=$(date -d "$create_date" +%s)
        retention_epoch=$(date -d "$RETENTION_DAYS days ago" +%s)
        
        if [ $create_date_epoch -lt $retention_epoch ]; then
            file_name=$(echo $line | awk '{print $4}')
            aws s3 rm s3://$S3_BUCKET/$S3_PATH/$file_name
            log "删除过期S3备份: $file_name"
        fi
    done
}

verify_s3_backup() {
    local backup_file=$1
    local s3_key="$S3_PATH/$(basename $backup_file)"
    
    log "验证S3备份完整性"
    
    # 下载备份文件
    local temp_file="/tmp/verify_$(basename $backup_file)"
    aws s3 cp s3://$S3_BUCKET/$s3_key $temp_file
    
    # 比较本地和S3的文件
    if cmp $backup_file $temp_file; then
        log "S3备份验证成功"
        rm $temp_file
        return 0
    else
        log "S3备份验证失败"
        rm $temp_file
        return 1
    fi
}

# 主逻辑
mkdir -p $BACKUP_DIR

BACKUP_FILE=$(create_backup)
if [ -n "$BACKUP_FILE" ]; then
    if upload_to_s3 $BACKUP_FILE; then
        verify_s3_backup $BACKUP_FILE
    fi
fi

cleanup_old_backups

2. 数据恢复与灾难恢复

基于时间点的恢复(PITR)

PITR恢复流程:

#!/bin/bash
# pitr_recovery.sh

RESTORE_TIME="2023-12-01 14:30:00"
BACKUP_DIR="/backup"
BINLOG_DIR="/var/lib/mysql"
RECOVERY_DIR="/recovery"
MYSQL_DATA_DIR="/var/lib/mysql"
LOG_FILE="/var/log/mysql/pitr_recovery.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

find_relevant_backup() {
    log "查找适用于时间点 $RESTORE_TIME 的备份"
    
    # 查找在恢复时间之前的最新完整备份
    for BACKUP in $(ls -t $BACKUP_DIR/full_* 2>/dev/null); do
        local backup_time=$(stat -c %y $BACKUP/xtrabackup_info | cut -d' ' -f1,2 | cut -d'.' -f1)
        local backup_epoch=$(date -d "$backup_time" +%s)
        local restore_epoch=$(date -d "$RESTORE_TIME" +%s)
        
        if [ $backup_epoch -le $restore_epoch ]; then
            echo $BACKUP
            return 0
        fi
    done
    
    log "错误: 未找到合适的完整备份"
    exit 1
}

extract_binlog_events() {
    local start_time=$1
    local stop_time=$2
    local output_file=$3
    
    log "提取二进制日志事件: $start_time 到 $stop_time"
    
    # 查找包含时间范围的二进制日志文件
    for BINLOG in $(ls -tr $BINLOG_DIR/mysql-bin.* 2>/dev/null | grep -v '.index'); do
        local first_event_time=$(mysqlbinlog $BINLOG | grep -m1 "end_log_pos" | awk '{print $1, $2}' | tr -d '#')
        local last_event_time=$(mysqlbinlog $BINLOG | tail -10 | grep "end_log_pos" | tail -1 | awk '{print $1, $2}' | tr -d '#')
        
        if [ -n "$first_event_time" ] && [ -n "$last_event_time" ]; then
            local first_epoch=$(date -d "$first_event_time" +%s 2>/dev/null || echo 0)
            local last_epoch=$(date -d "$last_event_time" +%s 2>/dev/null || echo 0)
            local start_epoch=$(date -d "$start_time" +%s)
            local stop_epoch=$(date -d "$stop_time" +%s)
            
            if [ $last_epoch -ge $start_epoch ] && [ $first_epoch -le $stop_epoch ]; then
                log "处理二进制日志: $BINLOG"
                mysqlbinlog --start-datetime="$start_time" --stop-datetime="$stop_time" $BINLOG >> $output_file
            fi
        fi
    done
}

perform_pitr_recovery() {
    local base_backup=$1
    
    log "执行时间点恢复"
    
    # 准备恢复环境
    systemctl stop mysql
    mv $MYSQL_DATA_DIR ${MYSQL_DATA_DIR}_backup_$(date +%Y%m%d_%H%M%S)
    
    # 恢复基础备份
    xtrabackup --copy-back --target-dir=$base_backup
    chown -R mysql:mysql $MYSQL_DATA_DIR
    
    # 启动MySQL到恢复模式
    systemctl start mysql
    
    # 获取备份时间
    local backup_time=$(stat -c %y $base_backup/xtrabackup_info | cut -d' ' -f1,2 | cut -d'.' -f1)
    
    # 提取和应用二进制日志
    local binlog_events="/tmp/binlog_events.sql"
    echo "" > $binlog_events
    
    extract_binlog_events "$backup_time" "$RESTORE_TIME" $binlog_events
    
    # 应用二进制日志事件
    if [ -s $binlog_events ]; then
        log "应用二进制日志事件"
        mysql -u root -p'password' < $binlog_events
    else
        log "没有需要应用的二进制日志事件"
    fi
    
    log "时间点恢复完成"
}

# 主逻辑
BASE_BACKUP=$(find_relevant_backup)
perform_pitr_recovery $BASE_BACKUP

误操作数据恢复方案

Flashback工具使用:

-- 安装mysqlbinlog_flashback工具
-- 使用my2sql或binlog2sql进行闪回

-- 示例:恢复误删除的数据
# 使用binlog2sql解析二进制日志
python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uroot -p'password' -dcompany -temployees --start-file='mysql-bin.000001' --start-pos=4 --stop-pos=1000 -B

-- 输出闪回SQL
/*
INSERT INTO `company`.`employees`(`create_time`, `phone`, `name`, `id`, `email`) VALUES ('2023-01-01 10:00:00', '13800138000', '张三', 1, 'zhangsan@company.com'); 
INSERT INTO `company`.`employees`(`create_time`, `phone`, `name`, `id`, `email`) VALUES ('2023-01-02 11:00:00', '13900139000', '李四', 2, 'lisi@company.com');
*/

基于备份的误操作恢复:

#!/bin/bash
# point_in_time_restore.sh

DB_NAME="company"
TABLE_NAME="employees"
BACKUP_DIR="/backup"
RESTORE_TIME="2023-12-01 10:00:00"  # 误操作之前的时间
LOG_FILE="/var/log/mysql/point_restore.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

create_restore_database() {
    local restore_db="${DB_NAME}_restore_$(date +%Y%m%d_%H%M%S)"
    
    log "创建恢复数据库: $restore_db"
    
    mysql -u root -p'password' -e "CREATE DATABASE $restore_db;"
    echo $restore_db
}

restore_table_to_point() {
    local restore_db=$1
    local backup_file=$(find $BACKUP_DIR -name "*${DB_NAME}*" -type f | sort -r | head -1)
    
    if [ -z "$backup_file" ]; then
        log "错误: 未找到备份文件"
        exit 1
    fi
    
    log "从备份恢复表结构"
    
    # 提取表结构
    if [[ $backup_file == *.sql.gz ]]; then
        gunzip -c $backup_file | sed -n "/^-- Table structure for table \`$TABLE_NAME\`/,/^-- Table structure/p" | \
        mysql -u root -p'password' $restore_db
    else
        sed -n "/^-- Table structure for table \`$TABLE_NAME\`/,/^-- Table structure/p" $backup_file | \
        mysql -u root -p'password' $restore_db
    fi
    
    # 应用二进制日志到指定时间点
    log "应用二进制日志到时间点: $RESTORE_TIME"
    
    local binlog_events="/tmp/binlog_events_$restore_db.sql"
    mysqlbinlog --database=$DB_NAME --stop-datetime="$RESTORE_TIME" /var/lib/mysql/mysql-bin.* | \
    sed -n "/^### INSERT INTO \`$DB_NAME\`.\`$TABLE_NAME\`/,/^### INSERT INTO/p" | \
    sed 's/^### //' > $binlog_events
    
    mysql -u root -p'password' $restore_db < $binlog_events
    rm $binlog_events
    
    log "表恢复完成: $restore_db.$TABLE_NAME"
}

compare_and_restore() {
    local restore_db=$1
    
    log "比较并恢复数据"
    
    # 生成恢复SQL
    local restore_sql="/tmp/restore_data.sql"
    
    cat > $restore_sql << EOF
-- 插入缺失的记录
INSERT INTO $DB_NAME.$TABLE_NAME 
SELECT * FROM $restore_db.$TABLE_NAME r
WHERE NOT EXISTS (
    SELECT 1 FROM $DB_NAME.$TABLE_NAME c 
    WHERE c.id = r.id
);

-- 更新被修改的记录
UPDATE $DB_NAME.$TABLE_NAME c
JOIN $restore_db.$TABLE_NAME r ON c.id = r.id
SET 
    c.name = r.name,
    c.email = r.email,
    c.phone = r.phone,
    c.updated_at = NOW()
WHERE c.name != r.name 
   OR c.email != r.email 
   OR c.phone != r.phone;
EOF

    mysql -u root -p'password' < $restore_sql
    rm $restore_sql
    
    log "数据恢复完成"
}

# 主逻辑
RESTORE_DB=$(create_restore_database)
restore_table_to_point $RESTORE_DB
compare_and_restore $RESTORE_DB

# 清理恢复数据库
mysql -u root -p'password' -e "DROP DATABASE $RESTORE_DB;"

主从切换与数据重建

计划内主从切换:

#!/bin/bash
# planned_failover.sh

CURRENT_MASTER="192.168.1.100"
NEW_MASTER="192.168.1.101"
MYSQL_USER="repl_user"
MYSQL_PASSWORD="repl_password"
LOG_FILE="/var/log/mysql/planned_failover.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

check_replication_health() {
    log "检查复制健康状况"
    
    # 检查主库
    local master_status=$(mysql -h $CURRENT_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW MASTER STATUS\G")
    if [ $? -ne 0 ]; then
        log "错误: 无法连接主库 $CURRENT_MASTER"
        exit 1
    fi
    
    # 检查从库延迟
    local slave_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW SLAVE STATUS\G")
    local seconds_behind=$(echo "$slave_status" | grep "Seconds_Behind_Master" | awk '{print $2}')
    
    if [ "$seconds_behind" != "0" ]; then
        log "警告: 从库有延迟 ($seconds_behind 秒)"
        read -p "是否继续? (y/n): " -n 1 -r
        echo
        if [[ ! $REPLY =~ ^[Yy]$ ]]; then
            exit 1
        fi
    fi
    
    log "复制健康状况良好"
}

perform_failover() {
    log "开始主从切换"
    
    # 1. 设置原主库为只读
    log "设置原主库为只读模式"
    mysql -h $CURRENT_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SET GLOBAL read_only = ON;"
    
    # 2. 等待从库应用所有日志
    log "等待从库应用所有日志"
    while true; do
        local slave_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW SLAVE STATUS\G")
        local seconds_behind=$(echo "$slave_status" | grep "Seconds_Behind_Master" | awk '{print $2}')
        local io_running=$(echo "$slave_status" | grep "Slave_IO_Running" | awk '{print $2}')
        local sql_running=$(echo "$slave_status" | grep "Slave_SQL_Running" | awk '{print $2}')
        
        if [ "$seconds_behind" = "0" ] && [ "$io_running" = "Yes" ] && [ "$sql_running" = "Yes" ]; then
            break
        fi
        sleep 1
    done
    
    # 3. 停止从库复制
    log "停止新主库的复制"
    mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "STOP SLAVE;"
    
    # 4. 记录新主库的二进制日志位置
    local new_master_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW MASTER STATUS\G")
    local new_master_file=$(echo "$new_master_status" | grep "File" | awk '{print $2}')
    local new_master_position=$(echo "$new_master_status" | grep "Position" | awk '{print $2}')
    
    # 5. 设置新主库为可写
    log "设置新主库为可写模式"
    mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SET GLOBAL read_only = OFF;"
    
    # 6. 配置其他从库指向新主库
    log "重新配置其他从库"
    # 这里可以添加其他从库的重新配置逻辑
    
    log "主从切换完成"
    log "新主库二进制日志位置: $new_master_file $new_master_position"
}

# 主逻辑
check_replication_health
perform_failover

灾难恢复演练

完整灾难恢复演练:

#!/bin/bash
# disaster_recovery_test.sh

DR_SITE_MYSQL="192.168.2.100"
BACKUP_SERVER="192.168.3.100"
MYSQL_USER="dr_user"
MYSQL_PASSWORD="dr_password"
LOG_FILE="/var/log/mysql/dr_test.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE
}

verify_dr_environment() {
    log "验证灾备环境"
    
    # 检查网络连通性
    if ! ping -c 3 $DR_SITE_MYSQL > /dev/null 2>&1; then
        log "错误: 无法连接到灾备MySQL服务器"
        return 1
    fi
    
    # 检查MySQL服务
    if ! mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SELECT 1;" > /dev/null 2>&1; then
        log "错误: 灾备MySQL服务不可用"
        return 1
    fi
    
    log "灾备环境验证通过"
    return 0
}

restore_to_dr_site() {
    log "开始恢复到灾备站点"
    
    # 1. 停止灾备站点MySQL服务
    log "停止灾备站点MySQL服务"
    ssh root@$DR_SITE_MYSQL "systemctl stop mysql"
    
    # 2. 备份当前数据
    log "备份灾备站点当前数据"
    ssh root@$DR_SITE_MYSQL "mv /var/lib/mysql /var/lib/mysql_backup_$(date +%Y%m%d_%H%M%S)"
    
    # 3. 从备份服务器获取最新备份
    log "获取最新备份"
    local latest_backup=$(ssh root@$BACKUP_SERVER "ls -t /backup/full_* | head -1")
    
    if [ -z "$latest_backup" ]; then
        log "错误: 未找到备份文件"
        return 1
    fi
    
    # 4. 传输备份到灾备站点
    log "传输备份文件"
    scp -r root@$BACKUP_SERVER:$latest_backup /tmp/dr_restore/
    
    # 5. 准备备份
    log "准备备份"
    ssh root@$DR_SITE_MYSQL "xtrabackup --prepare --target-dir=/tmp/dr_restore/"
    
    # 6. 恢复备份
    log "恢复备份"
    ssh root@$DR_SITE_MYSQL "xtrabackup --copy-back --target-dir=/tmp/dr_restore/"
    
    # 7. 设置权限并启动服务
    log "启动MySQL服务"
    ssh root@$DR_SITE_MYSQL "chown -R mysql:mysql /var/lib/mysql && systemctl start mysql"
    
    log "灾备恢复完成"
}

verify_dr_data() {
    log "验证灾备数据"
    
    # 检查数据库列表
    local db_count=$(mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema NOT IN ('mysql','information_schema','performance_schema','sys');")
    
    if [ "$db_count" -gt 0 ]; then
        log "数据验证成功: 发现 $db_count 个用户表"
        return 0
    else
        log "数据验证失败: 未发现用户表"
        return 1
    fi
}

perform_failover_test() {
    log "执行故障切换测试"
    
    # 模拟应用连接灾备数据库
    local test_result=$(mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "CREATE DATABASE dr_test; USE dr_test; CREATE TABLE test_table (id INT); INSERT INTO test_table VALUES (1); SELECT * FROM test_table;" 2>&1)
    
    if echo "$test_result" | grep -q "1"; then
        log "故障切换测试成功"
        
        # 清理测试数据
        mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "DROP DATABASE dr_test;"
        
        return 0
    else
        log "故障切换测试失败"
        return 1
    fi
}

# 主逻辑
if verify_dr_environment; then
    restore_to_dr_site
    if verify_dr_data; then
        perform_failover_test
    fi
fi

备份恢复监控告警

备份状态监控:

-- 创建备份监控表
CREATE TABLE backup_monitor (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    backup_type ENUM('FULL', 'INCREMENTAL', 'BINLOG') NOT NULL,
    backup_file VARCHAR(500) NOT NULL,
    backup_size BIGINT,
    start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    end_time TIMESTAMP NULL,
    status ENUM('RUNNING', 'COMPLETED', 'FAILED') DEFAULT 'RUNNING',
    error_message TEXT,
    checksum VARCHAR(64)
);

-- 创建备份告警表
CREATE TABLE backup_alerts (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    alert_type VARCHAR(50) NOT NULL,
    alert_message TEXT NOT NULL,
    severity ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL') NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    resolved_at TIMESTAMP NULL,
    resolved_by VARCHAR(100)
);

-- 备份状态检查存储过程
DELIMITER //

CREATE PROCEDURE CheckBackupStatus()
BEGIN
    DECLARE last_full_backup TIMESTAMP;
    DECLARE backup_age_hours INT;
    DECLARE failed_backups INT;
    
    -- 检查最近完整备份的时间
    SELECT MAX(start_time) INTO last_full_backup
    FROM backup_monitor
    WHERE backup_type = 'FULL' AND status = 'COMPLETED';
    
    SET backup_age_hours = TIMESTAMPDIFF(HOUR, last_full_backup, NOW());
    
    -- 如果超过24小时没有完整备份,发出告警
    IF backup_age_hours > 24 THEN
        INSERT INTO backup_alerts (alert_type, alert_message, severity)
        VALUES ('BACKUP_MISSING', 
                CONCAT('超过', backup_age_hours, '小时没有完整备份'), 
                'HIGH');
    END IF;
    
    -- 检查失败的备份
    SELECT COUNT(*) INTO failed_backups
    FROM backup_monitor
    WHERE status = 'FAILED' AND start_time > NOW() - INTERVAL 24 HOUR;
    
    IF failed_backups > 0 THEN
        INSERT INTO backup_alerts (alert_type, alert_message, severity)
        VALUES ('BACKUP_FAILED', 
                CONCAT('过去24小时有', failed_backups, '个备份失败'), 
                'HIGH');
    END IF;
    
END //

DELIMITER ;

3. 安全与权限管理

用户权限体系设计

最小权限原则实施:

-- 创建应用用户(遵循最小权限原则)
CREATE USER 'app_readonly'@'192.168.1.%' IDENTIFIED BY 'secure_password_123';
GRANT SELECT ON company.* TO 'app_readonly'@'192.168.1.%';

CREATE USER 'app_readwrite'@'192.168.1.%' IDENTIFIED BY 'secure_password_456';
GRANT SELECT, INSERT, UPDATE, DELETE ON company.* TO 'app_readwrite'@'192.168.1.%';

CREATE USER 'app_report'@'192.168.1.%' IDENTIFIED BY 'secure_password_789';
GRANT SELECT ON company.employees TO 'app_report'@'192.168.1.%';
GRANT SELECT ON company.departments TO 'app_report'@'192.168.1.%';

-- 创建管理用户
CREATE USER 'db_admin'@'localhost' IDENTIFIED BY 'admin_secure_password';
GRANT ALL PRIVILEGES ON *.* TO 'db_admin'@'localhost' WITH GRANT OPTION;

-- 创建备份用户
CREATE USER 'backup_user'@'localhost' IDENTIFIED BY 'backup_secure_password';
GRANT SELECT, RELOAD, PROCESS, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'backup_user'@'localhost';

-- 查看用户权限
SHOW GRANTS FOR 'app_readonly'@'192.168.1.%';

数据库权限审计:

-- 创建权限审计表
CREATE TABLE privilege_audit (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(100) NOT NULL,
    host_pattern VARCHAR(100) NOT NULL,
    database_name VARCHAR(100),
    table_name VARCHAR(100),
    privilege_type VARCHAR(50) NOT NULL,
    granted_by VARCHAR(100),
    granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    is_revoked BOOLEAN DEFAULT FALSE,
    revoked_at TIMESTAMP NULL,
    revoked_by VARCHAR(100)
);

-- 权限审计存储过程
DELIMITER //

CREATE PROCEDURE AuditUserPrivileges()
BEGIN
    DECLARE done INT DEFAULT 0;
    DECLARE v_user, v_host, v_db, v_table, v_privilege VARCHAR(100);
    DECLARE cur CURSOR FOR 
        SELECT User, Host, Db, Table_name, Privilege 
        FROM information_schema.table_privileges;
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
    
    OPEN cur;
    
    read_loop: LOOP
        FETCH cur INTO v_user, v_host, v_db, v_table, v_privilege;
        IF done THEN
            LEAVE read_loop;
        END IF;
        
        -- 检查权限是否已经记录
        IF NOT EXISTS (
            SELECT 1 FROM privilege_audit 
            WHERE username = v_user 
              AND host_pattern = v_host 
              AND database_name = v_db 
              AND table_name = v_table 
              AND privilege_type = v_privilege 
              AND is_revoked = FALSE
        ) THEN
            -- 记录新权限
            INSERT INTO privilege_audit (username, host_pattern, database_name, table_name, privilege_type)
            VALUES (v_user, v_host, v_db, v_table, v_privilege);
        END IF;
    END LOOP;
    
    CLOSE cur;
    
    -- 标记已撤销的权限
    UPDATE privilege_audit pa
    LEFT JOIN information_schema.table_privileges tp 
        ON pa.username = tp.User 
        AND pa.host_pattern = tp.Host 
        AND pa.database_name = tp.Db 
        AND pa.table_name = tp.Table_name 
        AND pa.privilege_type = tp.Privilege
    SET pa.is_revoked = TRUE,
        pa.revoked_at = CURRENT_TIMESTAMP
    WHERE pa.is_revoked = FALSE
      AND tp.User IS NULL;
    
END //

DELIMITER ;

角色管理与权限继承

MySQL 8.0角色管理:

-- 创建角色
CREATE ROLE read_only_role;
CREATE ROLE read_write_role;
CREATE ROLE dba_role;

-- 为角色分配权限
GRANT SELECT ON company.* TO read_only_role;
GRANT SELECT, INSERT, UPDATE, DELETE ON company.* TO read_write_role;
GRANT ALL PRIVILEGES ON *.* TO dba_role;

-- 创建用户并分配角色
CREATE USER 'report_user'@'%' IDENTIFIED BY 'report_password';
CREATE USER 'app_user'@'%' IDENTIFIED BY 'app_password';
CREATE USER 'admin_user'@'localhost' IDENTIFIED BY 'admin_password';

-- 分配角色给用户
GRANT read_only_role TO 'report_user'@'%';
GRANT read_write_role TO 'app_user'@'%';
GRANT dba_role TO 'admin_user'@'localhost';

-- 设置默认角色
SET DEFAULT ROLE read_only_role TO 'report_user'@'%';
SET DEFAULT ROLE read_write_role TO 'app_user'@'%';
SET DEFAULT ROLE dba_role TO 'admin_user'@'localhost';

-- 激活角色
SET ROLE ALL;

-- 查看角色权限
SHOW GRANTS FOR 'report_user'@'%' USING read_only_role;

-- 创建层次化角色
CREATE ROLE junior_dba;
CREATE ROLE senior_dba;

GRANT junior_dba TO senior_dba;
GRANT SELECT, INSERT, UPDATE, DELETE ON mysql.* TO junior_dba;
GRANT ALL PRIVILEGES ON *.* TO senior_dba;

动态权限管理:

-- 创建存储过程管理用户权限
DELIMITER //

CREATE PROCEDURE ManageUserAccess(
    IN p_username VARCHAR(100),
    IN p_host_pattern VARCHAR(100),
    IN p_database_name VARCHAR(100),
    IN p_action ENUM('GRANT_READ', 'GRANT_WRITE', 'REVOKE_ACCESS')
)
BEGIN
    DECLARE user_exists INT;
    
    -- 检查用户是否存在
    SELECT COUNT(*) INTO user_exists
    FROM mysql.user 
    WHERE User = p_username AND Host = p_host_pattern;
    
    IF user_exists = 0 THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '用户不存在';
    END IF;
    
    CASE p_action
        WHEN 'GRANT_READ' THEN
            SET @grant_sql = CONCAT('GRANT SELECT ON ', p_database_name, '.* TO ''', p_username, '''@''', p_host_pattern, '''');
            PREPARE stmt FROM @grant_sql;
            EXECUTE stmt;
            DEALLOCATE PREPARE stmt;
            
            -- 记录权限变更
            INSERT INTO privilege_audit (username, host_pattern, database_name, privilege_type)
            VALUES (p_username, p_host_pattern, p_database_name, 'SELECT');
            
        WHEN 'GRANT_WRITE' THEN
            SET @grant_sql = CONCAT('GRANT SELECT, INSERT, UPDATE, DELETE ON ', p_database_name, '.* TO ''', p_username, '''@''', p_host_pattern, '''');
            PREPARE stmt FROM @grant_sql;
            EXECUTE stmt;
            DEALLOCATE PREPARE stmt;
            
            INSERT INTO privilege_audit (username, host_pattern, database_name, privilege_type)
            VALUES (p_username, p_host_pattern, p_database_name, 'READ_WRITE');
            
        WHEN 'REVOKE_ACCESS' THEN
            SET @revoke_sql = CONCAT('REVOKE ALL PRIVILEGES ON ', p_database_name, '.* FROM ''', p_username, '''@''', p_host_pattern, '''');
            PREPARE stmt FROM @revoke_sql;
            EXECUTE stmt;
            DEALLOCATE PREPARE stmt;
            
            UPDATE privilege_audit 
            SET is_revoked = TRUE, revoked_at = NOW()
            WHERE username = p_username 
              AND host_pattern = p_host_pattern 
              AND database_name = p_database_name
              AND is_revoked = FALSE;
    END CASE;
    
END //

DELIMITER ;

数据加密:透明加密与列加密

InnoDB表空间加密:

-- 安装密钥环组件(MySQL 8.0)
INSTALL COMPONENT "file://component_keyring_file";
SET GLOBAL keyring_file_data = '/var/lib/mysql-keyring/keyring';

-- 创建加密表空间
CREATE TABLESPACE encrypted_ts 
ADD DATAFILE 'encrypted_ts.ibd' 
ENGINE=InnoDB
ENCRYPTION='Y';

-- 在加密表空间中创建表
CREATE TABLE sensitive_data (
    id INT PRIMARY KEY,
    secret_data VARCHAR(500)
) TABLESPACE encrypted_ts;

-- 加密现有表
ALTER TABLE existing_sensitive_table ENCRYPTION='Y';

-- 查看加密状态
SELECT 
    TABLE_SCHEMA,
    TABLE_NAME,
    CREATE_OPTIONS
FROM information_schema.TABLES 
WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%';

列级加密:

-- 创建加密函数
DELIMITER //

CREATE FUNCTION aes_encrypt(data TEXT, key_str VARCHAR(255))
RETURNS VARBINARY(500)
DETERMINISTIC
BEGIN
    RETURN AES_ENCRYPT(data, key_str);
END //

CREATE FUNCTION aes_decrypt(encrypted_data VARBINARY(500), key_str VARCHAR(255))
RETURNS TEXT
DETERMINISTIC
BEGIN
    RETURN AES_DECRYPT(encrypted_data, key_str);
END //

DELIMITER ;

-- 创建存储加密数据的表
CREATE TABLE user_secrets (
    user_id INT PRIMARY KEY,
    -- 加密存储的敏感数据
    ssn VARBINARY(500),
    credit_card VARBINARY(500),
    medical_info VARBINARY(500),
    -- 加密密钥(在实际应用中应该安全存储)
    encryption_key VARCHAR(255) DEFAULT 'default_encryption_key'
);

-- 插入加密数据
INSERT INTO user_secrets (user_id, ssn, credit_card)
VALUES (
    1,
    aes_encrypt('123-45-6789', 'user1_key'),
    aes_encrypt('4111111111111111', 'user1_key')
);

-- 查询解密数据
SELECT 
    user_id,
    aes_decrypt(ssn, 'user1_key') as decrypted_ssn,
    aes_decrypt(credit_card, 'user1_key') as decrypted_credit_card
FROM user_secrets 
WHERE user_id = 1;

密钥管理策略:

-- 创建密钥管理表
CREATE TABLE encryption_keys (
    key_id VARCHAR(100) PRIMARY KEY,
    key_value VARBINARY(500) NOT NULL,
    key_type ENUM('COLUMN', 'TABLE', 'BACKUP') NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    created_by VARCHAR(100),
    is_active BOOLEAN DEFAULT TRUE,
    rotated_at TIMESTAMP NULL
);

-- 密钥轮换存储过程
DELIMITER //

CREATE PROCEDURE RotateEncryptionKey(
    IN p_key_id VARCHAR(100),
    IN p_new_key_value VARBINARY(500)
)
BEGIN
    DECLARE old_key_value VARBINARY(500);
    DECLARE done INT DEFAULT 0;
    DECLARE v_user_id INT;
    DECLARE v_ssn, v_credit_card VARBINARY(500);
    
    -- 获取旧密钥
    SELECT key_value INTO old_key_value
    FROM encryption_keys
    WHERE key_id = p_key_id AND is_active = TRUE;
    
    IF old_key_value IS NULL THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '未找到活动的密钥';
    END IF;
    
    -- 使用游标处理所有需要重新加密的数据
    DECLARE cur CURSOR FOR 
        SELECT user_id, ssn, credit_card 
        FROM user_secrets;
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
    
    OPEN cur;
    
    read_loop: LOOP
        FETCH cur INTO v_user_id, v_ssn, v_credit_card;
        IF done THEN
            LEAVE read_loop;
        END IF;
        
        -- 解密并使用新密钥重新加密
        UPDATE user_secrets 
        SET ssn = aes_encrypt(aes_decrypt(v_ssn, old_key_value), p_new_key_value),
            credit_card = aes_encrypt(aes_decrypt(v_credit_card, old_key_value), p_new_key_value)
        WHERE user_id = v_user_id;
    END LOOP;
    
    CLOSE cur;
    
    -- 停用旧密钥,激活新密钥
    UPDATE encryption_keys SET is_active = FALSE, rotated_at = NOW() WHERE key_id = p_key_id;
    INSERT INTO encryption_keys (key_id, key_value, key_type) VALUES (p_key_id, p_new_key_value, 'COLUMN');
    
END //

DELIMITER ;

审计日志与安全监控

MySQL企业版审计:

-- 安装审计插件(企业版)
INSTALL PLUGIN audit_log SONAME 'audit_log.so';

-- 配置审计日志(在my.cnf中)
/*
[mysqld]
audit_log_format=JSON
audit_log_file=/var/log/mysql/audit.log
audit_log_policy=ALL
audit_log_rotate_on_size=100000000
audit_log_rotations=5
*/

-- 查看审计日志状态
SHOW VARIABLES LIKE 'audit_log%';

-- 查询审计日志
SELECT 
    JSON_EXTRACT(audit_record, '$.timestamp') as timestamp,
    JSON_EXTRACT(audit_record, '$.class') as event_class,
    JSON_EXTRACT(audit_record, '$.event') as event_type,
    JSON_EXTRACT(audit_record, '$.connection_id') as connection_id,
    JSON_EXTRACT(audit_record, '$.user') as user,
    JSON_EXTRACT(audit_record, '$.query') as query
FROM mysql.audit_log 
WHERE JSON_EXTRACT(audit_record, '$.query') IS NOT NULL
ORDER BY timestamp DESC 
LIMIT 10;

社区版审计方案:

-- 使用通用日志实现基础审计
SET GLOBAL general_log = 1;
SET GLOBAL log_output = 'TABLE';

-- 创建自定义审计表
CREATE TABLE custom_audit_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    event_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    user_host VARCHAR(200) NOT NULL,
    thread_id BIGINT NOT NULL,
    server_id INT NOT NULL,
    command_type VARCHAR(64) NOT NULL,
    argument TEXT NOT NULL,
    client_ip VARCHAR(45),
    database_name VARCHAR(100),
    execution_time DECIMAL(10,6),
    rows_affected INT
);

-- 审计触发器示例
DELIMITER //

CREATE TRIGGER audit_user_changes
AFTER INSERT ON mysql.user
FOR EACH ROW
BEGIN
    INSERT INTO custom_audit_log (user_host, thread_id, server_id, command_type, argument, client_ip)
    VALUES (USER(), CONNECTION_ID(), @@server_id, 'CREATE_USER', 
            CONCAT('Created user: ', NEW.User, '@', NEW.Host), 
            SUBSTRING_INDEX(USER(), '@', -1));
END //

CREATE TRIGGER audit_privilege_changes
AFTER INSERT ON mysql.db
FOR EACH ROW
BEGIN
    INSERT INTO custom_audit_log (user_host, thread_id, server_id, command_type, argument, database_name)
    VALUES (USER(), CONNECTION_ID(), @@server_id, 'GRANT_PRIVILEGE',
            CONCAT('Granted privileges on ', NEW.Db, ' to ', NEW.User),
            NEW.Db);
END //

DELIMITER ;

安全监控仪表板:

-- 创建安全监控视图
CREATE VIEW security_dashboard AS
SELECT 
    'Failed Logins' as metric_name,
    COUNT(*) as metric_value,
    MAX(event_time) as last_occurrence
FROM custom_audit_log
WHERE argument LIKE '%Access denied%'
  AND event_time > NOW() - INTERVAL 1 HOUR

UNION ALL

SELECT 
    'New Users Created' as metric_name,
    COUNT(*) as metric_value,
    MAX(event_time) as last_occurrence
FROM custom_audit_log
WHERE command_type = 'CREATE_USER'
  AND event_time > NOW() - INTERVAL 24 HOUR

UNION ALL

SELECT 
    'Privilege Changes' as metric_name,
    COUNT(*) as metric_value,
    MAX(event_time) as last_occurrence
FROM custom_audit_log
WHERE command_type IN ('GRANT_PRIVILEGE', 'REVOKE_PRIVILEGE')
  AND event_time > NOW() - INTERVAL 24 HOUR

UNION ALL

SELECT 
    'Sensitive Data Access' as metric_name,
    COUNT(*) as metric_value,
    MAX(event_time) as last_occurrence
FROM custom_audit_log
WHERE argument LIKE '%user_secrets%'
  AND event_time > NOW() - INTERVAL 1 HOUR;

SQL注入防护与安全开发

预处理语句使用:

-- 不安全的查询(容易SQL注入)
SET @user_input = "1'; DROP TABLE users; --";
SET @sql = CONCAT("SELECT * FROM users WHERE id = '", @user_input, "'");
PREPARE stmt FROM @sql;
EXECUTE stmt;

-- 安全的预处理语句
PREPARE safe_stmt FROM "SELECT * FROM users WHERE id = ?";
SET @user_id = "1";
EXECUTE safe_stmt USING @user_id;

-- 存储过程参数化查询
DELIMITER //

CREATE PROCEDURE GetUserByEmail(IN p_email VARCHAR(255))
BEGIN
    -- 直接使用参数,避免拼接
    SELECT * FROM users WHERE email = p_email;
END //

DELIMITER ;

输入验证函数:

DELIMITER //

CREATE FUNCTION ValidateEmail(email VARCHAR(255))
RETURNS BOOLEAN
DETERMINISTIC
BEGIN
    -- 简单的邮箱格式验证
    IF email REGEXP '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,4}$' THEN
        RETURN TRUE;
    ELSE
        RETURN FALSE;
    END IF;
END //

CREATE FUNCTION SanitizeInput(input_text TEXT)
RETURNS TEXT
DETERMINISTIC
BEGIN
    -- 移除潜在的SQL注入字符
    SET input_text = REPLACE(input_text, "'", "''");
    SET input_text = REPLACE(input_text, ";", "");
    SET input_text = REPLACE(input_text, "--", "");
    SET input_text = REPLACE(input_text, "/*", "");
    SET input_text = REPLACE(input_text, "*/", "");
    
    RETURN input_text;
END //

DELIMITER ;

安全开发规范检查:

-- 检查存储过程的安全问题
SELECT 
    ROUTINE_NAME,
    ROUTINE_DEFINITION
FROM information_schema.ROUTINES
WHERE ROUTINE_DEFINITION LIKE '%CONCAT(%'
   OR ROUTINE_DEFINITION LIKE '%EXECUTE%IMMEDIATE%'
   OR ROUTINE_DEFINITION LIKE '%PREPARE%'
   OR ROUTINE_DEFINITION LIKE '%sp_executesql%';

-- 查找可能包含动态SQL的代码
SELECT 
    TABLE_NAME,
    COLUMN_NAME
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = 'your_database'
  AND (COLUMN_NAME LIKE '%sql%' OR COLUMN_NAME LIKE '%query%')
  AND TABLE_NAME NOT LIKE '%audit%';

总结

通过本篇的深入学习,我们掌握了MySQL备份恢复和安全管理的完整体系:

  1. 备份策略:逻辑备份、物理备份、增量备份的实战应用
  2. 恢复技术:时间点恢复、误操作恢复、灾难恢复的完整流程
  3. 安全管理:权限体系、数据加密、审计监控的全面方案
  4. 安全开发:SQL注入防护、输入验证的安全编码实践

关键安全原则:

  • 最小权限:用户只拥有完成工作所需的最小权限
  • 纵深防御:多层安全措施,避免单点失效
  • 定期审计:持续监控和审查安全状态
  • 应急准备:完善的备份和恢复预案

备份恢复最佳实践:

  • 3-2-1规则:3个副本,2种介质,1个离线存储
  • 定期恢复演练:确保备份可用性
  • 监控备份状态:及时发现问题
  • 加密敏感数据:保护数据隐私

在下一篇中,我们将探讨MySQL在云原生环境中的应用,包括容器化部署、微服务架构集成等现代技术。

动手练习:

  1. 设计并实施完整的备份策略,包括完整备份和增量备份
  2. 执行时间点恢复演练,验证备份的可用性
  3. 建立权限管理体系,实施最小权限原则
  4. 配置数据加密和审计日志,增强安全性
  5. 进行安全代码审查,修复潜在的SQL注入漏洞

欢迎在评论区分享你的备份恢复实践和安全加固经验!

  • 本文作者: 忘忧
  • 本文链接: /archives/2953
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# MySQL # 数据库
MySql入门:高可用与架构设计
SqlServer高频面试题(持续更新251114)
  • 文章目录
  • 站点概览
忘忧

忘忧

君子藏器于身,待时而动,何不利之有

59 日志
9 分类
67 标签
RSS
Github E-mail StackOverflow
Creative Commons
0%
© 2025 忘忧
由 Halo 强力驱动