Skip to content

Commit 4d2d9e1

Browse files
Multiple connectors for a single extractor #120
1 parent 490cd25 commit 4d2d9e1

File tree

17 files changed

+223
-30
lines changed

17 files changed

+223
-30
lines changed

RELEASE-NOTES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Release 2.2
44
#114 Targets mapped event before merging update
55
#116 tokenManager is null in DefaultDeleteBuilder
66
#117 JdbcNormalizer for java.time classes
7+
#120 Multiple connectors for a single extractor
78

89
Release 2.1
910

link-move-csv/src/test/java/com/nhl/link/move/runtime/extractor/BaseExtractorFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void setUpExtractorFactory() {
3434
@Before
3535
public void setUpExtractorModel() {
3636
model = new MutableExtractorModel("testExtractorConfig");
37-
model.setConnectorId(CONNECTOR_ID);
37+
model.addConnectorId(CONNECTOR_ID);
3838
model.setAttributes(new BaseRowAttribute[0]);
3939
}
4040

link-move/src/main/java/com/nhl/link/move/extractor/model/ContainerAwareExtractorModel.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.nhl.link.move.extractor.model;
22

3+
import java.util.Collection;
34
import java.util.Map;
45

56
import com.nhl.link.move.RowAttribute;
@@ -43,6 +44,11 @@ public String getConnectorId() {
4344
return connectorId != null ? connectorId : parent.getConnectorId();
4445
}
4546

47+
@Override
48+
public Collection<String> getConnectorIds() {
49+
return delegate.getConnectorIds().isEmpty() ? parent.getConnectorIds() : delegate.getConnectorIds();
50+
}
51+
4652
@Override
4753
public Map<String, String> getProperties() {
4854
return delegate.getProperties();

link-move/src/main/java/com/nhl/link/move/extractor/model/ExtractorModel.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.nhl.link.move.extractor.model;
22

3+
import java.util.Collection;
34
import java.util.Map;
45

56
import com.nhl.link.move.RowAttribute;
@@ -12,14 +13,21 @@
1213
*/
1314
public interface ExtractorModel {
1415

15-
public static final String DEFAULT_NAME = "default_extractor";
16+
String DEFAULT_NAME = "default_extractor";
1617

1718
String getName();
1819

1920
String getType();
2021

22+
@Deprecated
2123
String getConnectorId();
2224

25+
/**
26+
* @return Collection of connector IDs
27+
* @since 2.2
28+
*/
29+
Collection<String> getConnectorIds();
30+
2331
Map<String, String> getProperties();
2432

2533
RowAttribute[] getAttributes();

link-move/src/main/java/com/nhl/link/move/extractor/model/ExtractorModelContainer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,15 @@ public interface ExtractorModelContainer {
1515

1616
String getType();
1717

18+
@Deprecated
1819
String getConnectorId();
1920

21+
/**
22+
* @return Collection of connector IDs
23+
* @since 2.2
24+
*/
25+
Collection<String> getConnectorIds();
26+
2027
ExtractorModel getExtractor(String name);
2128

2229
Collection<String> getExtractorNames();

link-move/src/main/java/com/nhl/link/move/extractor/model/MutableExtractorModel.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package com.nhl.link.move.extractor.model;
22

3+
import java.util.Collection;
4+
import java.util.Collections;
35
import java.util.HashMap;
6+
import java.util.HashSet;
47
import java.util.Map;
8+
import java.util.Set;
59

610
import com.nhl.link.move.RowAttribute;
711

@@ -12,13 +16,14 @@ public class MutableExtractorModel implements ExtractorModel {
1216

1317
private String name;
1418
private String type;
15-
private String connectorId;
19+
private Set<String> connectorIds;
1620
private long loadedOn;
1721
private RowAttribute[] attributes;
1822
private Map<String, String> properties;
1923

2024
public MutableExtractorModel(String name) {
2125
this.name = name;
26+
this.connectorIds = new HashSet<>();
2227
this.properties = new HashMap<>();
2328
}
2429

@@ -39,7 +44,18 @@ public String getType() {
3944

4045
@Override
4146
public String getConnectorId() {
42-
return connectorId;
47+
if (connectorIds.isEmpty()) {
48+
// connector IDs can be missing if this model is a part of a model container
49+
return null;
50+
} if (connectorIds.size() == 1) {
51+
return connectorIds.iterator().next();
52+
}
53+
throw new IllegalStateException("Multiple connector IDs specified in model");
54+
}
55+
56+
@Override
57+
public Collection<String> getConnectorIds() {
58+
return connectorIds;
4359
}
4460

4561
@Override
@@ -52,8 +68,11 @@ public long getLoadedOn() {
5268
return loadedOn;
5369
}
5470

55-
public void setConnectorId(String connectorId) {
56-
this.connectorId = connectorId;
71+
/**
72+
* @since 2.2
73+
*/
74+
public void addConnectorId(String connectorId) {
75+
this.connectorIds.add(connectorId);
5776
}
5877

5978
public void setAttributes(RowAttribute... rowKeys) {

link-move/src/main/java/com/nhl/link/move/extractor/model/MutableExtractorModelContainer.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import java.util.Collection;
44
import java.util.Collections;
5+
import java.util.HashSet;
56
import java.util.Map;
7+
import java.util.Set;
68
import java.util.TreeMap;
79

810
/**
@@ -12,14 +14,15 @@ public class MutableExtractorModelContainer implements ExtractorModelContainer {
1214

1315
private String location;
1416
private String type;
15-
private String connectorId;
17+
private Set<String> connectorIds;
1618
private long loadedOn;
1719

1820
private Map<String, ExtractorModel> extractors;
1921

2022
public MutableExtractorModelContainer(String location) {
2123
this.location = location;
2224

25+
this.connectorIds = new HashSet<>();
2326
// make sure the map is sorted for consistent iteration over
2427
// extractors...
2528
this.extractors = new TreeMap<>();
@@ -47,7 +50,18 @@ public String getType() {
4750

4851
@Override
4952
public String getConnectorId() {
50-
return connectorId;
53+
if (connectorIds.isEmpty()) {
54+
// connector IDs can be missing
55+
return null;
56+
} if (connectorIds.size() == 1) {
57+
return connectorIds.iterator().next();
58+
}
59+
throw new IllegalStateException("Multiple connector IDs specified in model");
60+
}
61+
62+
@Override
63+
public Collection<String> getConnectorIds() {
64+
return connectorIds;
5165
}
5266

5367
@Override
@@ -60,8 +74,8 @@ public void addExtractor(String name, ExtractorModel extractor) {
6074
extractors.put(name, extractor);
6175
}
6276

63-
public void setConnectorId(String connectorId) {
64-
this.connectorId = connectorId;
77+
public void addConnectorId(String connectorId) {
78+
this.connectorIds.add(connectorId);
6579
}
6680

6781
public void setType(String type) {

link-move/src/main/java/com/nhl/link/move/extractor/parser/ExtractorModelParser_v1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected void doParse(Element rootElement, MutableExtractorModelContainer conta
7575
case "connectorId":
7676
// enough to set this at the container level.. extractor
7777
// will inherit the connectorId
78-
container.setConnectorId(e.getTextContent());
78+
container.addConnectorId(e.getTextContent());
7979
break;
8080
case "attributes":
8181
processAttributes(e, extractor);

link-move/src/main/java/com/nhl/link/move/extractor/parser/ExtractorModelParser_v2.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,12 @@ protected void parseContainer(MutableExtractorModelContainer container, NodeList
6565
container.setType(e.getTextContent());
6666
break;
6767
case "connectorId":
68-
container.setConnectorId(e.getTextContent());
68+
container.addConnectorId(e.getTextContent());
6969
break;
70-
70+
case "connectorIds": {
71+
processConnectorIds(e, container);
72+
break;
73+
}
7174
case "extractor":
7275
parseExtractor(container, e);
7376
break;
@@ -113,8 +116,12 @@ protected void parseExtractor(MutableExtractorModelContainer container, Element
113116
extractor.setType(e.getTextContent());
114117
break;
115118
case "connectorId":
116-
extractor.setConnectorId(e.getTextContent());
119+
extractor.addConnectorId(e.getTextContent());
120+
break;
121+
case "connectorIds": {
122+
processConnectorIds(e, extractor);
117123
break;
124+
}
118125
case "attributes":
119126
processAttributes(e, extractor);
120127
break;
@@ -128,6 +135,36 @@ protected void parseExtractor(MutableExtractorModelContainer container, Element
128135
container.addExtractor(extractor.getName(), new ContainerAwareExtractorModel(container, extractor));
129136
}
130137

138+
private void processConnectorIds(Element connectorIds, MutableExtractorModel extractor) {
139+
NodeList nodes = connectorIds.getChildNodes();
140+
int len = nodes.getLength();
141+
for (int i = 0; i < len; i++) {
142+
143+
Node c = nodes.item(i);
144+
if (Node.ELEMENT_NODE == c.getNodeType()) {
145+
Element e = (Element) c;
146+
if ("id".equals(e.getNodeName())) {
147+
extractor.addConnectorId(e.getTextContent());
148+
}
149+
}
150+
}
151+
}
152+
153+
private void processConnectorIds(Element connectorIds, MutableExtractorModelContainer container) {
154+
NodeList nodes = connectorIds.getChildNodes();
155+
int len = nodes.getLength();
156+
for (int i = 0; i < len; i++) {
157+
158+
Node c = nodes.item(i);
159+
if (Node.ELEMENT_NODE == c.getNodeType()) {
160+
Element e = (Element) c;
161+
if ("connectorId".equals(e.getNodeName())) {
162+
container.addConnectorId(e.getTextContent());
163+
}
164+
}
165+
}
166+
}
167+
131168
protected void processAttributes(Element attributesNode, MutableExtractorModel extractor)
132169
throws ClassNotFoundException, DOMException {
133170

link-move/src/main/java/com/nhl/link/move/runtime/extractor/ReloadableExtractor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package com.nhl.link.move.runtime.extractor;
22

3+
import java.util.Collection;
4+
import java.util.List;
35
import java.util.Map;
6+
import java.util.stream.Collectors;
47

58
import com.nhl.link.move.RowReader;
69
import com.nhl.link.move.connect.Connector;
710
import com.nhl.link.move.extractor.Extractor;
11+
import com.nhl.link.move.extractor.MultiExtractor;
812
import com.nhl.link.move.extractor.model.ExtractorModel;
913
import com.nhl.link.move.extractor.model.ExtractorName;
1014
import com.nhl.link.move.runtime.connect.IConnectorService;
@@ -64,7 +68,21 @@ private <T extends Connector> Extractor createExtractor(ExtractorModel model) {
6468
throw new IllegalStateException("No factory mapped for Extractor type of '" + model.getType() + "'");
6569
}
6670

67-
T connector = connectorService.getConnector(factory.getConnectorType(), model.getConnectorId());
71+
Collection<String> connectorIds = model.getConnectorIds();
72+
if (connectorIds.size() == 1) {
73+
return createExtractor(model, factory, connectorIds.iterator().next());
74+
} else {
75+
List<Extractor> extractors = connectorIds.stream()
76+
.map(id -> createExtractor(model, factory, id))
77+
.collect(Collectors.toList());
78+
return new MultiExtractor(extractors);
79+
}
80+
}
81+
82+
private <T extends Connector> Extractor createExtractor(ExtractorModel model,
83+
IExtractorFactory<T> factory,
84+
String connectorId) {
85+
T connector = connectorService.getConnector(factory.getConnectorType(), connectorId);
6886
return factory.createExtractor(connector, model);
6987
}
7088

link-move/src/main/resources/com/nhl/link/move/xsd/extractor_config_2.xsd

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
<element name="config" type="tns:ConfigType" />
1313

14-
1514
<complexType name="ConfigType">
1615

1716
<annotation>
@@ -31,16 +30,10 @@
3130
</annotation>
3231
</element>
3332

34-
<element name="connectorId" type="string" minOccurs="1" maxOccurs="1">
35-
<annotation>
36-
<documentation source="description">
37-
Defines a symbolic ID of the connector used by the extractor. Connector must
38-
be
39-
compatible with Extractor type specified above. ExtractorModelContainer's connectorId is inherited (and can be
40-
overridden) by extractors.
41-
</documentation>
42-
</annotation>
43-
</element>
33+
<choice minOccurs="1" maxOccurs="1">
34+
<element name="connectorId" type="tns:ConnectorIdType"/>
35+
<element name="connectorIds" type="tns:ConnectorIdsType"/>
36+
</choice>
4437

4538
<element name="extractor" type="tns:ExtractorType" minOccurs="0" maxOccurs="unbounded" />
4639

@@ -67,13 +60,15 @@
6760
</annotation>
6861
</element>
6962

70-
<element name="connectorId" type="string" minOccurs="0" maxOccurs="1">
63+
<choice minOccurs="0" maxOccurs="1">
7164
<annotation>
7265
<documentation source="description">
73-
Overrides "connectorId" of the container
66+
Overrides "connectorId"/"connectorIds" of the container
7467
</documentation>
7568
</annotation>
76-
</element>
69+
<element name="connectorId" type="tns:ConnectorIdType"/>
70+
<element name="connectorIds" type="tns:ConnectorIdsType"/>
71+
</choice>
7772

7873
<element name="attributes" minOccurs="0" maxOccurs="1">
7974
<annotation>
@@ -123,5 +118,22 @@
123118
</sequence>
124119
</complexType>
125120

121+
<complexType name="ConnectorIdsType">
122+
<sequence>
123+
<element name="connectorId" type="tns:ConnectorIdType" minOccurs="1" maxOccurs="unbounded"/>
124+
</sequence>
125+
</complexType>
126+
127+
<simpleType name="ConnectorIdType">
128+
<annotation>
129+
<documentation source="description">
130+
Defines a symbolic ID of the connector used by the extractor. Connector must
131+
be
132+
compatible with Extractor type specified above. ExtractorModelContainer's connectorId is inherited (and can be
133+
overridden) by extractors.
134+
</documentation>
135+
</annotation>
136+
<restriction base="string"/>
137+
</simpleType>
126138

127139
</schema>

0 commit comments

Comments
 (0)