20
20
import com .risingwave .connector .api .TableSchema ;
21
21
import com .risingwave .connector .api .sink .SinkBase ;
22
22
import com .risingwave .connector .api .sink .SinkFactory ;
23
- import com .risingwave .java .utils .MinioUrlParser ;
24
23
import io .grpc .Status ;
24
+ import java .net .URI ;
25
+ import java .net .URISyntaxException ;
25
26
import java .util .Map ;
26
27
import java .util .Set ;
27
28
import org .apache .hadoop .conf .Configuration ;
@@ -38,11 +39,15 @@ public class IcebergSinkFactory implements SinkFactory {
38
39
private static final Logger LOG = LoggerFactory .getLogger (IcebergSinkFactory .class );
39
40
40
41
public static final String SINK_MODE_PROP = "sink.mode" ;
41
- public static final String LOCATION_TYPE_PROP = "location.type" ;
42
42
public static final String WAREHOUSE_PATH_PROP = "warehouse.path" ;
43
43
public static final String DATABASE_NAME_PROP = "database.name" ;
44
44
public static final String TABLE_NAME_PROP = "table.name" ;
45
+ public static final String S3_ACCESS_KEY_PROP = "s3.access.key" ;
46
+ public static final String S3_SECRET_KEY_PROP = "s3.secret.key" ;
47
+ public static final String S3_ENDPOINT_PROP = "s3.endpoint" ;
45
48
public static final FileFormat FILE_FORMAT = FileFormat .PARQUET ;
49
+
50
+ // hadoop catalog config
46
51
private static final String confEndpoint = "fs.s3a.endpoint" ;
47
52
private static final String confKey = "fs.s3a.access.key" ;
48
53
private static final String confSecret = "fs.s3a.secret.key" ;
@@ -56,83 +61,88 @@ public SinkBase create(TableSchema tableSchema, Map<String, String> tablePropert
56
61
validate (tableSchema , tableProperties );
57
62
58
63
String mode = tableProperties .get (SINK_MODE_PROP );
59
- String location = tableProperties .get (LOCATION_TYPE_PROP );
60
- String warehousePath = tableProperties .get (WAREHOUSE_PATH_PROP );
64
+ String warehousePath = getWarehousePath (tableProperties );
61
65
String databaseName = tableProperties .get (DATABASE_NAME_PROP );
62
66
String tableName = tableProperties .get (TABLE_NAME_PROP );
63
67
68
+ String scheme = parseWarehousePathScheme (warehousePath );
69
+
64
70
TableIdentifier tableIdentifier = TableIdentifier .of (databaseName , tableName );
65
- HadoopCatalog hadoopCatalog = createHadoopCatalog (location , warehousePath );
71
+ Configuration hadoopConf = createHadoopConf (scheme , tableProperties );
72
+ HadoopCatalog hadoopCatalog = new HadoopCatalog (hadoopConf , warehousePath );
66
73
Table icebergTable ;
67
74
try {
68
75
icebergTable = hadoopCatalog .loadTable (tableIdentifier );
69
76
} catch (Exception e ) {
70
- LOG .error ("load table error: {}" , e );
71
77
throw Status .FAILED_PRECONDITION
72
- .withDescription ("failed to load iceberg table" )
78
+ .withDescription (
79
+ String .format ("failed to load iceberg table: %s" , e .getMessage ()))
73
80
.withCause (e )
74
81
.asRuntimeException ();
75
82
}
76
83
77
84
if (mode .equals ("append-only" )) {
78
85
return new IcebergSink (tableSchema , hadoopCatalog , icebergTable , FILE_FORMAT );
79
86
} else if (mode .equals ("upsert" )) {
80
- return new UpsertIcebergSink (tableSchema , hadoopCatalog , icebergTable , FILE_FORMAT );
87
+ return new UpsertIcebergSink (
88
+ tableSchema , hadoopCatalog ,
89
+ icebergTable , FILE_FORMAT );
81
90
}
82
91
throw UNIMPLEMENTED .withDescription ("unsupported mode: " + mode ).asRuntimeException ();
83
92
}
84
93
85
94
@ Override
86
95
public void validate (TableSchema tableSchema , Map <String , String > tableProperties ) {
87
96
if (!tableProperties .containsKey (SINK_MODE_PROP ) // only append-only, upsert
88
- || !tableProperties .containsKey (LOCATION_TYPE_PROP ) // only local, s3, minio
89
97
|| !tableProperties .containsKey (WAREHOUSE_PATH_PROP )
90
98
|| !tableProperties .containsKey (DATABASE_NAME_PROP )
91
99
|| !tableProperties .containsKey (TABLE_NAME_PROP )) {
92
100
throw INVALID_ARGUMENT
93
101
.withDescription (
94
102
String .format (
95
- "%s, %s, %s, %s or %s is not specified" ,
103
+ "%s, %s, %s or %s is not specified" ,
96
104
SINK_MODE_PROP ,
97
- LOCATION_TYPE_PROP ,
98
105
WAREHOUSE_PATH_PROP ,
99
106
DATABASE_NAME_PROP ,
100
107
TABLE_NAME_PROP ))
101
108
.asRuntimeException ();
102
109
}
103
110
104
111
String mode = tableProperties .get (SINK_MODE_PROP );
105
- String location = tableProperties .get (LOCATION_TYPE_PROP );
106
- String warehousePath = tableProperties .get (WAREHOUSE_PATH_PROP );
107
112
String databaseName = tableProperties .get (DATABASE_NAME_PROP );
108
113
String tableName = tableProperties .get (TABLE_NAME_PROP );
114
+ String warehousePath = getWarehousePath (tableProperties );
115
+
116
+ String schema = parseWarehousePathScheme (warehousePath );
109
117
110
118
TableIdentifier tableIdentifier = TableIdentifier .of (databaseName , tableName );
111
- HadoopCatalog hadoopCatalog = createHadoopCatalog (location , warehousePath );
119
+ Configuration hadoopConf = createHadoopConf (schema , tableProperties );
120
+ HadoopCatalog hadoopCatalog = new HadoopCatalog (hadoopConf , warehousePath );
112
121
Table icebergTable ;
113
122
try {
114
123
icebergTable = hadoopCatalog .loadTable (tableIdentifier );
115
124
} catch (Exception e ) {
116
- LOG .error ("load table error: {}" , e );
117
125
throw Status .FAILED_PRECONDITION
118
- .withDescription ("failed to load iceberg table" )
126
+ .withDescription (
127
+ String .format ("failed to load iceberg table: %s" , e .getMessage ()))
119
128
.withCause (e )
120
129
.asRuntimeException ();
121
130
}
122
131
// check that all columns in tableSchema exist in the iceberg table
123
132
for (String columnName : tableSchema .getColumnNames ()) {
124
133
if (icebergTable .schema ().findField (columnName ) == null ) {
125
- LOG .error ("column not found: {}" , columnName );
126
134
throw Status .FAILED_PRECONDITION
127
- .withDescription ("table schema does not match" )
135
+ .withDescription (
136
+ String .format (
137
+ "table schema does not match. Column %s not found in iceberg table" ,
138
+ columnName ))
128
139
.asRuntimeException ();
129
140
}
130
141
}
131
142
// check that all required columns in the iceberg table exist in tableSchema
132
143
Set <String > columnNames = Set .of (tableSchema .getColumnNames ());
133
144
for (Types .NestedField column : icebergTable .schema ().columns ()) {
134
145
if (column .isRequired () && !columnNames .contains (column .name ())) {
135
- LOG .error ("required column not found: {}" , column .name ());
136
146
throw Status .FAILED_PRECONDITION
137
147
.withDescription (
138
148
String .format ("missing a required field %s" , column .name ()))
@@ -153,26 +163,62 @@ public void validate(TableSchema tableSchema, Map<String, String> tablePropertie
153
163
}
154
164
}
155
165
156
- private HadoopCatalog createHadoopCatalog (String location , String warehousePath ) {
157
- Configuration hadoopConf = new Configuration ();
158
- switch (location ) {
159
- case "local" :
160
- return new HadoopCatalog (hadoopConf , warehousePath );
161
- case "s3" :
162
- hadoopConf .set (confIoImpl , s3FileIOImpl );
163
- String s3aPath = "s3a:" + warehousePath .substring (warehousePath .indexOf ('/' ));
164
- return new HadoopCatalog (hadoopConf , s3aPath );
165
- case "minio" :
166
+ private static String getWarehousePath (Map <String , String > tableProperties ) {
167
+ String warehousePath = tableProperties .get (WAREHOUSE_PATH_PROP );
168
+ // unify s3 and s3a
169
+ if (warehousePath .startsWith ("s3://" )) {
170
+ return warehousePath .replace ("s3://" , "s3a://" );
171
+ }
172
+ return warehousePath ;
173
+ }
174
+
175
+ private static String parseWarehousePathScheme (String warehousePath ) {
176
+ try {
177
+ URI uri = new URI (warehousePath );
178
+ String scheme = uri .getScheme ();
179
+ if (scheme == null ) {
180
+ throw INVALID_ARGUMENT
181
+ .withDescription ("warehouse path should set scheme (e.g. s3a://)" )
182
+ .asRuntimeException ();
183
+ }
184
+ return scheme ;
185
+ } catch (URISyntaxException e ) {
186
+ throw INVALID_ARGUMENT
187
+ .withDescription (
188
+ String .format ("invalid warehouse path uri: %s" , e .getMessage ()))
189
+ .withCause (e )
190
+ .asRuntimeException ();
191
+ }
192
+ }
193
+
194
+ private Configuration createHadoopConf (String scheme , Map <String , String > tableProperties ) {
195
+ switch (scheme ) {
196
+ case "file" :
197
+ return new Configuration ();
198
+ case "s3a" :
199
+ Configuration hadoopConf = new Configuration ();
166
200
hadoopConf .set (confIoImpl , s3FileIOImpl );
167
- MinioUrlParser minioUrlParser = new MinioUrlParser (warehousePath );
168
- hadoopConf .set (confEndpoint , minioUrlParser .getEndpoint ());
169
- hadoopConf .set (confKey , minioUrlParser .getKey ());
170
- hadoopConf .set (confSecret , minioUrlParser .getSecret ());
171
201
hadoopConf .setBoolean (confPathStyleAccess , true );
172
- return new HadoopCatalog (hadoopConf , "s3a://" + minioUrlParser .getBucket ());
202
+ if (!tableProperties .containsKey (S3_ENDPOINT_PROP )) {
203
+ throw INVALID_ARGUMENT
204
+ .withDescription (
205
+ String .format (
206
+ "Should set %s for warehouse with scheme %s" ,
207
+ S3_ENDPOINT_PROP , scheme ))
208
+ .asRuntimeException ();
209
+ }
210
+ hadoopConf .set (confEndpoint , tableProperties .get (S3_ENDPOINT_PROP ));
211
+ if (tableProperties .containsKey (S3_ACCESS_KEY_PROP )) {
212
+ hadoopConf .set (confKey , tableProperties .get (S3_ACCESS_KEY_PROP ));
213
+ }
214
+ if (tableProperties .containsKey (S3_SECRET_KEY_PROP )) {
215
+ hadoopConf .set (confSecret , tableProperties .get (S3_SECRET_KEY_PROP ));
216
+ }
217
+ return hadoopConf ;
173
218
default :
174
219
throw UNIMPLEMENTED
175
- .withDescription ("unsupported iceberg sink type: " + location )
220
+ .withDescription (
221
+ String .format ("scheme %s not supported for warehouse path" , scheme ))
176
222
.asRuntimeException ();
177
223
}
178
224
}
0 commit comments