-
Notifications
You must be signed in to change notification settings - Fork 180
Snow-1936378 add support for vector type for loader #2161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 13 commits
d5a4587
44b33ab
7ca75fb
9d9e4c8
30076e8
374751c
75a65b6
ff62bc0
be0c4b0
2126ae4
a60567f
8dfda07
c32b59a
920de5a
3726af7
c2bdd21
a82121c
1239a7b
bc1e910
05373b4
1c1694e
cbf77d4
b0165ee
a6c3d0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,12 +5,14 @@ | |
import java.io.IOException; | ||
import java.sql.Connection; | ||
import java.sql.DatabaseMetaData; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.text.DateFormat; | ||
import java.text.SimpleDateFormat; | ||
import java.util.ArrayList; | ||
import java.util.Calendar; | ||
import java.util.GregorianCalendar; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
|
@@ -83,6 +85,8 @@ public class StreamLoader implements Loader, Runnable { | |
|
||
private List<String> _columns; | ||
|
||
private Map<String, Integer> _vectorColumns = new HashMap<String, Integer>(); | ||
|
||
private List<String> _keys; | ||
|
||
private long _batchRowSize = DEFAULT_BATCH_ROW_SIZE; | ||
|
@@ -178,6 +182,7 @@ public void setProperty(LoaderProperty property, Object value) { | |
typeCheckedColumns.add((String) e); | ||
} | ||
_columns = typeCheckedColumns; | ||
setVectorColumns(); | ||
} | ||
break; | ||
case keys: | ||
|
@@ -598,6 +603,23 @@ private void truncateTargetTable() { | |
} | ||
} | ||
|
||
public void setVectorColumns() { | ||
try { | ||
DatabaseMetaData dbmd = _processConn.getMetaData(); | ||
for (String col : _columns) { | ||
try (ResultSet rs = metadata.getColumns(_database, _schema, _table, col)) { | ||
// Check if column type is VECTOR, if true, add column name and size to vector column map. | ||
if (rs.getString(6).equalsIgnoreCase("vector")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we extract the logic of this condition and name this check instead of a comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be better to store metadata for all columns, not only for the vector, in my opinion. |
||
_vectorColumns.put(col, rs.getInt(7)); | ||
} | ||
} | ||
} | ||
} catch (SQLException e) { | ||
logger.error(e.getMessage(), e); | ||
abort(new Loader.ConnectionError(Utils.getCause(e))); | ||
} | ||
} | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
|
@@ -750,6 +772,10 @@ List<String> getColumns() { | |
return this._columns; | ||
} | ||
|
||
Map<String, Integer> getVectorColumns() { | ||
return this._vectorColumns; | ||
} | ||
|
||
String getColumnsAsString() { | ||
// comma separate list of column names | ||
StringBuilder sb = new StringBuilder("\""); | ||
|
@@ -904,4 +930,28 @@ public int getSubmittedRowCount() { | |
void setTestMode(boolean mode) { | ||
this._testMode = mode; | ||
} | ||
|
||
public String getStageColumnsAsString() { | ||
// if there are no vector columns in the target table just select * is needed from the staging | ||
// table. | ||
if (_vectorColumns.isEmpty()) { | ||
return "*"; | ||
} | ||
|
||
StringBuilder sb = new StringBuilder(); | ||
for (int i = 0; i < _columns.size(); i++) { | ||
String colName = _columns.get(i); | ||
if (_vectorColumns.containsKey(colName)) { | ||
sb.append(colName + "::VECTOR(FLOAT, " + _vectorColumns.get(colName) + ")"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about INT types in a vector? |
||
} else { | ||
sb.append("\""); | ||
sb.append(colName); | ||
sb.append("\""); | ||
} | ||
if (i != _columns.size() - 1) { | ||
sb.append(", "); | ||
} | ||
} | ||
return sb.toString(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Date; | ||
import net.snowflake.client.annotations.DontRunOnGithubActions; | ||
import net.snowflake.client.category.TestTags; | ||
import org.junit.jupiter.api.Tag; | ||
import org.junit.jupiter.api.Test; | ||
|
@@ -201,4 +202,66 @@ public void testKeyClusteringTable() throws Exception { | |
} | ||
} | ||
} | ||
|
||
@Test | ||
@DontRunOnGithubActions | ||
private void testVectorColumnInTable() throws Exception { | ||
String tableName = "VECTOR_TABLE"; | ||
try { | ||
testConnection | ||
.createStatement() | ||
.execute( | ||
String.format("CREATE OR REPLACE TABLE %s (vector_col VECTOR(FLOAT, 3))", tableName)); | ||
|
||
TestDataConfigBuilder tdcb = new TestDataConfigBuilder(testConnection, putConnection); | ||
tdcb.setOperation(Operation.INSERT) | ||
.setStartTransaction(true) | ||
.setTruncateTable(true) | ||
.setTableName(tableName) | ||
.setColumns(Arrays.asList("vector_col")); | ||
StreamLoader loader = tdcb.getStreamLoader(); | ||
TestDataConfigBuilder.ResultListener listener = tdcb.getListener(); | ||
loader.start(); | ||
|
||
loader.submitRow(new Object[] {"[12, 14.0, 100]"}); | ||
loader.finish(); | ||
int submitted = listener.getSubmittedRowCount(); | ||
assertThat("submitted rows", submitted, equalTo(1)); | ||
|
||
} finally { | ||
testConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s", tableName)); | ||
} | ||
} | ||
|
||
@Test | ||
@DontRunOnGithubActions | ||
private void testMultipleVectorColumnsInTable() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also test the table with mixed types (the best with all types). |
||
String tableName = "VECTOR_TABLE"; | ||
try { | ||
testConnection | ||
.createStatement() | ||
.execute( | ||
String.format( | ||
"CREATE OR REPLACE TABLE %s (vec1 VECTOR(FLOAT, 3), vec2 VECTOR(FLOAT, 3))", | ||
tableName)); | ||
|
||
TestDataConfigBuilder tdcb = new TestDataConfigBuilder(testConnection, putConnection); | ||
tdcb.setOperation(Operation.INSERT) | ||
.setStartTransaction(true) | ||
.setTruncateTable(true) | ||
.setTableName(tableName) | ||
.setColumns(Arrays.asList("vector_col")); | ||
StreamLoader loader = tdcb.getStreamLoader(); | ||
TestDataConfigBuilder.ResultListener listener = tdcb.getListener(); | ||
loader.start(); | ||
|
||
loader.submitRow(new Object[] {"[12, 14.0, 100]", "[12, 14.0, 100]"}); | ||
loader.finish(); | ||
int submitted = listener.getSubmittedRowCount(); | ||
assertThat("submitted rows", submitted, equalTo(1)); | ||
|
||
} finally { | ||
testConnection.createStatement().execute(String.format("DROP TABLE IF EXISTS %s", tableName)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to update the name of this map to better communicate its function: keep column size.