SQL编程与高级特性
SQL不仅仅是简单的数据查询语言,它拥有强大的编程能力和高级特性。掌握这些特性可以让你写出更高效、更优雅的数据库操作代码。今天,我们将深入探讨MySQL的SQL编程能力,从基础查询到高级特性,帮助你成为SQL编程的高手。
1. SQL基础与高级查询
DDL、DML、DCL、TCL全面掌握
数据定义语言(DDL) - 定义数据结构:
-- 数据库操作
CREATE DATABASE company
CHARACTER SET utf8mb4
COLLATE utf8mb4_unicode_ci;
ALTER DATABASE company CHARACTER SET utf8mb4;
DROP DATABASE IF EXISTS old_company;
-- 表操作
CREATE TABLE employees (
emp_id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
emp_name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE,
salary DECIMAL(10,2) CHECK (salary > 0),
dept_id INT UNSIGNED,
hire_date DATE DEFAULT (CURRENT_DATE),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB COMMENT='员工表';
-- 表结构修改
ALTER TABLE employees
ADD COLUMN phone VARCHAR(20) AFTER email,
MODIFY COLUMN emp_name VARCHAR(150) NOT NULL,
ADD INDEX idx_dept_hire (dept_id, hire_date);
-- 表维护
ANALYZE TABLE employees; -- 更新统计信息
OPTIMIZE TABLE employees; -- 优化表存储
RENAME TABLE employees TO staff;
数据操作语言(DML) - 操作数据:
-- 插入数据
INSERT INTO employees (emp_name, email, salary, dept_id, hire_date)
VALUES
('张三', 'zhangsan@company.com', 8000.00, 1, '2023-01-15'),
('李四', 'lisi@company.com', 7500.00, 1, '2023-02-20'),
('王五', 'wangwu@company.com', 9000.00, 2, '2023-03-10');
-- 插入并忽略重复键
INSERT IGNORE INTO employees (emp_name, email, salary, dept_id)
VALUES ('赵六', 'zhangsan@company.com', 8500.00, 2); -- 邮箱重复,被忽略
-- 批量插入(高效方式)
INSERT INTO employees (emp_name, email, salary, dept_id)
SELECT
CONCAT('员工', num) as emp_name,
CONCAT('emp', num, '@company.com') as email,
5000 + (RAND() * 5000) as salary,
FLOOR(1 + RAND() * 3) as dept_id
FROM (
SELECT @row := @row + 1 as num
FROM information_schema.columns c1,
information_schema.columns c2,
(SELECT @row := 0) r
LIMIT 100
) numbers;
-- 更新数据
UPDATE employees
SET salary = salary * 1.1,
updated_at = CURRENT_TIMESTAMP
WHERE dept_id = 1
AND hire_date < '2023-06-01';
-- 使用JOIN更新
UPDATE employees e
JOIN departments d ON e.dept_id = d.dept_id
SET e.salary = e.salary * 1.05
WHERE d.dept_name = '技术部';
-- 删除数据
DELETE FROM employees
WHERE emp_id = 100;
-- 使用JOIN删除
DELETE e
FROM employees e
LEFT JOIN departments d ON e.dept_id = d.dept_id
WHERE d.dept_id IS NULL; -- 删除部门不存在的员工
数据控制语言(DCL) - 权限管理:
-- 用户管理
CREATE USER 'report_user'@'192.168.1.%'
IDENTIFIED BY 'secure_password_123';
CREATE USER 'app_user'@'%'
IDENTIFIED WITH mysql_native_password BY 'app_password';
-- 权限管理
GRANT SELECT ON company.* TO 'report_user'@'192.168.1.%';
GRANT SELECT, INSERT, UPDATE, DELETE
ON company.employees TO 'app_user'@'%';
GRANT EXECUTE ON PROCEDURE company.CalculateDepartmentStats
TO 'report_user'@'192.168.1.%';
-- 角色管理(MySQL 8.0+)
CREATE ROLE data_reader;
GRANT SELECT ON company.* TO data_reader;
GRANT data_reader TO 'report_user'@'192.168.1.%';
-- 权限回收
REVOKE DELETE ON company.employees FROM 'app_user'@'%';
-- 查看权限
SHOW GRANTS FOR 'report_user'@'192.168.1.%';
事务控制语言(TCL) - 事务管理:
-- 基本事务
START TRANSACTION;
INSERT INTO accounts (account_id, balance) VALUES (1, 1000.00);
INSERT INTO accounts (account_id, balance) VALUES (2, 2000.00);
COMMIT;
-- 复杂事务控制
START TRANSACTION;
SAVEPOINT before_transfer;
UPDATE accounts SET balance = balance - 500 WHERE account_id = 1;
-- 检查约束
SELECT balance INTO @bal FROM accounts WHERE account_id = 1 FOR UPDATE;
IF @bal < 0 THEN
ROLLBACK TO SAVEPOINT before_transfer;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '余额不足';
END IF;
UPDATE accounts SET balance = balance + 500 WHERE account_id = 2;
COMMIT;
复杂查询:子查询、连接查询、联合查询
子查询深度应用:
-- 标量子查询(返回单个值)
SELECT
emp_name,
salary,
(SELECT AVG(salary) FROM employees) as avg_salary,
salary - (SELECT AVG(salary) FROM employees) as diff_from_avg
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees);
-- 列子查询(返回一列)
SELECT
dept_name
FROM departments
WHERE dept_id IN (
SELECT DISTINCT dept_id
FROM employees
WHERE salary > 10000
);
-- 行子查询(返回一行)
SELECT
emp_name,
salary
FROM employees
WHERE (salary, dept_id) = (
SELECT MAX(salary), dept_id
FROM employees
WHERE dept_id = 1
);
-- 表子查询(在FROM中)
SELECT
dept_stats.dept_name,
dept_stats.avg_salary,
dept_stats.employee_count
FROM (
SELECT
d.dept_name,
AVG(e.salary) as avg_salary,
COUNT(e.emp_id) as employee_count
FROM departments d
LEFT JOIN employees e ON d.dept_id = e.dept_id
GROUP BY d.dept_id, d.dept_name
) dept_stats
WHERE dept_stats.avg_salary > 8000;
-- 关联子查询
SELECT
e1.emp_name,
e1.salary,
e1.dept_id
FROM employees e1
WHERE e1.salary > (
SELECT AVG(e2.salary)
FROM employees e2
WHERE e2.dept_id = e1.dept_id -- 关联外部查询
);
-- EXISTS子查询
SELECT
d.dept_name
FROM departments d
WHERE EXISTS (
SELECT 1
FROM employees e
WHERE e.dept_id = d.dept_id
AND e.salary > 15000
);
高级连接查询:
-- 内连接(INNER JOIN)
SELECT
e.emp_name,
d.dept_name,
p.project_name
FROM employees e
INNER JOIN departments d ON e.dept_id = d.dept_id
INNER JOIN projects p ON e.dept_id = p.dept_id
WHERE p.status = 'active';
-- 左外连接(LEFT JOIN)
SELECT
d.dept_name,
COUNT(e.emp_id) as employee_count
FROM departments d
LEFT JOIN employees e ON d.dept_id = e.dept_id
GROUP BY d.dept_id, d.dept_name;
-- 右外连接(RIGHT JOIN)
SELECT
e.emp_name,
p.project_name
FROM employees e
RIGHT JOIN project_assignments pa ON e.emp_id = pa.emp_id
RIGHT JOIN projects p ON pa.project_id = p.project_id;
-- 全外连接模拟(UNION + LEFT/RIGHT JOIN)
SELECT
e.emp_name,
d.dept_name
FROM employees e
LEFT JOIN departments d ON e.dept_id = d.dept_id
UNION
SELECT
e.emp_name,
d.dept_name
FROM employees e
RIGHT JOIN departments d ON e.dept_id = d.dept_id;
-- 自连接(查询员工和经理)
SELECT
emp.emp_name as employee_name,
mgr.emp_name as manager_name
FROM employees emp
LEFT JOIN employees mgr ON emp.manager_id = mgr.emp_id;
-- 交叉连接(CROSS JOIN)
SELECT
e.emp_name,
p.project_name
FROM employees e
CROSS JOIN projects p
WHERE e.dept_id = p.dept_id;
-- 自然连接(NATURAL JOIN)- 不推荐在生产环境使用
SELECT
emp_name,
dept_name
FROM employees
NATURAL JOIN departments;
联合查询与集合操作:
-- UNION(去重)
SELECT
emp_name as name,
'employee' as type,
salary
FROM employees
WHERE salary > 8000
UNION
SELECT
dept_name as name,
'department' as type,
NULL as salary
FROM departments
WHERE budget > 100000;
-- UNION ALL(不去重)
SELECT
emp_name,
dept_id
FROM employees
WHERE hire_date >= '2023-01-01'
UNION ALL
SELECT
emp_name,
dept_id
FROM former_employees
WHERE leave_date >= '2023-01-01';
-- INTERSECT模拟(MySQL 8.0.31+ 直接支持)
SELECT emp_name
FROM employees
WHERE dept_id = 1
INTERSECT
SELECT emp_name
FROM employees
WHERE salary > 8000;
-- 在旧版本中模拟INTERSECT
SELECT DISTINCT e1.emp_name
FROM employees e1
INNER JOIN employees e2 ON e1.emp_name = e2.emp_name
WHERE e1.dept_id = 1 AND e2.salary > 8000;
-- EXCEPT/MINUS模拟
SELECT emp_name
FROM employees
WHERE dept_id = 1
EXCEPT
SELECT emp_name
FROM employees
WHERE salary <= 8000;
-- 在旧版本中模拟EXCEPT
SELECT e1.emp_name
FROM employees e1
LEFT JOIN employees e2 ON e1.emp_name = e2.emp_name AND e2.salary <= 8000
WHERE e1.dept_id = 1 AND e2.emp_name IS NULL;
窗口函数:排名、分组、累计计算
排名窗口函数:
-- 基本排名
SELECT
emp_name,
salary,
dept_id,
ROW_NUMBER() OVER (ORDER BY salary DESC) as rank_all,
RANK() OVER (ORDER BY salary DESC) as rank_with_ties,
DENSE_RANK() OVER (ORDER BY salary DESC) as dense_rank_no_gaps
FROM employees;
-- 分区排名
SELECT
emp_name,
salary,
dept_id,
ROW_NUMBER() OVER (PARTITION BY dept_id ORDER BY salary DESC) as dept_rank,
RANK() OVER (PARTITION BY dept_id ORDER BY salary DESC) as dept_rank_with_ties
FROM employees;
-- 前N名查询
WITH ranked_employees AS (
SELECT
emp_name,
salary,
dept_id,
ROW_NUMBER() OVER (PARTITION BY dept_id ORDER BY salary DESC) as rn
FROM employees
)
SELECT *
FROM ranked_employees
WHERE rn <= 3; -- 每个部门前3名
聚合窗口函数:
-- 累计计算
SELECT
emp_name,
hire_date,
salary,
SUM(salary) OVER (
ORDER BY hire_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total,
AVG(salary) OVER (
PARTITION BY dept_id
ORDER BY hire_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_3,
SUM(salary) OVER (
PARTITION BY dept_id
) as dept_total_salary
FROM employees
ORDER BY hire_date;
-- 前后值访问
SELECT
emp_name,
hire_date,
salary,
LAG(salary, 1) OVER (ORDER BY hire_date) as prev_salary,
LEAD(salary, 1) OVER (ORDER BY hire_date) as next_salary,
salary - LAG(salary, 1) OVER (ORDER BY hire_date) as salary_change
FROM employees;
-- 首尾值访问
SELECT
emp_name,
dept_id,
salary,
FIRST_VALUE(salary) OVER (
PARTITION BY dept_id
ORDER BY salary DESC
) as highest_in_dept,
LAST_VALUE(salary) OVER (
PARTITION BY dept_id
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as lowest_in_dept
FROM employees;
窗口帧详解:
-- 不同的窗口帧定义
SELECT
emp_name,
hire_date,
salary,
-- 从开始到当前行
SUM(salary) OVER (
ORDER BY hire_date
ROWS UNBOUNDED PRECEDING
) as running_total,
-- 最近3行(包括当前行)
AVG(salary) OVER (
ORDER BY hire_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_3,
-- 前后各1行
AVG(salary) OVER (
ORDER BY hire_date
ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
) as centered_avg,
-- 分组内所有行
AVG(salary) OVER (
PARTITION BY dept_id
) as dept_avg
FROM employees
ORDER BY hire_date;
公用表表达式(CTE)与递归查询
普通CTE:
-- 简单CTE
WITH department_stats AS (
SELECT
dept_id,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM employees
GROUP BY dept_id
),
high_paid_employees AS (
SELECT
e.emp_name,
e.salary,
d.dept_name
FROM employees e
JOIN departments d ON e.dept_id = d.dept_id
JOIN department_stats ds ON e.dept_id = ds.dept_id
WHERE e.salary > ds.avg_salary * 1.2
)
SELECT
dept_name,
COUNT(*) as high_paid_count,
AVG(salary) as avg_high_salary
FROM high_paid_employees
GROUP BY dept_name
ORDER BY high_paid_count DESC;
-- 多CTE链式使用
WITH
employee_data AS (
SELECT
emp_id,
emp_name,
salary,
dept_id
FROM employees
WHERE status = 'active'
),
department_data AS (
SELECT
dept_id,
dept_name,
budget
FROM departments
),
combined_data AS (
SELECT
e.emp_name,
e.salary,
d.dept_name,
d.budget
FROM employee_data e
JOIN department_data d ON e.dept_id = d.dept_id
)
SELECT
dept_name,
AVG(salary) as avg_salary,
SUM(salary) / budget as salary_budget_ratio
FROM combined_data
GROUP BY dept_name, budget
HAVING salary_budget_ratio < 0.8;
递归CTE:
-- 组织结构递归查询
CREATE TABLE organization (
emp_id INT PRIMARY KEY,
emp_name VARCHAR(100),
manager_id INT,
title VARCHAR(100)
);
-- 递归CTE查询完整汇报链
WITH RECURSIVE employee_hierarchy AS (
-- 锚点:顶级管理者(没有经理)
SELECT
emp_id,
emp_name,
manager_id,
title,
0 as level,
CAST(emp_name AS CHAR(1000)) as hierarchy_path
FROM organization
WHERE manager_id IS NULL
UNION ALL
-- 递归部分:下属员工
SELECT
o.emp_id,
o.emp_name,
o.manager_id,
o.title,
eh.level + 1,
CONCAT(eh.hierarchy_path, ' -> ', o.emp_name)
FROM organization o
JOIN employee_hierarchy eh ON o.manager_id = eh.emp_id
)
SELECT
emp_id,
emp_name,
title,
level,
hierarchy_path
FROM employee_hierarchy
ORDER BY hierarchy_path;
-- 数字序列生成
WITH RECURSIVE number_sequence AS (
SELECT 1 as num
UNION ALL
SELECT num + 1
FROM number_sequence
WHERE num < 100
)
SELECT num FROM number_sequence;
-- 日期序列生成
WITH RECURSIVE date_sequence AS (
SELECT '2023-01-01' as date_val
UNION ALL
SELECT date_val + INTERVAL 1 DAY
FROM date_sequence
WHERE date_val < '2023-01-31'
)
SELECT
date_val,
DAYNAME(date_val) as day_name
FROM date_sequence;
JSON函数与空间数据查询
JSON函数深度应用:
-- JSON创建函数
SELECT
emp_name,
salary,
JSON_OBJECT(
'name', emp_name,
'salary', salary,
'department', dept_id,
'hire_year', YEAR(hire_date)
) as emp_json
FROM employees
LIMIT 5;
-- JSON数组操作
SELECT
dept_id,
JSON_ARRAYAGG(
JSON_OBJECT(
'name', emp_name,
'salary', salary
)
) as employees_json
FROM employees
GROUP BY dept_id;
-- JSON查询函数
SELECT
emp_name,
JSON_EXTRACT(profile, '$.contact.email') as email,
profile->>'$.contact.phone' as phone, -- 简写形式
JSON_UNQUOTE(JSON_EXTRACT(profile, '$.address.city')) as city
FROM employees
WHERE JSON_CONTAINS_PATH(profile, 'one', '$.skills')
AND JSON_LENGTH(profile->'$.skills') >= 3;
-- JSON修改函数
UPDATE employees
SET profile = JSON_SET(
profile,
'$.last_updated', CURRENT_TIMESTAMP,
'$.contact.phone', '+86-13800138000'
)
WHERE emp_id = 1001;
-- JSON搜索和索引
SELECT
emp_name,
profile->>'$.title' as job_title
FROM employees
WHERE JSON_SEARCH(profile, 'one', '%经理%') IS NOT NULL;
-- 创建JSON索引(MySQL 8.0.17+)
CREATE TABLE products (
id INT PRIMARY KEY AUTO_INCREMENT,
product_data JSON,
-- 函数索引
INDEX idx_product_name ((CAST(product_data->>'$.name' AS CHAR(100)))),
INDEX idx_product_price ((CAST(product_data->>'$.price' AS DECIMAL(10,2))))
);
空间数据查询:
-- 空间数据创建和查询
CREATE TABLE locations (
location_id INT PRIMARY KEY AUTO_INCREMENT,
location_name VARCHAR(100),
coordinates POINT NOT NULL,
area_boundary POLYGON,
SPATIAL INDEX idx_coordinates (coordinates),
SPATIAL INDEX idx_area (area_boundary)
);
-- 插入空间数据
INSERT INTO locations (location_name, coordinates, area_boundary)
VALUES (
'公司总部',
ST_GeomFromText('POINT(116.3974 39.9093)'),
ST_GeomFromText('POLYGON((116.396 39.908, 116.398 39.908, 116.398 39.910, 116.396 39.910, 116.396 39.908))')
);
-- 空间查询
SELECT
location_name,
ST_AsText(coordinates) as coordinates,
ST_X(coordinates) as longitude,
ST_Y(coordinates) as latitude
FROM locations;
-- 距离计算
SELECT
l1.location_name as place1,
l2.location_name as place2,
ST_Distance_Sphere(l1.coordinates, l2.coordinates) as distance_meters
FROM locations l1
CROSS JOIN locations l2
WHERE l1.location_id != l2.location_id;
-- 包含查询
SELECT
location_name
FROM locations
WHERE ST_Contains(
area_boundary,
ST_GeomFromText('POINT(116.3974 39.9093)')
);
-- 缓冲区查询
SELECT
location_name,
ST_AsText(ST_Buffer(coordinates, 1000)) as buffer_zone -- 1000米缓冲区
FROM locations;
2. 存储过程与函数
存储过程编写与调试
基础存储过程:
-- 创建存储过程
DELIMITER //
CREATE PROCEDURE GetEmployeeStatistics(
IN p_dept_id INT,
OUT p_employee_count INT,
OUT p_avg_salary DECIMAL(10,2),
OUT p_max_salary DECIMAL(10,2)
)
BEGIN
-- 声明局部变量
DECLARE v_total_budget DECIMAL(12,2);
-- 业务逻辑
SELECT
COUNT(*),
AVG(salary),
MAX(salary)
INTO
p_employee_count,
p_avg_salary,
p_max_salary
FROM employees
WHERE dept_id = p_dept_id
AND status = 'active';
-- 调试信息(在生产环境可注释)
SELECT CONCAT('部门 ', p_dept_id, ' 统计完成') as debug_info;
END //
DELIMITER ;
-- 调用存储过程
CALL GetEmployeeStatistics(1, @emp_count, @avg_sal, @max_sal);
SELECT @emp_count, @avg_sal, @max_sal;
带条件逻辑的存储过程:
DELIMITER //
CREATE PROCEDURE UpdateEmployeeSalary(
IN p_emp_id INT,
IN p_increase_percent DECIMAL(5,2),
OUT p_result VARCHAR(500)
)
BEGIN
DECLARE v_current_salary DECIMAL(10,2);
DECLARE v_new_salary DECIMAL(10,2);
DECLARE v_emp_name VARCHAR(100);
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
GET DIAGNOSTICS CONDITION 1
@sqlstate = RETURNED_SQLSTATE,
@errno = MYSQL_ERRNO,
@text = MESSAGE_TEXT;
SET p_result = CONCAT('错误: ', @errno, ' - ', @text);
ROLLBACK;
END;
START TRANSACTION;
-- 获取当前薪资
SELECT emp_name, salary
INTO v_emp_name, v_current_salary
FROM employees
WHERE emp_id = p_emp_id
FOR UPDATE; -- 加锁防止并发更新
IF v_current_salary IS NULL THEN
SET p_result = CONCAT('员工ID ', p_emp_id, ' 不存在');
ROLLBACK;
ELSE
-- 计算新薪资
SET v_new_salary = v_current_salary * (1 + p_increase_percent / 100);
-- 更新薪资
UPDATE employees
SET salary = v_new_salary,
updated_at = CURRENT_TIMESTAMP
WHERE emp_id = p_emp_id;
-- 记录薪资变更历史
INSERT INTO salary_history (emp_id, old_salary, new_salary, change_date, change_reason)
VALUES (p_emp_id, v_current_salary, v_new_salary, NOW(), '年度调薪');
SET p_result = CONCAT(
'员工 ', v_emp_name,
' 薪资从 ', v_current_salary,
' 调整为 ', v_new_salary,
' (涨幅 ', p_increase_percent, '%)'
);
COMMIT;
END IF;
END //
DELIMITER ;
游标使用:
DELIMITER //
CREATE PROCEDURE ProcessDepartmentSalaries(IN p_dept_id INT)
BEGIN
DECLARE v_done INT DEFAULT 0;
DECLARE v_emp_id INT;
DECLARE v_emp_name VARCHAR(100);
DECLARE v_current_salary DECIMAL(10,2);
DECLARE v_new_salary DECIMAL(10,2);
-- 声明游标
DECLARE emp_cursor CURSOR FOR
SELECT emp_id, emp_name, salary
FROM employees
WHERE dept_id = p_dept_id
AND status = 'active';
-- 声明结束处理程序
DECLARE CONTINUE HANDLER FOR NOT FOUND SET v_done = 1;
-- 创建临时表存储结果
CREATE TEMPORARY TABLE IF NOT EXISTS salary_adjustments (
emp_id INT,
emp_name VARCHAR(100),
old_salary DECIMAL(10,2),
new_salary DECIMAL(10,2),
increase_amount DECIMAL(10,2)
);
OPEN emp_cursor;
emp_loop: LOOP
FETCH emp_cursor INTO v_emp_id, v_emp_name, v_current_salary;
IF v_done THEN
LEAVE emp_loop;
END IF;
-- 业务逻辑:根据规则调整薪资
IF v_current_salary < 5000 THEN
SET v_new_salary = v_current_salary * 1.15; -- 低薪员工涨15%
ELSEIF v_current_salary BETWEEN 5000 AND 10000 THEN
SET v_new_salary = v_current_salary * 1.10; -- 中等薪资涨10%
ELSE
SET v_new_salary = v_current_salary * 1.05; -- 高薪员工涨5%
END IF;
-- 更新薪资
UPDATE employees
SET salary = v_new_salary
WHERE emp_id = v_emp_id;
-- 记录调整结果
INSERT INTO salary_adjustments
VALUES (v_emp_id, v_emp_name, v_current_salary, v_new_salary, v_new_salary - v_current_salary);
END LOOP;
CLOSE emp_cursor;
-- 返回处理结果
SELECT * FROM salary_adjustments;
DROP TEMPORARY TABLE salary_adjustments;
END //
DELIMITER ;
自定义函数开发
标量函数:
DELIMITER //
CREATE FUNCTION CalculateTax(
p_salary DECIMAL(10,2),
p_tax_rate DECIMAL(5,3)
)
RETURNS DECIMAL(10,2)
DETERMINISTIC
READS SQL DATA
BEGIN
DECLARE v_tax_amount DECIMAL(10,2);
-- 计算税费(简单的线性计算)
SET v_tax_amount = p_salary * p_tax_rate;
-- 确保不为负数
IF v_tax_amount < 0 THEN
SET v_tax_amount = 0;
END IF;
RETURN v_tax_amount;
END //
DELIMITER ;
-- 使用自定义函数
SELECT
emp_name,
salary,
CalculateTax(salary, 0.1) as tax_amount,
salary - CalculateTax(salary, 0.1) as net_salary
FROM employees;
字符串处理函数:
DELIMITER //
CREATE FUNCTION FormatPhoneNumber(
p_phone VARCHAR(20)
)
RETURNS VARCHAR(20)
DETERMINISTIC
BEGIN
DECLARE v_clean_phone VARCHAR(20);
-- 移除所有非数字字符
SET v_clean_phone = REGEXP_REPLACE(p_phone, '[^0-9]', '');
-- 格式化手机号
IF LENGTH(v_clean_phone) = 11 THEN
RETURN CONCAT(
SUBSTR(v_clean_phone, 1, 3), '-',
SUBSTR(v_clean_phone, 4, 4), '-',
SUBSTR(v_clean_phone, 8, 4)
);
ELSE
RETURN p_phone; -- 无法格式化,返回原值
END IF;
END //
DELIMITER ;
复杂业务逻辑函数:
DELIMITER //
CREATE FUNCTION GetEmployeeLevel(
p_salary DECIMAL(10,2),
p_hire_date DATE,
p_performance_rating INT
)
RETURNS VARCHAR(20)
DETERMINISTIC
BEGIN
DECLARE v_years_worked INT;
DECLARE v_level_score DECIMAL(5,2);
-- 计算工作年限
SET v_years_worked = TIMESTAMPDIFF(YEAR, p_hire_date, CURDATE());
-- 计算级别分数(薪资权重40%,年限权重30%,绩效权重30%)
SET v_level_score =
(p_salary / 10000) * 0.4 + -- 每万元0.4分
(v_years_worked * 0.3) + -- 每年0.3分
(p_performance_rating * 0.3); -- 绩效评分权重
-- 根据分数确定级别
RETURN CASE
WHEN v_level_score >= 8 THEN '专家'
WHEN v_level_score >= 6 THEN '高级'
WHEN v_level_score >= 4 THEN '中级'
ELSE '初级'
END;
END //
DELIMITER ;
触发器设计与应用场景
审计触发器:
-- 创建审计表
CREATE TABLE employee_audit (
audit_id INT AUTO_INCREMENT PRIMARY KEY,
action_type ENUM('INSERT', 'UPDATE', 'DELETE'),
emp_id INT,
old_data JSON,
new_data JSON,
changed_by VARCHAR(100),
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 员工表更新触发器
DELIMITER //
CREATE TRIGGER before_employee_update
BEFORE UPDATE ON employees
FOR EACH ROW
BEGIN
DECLARE v_changes JSON DEFAULT JSON_OBJECT();
-- 检查哪些字段被修改了
IF OLD.emp_name != NEW.emp_name THEN
SET v_changes = JSON_SET(v_changes, '$.emp_name', JSON_OBJECT(
'old', OLD.emp_name,
'new', NEW.emp_name
));
END IF;
IF OLD.salary != NEW.salary THEN
SET v_changes = JSON_SET(v_changes, '$.salary', JSON_OBJECT(
'old', OLD.salary,
'new', NEW.salary
));
END IF;
IF OLD.dept_id != NEW.dept_id THEN
SET v_changes = JSON_SET(v_changes, '$.dept_id', JSON_OBJECT(
'old', OLD.dept_id,
'new', NEW.dept_id
));
END IF;
-- 如果有变化,记录审计日志
IF JSON_LENGTH(v_changes) > 0 THEN
INSERT INTO employee_audit (action_type, emp_id, old_data, new_data, changed_by)
VALUES (
'UPDATE',
OLD.emp_id,
JSON_OBJECT(
'emp_name', OLD.emp_name,
'salary', OLD.salary,
'dept_id', OLD.dept_id
),
v_changes,
USER()
);
END IF;
END //
DELIMITER ;
数据一致性触发器:
-- 部门预算检查触发器
DELIMITER //
CREATE TRIGGER before_department_update
BEFORE UPDATE ON departments
FOR EACH ROW
BEGIN
DECLARE v_total_salary DECIMAL(12,2);
-- 计算部门总薪资
SELECT COALESCE(SUM(salary), 0)
INTO v_total_salary
FROM employees
WHERE dept_id = NEW.dept_id
AND status = 'active';
-- 检查预算是否足够
IF NEW.budget < v_total_salary THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = '部门预算不能低于员工总薪资';
END IF;
END //
DELIMITER ;
派生数据触发器:
-- 维护部门统计信息的触发器
DELIMITER //
CREATE TRIGGER after_employee_change
AFTER INSERT OR UPDATE OR DELETE ON employees
FOR EACH ROW
BEGIN
DECLARE affected_dept_id INT;
-- 确定受影响的部门
IF INSERTING THEN
SET affected_dept_id = NEW.dept_id;
ELSEIF UPDATING THEN
-- 如果部门变更,两个部门都受影响
IF OLD.dept_id != NEW.dept_id THEN
CALL UpdateDepartmentStats(OLD.dept_id);
SET affected_dept_id = NEW.dept_id;
ELSE
SET affected_dept_id = NEW.dept_id;
END IF;
ELSE -- DELETING
SET affected_dept_id = OLD.dept_id;
END IF;
-- 更新部门统计
CALL UpdateDepartmentStats(affected_dept_id);
END //
DELIMITER ;
事件调度器实现定时任务
基础事件调度:
-- 启用事件调度器
SET GLOBAL event_scheduler = ON;
-- 创建每日统计事件
DELIMITER //
CREATE EVENT daily_department_stats
ON SCHEDULE EVERY 1 DAY
STARTS '2023-01-01 02:00:00'
COMMENT '每日部门统计'
DO
BEGIN
-- 防止事件重叠执行
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
INSERT INTO event_errors (event_name, error_message, occurred_at)
VALUES ('daily_department_stats', '执行失败', NOW());
END;
-- 更新部门统计表
REPLACE INTO department_daily_stats (stat_date, dept_id, employee_count, total_salary, avg_salary)
SELECT
CURDATE() as stat_date,
dept_id,
COUNT(*) as employee_count,
SUM(salary) as total_salary,
AVG(salary) as avg_salary
FROM employees
WHERE status = 'active'
GROUP BY dept_id;
-- 记录执行日志
INSERT INTO event_logs (event_name, executed_at, records_affected)
VALUES ('daily_department_stats', NOW(), ROW_COUNT());
END //
DELIMITER ;
复杂定时任务:
DELIMITER //
CREATE EVENT monthly_salary_report
ON SCHEDULE
EVERY 1 MONTH
STARTS TIMESTAMP(DATE_FORMAT(NOW() + INTERVAL 1 MONTH, '%Y-%m-01 03:00:00'))
COMMENT '月度薪资报告'
DO
BEGIN
DECLARE v_report_month DATE;
DECLARE v_previous_month DATE;
-- 设置报告月份(上个月)
SET v_report_month = DATE_FORMAT(NOW() - INTERVAL 1 MONTH, '%Y-%m-01');
SET v_previous_month = v_report_month - INTERVAL 1 MONTH;
-- 创建月度薪资报告
INSERT INTO monthly_salary_reports (report_month, dept_id, employee_count, total_salary, avg_salary, salary_growth)
SELECT
v_report_month as report_month,
dept_id,
COUNT(*) as employee_count,
SUM(salary) as total_salary,
AVG(salary) as avg_salary,
-- 计算薪资增长率
(AVG(salary) - COALESCE(
(SELECT avg_salary
FROM monthly_salary_reports
WHERE report_month = v_previous_month
AND dept_id = e.dept_id),
AVG(salary)
)) / COALESCE(
(SELECT avg_salary
FROM monthly_salary_reports
WHERE report_month = v_previous_month
AND dept_id = e.dept_id),
AVG(salary)
) * 100 as salary_growth_percent
FROM employees e
WHERE status = 'active'
GROUP BY dept_id;
-- 生成高管报告
INSERT INTO executive_reports (report_month, report_type, report_data)
SELECT
v_report_month,
'salary_analysis',
JSON_OBJECT(
'total_employees', (SELECT COUNT(*) FROM employees WHERE status = 'active'),
'total_payroll', (SELECT SUM(salary) FROM employees WHERE status = 'active'),
'department_breakdown', (
SELECT JSON_ARRAYAGG(
JSON_OBJECT(
'dept_id', dept_id,
'dept_name', dept_name,
'employee_count', employee_count,
'avg_salary', avg_salary
)
)
FROM monthly_salary_reports
WHERE report_month = v_report_month
)
)
FROM dual;
END //
DELIMITER ;
事件管理:
-- 查看事件状态
SHOW EVENTS;
-- 查看事件定义
SHOW CREATE EVENT monthly_salary_report;
-- 修改事件
ALTER EVENT monthly_salary_report
ON SCHEDULE EVERY 1 MONTH
STARTS CURRENT_TIMESTAMP + INTERVAL 1 DAY
ENABLE;
-- 暂停事件
ALTER EVENT monthly_salary_report DISABLE;
-- 删除事件
DROP EVENT IF EXISTS monthly_salary_report;
-- 事件监控
SELECT
event_schema as database_name,
event_name,
definer,
time_zone,
event_definition as sql_code,
execute_at,
interval_value,
interval_field,
created,
last_altered,
status
FROM information_schema.events
WHERE event_schema = 'company';
3. 事务与并发控制
ACID特性深度理解
原子性(Atomicity)实现:
-- 银行转账事务 - 原子性示例
START TRANSACTION;
-- 检查账户余额
SELECT balance INTO @current_balance
FROM accounts
WHERE account_id = 123
FOR UPDATE;
IF @current_balance < 1000 THEN
-- 余额不足,回滚事务
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '余额不足';
END IF;
-- 扣款
UPDATE accounts
SET balance = balance - 1000
WHERE account_id = 123;
-- 存款
UPDATE accounts
SET balance = balance + 1000
WHERE account_id = 456;
-- 记录交易
INSERT INTO transactions (from_account, to_account, amount, transaction_time)
VALUES (123, 456, 1000, NOW());
COMMIT;
一致性(Consistency)保证:
-- 使用约束保证一致性
CREATE TABLE orders (
order_id INT AUTO_INCREMENT PRIMARY KEY,
customer_id INT NOT NULL,
order_amount DECIMAL(10,2) NOT NULL CHECK (order_amount > 0),
order_date DATE NOT NULL,
status ENUM('pending', 'confirmed', 'shipped', 'delivered') DEFAULT 'pending',
-- 外键约束
FOREIGN KEY (customer_id) REFERENCES customers(customer_id),
-- 检查约束(MySQL 8.0.16+)
CONSTRAINT chk_order_date CHECK (order_date >= '2020-01-01')
);
-- 事务中的一致性检查
START TRANSACTION;
-- 业务逻辑检查
SELECT COUNT(*) INTO @active_products
FROM products
WHERE product_id IN (SELECT product_id FROM order_items WHERE order_id = 1001)
AND status = 'active';
IF @active_products = 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '订单中没有有效商品';
END IF;
-- 继续其他操作...
COMMIT;
事务隔离级别与实现原理
隔离级别对比:
-- 查看当前隔离级别
SELECT @@transaction_isolation;
-- 设置会话隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 不同隔离级别的现象演示
-- 1. 读未提交(READ UNCOMMITTED) - 脏读
-- 事务A
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
START TRANSACTION;
UPDATE accounts SET balance = 2000 WHERE account_id = 1; -- 未提交
-- 事务B(在另一个连接中)
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
START TRANSACTION;
SELECT balance FROM accounts WHERE account_id = 1; -- 可能读到2000(脏读)
-- 2. 读已提交(READ COMMITTED) - 避免脏读,但不可重复读
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 3. 可重复读(REPEATABLE READ) - MySQL默认级别
SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 4. 串行化(SERIALIZABLE) - 最高隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;
隔离级别实战:
-- 可重复读级别下的幻读问题
-- 事务A
START TRANSACTION;
SELECT COUNT(*) FROM employees WHERE salary > 8000; -- 假设返回10
-- 事务B(在另一个连接中插入新员工)
START TRANSACTION;
INSERT INTO employees (emp_name, salary, dept_id)
VALUES ('新员工', 9000, 1);
COMMIT;
-- 事务A再次查询
SELECT COUNT(*) FROM employees WHERE salary > 8000; -- 在REPEATABLE READ中仍然返回10
-- 串行化级别解决幻读
SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;
START TRANSACTION;
SELECT COUNT(*) FROM employees WHERE salary > 8000; -- 加锁,阻止其他事务插入
-- 事务B的插入会被阻塞,直到事务A提交
MVCC多版本并发控制机制
MVCC原理演示:
-- 查看InnoDB事务信息
SELECT * FROM information_schema.INNODB_TRX;
-- MVCC示例
-- 事务A
START TRANSACTION;
SELECT * FROM employees WHERE emp_id = 1; -- 读取当前版本
-- 事务B修改同一条记录
START TRANSACTION;
UPDATE employees SET salary = salary + 1000 WHERE emp_id = 1;
COMMIT;
-- 事务A再次读取(REPEATABLE READ级别下看到的是旧版本)
SELECT * FROM employees WHERE emp_id = 1; -- 薪资未变化
-- 提交事务A后看到新版本
COMMIT;
SELECT * FROM employees WHERE emp_id = 1; -- 看到更新后的薪资
MVCC与Undo Log:
-- Undo Log维护多版本数据
-- 当执行UPDATE时:
-- 1. 将旧数据复制到Undo Log
-- 2. 修改当前数据
-- 3. 更新DB_ROLL_PTR指向Undo Log中的旧版本
-- 长事务对Undo Log的影响
SELECT
t.trx_id,
t.trx_started,
TIMESTAMPDIFF(SECOND, t.trx_started, NOW()) as duration_seconds,
t.trx_state,
t.trx_operation_state
FROM information_schema.INNODB_TRX t
ORDER BY t.trx_started;
-- 监控Undo Log使用
SHOW ENGINE INNODB STATUS;
死锁检测与避免策略
死锁场景分析:
-- 死锁示例
-- 事务1
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1; -- 锁住账户1
-- 事务2
START TRANSACTION;
UPDATE accounts SET balance = balance - 200 WHERE account_id = 2; -- 锁住账户2
-- 事务1尝试锁住账户2
UPDATE accounts SET balance = balance + 100 WHERE account_id = 2; -- 等待事务2释放锁
-- 事务2尝试锁住账户1
UPDATE accounts SET balance = balance + 200 WHERE account_id = 1; -- 等待事务1释放锁,死锁!
-- InnoDB会自动检测到死锁并回滚其中一个事务
死锁避免策略:
-- 1. 按固定顺序访问资源
-- 好的做法:总是先访问ID小的账户
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE account_id = LEAST(1, 2);
UPDATE accounts SET balance = balance + 100 WHERE account_id = GREATEST(1, 2);
COMMIT;
-- 2. 使用锁超时
SET SESSION innodb_lock_wait_timeout = 10; -- 设置锁等待超时为10秒
-- 3. 使用NOWAIT和SKIP LOCKED(MySQL 8.0+)
START TRANSACTION;
SELECT * FROM accounts WHERE account_id = 1 FOR UPDATE NOWAIT; -- 如果锁被占用立即报错
SELECT * FROM accounts WHERE account_id = 1 FOR UPDATE SKIP LOCKED; -- 跳过被锁定的行
-- 4. 减少事务大小和时间
START TRANSACTION;
-- 尽快完成事务,减少锁持有时间
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
UPDATE accounts SET balance = balance + 100 WHERE account_id = 2;
COMMIT; -- 立即提交
-- 死锁信息分析
SHOW ENGINE INNODB STATUS; -- 查看最近的死锁信息
分布式事务(XA事务)实战
XA事务基础:
-- XA事务示例(跨多个数据库)
-- 第一阶段:准备阶段
XA START 'xid1'; -- 开始XA事务
UPDATE accounts SET balance = balance - 1000 WHERE account_id = 123;
UPDATE accounts SET balance = balance + 1000 WHERE account_id = 456;
XA END 'xid1';
XA PREPARE 'xid1'; -- 准备提交
-- 第二阶段:提交阶段
XA COMMIT 'xid1'; -- 提交事务
-- 或者回滚
-- XA ROLLBACK 'xid1';
-- 恢复中断的XA事务
XA RECOVER; -- 查看PREPARED状态的XA事务
-- 对于PREPARED状态的事务,可以决定提交或回滚
XA COMMIT 'xid_recovered';
-- 或者 XA ROLLBACK 'xid_recovered';
分布式事务管理:
-- 监控XA事务
SELECT * FROM performance_schema.events_transactions_current
WHERE STATE = 'PREPARED';
-- XA事务错误处理
DELIMITER //
CREATE PROCEDURE DistributedTransfer(
IN p_from_account INT,
IN p_to_account INT,
IN p_amount DECIMAL(10,2)
)
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
-- 回滚XA事务
XA END 'transfer_xid';
XA ROLLBACK 'transfer_xid';
RESIGNAL;
END;
-- 开始XA事务
XA START 'transfer_xid';
-- 业务操作
UPDATE accounts SET balance = balance - p_amount
WHERE account_id = p_from_account;
UPDATE accounts SET balance = balance + p_amount
WHERE account_id = p_to_account;
-- 结束并准备
XA END 'transfer_xid';
XA PREPARE 'transfer_xid';
-- 检查所有参与者是否准备成功
-- 这里可以添加其他数据库的检查逻辑
-- 提交事务
XA COMMIT 'transfer_xid';
END //
DELIMITER ;
总结
通过本篇的深入学习,我们掌握了MySQL SQL编程的核心高级特性:
- 复杂查询能力:子查询、连接查询、窗口函数、CTE递归查询
- 存储程序开发:存储过程、函数、触发器、事件调度器
- 事务管理:ACID特性、隔离级别、MVCC机制、死锁处理
- 高级数据类型:JSON处理、空间数据查询
- 分布式事务:XA事务管理和恢复
关键收获:
- 窗口函数可以优雅地解决复杂的分析需求
- 存储过程和函数封装了业务逻辑,提高代码复用性
- 合理使用事务隔离级别可以平衡性能和数据一致性
- MVCC机制是MySQL高并发的基础
- 分布式事务保证了跨数据库操作的一致性
这些高级特性使得MySQL能够处理复杂的企业级应用场景,为构建高性能、高可用的系统提供了坚实基础。
动手练习:
- 使用窗口函数分析你业务数据的排名和趋势
- 编写存储过程实现复杂的业务逻辑
- 设计触发器实现数据变更的审计跟踪
- 配置事件调度器实现定时数据维护任务
- 测试不同事务隔离级别对并发性能的影响
欢迎在评论区分享你的SQL编程经验和遇到的问题!