8
8
9
9
package org .opensearch .ingest ;
10
10
11
+ import org .opensearch .OpenSearchParseException ;
11
12
import org .opensearch .test .OpenSearchTestCase ;
12
13
13
14
import java .util .ArrayList ;
14
15
import java .util .Arrays ;
15
16
import java .util .Collections ;
17
+ import java .util .HashMap ;
16
18
import java .util .List ;
19
+ import java .util .Map ;
17
20
import java .util .function .Consumer ;
18
21
19
22
public class AbstractBatchingProcessorTests extends OpenSearchTestCase {
@@ -87,6 +90,29 @@ public void testBatchExecute_defaultBatchSize() {
87
90
assertEquals (wrapperList .subList (2 , 3 ), processor .getSubBatches ().get (2 ));
88
91
}
89
92
93
+ public void testFactory_invalidBatchSize () {
94
+ Map <String , Object > config = new HashMap <>();
95
+ config .put ("batch_size" , 0 );
96
+ DummyProcessor .DummyProcessorFactory factory = new DummyProcessor .DummyProcessorFactory ("DummyProcessor" );
97
+ OpenSearchParseException exception = assertThrows (OpenSearchParseException .class , () -> factory .create (config ));
98
+ assertEquals ("[batch_size] batch size must be a positive integer" , exception .getMessage ());
99
+ }
100
+
101
+ public void testFactory_defaultBatchSize () throws Exception {
102
+ Map <String , Object > config = new HashMap <>();
103
+ DummyProcessor .DummyProcessorFactory factory = new DummyProcessor .DummyProcessorFactory ("DummyProcessor" );
104
+ DummyProcessor processor = (DummyProcessor ) factory .create (config );
105
+ assertEquals (1 , processor .batchSize );
106
+ }
107
+
108
+ public void testFactory_callNewProcessor () throws Exception {
109
+ Map <String , Object > config = new HashMap <>();
110
+ config .put ("batch_size" , 3 );
111
+ DummyProcessor .DummyProcessorFactory factory = new DummyProcessor .DummyProcessorFactory ("DummyProcessor" );
112
+ DummyProcessor processor = (DummyProcessor ) factory .create (config );
113
+ assertEquals (3 , processor .batchSize );
114
+ }
115
+
90
116
static class DummyProcessor extends AbstractBatchingProcessor {
91
117
private List <List <IngestDocumentWrapper >> subBatches = new ArrayList <>();
92
118
@@ -113,5 +139,22 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
113
139
public String getType () {
114
140
return null ;
115
141
}
142
+
143
+ public static class DummyProcessorFactory extends Factory {
144
+
145
+ protected DummyProcessorFactory (String processorType ) {
146
+ super (processorType );
147
+ }
148
+
149
+ public AbstractBatchingProcessor create (Map <String , Object > config ) throws Exception {
150
+ final Map <String , org .opensearch .ingest .Processor .Factory > processorFactories = new HashMap <>();
151
+ return super .create (processorFactories , "tag" , "description" , config );
152
+ }
153
+
154
+ @ Override
155
+ protected AbstractBatchingProcessor newProcessor (String tag , String description , int batchSize , Map <String , Object > config ) {
156
+ return new DummyProcessor (batchSize );
157
+ }
158
+ }
116
159
}
117
160
}
0 commit comments