irpas技术客

定义Mybatis拦截器动态切换postgre数据库schema_Elon.Yang_mybatis 动态schema

网络 4383

背景

随着业务的发展和合规要求,产品数据库将切换到Postgres。之前不同技术域,不同交付工程的数据分库管理的方式切换到PG数据库后将通过分schema管理。 ORM继续使用Mybatis,为使用迁移工作量尽可能小,现有的SQL代码不做大的修改。动态数据源实现考虑在Mybatis执行过程中做拦截,替换sql中的schema标识。

提取请求参数中的schema

约定rest接口请求Header参数中增加schema信息。通过切面技术从请求头中提取schema后保存到线程变量。

1. 提取schema package com.postgres.manager; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.After; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; /** * Schema切面, 提取header头中的schema保存到SchemaHolder中 * * @author elon * @since 2022-03-20 */ @Aspect @Component @Order(9999) public class SchemaAspect { @Pointcut("@annotation(org.springframework.web.bind.annotation.GetMapping) " + "|| @annotation(org.springframework.web.bind.annotation.PostMapping) " + "|| @annotation(org.springframework.web.bind.annotation.DeleteMapping) " + "|| @annotation(org.springframework.web.bind.annotation.RequestMapping)") void schema() { } /** * 从请求头提取 * * @param joinPoint */ @Before("schema()") public void setSchema(JoinPoint joinPoint) { String schema = getSchemaFromHeader(); SchemaHolder.set(schema); } @After("schema()") public void clearSchema(JoinPoint joinPoint) { SchemaHolder.clear(); } /** * 从请求头中后去schema信息 * * @return schema */ private String getSchemaFromHeader() { HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); String schema = request.getHeader("schema"); return schema; } } 2. 保存schema的线程变量类 package com.postgres.manager; /** * Schema持有类. 用于在异步线程或者跨多个方法传递schema信息 * * @author elon * @since 2022-03-19 */ public class SchemaHolder { private static ThreadLocal<String> schema = new ThreadLocal<>(); public static void set(String sch) { schema.set(sch); } public static String get() { return schema.get(); } public static void clear() { schema.remove(); } } 定义Mybatis拦截器 1. 定义拦截器注解,用于修饰DAO层级接口 package com.postgres.manager; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * schema拦截器注解。修饰mapper接口类,用以区分访问的pg数据库schema * * @author elon * @since 2022-03-20 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface SchemaInterceptAnnotation { /** * schema类型。取值范围:business, common * * @return */ String schemaType() default ""; }

在DAO层接口类加上该注解,拦截器会动态切换schema.

package com.postgres.mapper; import com.postgres.manager.SchemaInterceptAnnotation; import com.postgres.model.ExamResult; import com.postgres.model.User; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.util.List; @Mapper @SchemaInterceptAnnotation(schemaType = "business") public interface UserMapper { /** * 从schema获取user数据 * * @return user列表 */ List<User> getUserFromSchema(@Param("name") String name); /** * 插入用户数据到schema * * @param userList 用户列表 */ void insertUser2Schema(@Param("list") List<User> userList); /** * 获取测试成绩. * * @return 测试成绩列表 */ List<ExamResult> getExamResult(); } 2. 拦截器替换sql中的表名为schema.表名 package com.postgres.manager; import org.apache.ibatis.executor.statement.StatementHandler; import org.apache.ibatis.mapping.BoundSql; import org.apache.ibatis.mapping.MappedStatement; import org.apache.ibatis.plugin.*; import org.apache.ibatis.reflection.DefaultReflectorFactory; import org.apache.ibatis.reflection.MetaObject; import org.apache.ibatis.reflection.SystemMetaObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.lang.reflect.Field; import java.sql.Connection; import java.util.Properties; /** * StatementHandler拦截器. 在prepare方法执行前拦截,修改sql语句,增加schema. * * @author elon * @since 2022-03-20 */ @Component @Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})}) public class StatementHandlerInterceptor implements Interceptor { private static final Logger LOGGER = LoggerFactory.getLogger(StatementHandlerInterceptor.class); /** * 业务数据分schema存储 */ private static final String BUSINESS_SCHEMA = "business"; /** * 公共的配置数据(不分schema), 固定库 */ private static final String COMMON_SCHEMA = "common"; @Override public Object intercept(Invocation invocation) throws Throwable { StatementHandler statementHandler = (StatementHandler) invocation.getTarget(); MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY, SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory()); MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement"); String mapperMethod = mappedStatement.getId(); BoundSql boundSql = statementHandler.getBoundSql(); String sql = boundSql.getSql(); String mapperClass = mapperMethod.substring(0, mappedStatement.getId().lastIndexOf(".")); Class<?> classType = Class.forName(mapperClass); SchemaInterceptAnnotation interceptAnnotation = classType.getAnnotation(SchemaInterceptAnnotation.class); String schemaType = interceptAnnotation.schemaType(); String newSql = replaceSqlWithSchema(schemaType, sql, mapperMethod); //通过反射修改sql语句 Field field = boundSql.getClass().getDeclaredField("sql"); field.setAccessible(true); field.set(boundSql, newSql); return invocation.proceed(); } @Override public Object plugin(Object object) { if (object instanceof StatementHandler) { return Plugin.wrap(object, this); } else { return object; } } @Override public void setProperties(Properties properties) { } private String replaceSqlWithSchema(String schemaType, String originalSql, String mapperMethod){ // 替换sql中的表名,加上schema if (BUSINESS_SCHEMA.equals(schemaType)) { String schema = SchemaHolder.get(); return originalSql.replaceAll(" t_", " " + schema + ".t_"); } else if (COMMON_SCHEMA.equals(schemaType)) { return originalSql.replaceAll(" t_", " " + COMMON_SCHEMA + ".t_"); } else { LOGGER.error("Invalid SchemaInterceptAnnotation. mapperMethod:{}", mapperMethod); throw new IllegalArgumentException("Invalid SchemaInterceptAnnotation."); } } } 2. 添加拦截器

加上如下处理, 拦截器才会生效

package com.postgres.config; import com.postgres.manager.StatementHandlerInterceptor; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import java.util.List; @Configuration public class InterceptorConfig { @Autowired private List<SqlSessionFactory> sqlSessionFactoryList; @PostConstruct public void addSqlInterceptor() { StatementHandlerInterceptor interceptor = new StatementHandlerInterceptor(); for (SqlSessionFactory sqlSessionFactory : sqlSessionFactoryList) { sqlSessionFactory.getConfiguration().addInterceptor(interceptor); } } }

完整的Demo代码还包括DataSource配置和XML中SQL,这些和普通的Spring Boot项目无异。参考github上的完整实现代码:https://github.com/ylforever/elon-postgres


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Mybatis #动态schema #1