Skip to content

Commit dc4bf4c

Browse files
DbEntity sync nhl#131
1 parent 34ef81f commit dc4bf4c

File tree

10 files changed

+131
-68
lines changed

10 files changed

+131
-68
lines changed

link-move/src/main/java/com/nhl/link/move/runtime/task/TaskService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public <T extends DataObject> CreateOrUpdateBuilder<T> createOrUpdate(Class<T> t
4848
@Override
4949
public CreateOrUpdateDbBuilder createOrUpdate(String dbEntityName) {
5050
return new CreateOrUpdateDbBuilder(dbEntityName, targetCayenneService, extractorService, tokenManager,
51-
pathNormalizer, keyAdapterFactory);
51+
pathNormalizer, keyAdapterFactory, writerService);
5252
}
5353

5454
@Override

link-move/src/main/java/com/nhl/link/move/runtime/task/createorupdatedb/CreateOrUpdateDbBuilder.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.nhl.link.move.runtime.task.createorupdate.RowConverter;
2323
import com.nhl.link.move.runtime.task.createorupdate.SourceMapper;
2424
import com.nhl.link.move.runtime.token.ITokenManager;
25+
import com.nhl.link.move.writer.ITargetPropertyWriterService;
2526
import org.apache.cayenne.dba.TypesMapping;
2627
import org.apache.cayenne.map.DbEntity;
2728
import org.apache.cayenne.map.ObjAttribute;
@@ -41,8 +42,10 @@ public class CreateOrUpdateDbBuilder extends BaseTaskBuilder {
4142
private MapperBuilder mapperBuilder;
4243
private Mapper mapper;
4344
private ListenersBuilder stageListenersBuilder;
44-
private DbEntity entity;
45+
private DbEntity dbEntity;
46+
private ObjEntity objEntity;
4547
private EntityPathNormalizer entityPathNormalizer;
48+
private ITargetPropertyWriterService writerService;
4649

4750
private ExtractorName extractorName;
4851

@@ -51,7 +54,8 @@ public CreateOrUpdateDbBuilder(String dbEntityName,
5154
IExtractorService extractorService,
5255
ITokenManager tokenManager,
5356
IPathNormalizer pathNormalizer,
54-
IKeyAdapterFactory keyAdapterFactory) {
57+
IKeyAdapterFactory keyAdapterFactory,
58+
ITargetPropertyWriterService writerService) {
5559

5660
this.targetCayenneService = targetCayenneService;
5761
this.extractorService = extractorService;
@@ -62,7 +66,7 @@ public CreateOrUpdateDbBuilder(String dbEntityName,
6266
if (entity == null) {
6367
throw new LmRuntimeException("DbEntity '" + dbEntityName + "' is not mapped in Cayenne");
6468
}
65-
this.entity = entity;
69+
this.dbEntity = entity;
6670

6771
ObjEntity objEntity = new ObjEntity(entity.getName() + "_temp");
6872
entity.getAttributes().forEach(a -> {
@@ -72,12 +76,14 @@ public CreateOrUpdateDbBuilder(String dbEntityName,
7276
objEntity.addAttribute(objAttribute);
7377
}
7478
});
75-
// entity.getRelationships()
79+
// dbEntity.getRelationships()
7680
objEntity.setDbEntity(entity);
7781
targetCayenneService.entityResolver().getDataMap("datamap-targets").addObjEntity(objEntity);
82+
this.objEntity = objEntity;
7883

7984
this.entityPathNormalizer = pathNormalizer.normalizer(entity);
8085
this.mapperBuilder = new MapperBuilder(entity, entityPathNormalizer, keyAdapterFactory);
86+
this.writerService = writerService;
8187

8288
this.stageListenersBuilder = new ListenersBuilder(AfterSourceRowsConverted.class, AfterSourcesMapped.class,
8389
AfterTargetsMatched.class, AfterTargetsMerged.class, AfterTargetsCommitted.class);
@@ -138,8 +144,8 @@ private CreateOrUpdateSegmentProcessor createProcessor() {
138144
Mapper mapper = this.mapper != null ? this.mapper : mapperBuilder.build();
139145

140146
SourceMapper sourceMapper = new SourceMapper(mapper);
141-
TargetMatcher targetMatcher = new TargetMatcher(entity, mapper);
142-
CreateOrUpdateMerger merger = new CreateOrUpdateMerger(mapper);
147+
TargetMatcher targetMatcher = new TargetMatcher(dbEntity, mapper);
148+
CreateOrUpdateMerger merger = new CreateOrUpdateMerger(objEntity, mapper, writerService.getWriterFactory(objEntity));
143149
RowConverter rowConverter = new RowConverter(entityPathNormalizer);
144150

145151
return new CreateOrUpdateSegmentProcessor(rowConverter, sourceMapper, targetMatcher, merger,

link-move/src/main/java/com/nhl/link/move/runtime/task/createorupdatedb/CreateOrUpdateMerger.java

Lines changed: 66 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,20 @@
33
import com.nhl.link.move.LmRuntimeException;
44
import com.nhl.link.move.mapper.Mapper;
55
import com.nhl.link.move.runtime.task.createorupdate.CreateOrUpdateTuple;
6-
import org.apache.cayenne.CayenneDataObject;
6+
import com.nhl.link.move.writer.TargetPropertyWriter;
7+
import com.nhl.link.move.writer.TargetPropertyWriterFactory;
78
import org.apache.cayenne.DataObject;
89
import org.apache.cayenne.DataRow;
910
import org.apache.cayenne.ObjectContext;
10-
import org.apache.cayenne.ObjectId;
11-
import org.apache.cayenne.PersistenceState;
1211
import org.apache.cayenne.access.DataContext;
12+
import org.apache.cayenne.map.ObjEntity;
1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
1515

1616
import java.util.ArrayList;
1717
import java.util.HashMap;
1818
import java.util.List;
1919
import java.util.Map;
20-
import java.util.Objects;
21-
import java.util.function.Function;
22-
import java.util.stream.Collectors;
2320

2421
/**
2522
* @since 1.3
@@ -28,31 +25,35 @@ public class CreateOrUpdateMerger {
2825

2926
private static final Logger LOGGER = LoggerFactory.getLogger(CreateOrUpdateMerger.class);
3027

28+
private ObjEntity objEntity;
3129
private Mapper mapper;
30+
private TargetPropertyWriterFactory<?> writerFactory;
3231

33-
public CreateOrUpdateMerger(Mapper mapper) {
32+
public CreateOrUpdateMerger(ObjEntity objEntity, Mapper mapper, TargetPropertyWriterFactory<?> writerFactory) {
33+
this.objEntity = objEntity;
3434
this.mapper = mapper;
35+
this.writerFactory = writerFactory;
3536
}
3637

37-
public void merge(List<CreateOrUpdateTuple<DataRow>> mapped) {
38-
for (CreateOrUpdateTuple<DataRow> t : mapped) {
38+
public void merge(List<CreateOrUpdateTuple<DataObject>> mapped) {
39+
for (CreateOrUpdateTuple<DataObject> t : mapped) {
3940
if (!t.isCreated()) {
4041
merge(t.getSource(), t.getTarget());
4142
}
4243
}
4344
}
4445

45-
public List<CreateOrUpdateTuple<DataRow>> map(ObjectContext context,
46+
public List<CreateOrUpdateTuple<DataObject>> map(ObjectContext context,
4647
Map<Object, Map<String, Object>> mappedSources,
4748
List<DataRow> matchedTargets) {
4849

4950
// clone mappedSources as we are planning to truncate it in this method
5051
Map<Object, Map<String, Object>> localMappedSources = new HashMap<>(mappedSources);
5152

52-
List<CreateOrUpdateTuple<DataRow>> result = new ArrayList<>();
53+
List<CreateOrUpdateTuple<DataObject>> result = new ArrayList<>();
5354

5455
for (DataRow t : matchedTargets) {
55-
DataObject object = ((DataContext) context).objectFromDataRow("etl11t_temp", t);
56+
DataObject object = ((DataContext) context).objectFromDataRow(objEntity.getName(), t);
5657
Object key = mapper.keyForTarget(object);
5758

5859
Map<String, Object> src = localMappedSources.remove(key);
@@ -64,61 +65,89 @@ public List<CreateOrUpdateTuple<DataRow>> map(ObjectContext context,
6465
}
6566

6667
// skip phantom updates...
67-
if (willUpdate(src, t)) {
68-
result.add(new CreateOrUpdateTuple<>(src, t, false));
68+
if (willUpdate(src, object)) {
69+
result.add(new CreateOrUpdateTuple<>(src, create((DataContext) context, t, false), false));
6970
}
7071
}
7172

7273
// everything that's left are new objects
7374
for (Map.Entry<Object, Map<String, Object>> e : localMappedSources.entrySet()) {
7475

75-
DataRow t = create((DataContext) context, e.getValue());
76+
DataObject t = create((DataContext) context, e.getValue(), true);
7677

7778
result.add(new CreateOrUpdateTuple<>(e.getValue(), t, true));
7879
}
7980

8081
return result;
8182
}
8283

83-
protected boolean willUpdate(Map<String, Object> source, DataRow target) {
84+
protected boolean willUpdate(Map<String, Object> source, DataObject target) {
8485

8586
if (source.isEmpty()) {
8687
return false;
8788
}
8889

8990
for (Map.Entry<String, Object> e : source.entrySet()) {
90-
// TODO: check for properties that are absent in the target
91-
String attribute = e.getKey();
92-
if (!target.containsKey(attribute) || !Objects.equals(e.getValue(), target.get(attribute))) {
91+
TargetPropertyWriter writer = writerFactory.getOrCreateWriter(e.getKey());
92+
if (writer == null) {
93+
LOGGER.info("Source contains property not mapped in the target: " + e.getKey() + ". Skipping...");
94+
continue;
95+
}
96+
97+
if (writer.willWrite(target, e.getValue())) {
9398
return true;
9499
}
95100
}
96101

97102
return false;
98103
}
99104

100-
protected DataRow create(DataContext context, Map<String, Object> source) {
101-
source = source.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().substring(3), Map.Entry::getValue));
102-
DataRow row = new DataRow(source);
103-
// DataObject object = context.objectFromDataRow("etl11t_temp", row);
105+
protected DataObject create(DataContext context, Map<String, Object> source, boolean shouldRegister) {
106+
// Map<String, Object> target = new HashMap<>();
107+
// for (Map.Entry<String, Object> e : source.entrySet()) {
108+
// if (e.getValue() != null) {
109+
// target.put(e.getKey()/*.substring(3)*/, e.getValue());
110+
// }
111+
// }
112+
DataObject target = (DataObject) context.newObject(objEntity.getName());
113+
// TODO: ID
104114
// object.setObjectId(new ObjectId("etl11t_temp", "id", source.get("id")));
105-
// context.registerNewObject(object);
106-
DataObject object = (DataObject) context.newObject("etl11t_temp");
107-
object.setObjectId(new ObjectId("etl11t_temp", "id", source.get("id")));
108-
context.registerNewObject(object);
109-
// object.getObjectId().getIdSnapshot().clear();
110-
// object.getObjectId().getIdSnapshot().put("id", source.get("id"));
111-
source.forEach((k, v) -> {
112-
if (!k.equals("id")) {
113-
object.writePropertyDirectly(k, v);
115+
if (shouldRegister) {
116+
context.registerNewObject(target);
117+
}
118+
// target.forEach((k, v) -> {
119+
// if (!k.equals("id")) {
120+
// object.writePropertyDirectly(k, v);
121+
// }
122+
// });
123+
124+
for (Map.Entry<String, Object> e : source.entrySet()) {
125+
TargetPropertyWriter writer = writerFactory.getOrCreateWriter(e.getKey());
126+
if (writer == null) {
127+
LOGGER.info("Source contains property not mapped in the target: " + e.getKey() + ". Skipping...");
128+
continue;
129+
}
130+
if (writer.willWrite(target, e.getValue())) {
131+
writer.write(target, e.getValue());
114132
}
115-
});
133+
}
116134

117-
return row;
135+
return target;
118136
}
119137

120-
private void merge(Map<String, Object> source, DataRow target) {
121-
// TODO: check for properties that are absent in the target
122-
target.putAll(source);
138+
private void merge(Map<String, Object> source, DataObject target) {
139+
140+
if (source.isEmpty()) {
141+
return;
142+
}
143+
144+
for (Map.Entry<String, Object> e : source.entrySet()) {
145+
TargetPropertyWriter writer = writerFactory.getOrCreateWriter(e.getKey());
146+
if (writer == null) {
147+
LOGGER.info("Source contains property not mapped in the target: " + e.getKey() + ". Skipping...");
148+
continue;
149+
}
150+
writer.write(target, e.getValue());
151+
}
123152
}
124153
}

link-move/src/main/java/com/nhl/link/move/runtime/task/createorupdatedb/CreateOrUpdateSegmentProcessor.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.nhl.link.move.runtime.task.createorupdate.CreateOrUpdateSegment;
1212
import com.nhl.link.move.runtime.task.createorupdate.RowConverter;
1313
import com.nhl.link.move.runtime.task.createorupdate.SourceMapper;
14+
import org.apache.cayenne.DataObject;
1415
import org.apache.cayenne.DataRow;
1516

1617
import java.lang.annotation.Annotation;
@@ -43,9 +44,10 @@ public CreateOrUpdateSegmentProcessor(RowConverter rowConverter,
4344
this.listeners = stageListeners;
4445
}
4546

46-
public void process(Execution exec, CreateOrUpdateSegment<DataRow> segment) {
47+
public void process(Execution exec, CreateOrUpdateSegment segment) {
4748

4849
// execute create-or-update pipeline stages
50+
// TODO: using raw types because segment's matchedTargets and merged have different types in this scenario
4951
convertSrc(exec, segment);
5052
mapSrc(exec, segment);
5153
matchTarget(exec, segment);
@@ -54,12 +56,12 @@ public void process(Execution exec, CreateOrUpdateSegment<DataRow> segment) {
5456
commitTarget(exec, segment);
5557
}
5658

57-
private void convertSrc(Execution exec, CreateOrUpdateSegment<DataRow> segment) {
59+
private void convertSrc(Execution exec, CreateOrUpdateSegment<?> segment) {
5860
segment.setSources(rowConverter.convert(segment.getSourceRows()));
5961
notifyListeners(AfterSourceRowsConverted.class, exec, segment);
6062
}
6163

62-
private void mapSrc(Execution exec, CreateOrUpdateSegment<DataRow> segment) {
64+
private void mapSrc(Execution exec, CreateOrUpdateSegment<?> segment) {
6365
segment.setMappedSources(mapper.map(segment.getSources()));
6466
notifyListeners(AfterSourcesMapped.class, exec, segment);
6567
}
@@ -69,12 +71,15 @@ private void matchTarget(Execution exec, CreateOrUpdateSegment<DataRow> segment)
6971
notifyListeners(AfterTargetsMatched.class, exec, segment);
7072
}
7173

72-
private void mapToTarget(Execution exec, CreateOrUpdateSegment<DataRow> segment) {
73-
segment.setMerged(merger.map(segment.getContext(), segment.getMappedSources(), segment.getMatchedTargets()));
74+
@SuppressWarnings("unchecked")
75+
private void mapToTarget(Execution exec, CreateOrUpdateSegment segment) {
76+
// TODO: using raw types because segment.setMerged uses DataObject instead of DataRow
77+
List<DataRow> matchedTargets = (List<DataRow>) segment.getMatchedTargets();
78+
segment.setMerged(merger.map(segment.getContext(), segment.getMappedSources(), matchedTargets));
7479
notifyListeners(AfterTargetsMapped.class, exec, segment);
7580
}
7681

77-
private void mergeToTarget(Execution exec, CreateOrUpdateSegment<DataRow> segment) {
82+
private void mergeToTarget(Execution exec, CreateOrUpdateSegment<DataObject> segment) {
7883
merger.merge(segment.getMerged());
7984
notifyListeners(AfterTargetsMerged.class, exec, segment);
8085
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package com.nhl.link.move.writer;
22

3+
import org.apache.cayenne.map.ObjEntity;
4+
35
/**
46
* @since 1.6
57
*/
68
public interface ITargetPropertyWriterService {
79

810
<T> TargetPropertyWriterFactory<T> getWriterFactory(Class<T> type);
11+
12+
TargetPropertyWriterFactory<?> getWriterFactory(ObjEntity entity);
913
}

link-move/src/main/java/com/nhl/link/move/writer/TargetPropertyWriterService.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.cayenne.reflect.ToManyProperty;
1212
import org.apache.cayenne.reflect.ToOneProperty;
1313

14+
import java.util.Objects;
1415
import java.util.concurrent.ConcurrentHashMap;
1516
import java.util.concurrent.ConcurrentMap;
1617

@@ -32,13 +33,26 @@ public TargetPropertyWriterService(@Inject ITargetCayenneService targetCayenneSe
3233
@Override
3334
public <T> TargetPropertyWriterFactory<T> getWriterFactory(Class<T> type) {
3435

35-
if (targetCayenneService.entityResolver().getObjEntity(type) == null) {
36+
ObjEntity entity = targetCayenneService.entityResolver().getObjEntity(type);
37+
if (entity == null) {
3638
throw new LmRuntimeException("Java class " + type.getName() + " is not mapped in Cayenne");
3739
}
3840

41+
return getOrCreateWriterFactory(entity, type);
42+
}
43+
44+
@Override
45+
public TargetPropertyWriterFactory<?> getWriterFactory(ObjEntity entity) {
46+
Objects.requireNonNull(entity);
47+
return getOrCreateWriterFactory(entity, Object.class);
48+
}
49+
50+
@SuppressWarnings("unchecked")
51+
private <T> TargetPropertyWriterFactory<T> getOrCreateWriterFactory(ObjEntity entity, Class<T> type) {
52+
3953
TargetPropertyWriterFactory<T> writerFactory = (TargetPropertyWriterFactory<T>) writerFactories.get(type);
4054
if (writerFactory == null) {
41-
writerFactory = createWriterFactory(type);
55+
writerFactory = createWriterFactory(entity, type);
4256
TargetPropertyWriterFactory existing = writerFactories.putIfAbsent(type, writerFactory);
4357
if (existing != null) {
4458
writerFactory = existing;
@@ -47,9 +61,8 @@ public <T> TargetPropertyWriterFactory<T> getWriterFactory(Class<T> type) {
4761
return writerFactory;
4862
}
4963

50-
private <T> TargetPropertyWriterFactory<T> createWriterFactory(Class<T> type) {
64+
private <T> TargetPropertyWriterFactory<T> createWriterFactory(ObjEntity entity, Class<T> type) {
5165

52-
ObjEntity entity = targetCayenneService.entityResolver().getObjEntity(type);
5366
final TargetPropertyWriterFactory<T> writerFactory = new TargetPropertyWriterFactory<>(type, entity);
5467
ClassDescriptor descriptor = targetCayenneService.entityResolver().getClassDescriptor(entity.getName());
5568

0 commit comments

Comments
 (0)