本文源码:GitHub·点这里 || gitEE·点这里一、数据同步简介1、场景描述如果经常接触数据开发,会有这样一个场景,服务A提供一个数据源,假设称为动态数据源A,需要读取该数据源下的数据;服务B提供一个数据源,假设称为动态数据源B,
如果经常接触数据开发,会有这样一个场景,服务A提供一个数据源,假设称为动态数据源A,需要读取该数据源下的数据;服务B提供一个数据源,假设称为动态数据源B,需要写入数据到该数据源。这个场景通常描述为数据同步,或者数据搬运。
基于上述流程图,整体步骤如下:
Java中JDBC下执行数据库操作的一个重要接口,在已经建立数据库连接的基础上,向数据库发送要执行的SQL语句。
继承Statement接口,且实现SQL预编译,可以提高批量处理效率。常应用于批量数据写入场景。
存储JDBC查询结果集的对象,ResultSet接口提供从当前行检索列值的方法。
提供一个数据源管理的Factory,当前场景下主要管理一个读库即数据源A,和一个写库即数据源B,数据源连接验证通过,放入容器中。
@Componentpublic class ConnectionFactory { private volatile Map<String, Connection> connectionMap = new HashMap<>(); @Resource private JdbcConfig jdbcConfig ; @PostConstruct public void init (){ ConnectionEntity read = new ConnectionEntity( "MySql","jdbc:Mysql://localhost:3306/data_read","user01","123"); if (jdbcConfig.getConnection(read) != null){ connectionMap.put(JdbcConstant.READ,jdbcConfig.getConnection(read)); } ConnectionEntity write = new ConnectionEntity( "mysql","jdbc:mysql://localhost:3306/data_write","user01","123"); if (jdbcConfig.getConnection(write) != null){ connectionMap.put(JdbcConstant.WRITE,jdbcConfig.getConnection(write)); } } public Connection getByKey (final String key){ return connectionMap.get(key) ; }}
基础SQL管理
主要提供SQL的基础模板,例如全表查,分页查,表结构查询。
public class BaseSql { public static String READ_SQL = "SELECT * FROM %s LIMIT 1"; public static String WRITE_SQL = "INSERT INTO %s (SELECT * FROM %s WHERE 1=0)" ; public static String CREATE_SQL = "SHOW CREATE TABLE %s" ; public static String SELECT_SQL = "SELECT * FROM %s" ; public static String COUNT_SQL = "SELECT COUNT(1) countNum FROM %s" ; public static String PAGE_SQL = "SELECT * FROM %s LIMIT %s,%s" ; public static String STRUCT_SQL (){ StringBuffer sql = new StringBuffer() ; sql.append(" SELECT "); sql.append(" COLUMN_NAME, "); sql.append(" IS_NULLABLE, "); sql.append(" COLUMN_TYPE, "); sql.append(" COLUMN_KEY, "); sql.append(" COLUMN_COMMENT "); sql.append(" FROM "); sql.append(" infORMation_schema.COLUMNS "); sql.append(" WHERE "); sql.append(" table_schema = '%s' "); sql.append(" AND table_name = '%s' "); return String.valueOf(sql) ; }}
SQL参数拼接
根据SQL模板中缺失的参数,进行动态补全,生成完成SQL语句。
public class BuildSql { public static String buildReadSql(String table) { String readSql = null ; if (StringUtils.isNotEmpty(table)){ readSql = String.format(BaseSql.READ_SQL, table); } return readSql; } public static String buildWriteSql(String table){ String writeSql = null ; if (StringUtils.isNotEmpty(table)){ writeSql = String.format(BaseSql.WRITE_SQL, table,table); } return writeSql ; } public static String buildStructSql (String table){ String structSql = null ; if (StringUtils.isNotEmpty(table)){ structSql = String.format(BaseSql.CREATE_SQL, table); } return structSql ; } public static String buildTableSql (String schema,String table){ String structSql = null ; if (StringUtils.isNotEmpty(table)){ structSql = String.format(BaseSql.STRUCT_SQL(), schema,table); } return structSql ; } public static String buildSelectSql (String table){ String selectSql = null ; if (StringUtils.isNotEmpty(table)){ selectSql = String.format(BaseSql.SELECT_SQL,table); } return selectSql ; } public static String buildCountSql (String table){ String countSql = null ; if (StringUtils.isNotEmpty(table)){ countSql = String.format(BaseSql.COUNT_SQL,table); } return countSql ; } public static String buildPageSql (String table,int offset,int size){ String pageSql = null ; if (StringUtils.isNotEmpty(table)){ pageSql = String.format(BaseSql.PAGE_SQL,table,offset,size); } return pageSql ; }}
读库尝试一次单条数据读取,写库尝试一次不成立条件的写入,如果没有权限,会抛出相应异常。
@RestControllerpublic class CheckController { @Resource private ConnectionFactory connectionFactory ; // MySQLSyntaxErrorException: SELECT command denied to user @GetMapping("/checkRead") public String checkRead (){ try { String sql = BuildSql.buildReadSql("rw_read") ; ExecuteSqlUtil.query(connectionFactory.getByKey(JdbcConstant.READ),sql) ; return "success" ; } catch (SQLException e) { e.printStackTrace(); } return "fail" ; } // MySQLSyntaxErrorException: INSERT command denied to user @GetMapping("/checkWrite") public String checkWrite (){ try { String sql = BuildSql.buildWriteSql("rw_read") ; ExecuteSqlUtil.update(connectionFactory.getByKey(JdbcConstant.WRITE),sql) ; return "success" ; } catch (SQLException e) { e.printStackTrace(); } return "fail" ; }}
这里执行最简单操作,把读库表创建语句查询出来,丢到写库中执行。
@RestControllerpublic class StructController { @Resource private ConnectionFactory connectionFactory ; @GetMapping("/syncStruct") public String syncStruct (){ try { String sql = BuildSql.buildStructSql("rw_read") ; ResultSet resultSet = ExecuteSqlUtil.query(connectionFactory.getByKey(JdbcConstant.READ),sql) ; String createTableSql = null ; while (resultSet.next()){ createTableSql = resultSet.getString("Create Table") ; } if (StringUtils.isNotEmpty(createTableSql)){ ExecuteSqlUtil.update(connectionFactory.getByKey(JdbcConstant.WRITE),createTableSql) ; } return "success" ; } catch (SQLException e) { e.printStackTrace(); } return "fail" ; }}
读库的表数据读取,批量放入写库中。这里特别说一个方法:statement.setObject();在不知道参数个数和类型时,自动适配数据类型。
@RestControllerpublic class DataSyncController { @Resource private ConnectionFactory connectionFactory ; @GetMapping("/dataSync") public List<RwReadEntity> dataSync (){ List<RwReadEntity> rwReadEntities = new ArrayList<>() ; try { Connection readConnection = connectionFactory.getByKey(JdbcConstant.READ) ; String sql = BuildSql.buildSelectSql("rw_read") ; ResultSet resultSet = ExecuteSqlUtil.query(readConnection,sql) ; while (resultSet.next()){ RwReadEntity rwReadEntity = new RwReadEntity() ; rwReadEntity.setId(resultSet.getInt("id")); rwReadEntity.setSign(resultSet.getString("sign")); rwReadEntities.add(rwReadEntity) ; } if (rwReadEntities.size() > 0){ Connection writeConnection = connectionFactory.getByKey(JdbcConstant.WRITE) ; writeConnection.setAutoCommit(false); PreparedStatement statement = writeConnection.prepareStatement("INSERT INTO rw_read VALUES(?,?)"); // 基于动态获取列,和statement.setObject();自动适配数据类型 for (int i = 0 ; i < rwReadEntities.size() ; i++){ RwReadEntity rwReadEntity = rwReadEntities.get(i) ; statement.setInt(1,rwReadEntity.getId()) ; statement.setString(2,rwReadEntity.getSign()) ; statement.addBatch(); if (i>0 && i%2==0){ statement.executeBatch() ; } } // 处理最后一批数据 statement.executeBatch(); writeConnection.commit(); } return rwReadEntities ; } catch (SQLException e) { e.printStackTrace(); } return null ; }}
提供一个分页查询工具,在数据量大的情况下不能一次性读取大量的数据,避免资源占用过高。
public class PageUtilEntity { public static PageHelperEntity<Object> pageResult (int total, int pageSize,int currentPage, List dataList){ PageHelperEntity<Object> pageBean = new PageHelperEntity<Object>(); // 总页数 int totalPage = PageHelperEntity.countTotalPage(pageSize,total) ; // 分页列表 List<Integer> pageList = PageHelperEntity.pageList(currentPage,pageSize,total) ; // 上一页 int prevPage = 0 ; if (currentPage==1){ prevPage = currentPage ; } else if (currentPage>1&¤tPage<=totalPage){ prevPage = currentPage -1 ; } // 下一页 int nextPage =0 ; if (totalPage==1){ nextPage = currentPage ; } else if (currentPage<=totalPage-1){ nextPage = currentPage+1 ; } pageBean.setDataList(dataList); pageBean.setTotal(total); pageBean.setPageSize(pageSize); pageBean.setCurrentPage(currentPage); pageBean.setTotalPage(totalPage); pageBean.setPageList(pageList); pageBean.setPrevPage(prevPage); pageBean.setNextPage(nextPage); pageBean.initjudge(); return pageBean ; }}
很多复杂度偏高的业务,越是需要借助基础API解决,因为复杂度高,不容易抽象化统一封装,如果数据同步这块业务,可以适配多种数据库,完全可以独立封装为中间件,开源项目中关于多方数据同步或计算的中间件也有好多,可以自行了解下,增长眼界开阔思路。
GitHub·地址https://github.com/cicadasmile/data-manage-parentGitEE·地址Https://gitee.com/cicadasmile/data-manage-parent
推荐相关阅读 |
---|
数据源管理:主从库动态路由,aop模式读写分离 |
数据源管理:基于JDBC模式,适配和管理动态数据源 |
--结束END--
本文标题: 数据源管理 | 动态权限校验,表结构和数据迁移流程
本文链接: https://lsjlt.com/news/229812.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0