21
21
#include "tanalytics.h"
22
22
#include "taoserror.h"
23
23
#include "tcommon.h"
24
- // #include "tcompare.h"
25
24
#include "tdatablock.h"
26
- // #include "tfill.h"
27
25
#include "tmsg.h"
28
- // #include "ttime.h"
29
26
30
27
#ifdef USE_ANALYTICS
31
28
@@ -45,7 +42,7 @@ typedef struct {
45
42
typedef struct {
46
43
char algoName [TSDB_ANALYTIC_ALGO_NAME_LEN ];
47
44
char algoUrl [TSDB_ANALYTIC_ALGO_URL_LEN ];
48
- char algoOpt [ TSDB_ANALYTIC_ALGO_OPTION_LEN ] ;
45
+ char * pOptions ;
49
46
int64_t maxTs ;
50
47
int64_t minTs ;
51
48
int64_t numOfRows ;
@@ -195,7 +192,7 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
195
192
code = taosAnalyBufWriteDataEnd (pBuf );
196
193
if (code != 0 ) return code ;
197
194
198
- code = taosAnalyBufWriteOptStr (pBuf , "option" , pSupp -> algoOpt );
195
+ code = taosAnalyBufWriteOptStr (pBuf , "option" , pSupp -> pOptions );
199
196
if (code != 0 ) return code ;
200
197
201
198
code = taosAnalyBufWriteOptStr (pBuf , "algo" , pSupp -> algoName );
@@ -226,7 +223,7 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
226
223
code = taosAnalyBufWriteOptFloat (pBuf , "conf" , pSupp -> conf );
227
224
if (code != 0 ) return code ;
228
225
229
- int32_t len = strlen (pSupp -> algoOpt );
226
+ int32_t len = strlen (pSupp -> pOptions );
230
227
int64_t every = (pSupp -> setEvery != 0 ) ? pSupp -> every : ((pSupp -> maxTs - pSupp -> minTs ) / (pSupp -> numOfRows - 1 ));
231
228
code = taosAnalyBufWriteOptInt (pBuf , "every" , every );
232
229
if (code != 0 ) return code ;
@@ -538,6 +535,18 @@ static int32_t validInputParams(SFunctionNode* pFunc, const char* id) {
538
535
return code ;
539
536
}
540
537
538
+ static bool existInList (SForecastSupp * pSupp , int32_t slotId ) {
539
+ for (int32_t j = 0 ; j < taosArrayGetSize (pSupp -> pCovariateSlotList ); ++ j ) {
540
+ SColumn * pCol = taosArrayGet (pSupp -> pCovariateSlotList , j );
541
+
542
+ if (pCol -> slotId == slotId ) {
543
+ return true;
544
+ }
545
+ }
546
+
547
+ return false;
548
+ }
549
+
541
550
static int32_t forecastParseInput (SForecastSupp * pSupp , SNodeList * pFuncs , const char * id ) {
542
551
int32_t code = 0 ;
543
552
SNode * pNode = NULL ;
@@ -571,7 +580,11 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const
571
580
pSupp -> targetValType = pTarget -> node .resType .type ;
572
581
573
582
// let's add the holtwinters as the default forecast algorithm
574
- tstrncpy (pSupp -> algoOpt , "algo=holtwinters" , TSDB_ANALYTIC_ALGO_OPTION_LEN );
583
+ pSupp -> pOptions = taosStrdup ("algo=holtwinters" );
584
+ if (pSupp -> pOptions == NULL ) {
585
+ qError ("%s failed to dup forecast option, code:%s" , id , tstrerror (terrno ));
586
+ return terrno ;
587
+ }
575
588
} else {
576
589
SColumnNode * pTarget = (SColumnNode * )nodesListGetNode (pFunc -> pParameterList , 0 );
577
590
bool assignTs = false;
@@ -594,7 +607,7 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const
594
607
} else if (nodeType (pNode ) == QUERY_NODE_VALUE ) {
595
608
if (!assignOpt ) {
596
609
SValueNode * pOptNode = (SValueNode * )pNode ;
597
- tstrncpy ( pSupp -> algoOpt , pOptNode -> literal , sizeof ( pSupp -> algoOpt ) );
610
+ pSupp -> pOptions = taosStrdup ( pOptNode -> literal );
598
611
assignOpt = true;
599
612
continue ;
600
613
}
@@ -603,7 +616,11 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const
603
616
604
617
if (!assignOpt ) {
605
618
// set the default forecast option
606
- tstrncpy (pSupp -> algoOpt , "algo=holtwinters" , TSDB_ANALYTIC_ALGO_OPTION_LEN );
619
+ pSupp -> pOptions = taosStrdup ("algo=holtwinters" );
620
+ if (pSupp -> pOptions == NULL ) {
621
+ qError ("%s failed to dup forecast option, code:%s" , id , tstrerror (terrno ));
622
+ return terrno ;
623
+ }
607
624
}
608
625
609
626
pSupp -> pCovariateSlotList = taosArrayInit (4 , sizeof (SColumn ));
@@ -616,6 +633,15 @@ static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const
616
633
break ;
617
634
}
618
635
636
+ if (p -> slotId == pSupp -> targetValSlot ) {
637
+ continue ; // duplicate the target column, ignore it
638
+ }
639
+
640
+ bool exist = existInList (pSupp , p -> slotId );
641
+ if (exist ) {
642
+ continue ; // duplicate column, ignore it
643
+ }
644
+
619
645
SColumn col = {.slotId = p -> slotId ,
620
646
.colType = p -> colType ,
621
647
.type = p -> node .resType .type ,
@@ -671,7 +697,7 @@ static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
671
697
672
698
initForecastOpt (pSupp );
673
699
674
- code = taosAnalyGetOpts (pSupp -> algoOpt , & pHashMap );
700
+ code = taosAnalyGetOpts (pSupp -> pOptions , & pHashMap );
675
701
if (code != TSDB_CODE_SUCCESS ) {
676
702
return code ;
677
703
}
@@ -682,7 +708,13 @@ static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
682
708
return code ;
683
709
}
684
710
685
- code = taosAnalysisParseAlgo (pSupp -> algoOpt , pSupp -> algoName , pSupp -> algoUrl , ANALY_ALGO_TYPE_FORECAST ,
711
+ if (taosHashGetSize (pHashMap ) == 0 ) {
712
+ code = TSDB_CODE_INVALID_PARA ;
713
+ qError ("%s no valid options for forecast, failed to exec" , id );
714
+ return code ;
715
+ }
716
+
717
+ code = taosAnalysisParseAlgo (pSupp -> pOptions , pSupp -> algoName , pSupp -> algoUrl , ANALY_ALGO_TYPE_FORECAST ,
686
718
tListLen (pSupp -> algoUrl ), pHashMap , id );
687
719
TSDB_CHECK_CODE (code , lino , _end );
688
720
@@ -705,7 +737,7 @@ static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
705
737
pSupp -> forecastRows = v ;
706
738
qDebug ("%s forecast rows:%" PRId64 , id , pSupp -> forecastRows );
707
739
} else {
708
- qDebug ("%s forecast rows not found:%s, use default:%" PRId64 , id , pSupp -> algoOpt , pSupp -> forecastRows );
740
+ qDebug ("%s forecast rows not found:%s, use default:%" PRId64 , id , pSupp -> pOptions , pSupp -> forecastRows );
709
741
}
710
742
711
743
if (pSupp -> forecastRows > ANALY_FORECAST_RES_MAX_ROWS ) {
@@ -730,7 +762,7 @@ static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
730
762
qDebug ("%s forecast conf:%.2f" , id , pSupp -> conf );
731
763
}
732
764
} else {
733
- qDebug ("%s forecast conf not found:%s, use default:%.2f" , id , pSupp -> algoOpt , pSupp -> conf );
765
+ qDebug ("%s forecast conf not found:%s, use default:%.2f" , id , pSupp -> pOptions , pSupp -> conf );
734
766
}
735
767
736
768
// extract the start timestamp
@@ -780,8 +812,11 @@ static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
780
812
781
813
void * pCol = taosHashGet (pHashMap , nameBuf , strlen (nameBuf ));
782
814
if (pCol == NULL ) {
783
- qError ("%s dynamic real column related:%s column name:%s not specified" , id , pKey , nameBuf );
784
- code = TSDB_CODE_ANA_INTERNAL_ERROR ;
815
+ char * pTmp = taosStrndupi (pKey , keyLen );
816
+ qError ("%s dynamic real column related:%s column name:%s not specified" , id , pTmp , nameBuf );
817
+
818
+ taosMemoryFree (pTmp );
819
+ code = TSDB_CODE_INVALID_PARA ;
785
820
goto _end ;
786
821
} else {
787
822
// build dynamic_real_feature
@@ -803,7 +838,8 @@ static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
803
838
804
839
if (index == -1 ) {
805
840
qError ("%s not found the required future dynamic real column:%s" , id , d .pName );
806
- code = TSDB_CODE_ANA_INTERNAL_ERROR ;
841
+ code = TSDB_CODE_INVALID_PARA ;
842
+ taosMemoryFree (d .pName );
807
843
goto _end ;
808
844
}
809
845
@@ -812,15 +848,20 @@ static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
812
848
d .data .info .type = pColx -> type ;
813
849
d .data .info .bytes = pColx -> bytes ;
814
850
815
- char * buf = taosStrndup (pVal , taosHashGetValueSize ((void * )pVal ));
851
+ int32_t len = taosHashGetValueSize ((void * )pVal );
852
+ char * buf = taosStrndupi (pVal , len );
816
853
int32_t unused = strdequote ((char * )buf );
817
854
818
855
int32_t num = 0 ;
819
856
char * * pList = strsplit (buf , " " , & num );
820
857
if (num != pSupp -> forecastRows ) {
821
858
qError ("%s the rows:%d of future dynamic real column data is not equalled to the forecasting rows:%" PRId64 ,
822
859
id , num , pSupp -> forecastRows );
823
- code = TSDB_CODE_ANA_INTERNAL_ERROR ;
860
+ code = TSDB_CODE_INVALID_PARA ;
861
+
862
+ taosMemoryFree (d .pName );
863
+ taosMemoryFree (pList );
864
+ taosMemoryFree (buf );
824
865
goto _end ;
825
866
}
826
867
@@ -890,6 +931,9 @@ static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
890
931
891
932
}
892
933
934
+ taosMemoryFree (pList );
935
+ taosMemoryFree (buf );
936
+
893
937
void * noret = taosArrayPush (pSupp -> pDynamicRealList , & d );
894
938
if (noret == NULL ) {
895
939
qError ("%s failed to add column info in dynamic real column info" , id );
@@ -1047,7 +1091,7 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo
1047
1091
1048
1092
* pOptrInfo = pOperator ;
1049
1093
1050
- qDebug ("%s forecast env is initialized, option:%s" , pId , pSupp -> algoOpt );
1094
+ qDebug ("%s forecast env is initialized, option:%s" , pId , pSupp -> pOptions );
1051
1095
return TSDB_CODE_SUCCESS ;
1052
1096
1053
1097
_error :
@@ -1060,6 +1104,12 @@ int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNo
1060
1104
return code ;
1061
1105
}
1062
1106
1107
+ static void destroyColFutureData (void * p ) {
1108
+ SColFutureData * pData = p ;
1109
+ taosMemoryFree (pData -> pName );
1110
+ colDataDestroy (& pData -> data );
1111
+ }
1112
+
1063
1113
static void destroyForecastInfo (void * param ) {
1064
1114
SForecastOperatorInfo * pInfo = (SForecastOperatorInfo * )param ;
1065
1115
@@ -1069,6 +1119,11 @@ static void destroyForecastInfo(void* param) {
1069
1119
taosArrayDestroy (pInfo -> forecastSupp .pCovariateSlotList );
1070
1120
pInfo -> forecastSupp .pCovariateSlotList = NULL ;
1071
1121
1122
+ taosArrayDestroyEx (pInfo -> forecastSupp .pDynamicRealList , destroyColFutureData );
1123
+ pInfo -> forecastSupp .pDynamicRealList = NULL ;
1124
+
1125
+ taosMemoryFree (pInfo -> forecastSupp .pOptions );
1126
+
1072
1127
cleanupExprSupp (& pInfo -> scalarSup );
1073
1128
taosAnalyBufDestroy (& pInfo -> forecastSupp .analyBuf );
1074
1129
taosMemoryFreeClear (param );
0 commit comments