本文共 9774 字,大约阅读时间需要 32 分钟。
场景:需要将从ODPS数仓中计算得到的大额可疑交易信息导入到业务系统的mysql中供业务系统审核。最简单的方式是用阿里云的组件自动进行数据同步了。但是本系统是开放是为了产品化,要保证不同环境的可移植性,同时同步的表也就6个表,那么就利用现有的基于jdbc的规则引擎工程来自己实现数据的同步。
完整的工程代码可以参考我的github https://github.com/intsmaze/SqlAdapter
查询数据,将结果映射到javabean对象中ps = conn.prepareStatement(select id,name,age from intsmaze);rs = ps.executeQuery();Intsmaze intsmaze = null;while (rs.next()) { intsmaze = new Intsmaze(); intsmaze.setId(rs.getInt(1)); intsmaze.setName(rs.getString(2)); intsmaze.setAge(rs.getInt(3));}添加数据,将javabean对象字段映射到对应的表列String sql = "insert into intsmaze(id,name,age) values (?,?,?)";ps = conn.prepareStatement(sql);ps.setInt(1, intsmaze.getId());ps.setString(2, intsmaze.getName());ps.setInt(3, intsmaze.getAge());ps.executeUpdate();
使用JDBC大家都会使用上面的方式进行开发,但是如果我们的表的字段有50个,而且查询不能使用 select * from 必须指定列名呢?添加数据不能使用insert into intsmaze values()必须指定插入的列名呢?你干脆杀了我吧。
50个字段你要做2次字段列名映射,稍有不慎就会将字段列名映射到错误的位置,导致最后数据错误,最可怕的是,还要编写sql语句,如果后面有新增或删除列名,那么你又要去看一眼映射关系,看看是否影响到。这就是一种费力却没有技术含量的事情,而且还很容易出错。下面就是我们要做的各种映射,你真的很考验我的眼神。
result.getObjectByName("FieldName")javabean.setFieldValue()PreparedStatement.setString(number,FieldVale)
根据javabean自动生成insert,select语句,完成字段列名映射
当初开发时,一看到这么多字段映射我烦躁不安,然后花了半天用反射把代码重新编写了下,后面有新的表要进行同步时,用一个工具类生成javabean的java文件,然后直接就在下面模板代码中替换javabean类就完成了数据同步,整个操作10分钟搞定,是不是很爽。
当然你可以引入orm框架,但是除了hibernate框架,mybatis框架虽然免去了select和insert的映射,但是还是要编写前缀列名,而且我就一个小工程,我再引入ORM框架,麻不麻烦啊,有这时间还不如自己写一写。
public class ModelServer extends ApplicationServer { private static final Logger logger = LoggerFactory.getLogger(ExportBlockCustomerServer.class); private final static String SQL = "get_customer_infor"; private MysqlServicemysqlService = new MysqlService (); @Override public String[] getPaths() { return new String[] { "com/hand/service/exe/blocktrade/blocktrade.xml" }; } private int date = 1; @Override public void addOptions(Options options) { options.addOption("date", true, "天数"); } @Override public void setupOptionValue(CommandLine cmd) { date = Integer.parseInt(cmd.getOptionValue("date", "1")); logger.debug("date is {}", date); } public void service() throws Exception { mysqlService.setMysqlDao(this.getMysqlDao()); AmlException exception = null; for (int i = 1; i <= date; i++) { String exeSql = (String) this.getSqlMap().get(SQL); Result result = this.getSqlAdapter().select(this.getDao(),"SELECT * from test_group");//向odps数据仓库查询数据,并导入到mysql中 String[] names = FilesNameUtils.getFiledName(new TestGroup());//得到这个bean类的所有字段名称,要保证bean类的字段名称和数据库表的列名一致 String insertSql = SqlUtils.getInsertSql("test_group", names);//组装成insert into test_group (id,cny,d,party_id,age) values (?,?,?,?,?)语句 List list = new ArrayList (100); int number = 0; while (result.hasNext()) { result.next(); TestGroup br = (TestGroup) tableToBean(result, i,names);//将odps查询的数据反射到TestGroup类中,不用反射见重载函数 list.add(br); number++; if (number % 100 == 0) { try { mysqlService.insert(insertSql, list, names); } catch (Exception e) { exception = new AmlException(e); } finally { list.clear(); } } } try { mysqlService.insert(insertSql, list, names); logger.info("insert data number is {}", number); } catch (Exception e) { logger.info("insert data number is {}", number); exception = new AmlException(e); } finally { result.close(); } } if (exception != null) { throw exception; } } public static void main(String[] args) throws Exception { ModelServer applicationServer = new ModelServer(); applicationServer.run(args); logger.info("execute sucess......"); System.exit(0); } private Object tableToBean(Result result, int i, String[] names) throws Exception { Class clazz = TestGroup.class; TestGroup testGroup = (TestGroup) clazz.newInstance(); for (int j = 0; j < names.length; j++) { Object object = result.getObjectByName(names[j]); if (object instanceof String) { Field f = clazz.getDeclaredField(names[j]); f.setAccessible(true); f.set(testGroup, object); } else if (object instanceof Date) { Field f = clazz.getDeclaredField(names[j]); f.setAccessible(true); f.set(testGroup, new java.sql.Date(((Date)object).getTime())); } else if (object instanceof Long) { Field f = clazz.getDeclaredField(names[j]); f.setAccessible(true); f.set(testGroup, object); } } return testGroup; } private Bigamountreport tableToBean(Result result, int i) throws SQLException { Bigamountreport bigamountreport = new Bigamountreport(); bigamountreport.setSeqno((String) result.getObjectByName("aml_id")); bigamountreport.setCustomerId((String) result .getObjectByName("party_id")); ......疯狂的set操作return bigamountreport; }}
获得传入对象的字段名称的字符串数据,为了拼接sql使用
public static String[] getFiledName(Object o) { Field[] fields = o.getClass().getDeclaredFields(); String[] fieldNames = new String[fields.length]; for (int i = 0; i < fields.length; i++) { fieldNames[i] = fields[i].getName(); } return fieldNames; }
拼接insert的sql语句
public static String getInsertSql(String tableName ,String[] names) { String insertSql = StringUtils.join("insert into ",tableName ," (#{field_name}) values (#{field_value})"); String fieldName=""; String fieldValue=""; for(int j = 0; j < names.length; j++) { if(j==names.length-1) { fieldName=StringUtils.join(fieldName,names[j]); fieldValue=StringUtils.join(fieldValue,"?"); } else { fieldName=StringUtils.join(fieldName,names[j],","); fieldValue=StringUtils.join(fieldValue,"?",","); } } insertSql=insertSql.replace("#{field_name}", fieldName).replace("#{field_value}", fieldValue); logger.debug("the insert sql is :{}",insertSql); return insertSql; }
最重要的是assembleBeantoPS方法,用于根据映射字段列名
public class MysqlService{ private static final Logger logger = LoggerFactory.getLogger(MysqlService.class); private MysqlDao mysqlDao; public void assembleBeantoPS(PreparedStatement ps, int number, String FileName, Object bean) throws Exception { Type fileType = FilesNameUtils.getFieldType(FileName, bean);//根据属性名称返回字段类型 if (fileType == String.class) { ps.setString(number + 1, (String) FilesNameUtils.getFieldValueByName(FileName, bean)); } else if ("long".equals(fileType.toString()+"")) { ps.setLong(number + 1,(Long) FilesNameUtils.getFieldValueByName(FileName, bean)); } else if (fileType == Long.class) { ps.setLong(number + 1,(Long) FilesNameUtils.getFieldValueByName(FileName, bean)); } else if (fileType == Date.class) { ps.setDate(number + 1,new java.sql.Date(((Date)FilesNameUtils.getFieldValueByName(FileName, bean)).getTime())); } } /** * @author:YangLiu * @date:2017年12月25日 下午3:52:20 * @describe: */ public void insert(String sql, List list, String[] names) throws Exception { boolean iserror=false; PreparedStatement ps = null; try { ps = mysqlDao.getConnection().prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); for (int i = 0; i < list.size(); i++) { T bigamount = list.get(i); try{ for (int j = 0; j < names.length; j++) { assembleBeantoPS(ps, j, names[j], bigamount); } ps.executeUpdate(); }catch (Exception e) { iserror=true; logger.error("插入数据发生错误, occur {} ", e); logger.error("异常数据 {} ", bigamount); } } } catch (Exception e) { mysqlDao.getInstance().free(null, ps, mysqlDao.getConnection()); logger.error("the sql: {} occur {} ", sql, e); throw new AmlException("mysql建立连接时发生异常"); } finally { mysqlDao.getInstance().free(null, ps); if(iserror) { throw new AmlException("向mysql中导入数据时发生异常"); } } } /** * @deprecated * @author:YangLiu * @date:2017年12月25日 下午3:52:20 * @describe:SB写法 */ public boolean insertBatchBigamountrecord(String sql, List list) throws Exception { PreparedStatement ps = null; try { // dseqno,transOrgId,policyNo,antPolicyNo,periodPrem,transactionAmountCny,transactionAmountUsd ps = mysqlDao.getConnection().prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); for (int i = 0; i < list.size(); i++) { Bigamountrecord bigamountrecord = list.get(i); int j = 1; ps.setString(j++, bigamountrecord.getDseqno()); ps.setString(j++, bigamountrecord.getTransOrgId()); ps.setString(j++, bigamountrecord.getPolicyNo()); ......疯狂的set操作 ps.addBatch(); } ps.executeBatch(); } catch (SQLException e) { mysqlDao.getInstance().free(null, ps, mysqlDao.getConnection()); return false; } finally { mysqlDao.getInstance().free(null, ps); } return true; }}