irpas技术客

datax 操作pg,支持insert和update_Emrys02_datax update

大大的周 5394

1.如果需要pg支持update,则需pg版本在9.5以上

可用select version()查看。

可熟悉语法:ON CONFLICT?,PostgreSQL的ON CONFLICT

关键词:如果不存在则插入,存在则更新。

t_hdj_test3的联合主键是id,name;

INSERT INTO "warning_event"."t_hdj_test3" (id,name,sex) VALUES('1'::int4,'aaa'::varchar,'女'::varchar) ON CONFLICT ?( id ,name ) ?DO ?UPDATE SET sex=excluded.sex

修改 PostgresqlWriter.java

删除限制:

?

修改WriterUtil.java

添加postgresql 数据插入类型转换:???????

?

public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) { boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert") || writeMode.trim().toLowerCase().startsWith("replace") || writeMode.trim().toLowerCase().startsWith("update"); if (!isWriteModeLegal) { throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode)); } // && writeMode.trim().toLowerCase().startsWith("replace") String writeDataSqlTemplate; if (forceUseUpdate || ((dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) && writeMode.trim().toLowerCase().startsWith("update")) ) { //update只在mysql下使用 writeDataSqlTemplate = new StringBuilder() .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")") .append(onDuplicateKeyUpdateString(columnHolders)) .toString(); } else { if (dataBaseType == DataBaseType.PostgreSQL) { StringBuilder sb = new StringBuilder().append("INSERT INTO %s (") .append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")"); if(writeMode.trim().toLowerCase().startsWith("update")){ sb.append(onConFlictDoString(writeMode, columnHolders)); } writeDataSqlTemplate = sb.toString(); } else { //这里是保护,如果其他错误的使用了update,需要更换为replace if (writeMode.trim().toLowerCase().startsWith("update")) { writeMode = "replace"; } writeDataSqlTemplate = new StringBuilder().append(writeMode) .append(" INTO %s (").append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")").toString(); } } return writeDataSqlTemplate; } 增加onConFlictDoString方法: public static String onConFlictDoString(String conflict, List<String> columnHolders) { conflict = conflict.replace("update", ""); StringBuilder sb = new StringBuilder(); sb.append(" ON CONFLICT "); sb.append(conflict); sb.append(" DO "); if (columnHolders == null || columnHolders.size() < 1) { sb.append("NOTHING"); return sb.toString(); } String[] conflictFields = conflict.replace("(","").replace(")","").replace(" ","").split(","); Set<String> conflictFieldsSet = Sets.newHashSet(conflictFields); sb.append(" UPDATE SET "); boolean first = true; for (String column : columnHolders) { if(conflictFieldsSet.contains(column)){ continue; } if (!first) { sb.append(","); } else { first = false; } sb.append(column); sb.append("=excluded."); sb.append(column); } return sb.toString(); } 效果Json { "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name":"postgresqlreader", "parameter":{ "username":"111", "password":"222", "splitPk":"", "connection":[ { "querySql":["select id ,name,sex from warning_event.t_hdj_test2"], "jdbcUrl":[ "jdbc:postgresql://333" ] } ] } }, "writer": { "name": "postgresqlwriter", "parameter": { "writeMode": "update (id,name)", "username":"111", "password":"222", "column":["id","name","sex"], "preSql":[ ], "connection":[ { "table":["\"warning_event\".\"t_hdj_test3\""], "jdbcUrl":"jdbc:postgresql://333" } ] } } } ] } }

Datax运行日志:

此错误是pg库版本在9.5以下,

生成sql如下:INSERT INTO "warning_event"."t_hdj_test3" (id,name,sex) VALUES('1'::int4,'aaa'::varchar,'女'::varchar) ON CONFLICT ?( id ,name ) ?DO ?UPDATE SET sex=excluded.sex

?没有"writeMode": "update (id,name)", 则正常插入


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #dataX #update