|
19 | 19 |
|
20 | 20 | package com.antgroup.geaflow.dsl.connector.jdbc.util;
|
21 | 21 |
|
| 22 | +import com.antgroup.geaflow.common.type.IType; |
22 | 23 | import com.antgroup.geaflow.common.type.Types;
|
| 24 | +import com.antgroup.geaflow.common.type.primitive.BinaryStringType; |
| 25 | +import com.antgroup.geaflow.common.type.primitive.StringType; |
23 | 26 | import com.antgroup.geaflow.dsl.common.data.Row;
|
24 | 27 | import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;
|
25 | 28 | import com.antgroup.geaflow.dsl.common.exception.GeaFlowDSLException;
|
|
29 | 32 | import java.sql.SQLException;
|
30 | 33 | import java.sql.Statement;
|
31 | 34 | import java.util.ArrayList;
|
32 |
| -import java.util.Arrays; |
33 | 35 | import java.util.List;
|
34 | 36 | import java.util.stream.Collectors;
|
35 | 37 | import org.apache.calcite.sql.type.SqlTypeName;
|
@@ -72,14 +74,35 @@ public static void createTemporaryTable(Statement statement, String tableName,
|
72 | 74 | public static void insertIntoTable(Statement statement, String tableName,
|
73 | 75 | List<TableField> fields, Row row) throws SQLException {
|
74 | 76 | Object[] values = new Object[fields.size()];
|
| 77 | + boolean isFirst = true; |
| 78 | + StringBuilder builder = new StringBuilder(); |
75 | 79 | for (int i = 0; i < fields.size(); i++) {
|
| 80 | + if (isFirst) { |
| 81 | + isFirst = false; |
| 82 | + } else { |
| 83 | + builder.append(","); |
| 84 | + } |
| 85 | + IType type = fields.get(i).getType(); |
| 86 | + Object value = row.getField(i, type); |
| 87 | + if (value == null) { |
| 88 | + if (fields.get(i).isNullable()) { |
| 89 | + builder.append("null"); |
| 90 | + } else { |
| 91 | + throw new RuntimeException("filed " + fields.get(i).getName() + " can not be null"); |
| 92 | + } |
| 93 | + } else if (type.getClass() == BinaryStringType.class || type.getClass() == StringType.class) { |
| 94 | + builder.append("'").append(value).append("'"); |
| 95 | + } else { |
| 96 | + builder.append(value); |
| 97 | + } |
76 | 98 | values[i] = row.getField(i, fields.get(i).getType());
|
77 | 99 | }
|
78 |
| - String insertIntoValues = StringUtils.join( |
79 |
| - Arrays.stream(values).map(value -> "'" + value + "'").collect(Collectors.toList()), |
80 |
| - ","); |
81 |
| - String insertIntoTableQuery = String.format("INSERT INTO %s VALUES (%s);", tableName, |
82 |
| - insertIntoValues); |
| 100 | + |
| 101 | + String insertIntoValues = builder.toString(); |
| 102 | + String insertColumns = StringUtils.join(fields.stream().map( |
| 103 | + field -> field.getName()).collect(Collectors.toList()), ","); |
| 104 | + String insertIntoTableQuery = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, insertColumns, |
| 105 | + insertIntoValues); |
83 | 106 | statement.execute(insertIntoTableQuery);
|
84 | 107 | }
|
85 | 108 |
|
|
0 commit comments