Skip to content

SNOW-1947501 Support AST Request Internally #2102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
40 changes: 40 additions & 0 deletions src/main/java/net/snowflake/client/core/QueryExecDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
public class QueryExecDTO {
private String sqlText;

private String dataframeAst;

@Deprecated private Integer sequenceId;

private Map<String, ParameterBindingDTO> bindings;
Expand Down Expand Up @@ -39,7 +41,35 @@ public QueryExecDTO(
long querySubmissionTime,
boolean internal,
boolean asyncExec) {
this(
sqlText,
describeOnly,
sequenceId,
bindings,
bindStage,
parameters,
queryContext,
querySubmissionTime,
internal,
asyncExec,
null);
}

@SnowflakeJdbcInternalApi
public QueryExecDTO(
String sqlText,
boolean describeOnly,
Integer sequenceId,
Map<String, ParameterBindingDTO> bindings,
String bindStage,
Map<String, Object> parameters,
QueryContextDTO queryContext,
long querySubmissionTime,
boolean internal,
boolean asyncExec,
String dataframeAst) {
this.sqlText = sqlText;
this.dataframeAst = dataframeAst;
this.describeOnly = describeOnly;
this.sequenceId = sequenceId;
this.bindings = bindings;
Expand All @@ -59,6 +89,16 @@ public void setSqlText(String sqlText) {
this.sqlText = sqlText;
}

@SnowflakeJdbcInternalApi
public String getDataframeAst() {
return this.dataframeAst;
}

@SnowflakeJdbcInternalApi
public void setDataframeAst(String dataframeAst) {
this.dataframeAst = dataframeAst;
}

@Deprecated
public Integer getSequenceId() {
return sequenceId;
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/net/snowflake/client/core/SFBaseStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,30 @@ public abstract SFBaseResultSet execute(
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException;

/**
* Executes the given SQL string, with dataframe AST parameter.
*
* @param sql The SQL string to execute, synchronously.
* @param dataframeAst encoded string representation of the dataframe AST
* @param parametersBinding parameters to bind
* @param caller the JDBC interface method that called this method, if any
* @param execTimeData OOB telemetry object to record timings
* @return whether there is result set or not
* @throws SQLException if failed to execute sql
* @throws SFException exception raised from Snowflake components
* @throws SQLException if SQL error occurs
*/
@SnowflakeJdbcInternalApi
public SFBaseResultSet execute(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
throw new UnsupportedOperationException();
}

/**
* Execute sql asynchronously. Note that at a minimum, this does not have to be supported; if
* executeAsyncQuery() is called from SnowflakeStatement and the SFConnectionHandler's
Expand Down
137 changes: 110 additions & 27 deletions src/main/java/net/snowflake/client/core/SFStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,41 +109,47 @@ private void sanityCheckQuery(String sql) throws SQLException {
* Execute SQL query with an option for describe only
*
* @param sql sql statement
* @param dataframeAst encoded string representation of the dataframe AST
* @param describeOnly true if describe only
* @return query result set
* @throws SQLException if connection is already closed
* @throws SFException if result set is null
*/
private SFBaseResultSet executeQuery(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
boolean describeOnly,
boolean asyncExec,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
sanityCheckQuery(sql);
// if dataframeAst is passed, then can skip sql checks
if (dataframeAst == null) {
sanityCheckQuery(sql);

String trimmedSql = sql.trim();

// snowflake specific client side commands
if (isFileTransfer(trimmedSql)) {
// Server side value or Connection string value is false then disable the PUT/GET command
if ((session != null && !(session.getJdbcEnablePutGet() && session.getEnablePutGet()))) {
// PUT/GET command disabled either on server side or in the client connection string
logger.debug("Executing file transfer locally is disabled: {}", sql);
throw new SnowflakeSQLException("File transfers have been disabled.");
}

String trimmedSql = sql.trim();
// PUT/GET command
logger.debug("Executing file transfer locally: {}", sql);

// snowflake specific client side commands
if (isFileTransfer(trimmedSql)) {
// Server side value or Connection string value is false then disable the PUT/GET command
if ((session != null && !(session.getJdbcEnablePutGet() && session.getEnablePutGet()))) {
// PUT/GET command disabled either on server side or in the client connection string
logger.debug("Executing file transfer locally is disabled: {}", sql);
throw new SnowflakeSQLException("File transfers have been disabled.");
return executeFileTransfer(sql);
}

// PUT/GET command
logger.debug("Executing file transfer locally: {}", sql);

return executeFileTransfer(sql);
}

// NOTE: It is intentional two describeOnly parameters are specified.
return executeQueryInternal(
sql,
dataframeAst,
parametersBinding,
describeOnly,
describeOnly, // internal query if describeOnly is true
Expand All @@ -163,7 +169,7 @@ private SFBaseResultSet executeQuery(
@Override
public SFPreparedStatementMetaData describe(String sql) throws SFException, SQLException {
SFBaseResultSet baseResultSet =
executeQuery(sql, null, true, false, null, new ExecTimeTelemetryData());
executeQuery(sql, null, null, true, false, null, new ExecTimeTelemetryData());

describeJobUUID = baseResultSet.getQueryId();

Expand All @@ -182,6 +188,7 @@ public SFPreparedStatementMetaData describe(String sql) throws SFException, SQLE
* <p>
*
* @param sql sql statement
* @param dataframeAst encoded string representation of the dataframe AST
* @param parameterBindings binding information
* @param describeOnly true if just showing result set metadata
* @param internal true if internal command not showing up in the history
Expand All @@ -192,6 +199,7 @@ public SFPreparedStatementMetaData describe(String sql) throws SFException, SQLE
*/
SFBaseResultSet executeQueryInternal(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parameterBindings,
boolean describeOnly,
boolean internal,
Expand All @@ -210,6 +218,7 @@ SFBaseResultSet executeQueryInternal(
Object result =
executeHelper(
sql,
dataframeAst,
StmtUtil.SF_MEDIA_TYPE,
parameterBindings,
describeOnly,
Expand Down Expand Up @@ -329,6 +338,36 @@ public Object executeHelper(
boolean asyncExec,
ExecTimeTelemetryData execTimeData)
throws SnowflakeSQLException, SFException {
return executeHelper(
sql, null, mediaType, bindValues, describeOnly, internal, asyncExec, execTimeData);
}

/**
* A helper method to build URL and submit the SQL to snowflake for exec
*
* @param sql sql statement
* @param dataframeAst encoded string representation of the dataframe AST
* @param mediaType media type
* @param bindValues map of binding values
* @param describeOnly whether only show the result set metadata
* @param internal run internal query not showing up in history
* @param asyncExec is async execute
* @param execTimeData ExecTimeTelemetryData
* @return raw json response
* @throws SFException if query is canceled
* @throws SnowflakeSQLException if query is already running
*/
@SnowflakeJdbcInternalApi
public Object executeHelper(
String sql,
String dataframeAst,
String mediaType,
Map<String, ParameterBindingDTO> bindValues,
boolean describeOnly,
boolean internal,
boolean asyncExec,
ExecTimeTelemetryData execTimeData)
throws SnowflakeSQLException, SFException {
ScheduledExecutorService executor = null;

try {
Expand Down Expand Up @@ -399,6 +438,7 @@ public Object executeHelper(
StmtUtil.StmtInput stmtInput = new StmtUtil.StmtInput();
stmtInput
.setSql(sql)
.setDataframeAst(dataframeAst)
.setMediaType(mediaType)
.setInternal(internal)
.setDescribeOnly(describeOnly)
Expand Down Expand Up @@ -697,6 +737,18 @@ public SFBaseResultSet execute(
return execute(sql, false, parametersBinding, caller, execTimeData);
}

@SnowflakeJdbcInternalApi
@Override
public SFBaseResultSet execute(
String sql,
String dataframeAst,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
return execute(sql, dataframeAst, false, parametersBinding, caller, execTimeData);
}

/**
* A helper method to build URL and cancel the SQL for exec
*
Expand Down Expand Up @@ -759,24 +811,55 @@ public SFBaseResultSet execute(
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
return execute(sql, null, asyncExec, parametersBinding, caller, execTimeData);
}

/**
* Execute sql
*
* @param sql sql statement.
* @param dataframeAst encoded string representation of the dataframe AST
* @param asyncExec is async exec
* @param parametersBinding parameters to bind
* @param caller the JDBC interface method that called this method, if any
* @param execTimeData ExecTimeTelemetryData
* @return whether there is result set or not
* @throws SQLException if failed to execute sql
* @throws SFException exception raised from Snowflake components
* @throws SQLException if SQL error occurs
*/
@SnowflakeJdbcInternalApi
public SFBaseResultSet execute(
String sql,
String dataframeAst,
boolean asyncExec,
Map<String, ParameterBindingDTO> parametersBinding,
CallingMethod caller,
ExecTimeTelemetryData execTimeData)
throws SQLException, SFException {
TelemetryService.getInstance().updateContext(session.getSnowflakeConnectionString());
sanityCheckQuery(sql);

session.injectedDelay();
// if dataframeAst is passed, then no need for sql checks and can skip
if (dataframeAst == null) {
sanityCheckQuery(sql);

if (session.getPreparedStatementLogging()) {
logger.info("Execute: {}", sql);
} else {
logger.debug("Execute: {}", sql);
}
session.injectedDelay();

if (session.getPreparedStatementLogging()) {
logger.info("Execute: {}", sql);
} else {
logger.debug("Execute: {}", sql);
}

String trimmedSql = sql.trim();
String trimmedSql = sql.trim();

if (trimmedSql.length() >= 20 && trimmedSql.toLowerCase().startsWith("set-sf-property")) {
executeSetProperty(sql);
return null;
if (trimmedSql.length() >= 20 && trimmedSql.toLowerCase().startsWith("set-sf-property")) {
executeSetProperty(sql);
return null;
}
}
return executeQuery(sql, parametersBinding, false, asyncExec, caller, execTimeData);
return executeQuery(
sql, dataframeAst, parametersBinding, false, asyncExec, caller, execTimeData);
}

private SFBaseResultSet executeFileTransfer(String sql) throws SQLException, SFException {
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/net/snowflake/client/core/StmtUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class StmtUtil {
/** Input for executing a statement on server */
static class StmtInput {
String sql;
String dataframeAst = null;

// default to snowflake (a special json format for snowflake query result
String mediaType = SF_MEDIA_TYPE;
Expand Down Expand Up @@ -99,6 +100,11 @@ public StmtInput setSql(String sql) {
return this;
}

public StmtInput setDataframeAst(String dataframeAst) {
this.dataframeAst = dataframeAst;
return this;
}

public StmtInput setMediaType(String mediaType) {
this.mediaType = mediaType;
return this;
Expand Down Expand Up @@ -328,7 +334,8 @@ public static StmtOutput execute(StmtInput stmtInput, ExecTimeTelemetryData exec
stmtInput.queryContextDTO,
stmtInput.querySubmissionTime,
stmtInput.describeOnly || stmtInput.internal,
stmtInput.asyncExec);
stmtInput.asyncExec,
stmtInput.dataframeAst);

if (!stmtInput.describeOnly) {
sqlJsonBody.setDescribedJobId(stmtInput.describedJobId);
Expand Down
Loading
Loading