Skip to content

CLT: update tests to latest updates and fixes in buddy to autosharding #3465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/searchd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7248,7 +7248,7 @@ void sphHandleMysqlUpdate ( StmtErrorReporter_i & tOut, const SqlStmt_t & tStmt,

// connect to remote agents and query them
std::unique_ptr<RequestBuilder_i> pRequestBuilder = CreateRequestBuilder ( sQuery, tStmt );
std::unique_ptr<ReplyParser_i> pReplyParser = CreateReplyParser ( tStmt.m_bJson, iUpdated, iWarns );
std::unique_ptr<ReplyParser_i> pReplyParser = CreateReplyParser ( tStmt.m_bJson, iUpdated, iWarns, dFails );
iSuccesses += PerformRemoteTasks ( dAgents, pRequestBuilder.get (), pReplyParser.get () );
}
}
Expand Down Expand Up @@ -7994,7 +7994,7 @@ void sphHandleMysqlDelete ( StmtErrorReporter_i & tOut, const SqlStmt_t & tStmt,

// connect to remote agents and query them
std::unique_ptr<RequestBuilder_i> pRequestBuilder = CreateRequestBuilder ( sQuery, tStmt );
std::unique_ptr<ReplyParser_i> pReplyParser = CreateReplyParser ( tStmt.m_bJson, iGot, iWarns );
std::unique_ptr<ReplyParser_i> pReplyParser = CreateReplyParser ( tStmt.m_bJson, iGot, iWarns, dErrors );
PerformRemoteTasks ( dAgents, pRequestBuilder.get (), pReplyParser.get () );

// FIXME!!! report error & warnings from agents
Expand Down Expand Up @@ -11507,9 +11507,17 @@ void HandleCommandJson ( ISphOutputBuffer & tOut, WORD uVer, InputBuffer_c & tRe
// parse request
CSphString sEndpoint = tReq.GetString ();
CSphString sCommand = tReq.GetString ();

OptionsHash_t hOptions;
hOptions.AddUnique ( "endpoint" ) = sEndpoint;
if ( uVer>=0x102 )
{
hOptions.AddUnique ( "raw_query" ) = tReq.GetString ();
hOptions.AddUnique ( "full_url" ) = tReq.GetString ();
}

CSphVector<BYTE> dResult;
sphProcessHttpQueryNoResponce ( sEndpoint, sCommand, dResult );
ProcessHttpJsonQuery ( sCommand, hOptions, dResult );

auto tReply = APIAnswer ( tOut, VER_COMMAND_JSON );
tOut.SendString ( sEndpoint.cstr() );
Expand Down
8 changes: 5 additions & 3 deletions src/searchdaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ enum SearchdCommandV_e : WORD
VER_COMMAND_STATUS = 0x101,
VER_COMMAND_FLUSHATTRS = 0x100,
VER_COMMAND_SPHINXQL = 0x100,
VER_COMMAND_JSON = 0x101,
VER_COMMAND_JSON = 0x102,
VER_COMMAND_PING = 0x100,
VER_COMMAND_UVAR = 0x100,
VER_COMMAND_CALLPQ = 0x100,
Expand Down Expand Up @@ -1254,10 +1254,11 @@ class StmtErrorReporter_i
class QueryParser_i;
class RequestBuilder_i;
class ReplyParser_i;
class SearchFailuresLog_c;

std::unique_ptr<QueryParser_i> CreateQueryParser ( bool bJson ) noexcept;
std::unique_ptr<RequestBuilder_i> CreateRequestBuilder ( Str_t sQuery, const SqlStmt_t & tStmt );
std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings );
std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings, SearchFailuresLog_c & dFails );
StmtErrorReporter_i * CreateHttpErrorReporter();

enum class EHTTP_STATUS : BYTE
Expand Down Expand Up @@ -1313,7 +1314,8 @@ bool sphCheckWeCanModify ( RowBuffer_i& tOut );
bool PollOptimizeRunning ( const CSphString & sIndex );
void FixPathAbsolute ( CSphString & sPath );

void sphProcessHttpQueryNoResponce ( const CSphString& sEndpoint, const CSphString& sQuery, CSphVector<BYTE> & dResult );
using OptionsHash_t = SmallStringHash_T<CSphString>;
void ProcessHttpJsonQuery ( const CSphString & sQuery, OptionsHash_t & hOptions, CSphVector<BYTE> & dResult );
void sphHttpErrorReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * szError );
void LoadCompatHttp ( const char * sData );
void SaveCompatHttp ( JsonEscapedBuilder & tOut );
Expand Down
75 changes: 54 additions & 21 deletions src/searchdhttp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,9 +899,11 @@ static void HttpHandlerIndexPage ( CSphVector<BYTE> & dData )
class JsonRequestBuilder_c : public RequestBuilder_i
{
public:
JsonRequestBuilder_c ( const char* szQuery, CSphString sEndpoint )
: m_sEndpoint ( std::move ( sEndpoint ) )
, m_tQuery ( szQuery )
JsonRequestBuilder_c ( const char* szQuery, CSphString sEndpoint, const CSphString & sRawQuery, const CSphString & sFullUrl )
: m_sEndpoint ( std::move ( sEndpoint ) )
, m_tQuery ( szQuery )
, m_sRawQuery ( sRawQuery )
, m_sFullUrl ( sFullUrl )
{
// fixme: we can implement replacing indexes in a string (without parsing) if it becomes a performance issue
}
Expand All @@ -917,23 +919,28 @@ class JsonRequestBuilder_c : public RequestBuilder_i
auto tWr = APIHeader ( tOut, SEARCHD_COMMAND_JSON, VER_COMMAND_JSON ); // API header
tOut.SendString ( m_sEndpoint.cstr() );
tOut.SendString ( sRequest.cstr() );
tOut.SendString ( m_sRawQuery.cstr() );
tOut.SendString ( m_sFullUrl.cstr() );
}

private:
CSphString m_sEndpoint;
mutable JsonObj_c m_tQuery;
const CSphString & m_sRawQuery;
const CSphString & m_sFullUrl;
};


class JsonReplyParser_c : public ReplyParser_i
{
public:
JsonReplyParser_c ( int & iAffected, int & iWarnings )
JsonReplyParser_c ( int & iAffected, int & iWarnings, SearchFailuresLog_c & tFails )
: m_iAffected ( iAffected )
, m_iWarnings ( iWarnings )
, m_tFails ( tFails )
{}

bool ParseReply ( MemInputBuffer_c & tReq, AgentConn_t & ) const final
bool ParseReply ( MemInputBuffer_c & tReq, AgentConn_t & tAgent ) const final
{
CSphString sEndpoint = tReq.GetString();
EHTTP_ENDPOINT eEndpoint = StrToHttpEndpoint ( sEndpoint );
Expand All @@ -945,12 +952,18 @@ class JsonReplyParser_c : public ReplyParser_i
tReq.GetBytes ( dResult.Begin(), (int)uLength );
dResult[uLength] = '\0';

return sphGetResultStats ( (const char *)dResult.Begin(), m_iAffected, m_iWarnings, eEndpoint==EHTTP_ENDPOINT::JSON_UPDATE );
CSphString sError;
bool bOk = sphGetResultStats ( (const char *)dResult.Begin(), m_iAffected, m_iWarnings, eEndpoint==EHTTP_ENDPOINT::JSON_UPDATE, sError );
if ( !sError.IsEmpty() )
m_tFails.Submit ( tAgent.m_tDesc.m_sIndexes, nullptr, sError.cstr() );

return bOk;
}

protected:
int & m_iAffected;
int & m_iWarnings;
SearchFailuresLog_c & m_tFails;
};

std::unique_ptr<QueryParser_i> CreateQueryParser ( bool bJson ) noexcept
Expand All @@ -963,17 +976,17 @@ std::unique_ptr<RequestBuilder_i> CreateRequestBuilder ( Str_t sQuery, const Sql
if ( tStmt.m_bJson )
{
assert ( !tStmt.m_sEndpoint.IsEmpty() );
return std::make_unique<JsonRequestBuilder_c> ( sQuery.first, tStmt.m_sEndpoint );
return std::make_unique<JsonRequestBuilder_c> ( sQuery.first, tStmt.m_sEndpoint, tStmt.m_sRawQuery, tStmt.m_sFullUrl );
} else
{
return std::make_unique<SphinxqlRequestBuilder_c> ( sQuery, tStmt );
}
}

std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings )
std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings, SearchFailuresLog_c & tFails )
{
if ( bJson )
return std::make_unique<JsonReplyParser_c> ( iUpdated, iWarnings );
return std::make_unique<JsonReplyParser_c> ( iUpdated, iWarnings, tFails );
else
return std::make_unique<SphinxqlReplyParser_c> ( &iUpdated, &iWarnings );
}
Expand Down Expand Up @@ -1815,15 +1828,28 @@ class HttpJsonUpdateTraits_c
const ResultSetFormat_e m_eFormat = ResultSetFormat_e::MntSearch;
};

static void SetQueryOptions ( const OptionsHash_t & hOpts, SqlStmt_t & tStmt )
{
if ( tStmt.m_eStmt==STMT_UPDATE || tStmt.m_eStmt==STMT_DELETE )
{
const CSphString * pRawQuery = hOpts ( "raw_query" );
if ( pRawQuery )
tStmt.m_sRawQuery = *pRawQuery;
const CSphString * pFullUrl = hOpts ( "full_url" );
if ( pFullUrl )
tStmt.m_sFullUrl = *pFullUrl;
}
}

class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c, HttpOptionTrait_t
{
protected:
Str_t m_sQuery;

public:
explicit HttpHandler_JsonUpdate_c ( Str_t sQuery )
: m_sQuery ( sQuery )
explicit HttpHandler_JsonUpdate_c ( Str_t sQuery, const OptionsHash_t & tOptions )
: HttpOptionTrait_t ( tOptions )
, m_sQuery ( sQuery )
{}

bool Process () final
Expand All @@ -1833,6 +1859,7 @@ class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
tStmt.m_bJson = true;
tStmt.m_tQuery.m_eQueryType = QUERY_JSON;
tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_UPDATE );
tStmt.m_eStmt = STMT_UPDATE;

DocID_t tDocId = 0;
if ( !ParseQuery ( tStmt, tDocId ) )
Expand All @@ -1841,6 +1868,8 @@ class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
return false;
}

SetQueryOptions ( m_tOptions, tStmt );

JsonObj_c tResult = JsonNull;
bool bResult = ProcessQuery ( tStmt, tDocId, tResult );

Expand Down Expand Up @@ -1868,14 +1897,15 @@ class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
class HttpHandler_JsonDelete_c final : public HttpHandler_JsonUpdate_c
{
public:
explicit HttpHandler_JsonDelete_c ( Str_t sQuery )
: HttpHandler_JsonUpdate_c ( sQuery )
explicit HttpHandler_JsonDelete_c ( Str_t sQuery, const OptionsHash_t & tOptions )
: HttpHandler_JsonUpdate_c ( sQuery, tOptions )
{}

protected:
bool ParseQuery ( SqlStmt_t & tStmt, DocID_t & tDocId ) final
{
tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_DELETE );
tStmt.m_eStmt = STMT_DELETE;
return sphParseJsonDelete ( m_sQuery, tStmt, tDocId, m_sError );
}

Expand Down Expand Up @@ -2090,6 +2120,8 @@ class HttpHandler_JsonBulk_c : public HttpHandler_c, public HttpJsonUpdateTraits
iLastTxStartLine = iCurLine;
}

SetQueryOptions ( m_tOptions, tStmt );

switch ( tStmt.m_eStmt )
{
case STMT_INSERT:
Expand Down Expand Up @@ -2309,14 +2341,14 @@ static std::unique_ptr<HttpHandler_c> CreateHttpHandler ( EHTTP_ENDPOINT eEndpoi
if ( tSource.GetError() )
return nullptr;
else
return std::make_unique<HttpHandler_JsonUpdate_c> ( sQuery ); // json
return std::make_unique<HttpHandler_JsonUpdate_c> ( sQuery, tOptions ); // json

case EHTTP_ENDPOINT::JSON_DELETE:
SetQuery ( tSource.ReadAll() );
if ( tSource.GetError() )
return nullptr;
else
return std::make_unique<HttpHandler_JsonDelete_c> ( sQuery ); // json
return std::make_unique<HttpHandler_JsonDelete_c> ( sQuery, tOptions ); // json

case EHTTP_ENDPOINT::JSON_BULK:
return std::make_unique<HttpHandler_JsonBulk_c> ( tSource, tOptions ); // json
Expand Down Expand Up @@ -2394,15 +2426,14 @@ HttpProcessResult_t ProcessHttpQuery ( CharStream_c & tSource, Str_t & sSrcQuery
return tRes;
}

void sphProcessHttpQueryNoResponce ( const CSphString & sEndpoint, const CSphString & sQuery, CSphVector<BYTE> & dResult )
void ProcessHttpJsonQuery ( const CSphString & sQuery, OptionsHash_t & hOptions, CSphVector<BYTE> & dResult )
{
OptionsHash_t hOptions;
hOptions.Add ( sEndpoint, "endpoint" );
http_method eReqType = HTTP_POST;

BlobStream_c tQuery ( sQuery );
Str_t sSrcQuery;
HttpProcessResult_t tRes = ProcessHttpQuery ( tQuery, sSrcQuery, hOptions, dResult, false, HTTP_GET );
ProcessHttpQueryBuddy ( tRes, sSrcQuery, hOptions, dResult, false, HTTP_GET );
HttpProcessResult_t tRes = ProcessHttpQuery ( tQuery, sSrcQuery, hOptions, dResult, false, eReqType );
ProcessHttpQueryBuddy ( tRes, sSrcQuery, hOptions, dResult, false, eReqType );
}

static bool IsCompressed ( const OptionsHash_t & hOptions )
Expand Down Expand Up @@ -3282,6 +3313,8 @@ bool HttpHandlerEsBulk_c::ProcessTnx ( const VecTraits_T<BulkTnx_t> & dTnx, VecT
bool bAction = false;
JsonObj_c tResult = JsonNull;

SetQueryOptions ( m_hOpts, tStmt );

switch ( tStmt.m_eStmt )
{
case STMT_INSERT:
Expand Down
1 change: 0 additions & 1 deletion src/searchdhttp.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "searchdaemon.h"
#include "http/http_parser.h"

using OptionsHash_t = SmallStringHash_T<CSphString>;
class AsyncNetInputBuffer_c;

class HttpRequestParser_c : public ISphNoncopyable
Expand Down
2 changes: 2 additions & 0 deletions src/searchdsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ struct SqlStmt_t

bool m_bJson = false;
CSphString m_sEndpoint;
CSphString m_sRawQuery;
CSphString m_sFullUrl;

CSphVector<CSphString> m_dStringSubkeys;
CSphVector<int64_t> m_dIntSubkeys;
Expand Down
20 changes: 15 additions & 5 deletions src/sphinxjsonquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2928,7 +2928,7 @@ JsonObj_c sphEncodeInsertErrorJson ( const char * szIndex, const char * szError,
}


bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings, bool bUpdate )
bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings, bool bUpdate, CSphString & sError )
{
JsonObj_c tJsonRoot ( szResult );
if ( !tJsonRoot )
Expand All @@ -2937,23 +2937,33 @@ bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings
// no warnings in json results for now
iWarnings = 0;

CSphString sParseError;
if ( tJsonRoot.HasItem("error") )
{
JsonObj_c tReplyError = tJsonRoot.GetItem ( "error" );
if ( tReplyError.IsObj() )
{
JsonObj_c tReason = tReplyError.GetItem ( "reason" );
if ( tReason && tReason.IsStr() )
sError = tReason.StrVal();
}
if ( sError.IsEmpty() )
sError = tReplyError.AsString();

iAffected = 0;
return true;
return false;
}

// its either update or delete
CSphString sError;
JsonObj_c tAffected = tJsonRoot.GetIntItem ( bUpdate ? "updated" : "deleted", sError );
JsonObj_c tAffected = tJsonRoot.GetIntItem ( bUpdate ? "updated" : "deleted", sParseError );
if ( tAffected )
{
iAffected = (int)tAffected.IntVal();
return true;
}

// it was probably a query with an "id"
JsonObj_c tId = tJsonRoot.GetIntItem ( "id", sError );
JsonObj_c tId = tJsonRoot.GetIntItem ( "id", sParseError );
if ( tId )
{
iAffected = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/sphinxjsonquery.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ JsonObj_c sphEncodeDeleteResultJson ( const char * szIndex, DocID_t tDocId, in
JsonObj_c sphEncodeInsertErrorJson ( const char * szIndex, const char * szError, ResultSetFormat_e eFormat );
JsonObj_c sphEncodeTxnResultJson ( const char* szIndex, DocID_t tDocId, int iInserts, int iDeletes, int iUpdates, ResultSetFormat_e eFormat );

bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings, bool bUpdate );
bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings, bool bUpdate, CSphString & sError );

bool NonEmptyQuery ( const JsonObj_c & tQuery );
bool CheckRootNode ( const JsonObj_c & tRoot, CSphString & sError );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
http update + distributed table + wrong replication cluster = crash
#3481

––– comment –––
Start Manticore Search
––– input –––
rm -f /var/log/manticore/searchd.log; stdbuf -oL searchd --stopwait > /dev/null; stdbuf -oL searchd --logdebugv > /dev/null
––– output –––
––– input –––
if timeout 10 grep -qm1 'accepting connections' <(tail -n 1000 -f /var/log/manticore/searchd.log); then echo 'Manticore started!'; else echo 'Timeout or failed!'; fi
––– output –––
Manticore started!
––– comment –––
Execute the MRE commands
––– input –––
mysql -v -P9306 -h0 -e "drop table if exists t; create table t(f int); drop table if exists d; CREATE TABLE d type='distributed' agent='127.0.0.1:9312:t'"
––– output –––
--------------
drop table if exists t
--------------
--------------
create table t(f int)
--------------
--------------
drop table if exists d
--------------
--------------
CREATE TABLE d type='distributed' agent='127.0.0.1:9312:t'
--------------
––– comment –––
Test HTTP update with wrong cluster - should not crash
The noop result here is normal due to this is clustered table
The issue about it: https://github.com/manticoresoftware/manticoresearch/issues/3537
––– input –––
curl -s -X POST http://localhost:9312/update -d '{"cluster": "unknown_cluster", "table": "d", "id": 2, "doc": {"f": 5}}'
––– output –––
{"error":{"type":"action_request_validation_exception","reason":"table t: table t: table 't' is not in any cluster, use just 't'","table":"d"},"status":409}
––– input –––
sleep 2; cat /var/log/manticore/searchd.log | grep 'response data' | head -n 1 | cut -d' ' -f12-
––– output –––
{"version":3,"type":"json response","message":{"error":{"type":"action_request_validation_exception","reason":"table t: table 't' is not in any cluster, use just 't'","table":"t"},"status":409},"meta":null,"error_code":0}
Loading
Loading