Skip to content

Commit 1bdd974

Browse files
authored
Merge 13eae6e into a69ac70
2 parents a69ac70 + 13eae6e commit 1bdd974

9 files changed

+134
-39
lines changed

src/searchd.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7248,7 +7248,7 @@ void sphHandleMysqlUpdate ( StmtErrorReporter_i & tOut, const SqlStmt_t & tStmt,
72487248

72497249
// connect to remote agents and query them
72507250
std::unique_ptr<RequestBuilder_i> pRequestBuilder = CreateRequestBuilder ( sQuery, tStmt );
7251-
std::unique_ptr<ReplyParser_i> pReplyParser = CreateReplyParser ( tStmt.m_bJson, iUpdated, iWarns );
7251+
std::unique_ptr<ReplyParser_i> pReplyParser = CreateReplyParser ( tStmt.m_bJson, iUpdated, iWarns, dFails );
72527252
iSuccesses += PerformRemoteTasks ( dAgents, pRequestBuilder.get (), pReplyParser.get () );
72537253
}
72547254
}
@@ -7994,7 +7994,7 @@ void sphHandleMysqlDelete ( StmtErrorReporter_i & tOut, const SqlStmt_t & tStmt,
79947994

79957995
// connect to remote agents and query them
79967996
std::unique_ptr<RequestBuilder_i> pRequestBuilder = CreateRequestBuilder ( sQuery, tStmt );
7997-
std::unique_ptr<ReplyParser_i> pReplyParser = CreateReplyParser ( tStmt.m_bJson, iGot, iWarns );
7997+
std::unique_ptr<ReplyParser_i> pReplyParser = CreateReplyParser ( tStmt.m_bJson, iGot, iWarns, dErrors );
79987998
PerformRemoteTasks ( dAgents, pRequestBuilder.get (), pReplyParser.get () );
79997999

80008000
// FIXME!!! report error & warnings from agents
@@ -11507,9 +11507,17 @@ void HandleCommandJson ( ISphOutputBuffer & tOut, WORD uVer, InputBuffer_c & tRe
1150711507
// parse request
1150811508
CSphString sEndpoint = tReq.GetString ();
1150911509
CSphString sCommand = tReq.GetString ();
11510+
11511+
OptionsHash_t hOptions;
11512+
hOptions.AddUnique ( "endpoint" ) = sEndpoint;
11513+
if ( uVer>=0x102 )
11514+
{
11515+
hOptions.AddUnique ( "raw_query" ) = tReq.GetString ();
11516+
hOptions.AddUnique ( "full_url" ) = tReq.GetString ();
11517+
}
1151011518

1151111519
CSphVector<BYTE> dResult;
11512-
sphProcessHttpQueryNoResponce ( sEndpoint, sCommand, dResult );
11520+
ProcessHttpJsonQuery ( sCommand, hOptions, dResult );
1151311521

1151411522
auto tReply = APIAnswer ( tOut, VER_COMMAND_JSON );
1151511523
tOut.SendString ( sEndpoint.cstr() );

src/searchdaemon.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ enum SearchdCommandV_e : WORD
150150
VER_COMMAND_STATUS = 0x101,
151151
VER_COMMAND_FLUSHATTRS = 0x100,
152152
VER_COMMAND_SPHINXQL = 0x100,
153-
VER_COMMAND_JSON = 0x101,
153+
VER_COMMAND_JSON = 0x102,
154154
VER_COMMAND_PING = 0x100,
155155
VER_COMMAND_UVAR = 0x100,
156156
VER_COMMAND_CALLPQ = 0x100,
@@ -1254,10 +1254,11 @@ class StmtErrorReporter_i
12541254
class QueryParser_i;
12551255
class RequestBuilder_i;
12561256
class ReplyParser_i;
1257+
class SearchFailuresLog_c;
12571258

12581259
std::unique_ptr<QueryParser_i> CreateQueryParser ( bool bJson ) noexcept;
12591260
std::unique_ptr<RequestBuilder_i> CreateRequestBuilder ( Str_t sQuery, const SqlStmt_t & tStmt );
1260-
std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings );
1261+
std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings, SearchFailuresLog_c & dFails );
12611262
StmtErrorReporter_i * CreateHttpErrorReporter();
12621263

12631264
enum class EHTTP_STATUS : BYTE
@@ -1313,7 +1314,8 @@ bool sphCheckWeCanModify ( RowBuffer_i& tOut );
13131314
bool PollOptimizeRunning ( const CSphString & sIndex );
13141315
void FixPathAbsolute ( CSphString & sPath );
13151316

1316-
void sphProcessHttpQueryNoResponce ( const CSphString& sEndpoint, const CSphString& sQuery, CSphVector<BYTE> & dResult );
1317+
using OptionsHash_t = SmallStringHash_T<CSphString>;
1318+
void ProcessHttpJsonQuery ( const CSphString & sQuery, OptionsHash_t & hOptions, CSphVector<BYTE> & dResult );
13171319
void sphHttpErrorReply ( CSphVector<BYTE> & dData, EHTTP_STATUS eCode, const char * szError );
13181320
void LoadCompatHttp ( const char * sData );
13191321
void SaveCompatHttp ( JsonEscapedBuilder & tOut );

src/searchdhttp.cpp

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -899,9 +899,11 @@ static void HttpHandlerIndexPage ( CSphVector<BYTE> & dData )
899899
class JsonRequestBuilder_c : public RequestBuilder_i
900900
{
901901
public:
902-
JsonRequestBuilder_c ( const char* szQuery, CSphString sEndpoint )
903-
: m_sEndpoint ( std::move ( sEndpoint ) )
904-
, m_tQuery ( szQuery )
902+
JsonRequestBuilder_c ( const char* szQuery, CSphString sEndpoint, const CSphString & sRawQuery, const CSphString & sFullUrl )
903+
: m_sEndpoint ( std::move ( sEndpoint ) )
904+
, m_tQuery ( szQuery )
905+
, m_sRawQuery ( sRawQuery )
906+
, m_sFullUrl ( sFullUrl )
905907
{
906908
// fixme: we can implement replacing indexes in a string (without parsing) if it becomes a performance issue
907909
}
@@ -917,23 +919,28 @@ class JsonRequestBuilder_c : public RequestBuilder_i
917919
auto tWr = APIHeader ( tOut, SEARCHD_COMMAND_JSON, VER_COMMAND_JSON ); // API header
918920
tOut.SendString ( m_sEndpoint.cstr() );
919921
tOut.SendString ( sRequest.cstr() );
922+
tOut.SendString ( m_sRawQuery.cstr() );
923+
tOut.SendString ( m_sFullUrl.cstr() );
920924
}
921925

922926
private:
923927
CSphString m_sEndpoint;
924928
mutable JsonObj_c m_tQuery;
929+
const CSphString & m_sRawQuery;
930+
const CSphString & m_sFullUrl;
925931
};
926932

927933

928934
class JsonReplyParser_c : public ReplyParser_i
929935
{
930936
public:
931-
JsonReplyParser_c ( int & iAffected, int & iWarnings )
937+
JsonReplyParser_c ( int & iAffected, int & iWarnings, SearchFailuresLog_c & tFails )
932938
: m_iAffected ( iAffected )
933939
, m_iWarnings ( iWarnings )
940+
, m_tFails ( tFails )
934941
{}
935942

936-
bool ParseReply ( MemInputBuffer_c & tReq, AgentConn_t & ) const final
943+
bool ParseReply ( MemInputBuffer_c & tReq, AgentConn_t & tAgent ) const final
937944
{
938945
CSphString sEndpoint = tReq.GetString();
939946
EHTTP_ENDPOINT eEndpoint = StrToHttpEndpoint ( sEndpoint );
@@ -945,12 +952,18 @@ class JsonReplyParser_c : public ReplyParser_i
945952
tReq.GetBytes ( dResult.Begin(), (int)uLength );
946953
dResult[uLength] = '\0';
947954

948-
return sphGetResultStats ( (const char *)dResult.Begin(), m_iAffected, m_iWarnings, eEndpoint==EHTTP_ENDPOINT::JSON_UPDATE );
955+
CSphString sError;
956+
bool bOk = sphGetResultStats ( (const char *)dResult.Begin(), m_iAffected, m_iWarnings, eEndpoint==EHTTP_ENDPOINT::JSON_UPDATE, sError );
957+
if ( !sError.IsEmpty() )
958+
m_tFails.Submit ( tAgent.m_tDesc.m_sIndexes, nullptr, sError.cstr() );
959+
960+
return bOk;
949961
}
950962

951963
protected:
952964
int & m_iAffected;
953965
int & m_iWarnings;
966+
SearchFailuresLog_c & m_tFails;
954967
};
955968

956969
std::unique_ptr<QueryParser_i> CreateQueryParser ( bool bJson ) noexcept
@@ -963,17 +976,17 @@ std::unique_ptr<RequestBuilder_i> CreateRequestBuilder ( Str_t sQuery, const Sql
963976
if ( tStmt.m_bJson )
964977
{
965978
assert ( !tStmt.m_sEndpoint.IsEmpty() );
966-
return std::make_unique<JsonRequestBuilder_c> ( sQuery.first, tStmt.m_sEndpoint );
979+
return std::make_unique<JsonRequestBuilder_c> ( sQuery.first, tStmt.m_sEndpoint, tStmt.m_sRawQuery, tStmt.m_sFullUrl );
967980
} else
968981
{
969982
return std::make_unique<SphinxqlRequestBuilder_c> ( sQuery, tStmt );
970983
}
971984
}
972985

973-
std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings )
986+
std::unique_ptr<ReplyParser_i> CreateReplyParser ( bool bJson, int & iUpdated, int & iWarnings, SearchFailuresLog_c & tFails )
974987
{
975988
if ( bJson )
976-
return std::make_unique<JsonReplyParser_c> ( iUpdated, iWarnings );
989+
return std::make_unique<JsonReplyParser_c> ( iUpdated, iWarnings, tFails );
977990
else
978991
return std::make_unique<SphinxqlReplyParser_c> ( &iUpdated, &iWarnings );
979992
}
@@ -1815,15 +1828,28 @@ class HttpJsonUpdateTraits_c
18151828
const ResultSetFormat_e m_eFormat = ResultSetFormat_e::MntSearch;
18161829
};
18171830

1831+
static void SetQueryOptions ( const OptionsHash_t & hOpts, SqlStmt_t & tStmt )
1832+
{
1833+
if ( tStmt.m_eStmt==STMT_UPDATE || tStmt.m_eStmt==STMT_DELETE )
1834+
{
1835+
const CSphString * pRawQuery = hOpts ( "raw_query" );
1836+
if ( pRawQuery )
1837+
tStmt.m_sRawQuery = *pRawQuery;
1838+
const CSphString * pFullUrl = hOpts ( "full_url" );
1839+
if ( pFullUrl )
1840+
tStmt.m_sFullUrl = *pFullUrl;
1841+
}
1842+
}
18181843

1819-
class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
1844+
class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c, HttpOptionTrait_t
18201845
{
18211846
protected:
18221847
Str_t m_sQuery;
18231848

18241849
public:
1825-
explicit HttpHandler_JsonUpdate_c ( Str_t sQuery )
1826-
: m_sQuery ( sQuery )
1850+
explicit HttpHandler_JsonUpdate_c ( Str_t sQuery, const OptionsHash_t & tOptions )
1851+
: HttpOptionTrait_t ( tOptions )
1852+
, m_sQuery ( sQuery )
18271853
{}
18281854

18291855
bool Process () final
@@ -1833,6 +1859,7 @@ class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
18331859
tStmt.m_bJson = true;
18341860
tStmt.m_tQuery.m_eQueryType = QUERY_JSON;
18351861
tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_UPDATE );
1862+
tStmt.m_eStmt = STMT_UPDATE;
18361863

18371864
DocID_t tDocId = 0;
18381865
if ( !ParseQuery ( tStmt, tDocId ) )
@@ -1841,6 +1868,8 @@ class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
18411868
return false;
18421869
}
18431870

1871+
SetQueryOptions ( m_tOptions, tStmt );
1872+
18441873
JsonObj_c tResult = JsonNull;
18451874
bool bResult = ProcessQuery ( tStmt, tDocId, tResult );
18461875

@@ -1868,14 +1897,15 @@ class HttpHandler_JsonUpdate_c : public HttpHandler_c, HttpJsonUpdateTraits_c
18681897
class HttpHandler_JsonDelete_c final : public HttpHandler_JsonUpdate_c
18691898
{
18701899
public:
1871-
explicit HttpHandler_JsonDelete_c ( Str_t sQuery )
1872-
: HttpHandler_JsonUpdate_c ( sQuery )
1900+
explicit HttpHandler_JsonDelete_c ( Str_t sQuery, const OptionsHash_t & tOptions )
1901+
: HttpHandler_JsonUpdate_c ( sQuery, tOptions )
18731902
{}
18741903

18751904
protected:
18761905
bool ParseQuery ( SqlStmt_t & tStmt, DocID_t & tDocId ) final
18771906
{
18781907
tStmt.m_sEndpoint = HttpEndpointToStr ( EHTTP_ENDPOINT::JSON_DELETE );
1908+
tStmt.m_eStmt = STMT_DELETE;
18791909
return sphParseJsonDelete ( m_sQuery, tStmt, tDocId, m_sError );
18801910
}
18811911

@@ -2090,6 +2120,8 @@ class HttpHandler_JsonBulk_c : public HttpHandler_c, public HttpJsonUpdateTraits
20902120
iLastTxStartLine = iCurLine;
20912121
}
20922122

2123+
SetQueryOptions ( m_tOptions, tStmt );
2124+
20932125
switch ( tStmt.m_eStmt )
20942126
{
20952127
case STMT_INSERT:
@@ -2309,14 +2341,14 @@ static std::unique_ptr<HttpHandler_c> CreateHttpHandler ( EHTTP_ENDPOINT eEndpoi
23092341
if ( tSource.GetError() )
23102342
return nullptr;
23112343
else
2312-
return std::make_unique<HttpHandler_JsonUpdate_c> ( sQuery ); // json
2344+
return std::make_unique<HttpHandler_JsonUpdate_c> ( sQuery, tOptions ); // json
23132345

23142346
case EHTTP_ENDPOINT::JSON_DELETE:
23152347
SetQuery ( tSource.ReadAll() );
23162348
if ( tSource.GetError() )
23172349
return nullptr;
23182350
else
2319-
return std::make_unique<HttpHandler_JsonDelete_c> ( sQuery ); // json
2351+
return std::make_unique<HttpHandler_JsonDelete_c> ( sQuery, tOptions ); // json
23202352

23212353
case EHTTP_ENDPOINT::JSON_BULK:
23222354
return std::make_unique<HttpHandler_JsonBulk_c> ( tSource, tOptions ); // json
@@ -2394,15 +2426,14 @@ HttpProcessResult_t ProcessHttpQuery ( CharStream_c & tSource, Str_t & sSrcQuery
23942426
return tRes;
23952427
}
23962428

2397-
void sphProcessHttpQueryNoResponce ( const CSphString & sEndpoint, const CSphString & sQuery, CSphVector<BYTE> & dResult )
2429+
void ProcessHttpJsonQuery ( const CSphString & sQuery, OptionsHash_t & hOptions, CSphVector<BYTE> & dResult )
23982430
{
2399-
OptionsHash_t hOptions;
2400-
hOptions.Add ( sEndpoint, "endpoint" );
2431+
http_method eReqType = HTTP_POST;
24012432

24022433
BlobStream_c tQuery ( sQuery );
24032434
Str_t sSrcQuery;
2404-
HttpProcessResult_t tRes = ProcessHttpQuery ( tQuery, sSrcQuery, hOptions, dResult, false, HTTP_GET );
2405-
ProcessHttpQueryBuddy ( tRes, sSrcQuery, hOptions, dResult, false, HTTP_GET );
2435+
HttpProcessResult_t tRes = ProcessHttpQuery ( tQuery, sSrcQuery, hOptions, dResult, false, eReqType );
2436+
ProcessHttpQueryBuddy ( tRes, sSrcQuery, hOptions, dResult, false, eReqType );
24062437
}
24072438

24082439
static bool IsCompressed ( const OptionsHash_t & hOptions )
@@ -3282,6 +3313,8 @@ bool HttpHandlerEsBulk_c::ProcessTnx ( const VecTraits_T<BulkTnx_t> & dTnx, VecT
32823313
bool bAction = false;
32833314
JsonObj_c tResult = JsonNull;
32843315

3316+
SetQueryOptions ( m_hOpts, tStmt );
3317+
32853318
switch ( tStmt.m_eStmt )
32863319
{
32873320
case STMT_INSERT:

src/searchdhttp.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#include "searchdaemon.h"
1717
#include "http/http_parser.h"
1818

19-
using OptionsHash_t = SmallStringHash_T<CSphString>;
2019
class AsyncNetInputBuffer_c;
2120

2221
class HttpRequestParser_c : public ISphNoncopyable

src/searchdsql.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ struct SqlStmt_t
327327

328328
bool m_bJson = false;
329329
CSphString m_sEndpoint;
330+
CSphString m_sRawQuery;
331+
CSphString m_sFullUrl;
330332

331333
CSphVector<CSphString> m_dStringSubkeys;
332334
CSphVector<int64_t> m_dIntSubkeys;

src/sphinxjsonquery.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2928,7 +2928,7 @@ JsonObj_c sphEncodeInsertErrorJson ( const char * szIndex, const char * szError,
29282928
}
29292929

29302930

2931-
bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings, bool bUpdate )
2931+
bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings, bool bUpdate, CSphString & sError )
29322932
{
29332933
JsonObj_c tJsonRoot ( szResult );
29342934
if ( !tJsonRoot )
@@ -2937,23 +2937,33 @@ bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings
29372937
// no warnings in json results for now
29382938
iWarnings = 0;
29392939

2940+
CSphString sParseError;
29402941
if ( tJsonRoot.HasItem("error") )
29412942
{
2943+
JsonObj_c tReplyError = tJsonRoot.GetItem ( "error" );
2944+
if ( tReplyError.IsObj() )
2945+
{
2946+
JsonObj_c tReason = tReplyError.GetItem ( "reason" );
2947+
if ( tReason && tReason.IsStr() )
2948+
sError = tReason.StrVal();
2949+
}
2950+
if ( sError.IsEmpty() )
2951+
sError = tReplyError.AsString();
2952+
29422953
iAffected = 0;
2943-
return true;
2954+
return false;
29442955
}
29452956

29462957
// its either update or delete
2947-
CSphString sError;
2948-
JsonObj_c tAffected = tJsonRoot.GetIntItem ( bUpdate ? "updated" : "deleted", sError );
2958+
JsonObj_c tAffected = tJsonRoot.GetIntItem ( bUpdate ? "updated" : "deleted", sParseError );
29492959
if ( tAffected )
29502960
{
29512961
iAffected = (int)tAffected.IntVal();
29522962
return true;
29532963
}
29542964

29552965
// it was probably a query with an "id"
2956-
JsonObj_c tId = tJsonRoot.GetIntItem ( "id", sError );
2966+
JsonObj_c tId = tJsonRoot.GetIntItem ( "id", sParseError );
29572967
if ( tId )
29582968
{
29592969
iAffected = 1;

src/sphinxjsonquery.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ JsonObj_c sphEncodeDeleteResultJson ( const char * szIndex, DocID_t tDocId, in
8787
JsonObj_c sphEncodeInsertErrorJson ( const char * szIndex, const char * szError, ResultSetFormat_e eFormat );
8888
JsonObj_c sphEncodeTxnResultJson ( const char* szIndex, DocID_t tDocId, int iInserts, int iDeletes, int iUpdates, ResultSetFormat_e eFormat );
8989

90-
bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings, bool bUpdate );
90+
bool sphGetResultStats ( const char * szResult, int & iAffected, int & iWarnings, bool bUpdate, CSphString & sError );
9191

9292
bool NonEmptyQuery ( const JsonObj_c & tQuery );
9393
bool CheckRootNode ( const JsonObj_c & tRoot, CSphString & sError );
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
http update + distributed table + wrong replication cluster = crash
2+
#3481
3+
4+
––– comment –––
5+
Start Manticore Search
6+
––– input –––
7+
rm -f /var/log/manticore/searchd.log; stdbuf -oL searchd --stopwait > /dev/null; stdbuf -oL searchd --logdebugv > /dev/null
8+
––– output –––
9+
––– input –––
10+
if timeout 10 grep -qm1 'accepting connections' <(tail -n 1000 -f /var/log/manticore/searchd.log); then echo 'Manticore started!'; else echo 'Timeout or failed!'; fi
11+
––– output –––
12+
Manticore started!
13+
––– comment –––
14+
Execute the MRE commands
15+
––– input –––
16+
mysql -v -P9306 -h0 -e "drop table if exists t; create table t(f int); drop table if exists d; CREATE TABLE d type='distributed' agent='127.0.0.1:9312:t'"
17+
––– output –––
18+
--------------
19+
drop table if exists t
20+
--------------
21+
--------------
22+
create table t(f int)
23+
--------------
24+
--------------
25+
drop table if exists d
26+
--------------
27+
--------------
28+
CREATE TABLE d type='distributed' agent='127.0.0.1:9312:t'
29+
--------------
30+
––– comment –––
31+
Test HTTP update with wrong cluster - should not crash
32+
The noop result here is normal due to this is clustered table
33+
The issue about it: https://github.com/manticoresoftware/manticoresearch/issues/3537
34+
––– input –––
35+
curl -s -X POST http://localhost:9312/update -d '{"cluster": "unknown_cluster", "table": "d", "id": 2, "doc": {"f": 5}}'
36+
––– output –––
37+
{"error":{"type":"action_request_validation_exception","reason":"","table":"d"},"status":409}
38+
––– input –––
39+
sleep 2; cat /var/log/manticore/searchd.log | grep 'response data' | cut -d' ' -f12-
40+
––– output –––
41+
{"version":3,"type":"json response","message":{"error":{"type":"action_request_validation_exception","reason":"","table":"d"},"status":409},"meta":null,"error_code":0}

0 commit comments

Comments
 (0)