Skip to content

Commit b2321dc

Browse files
DbEntity sync nhl#131
1 parent 80f7da1 commit b2321dc

15 files changed

+671
-42
lines changed

link-move/src/main/java/com/nhl/link/move/runtime/task/ITaskService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.nhl.link.move.runtime.task;
22

3+
import com.nhl.link.move.runtime.task.createorupdatedb.CreateOrUpdateDbBuilder;
34
import org.apache.cayenne.DataObject;
45

56
import com.nhl.link.move.CreateOrUpdateBuilder;
@@ -15,6 +16,8 @@ public interface ITaskService {
1516
*/
1617
<T extends DataObject> CreateOrUpdateBuilder<T> createOrUpdate(Class<T> type);
1718

19+
CreateOrUpdateDbBuilder createOrUpdate(String dbEntityName);
20+
1821
/**
1922
* Returns a builder of target delete ETL synchronization task.
2023
*

link-move/src/main/java/com/nhl/link/move/runtime/task/MapperBuilder.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,31 @@
11
package com.nhl.link.move.runtime.task;
22

3-
import java.util.ArrayList;
4-
import java.util.Collection;
5-
import java.util.Collections;
6-
import java.util.HashSet;
7-
import java.util.LinkedHashMap;
8-
import java.util.List;
9-
import java.util.Map;
10-
import java.util.Set;
11-
3+
import com.nhl.link.move.mapper.KeyAdapter;
4+
import com.nhl.link.move.mapper.Mapper;
5+
import com.nhl.link.move.mapper.MultiPathMapper;
6+
import com.nhl.link.move.mapper.PathMapper;
7+
import com.nhl.link.move.mapper.SafeMapKeyMapper;
8+
import com.nhl.link.move.runtime.key.IKeyAdapterFactory;
9+
import com.nhl.link.move.runtime.path.EntityPathNormalizer;
1210
import org.apache.cayenne.exp.ExpressionFactory;
1311
import org.apache.cayenne.exp.Property;
1412
import org.apache.cayenne.exp.parser.ASTDbPath;
1513
import org.apache.cayenne.map.DbAttribute;
14+
import org.apache.cayenne.map.DbEntity;
1615
import org.apache.cayenne.map.ObjAttribute;
1716
import org.apache.cayenne.map.ObjEntity;
1817
import org.apache.cayenne.map.ObjRelationship;
1918

20-
import com.nhl.link.move.mapper.KeyAdapter;
21-
import com.nhl.link.move.mapper.Mapper;
22-
import com.nhl.link.move.mapper.MultiPathMapper;
23-
import com.nhl.link.move.mapper.PathMapper;
24-
import com.nhl.link.move.mapper.SafeMapKeyMapper;
25-
import com.nhl.link.move.runtime.key.IKeyAdapterFactory;
26-
import com.nhl.link.move.runtime.path.EntityPathNormalizer;
19+
import java.util.ArrayList;
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.HashSet;
23+
import java.util.LinkedHashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Objects;
27+
import java.util.Optional;
28+
import java.util.Set;
2729

2830
/**
2931
* @since 1.3
@@ -33,11 +35,22 @@ public class MapperBuilder {
3335
private IKeyAdapterFactory keyAdapterFactory;
3436
private EntityPathNormalizer pathNormalizer;
3537

36-
private ObjEntity entity;
38+
private Optional<ObjEntity> objEntity;
39+
private DbEntity dbEntity;
3740
private Set<String> paths;
3841

3942
public MapperBuilder(ObjEntity entity, EntityPathNormalizer pathNormalizer, IKeyAdapterFactory keyAdapterFactory) {
40-
this.entity = entity;
43+
this(entity, entity.getDbEntity(), pathNormalizer, keyAdapterFactory);
44+
}
45+
46+
public MapperBuilder(DbEntity entity, EntityPathNormalizer pathNormalizer, IKeyAdapterFactory keyAdapterFactory) {
47+
this(null, entity, pathNormalizer, keyAdapterFactory);
48+
}
49+
50+
private MapperBuilder(ObjEntity objEntity, DbEntity dbEntity, EntityPathNormalizer pathNormalizer, IKeyAdapterFactory keyAdapterFactory) {
51+
this.objEntity = Optional.ofNullable(objEntity);
52+
this.dbEntity = Objects.requireNonNull(dbEntity);
53+
4154
this.keyAdapterFactory = keyAdapterFactory;
4255
this.pathNormalizer = pathNormalizer;
4356

@@ -74,9 +87,9 @@ public MapperBuilder matchBy(Property<?>... paths) {
7487

7588
public MapperBuilder matchById() {
7689

77-
Collection<DbAttribute> pks = entity.getDbEntity().getPrimaryKeys();
90+
Collection<DbAttribute> pks = dbEntity.getPrimaryKeys();
7891
if (pks.isEmpty()) {
79-
throw new IllegalStateException("Target entity has no PKs defined: " + entity.getDbEntityName());
92+
throw new IllegalStateException("Target entity has no PKs defined: " + dbEntity.getName());
8093
}
8194

8295
for (DbAttribute pk : pks) {
@@ -92,6 +105,10 @@ public Mapper build() {
92105

93106
@SuppressWarnings("deprecation")
94107
Mapper createSafeKeyMapper(Mapper unsafe) {
108+
if (!objEntity.isPresent()) {
109+
return unsafe;
110+
}
111+
95112
KeyAdapter keyAdapter;
96113

97114
if (paths.size() > 1) {
@@ -102,7 +119,7 @@ Mapper createSafeKeyMapper(Mapper unsafe) {
102119
keyAdapter = keyAdapterFactory.adapter(List.class);
103120
} else {
104121

105-
Object attributeOrRelationship = ExpressionFactory.exp(paths.iterator().next()).evaluate(entity);
122+
Object attributeOrRelationship = ExpressionFactory.exp(paths.iterator().next()).evaluate(objEntity.get());
106123

107124
Class<?> type;
108125

link-move/src/main/java/com/nhl/link/move/runtime/task/TaskService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.nhl.link.move.runtime.task;
22

3+
import com.nhl.link.move.runtime.task.createorupdatedb.CreateOrUpdateDbBuilder;
34
import org.apache.cayenne.DataObject;
45
import org.apache.cayenne.di.Inject;
56
import org.apache.cayenne.map.ObjEntity;
@@ -44,6 +45,11 @@ public <T extends DataObject> CreateOrUpdateBuilder<T> createOrUpdate(Class<T> t
4445
keyAdapterFactory, pathNormalizer, writerService);
4546
}
4647

48+
@Override
49+
public CreateOrUpdateDbBuilder createOrUpdate(String dbEntityName) {
50+
return new CreateOrUpdateDbBuilder(dbEntityName, targetCayenneService, extractorService, tokenManager, keyAdapterFactory);
51+
}
52+
4753
@Override
4854
public <T extends DataObject> SourceKeysBuilder extractSourceKeys(Class<T> type) {
4955
ObjEntity targetEntity = targetCayenneService.entityResolver().getObjEntity(type);
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package com.nhl.link.move.runtime.task.createorupdatedb;
2+
3+
import com.nhl.link.move.LmRuntimeException;
4+
import com.nhl.link.move.LmTask;
5+
import com.nhl.link.move.annotation.AfterSourceRowsConverted;
6+
import com.nhl.link.move.annotation.AfterSourcesMapped;
7+
import com.nhl.link.move.annotation.AfterTargetsCommitted;
8+
import com.nhl.link.move.annotation.AfterTargetsMatched;
9+
import com.nhl.link.move.annotation.AfterTargetsMerged;
10+
import com.nhl.link.move.extractor.model.ExtractorModel;
11+
import com.nhl.link.move.extractor.model.ExtractorName;
12+
import com.nhl.link.move.mapper.Mapper;
13+
import com.nhl.link.move.runtime.cayenne.ITargetCayenneService;
14+
import com.nhl.link.move.runtime.extractor.IExtractorService;
15+
import com.nhl.link.move.runtime.key.IKeyAdapterFactory;
16+
import com.nhl.link.move.runtime.task.BaseTaskBuilder;
17+
import com.nhl.link.move.runtime.task.ListenersBuilder;
18+
import com.nhl.link.move.runtime.task.MapperBuilder;
19+
import com.nhl.link.move.runtime.task.createorupdate.CreateOrUpdateStatsListener;
20+
import com.nhl.link.move.runtime.task.createorupdate.SourceMapper;
21+
import com.nhl.link.move.runtime.token.ITokenManager;
22+
import org.apache.cayenne.map.DbEntity;
23+
24+
import java.util.Objects;
25+
26+
/**
27+
* A builder of an ETL task that matches source data with target data based on a
28+
* certain unique attribute on both sides.
29+
*/
30+
public class CreateOrUpdateDbBuilder extends BaseTaskBuilder {
31+
32+
private IExtractorService extractorService;
33+
private ITargetCayenneService targetCayenneService;
34+
private ITokenManager tokenManager;
35+
private Mapper mapper;
36+
private ListenersBuilder stageListenersBuilder;
37+
private DbEntity entity;
38+
39+
private ExtractorName extractorName;
40+
41+
public CreateOrUpdateDbBuilder(String dbEntityName,
42+
ITargetCayenneService targetCayenneService,
43+
IExtractorService extractorService,
44+
ITokenManager tokenManager,
45+
IKeyAdapterFactory keyAdapterFactory) {
46+
47+
this.targetCayenneService = targetCayenneService;
48+
this.extractorService = extractorService;
49+
this.tokenManager = tokenManager;
50+
51+
Objects.requireNonNull(dbEntityName);
52+
DbEntity entity = targetCayenneService.entityResolver().getDbEntity(dbEntityName);
53+
if (entity == null) {
54+
throw new LmRuntimeException("DbEntity '" + dbEntityName + "' is not mapped in Cayenne");
55+
}
56+
this.entity = entity;
57+
58+
this.mapper = new MapperBuilder(entity, null, keyAdapterFactory).matchById().build();
59+
60+
this.stageListenersBuilder = new ListenersBuilder(AfterSourceRowsConverted.class, AfterSourcesMapped.class,
61+
AfterTargetsMatched.class, AfterTargetsMerged.class, AfterTargetsCommitted.class);
62+
63+
// always add stats listener..
64+
stageListener(CreateOrUpdateStatsListener.instance());
65+
}
66+
67+
public CreateOrUpdateDbBuilder sourceExtractor(String location, String name) {
68+
this.extractorName = ExtractorName.create(location, name);
69+
return this;
70+
}
71+
72+
public CreateOrUpdateDbBuilder sourceExtractor(String location) {
73+
// v.1 model style config
74+
return sourceExtractor(location, ExtractorModel.DEFAULT_NAME);
75+
}
76+
77+
public CreateOrUpdateDbBuilder matchById() {
78+
// already created
79+
return this;
80+
}
81+
82+
public CreateOrUpdateDbBuilder batchSize(int batchSize) {
83+
this.batchSize = batchSize;
84+
return this;
85+
}
86+
87+
public CreateOrUpdateDbBuilder stageListener(Object listener) {
88+
stageListenersBuilder.addListener(listener);
89+
return this;
90+
}
91+
92+
public LmTask task() throws IllegalStateException {
93+
94+
if (extractorName == null) {
95+
throw new IllegalStateException("Required 'extractorName' is not set");
96+
}
97+
98+
return new CreateOrUpdateDbTask(extractorName, batchSize, targetCayenneService, extractorService, tokenManager,
99+
createProcessor());
100+
}
101+
102+
private CreateOrUpdateSegmentProcessor createProcessor() {
103+
104+
SourceMapper sourceMapper = new SourceMapper(mapper);
105+
TargetMatcher targetMatcher = new TargetMatcher(entity, mapper);
106+
CreateOrUpdateMerger merger = new CreateOrUpdateMerger(mapper);
107+
RowConverter rowConverter = new RowConverter();
108+
109+
return new CreateOrUpdateSegmentProcessor(rowConverter, sourceMapper, targetMatcher, merger,
110+
stageListenersBuilder.getListeners());
111+
}
112+
113+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.nhl.link.move.runtime.task.createorupdatedb;
2+
3+
import com.nhl.link.move.CountingRowReader;
4+
import com.nhl.link.move.Execution;
5+
import com.nhl.link.move.Row;
6+
import com.nhl.link.move.RowReader;
7+
import com.nhl.link.move.batch.BatchProcessor;
8+
import com.nhl.link.move.batch.BatchRunner;
9+
import com.nhl.link.move.extractor.Extractor;
10+
import com.nhl.link.move.extractor.model.ExtractorName;
11+
import com.nhl.link.move.runtime.cayenne.ITargetCayenneService;
12+
import com.nhl.link.move.runtime.extractor.IExtractorService;
13+
import com.nhl.link.move.runtime.task.BaseTask;
14+
import com.nhl.link.move.runtime.token.ITokenManager;
15+
import org.apache.cayenne.ObjectContext;
16+
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
/**
21+
* A task that reads streamed source data and creates/updates records in a
22+
* target DB.
23+
*/
24+
public class CreateOrUpdateDbTask extends BaseTask {
25+
26+
private ExtractorName extractorName;
27+
private int batchSize;
28+
private ITargetCayenneService targetCayenneService;
29+
private IExtractorService extractorService;
30+
private CreateOrUpdateSegmentProcessor processor;
31+
32+
public CreateOrUpdateDbTask(ExtractorName extractorName,
33+
int batchSize,
34+
ITargetCayenneService targetCayenneService,
35+
IExtractorService extractorService,
36+
ITokenManager tokenManager,
37+
CreateOrUpdateSegmentProcessor processor) {
38+
39+
super(tokenManager);
40+
41+
this.extractorName = extractorName;
42+
this.batchSize = batchSize;
43+
this.targetCayenneService = targetCayenneService;
44+
this.extractorService = extractorService;
45+
this.processor = processor;
46+
}
47+
48+
@Override
49+
public Execution run(Map<String, ?> params) {
50+
51+
if (params == null) {
52+
throw new NullPointerException("Null params");
53+
}
54+
55+
try (Execution execution = new Execution("CreateOrUpdateTask:" + extractorName, params);) {
56+
57+
BatchProcessor<Row> batchProcessor = createBatchProcessor(execution);
58+
59+
try (RowReader data = getRowReader(execution, params)) {
60+
BatchRunner.create(batchProcessor).withBatchSize(batchSize).run(data);
61+
}
62+
63+
return execution;
64+
}
65+
}
66+
67+
protected BatchProcessor<Row> createBatchProcessor(final Execution execution) {
68+
return new BatchProcessor<Row>() {
69+
70+
ObjectContext context = targetCayenneService.newContext();
71+
72+
@Override
73+
public void process(List<Row> rows) {
74+
processor.process(execution, new CreateOrUpdateSegment(context, rows));
75+
}
76+
};
77+
}
78+
79+
/**
80+
* Returns a RowReader obtained from a named extractor and wrapped in a read
81+
* stats counter.
82+
*/
83+
protected RowReader getRowReader(Execution execution, Map<String, ?> extractorParams) {
84+
Extractor extractor = extractorService.getExtractor(extractorName);
85+
return new CountingRowReader(extractor.getReader(extractorParams), execution.getStats());
86+
}
87+
88+
}

0 commit comments

Comments
 (0)