Skip to content

Commit 617e87e

Browse files
ben-Draegerbelagertem
authored andcommitted
Fix for the SequenceId Ordering Problem (Draegerwerk#221)
SequenceIds are currently ordered lexicographically, which is wrong as the SDC standards define no order on them. We hence order them by the timestamp of the first message that they were used in. # Checklist The following aspects have been respected by the author of this pull request, confirmed by both pull request assignee **and** reviewer: * Adherence to coding conventions * [x] Pull Request Assignee * [x] Reviewer (midtuna) * Adherence to javadoc conventions * [x] Pull Request Assignee * [x] Reviewer (midtuna) * Changelog update (necessity checked and entry added or not added respectively) * [x] Pull Request Assignee * [x] Reviewer (midtuna) * README update (necessity checked and entry added or not added respectively) * [x] Pull Request Assignee * [x] Reviewer (midtuna) * config update (necessity checked and entry added or not added respectively) * [x] Pull Request Assignee * [x] Reviewer (midtuna) * SDCcc executable ran against a test device (if necessary) * [x] Pull Request Assignee * [x] Reviewer (midtuna) --------- Co-authored-by: midttuna <[email protected]> (cherry picked from commit 729356b) Signed-off-by: Malte Grebe-Lüth <[email protected]>
1 parent d786e0a commit 617e87e

File tree

4 files changed

+171
-42
lines changed

4 files changed

+171
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99
### Fixed
1010

1111
- biceps:C-11, biceps:C-12, biceps:C-13, biceps:C-14 and biceps:C-15 no longer fails incorrectly if a DescriptionModificationReport with the same MdibVersion has been sent for the respective EpisodicReport.
12+
- SequenceIds are now ordered by the timestamp of the first message that used them
1213

1314
## [9.0.0] - 2024-02-23
1415

sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,12 @@
7676
import org.apache.commons.io.input.BOMInputStream;
7777
import org.apache.logging.log4j.LogManager;
7878
import org.apache.logging.log4j.Logger;
79+
import org.hibernate.ScrollMode;
7980
import org.hibernate.Session;
8081
import org.hibernate.SessionFactory;
8182
import org.hibernate.Transaction;
83+
import org.hibernate.query.spi.ScrollableResultsImplementor;
84+
import org.hibernate.query.spi.StreamDecorator;
8285
import org.somda.sdc.dpws.CommunicationLog;
8386
import org.somda.sdc.dpws.soap.ApplicationInfo;
8487
import org.somda.sdc.dpws.soap.CommunicationContext;
@@ -1028,6 +1031,7 @@ private void awaitFlushBarrier() {
10281031

10291032
/**
10301033
* Retrieves all SequenceId attribute values that have been seen.
1034+
* Orders them by the timestamp of the first message that used the respective SequenceId.
10311035
*
10321036
* @return stream of all SequenceId attribute values that have been seen
10331037
* @throws IOException if storage is closed
@@ -1039,17 +1043,20 @@ public Stream<String> getUniqueSequenceIds() throws IOException {
10391043
throw new IOException(GET_UNIQUE_SEQUENCE_IDS_CALLED_ON_CLOSED_STORAGE);
10401044
}
10411045

1042-
final CriteriaQuery<String> criteria;
1043-
1046+
final CriteriaQuery<String> messageContentQuery;
10441047
try (final Session session = sessionFactory.openSession()) {
1048+
session.beginTransaction();
1049+
10451050
final CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder();
1046-
criteria = criteriaBuilder.createQuery(String.class);
1047-
final Root<MdibVersionGroupEntity> mdibVersionGroupEntityRoot = criteria.from(MdibVersionGroupEntity.class);
1048-
criteria.select(mdibVersionGroupEntityRoot.get(MdibVersionGroupEntity_.sequenceId));
1049-
criteria.distinct(true);
1051+
messageContentQuery = criteriaBuilder.createQuery(String.class);
1052+
final Root<MessageContent> messageContentRoot = messageContentQuery.from(MessageContent.class);
1053+
messageContentQuery.select(
1054+
messageContentRoot.join(MessageContent_.mdibVersionGroups).get(MdibVersionGroupEntity_.sequenceId));
1055+
1056+
messageContentQuery.orderBy(criteriaBuilder.asc(messageContentRoot.get(MessageContent_.nanoTimestamp)));
10501057
}
10511058

1052-
return this.getQueryResult(criteria);
1059+
return this.getOrderedQueryResult(messageContentQuery).distinct();
10531060
}
10541061

10551062
/**
@@ -1649,11 +1656,11 @@ public GetterResult<MessageContent> getInboundMessagesByTimeIntervalAndBodyType(
16491656
}
16501657

16511658
final boolean present;
1652-
try (final Stream<MessageContent> countingStream = this.getQueryResult(messageContentQuery)) {
1659+
try (final Stream<MessageContent> countingStream = this.getOrderedQueryResult(messageContentQuery)) {
16531660
present = countingStream.findAny().isPresent();
16541661
}
16551662

1656-
return new GetterResult<>(this.getQueryResult(messageContentQuery), present);
1663+
return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present);
16571664
}
16581665

16591666
/**
@@ -1729,11 +1736,11 @@ public GetterResult<MessageContent> getInboundMessagesByTimestampAndBodyType(
17291736
}
17301737

17311738
final boolean present;
1732-
try (final Stream<MessageContent> countingStream = this.getQueryResult(messageContentQuery)) {
1739+
try (final Stream<MessageContent> countingStream = this.getOrderedQueryResult(messageContentQuery)) {
17331740
present = countingStream.findAny().isPresent();
17341741
}
17351742

1736-
return new GetterResult<>(this.getQueryResult(messageContentQuery), present);
1743+
return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present);
17371744
}
17381745

17391746
/**
@@ -1773,11 +1780,11 @@ public GetterResult<ManipulationData> getManipulationDataByManipulation(final St
17731780
}
17741781

17751782
final boolean present;
1776-
try (final Stream<ManipulationData> countingStream = this.getQueryResult(criteria)) {
1783+
try (final Stream<ManipulationData> countingStream = this.getOrderedQueryResult(criteria)) {
17771784
present = countingStream.findAny().isPresent();
17781785
}
17791786

1780-
return new GetterResult<>(this.getQueryResult(criteria), present);
1787+
return new GetterResult<>(this.getOrderedQueryResult(criteria), present);
17811788
}
17821789

17831790
/**
@@ -1842,10 +1849,10 @@ public GetterResult<ManipulationData> getManipulationDataByParametersAndManipula
18421849
criteriaBuilder.and(parameterExistPredicates.toArray(new Predicate[0]))));
18431850
}
18441851
final boolean present;
1845-
try (final Stream<ManipulationData> countingStream = this.getQueryResult(criteria)) {
1852+
try (final Stream<ManipulationData> countingStream = this.getOrderedQueryResult(criteria)) {
18461853
present = countingStream.findAny().isPresent();
18471854
}
1848-
return new GetterResult<>(this.getQueryResult(criteria), present);
1855+
return new GetterResult<>(this.getOrderedQueryResult(criteria), present);
18491856
}
18501857

18511858
private <T> Stream<T> getQueryResult(final CriteriaQuery<T> criteriaQuery) {
@@ -1858,6 +1865,16 @@ private <T> Stream<T> getQueryResult(final CriteriaQuery<T> criteriaQuery) {
18581865
.onClose(resultIterator::close);
18591866
}
18601867

1868+
private <T> Stream<T> getOrderedQueryResult(final CriteriaQuery<T> criteriaQuery) {
1869+
final Session session = sessionFactory.openSession();
1870+
final Stream<T> results = getOrderedStreamForQuery(session, criteriaQuery);
1871+
1872+
final ResultIterator<T> resultIterator = new ResultIterator<>(session, results);
1873+
1874+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false)
1875+
.onClose(resultIterator::close);
1876+
}
1877+
18611878
// be aware, that this does not use evict on cached objects
18621879
private <T> Stream<T> getStreamForQuery(final CriteriaQuery<T> criteriaQuery) {
18631880
final Session session = sessionFactory.openSession();
@@ -1882,6 +1899,23 @@ private <T> Stream<T> getStreamForQuery(final Session session, final CriteriaQue
18821899
.stream();
18831900
}
18841901

1902+
// be aware, that this does not use evict on cached objects
1903+
private <T> Stream<T> getOrderedStreamForQuery(final Session session, final CriteriaQuery<T> criteriaQuery) {
1904+
// The stream provided by Hibernate does not have the ORDERED characteristic.
1905+
// We hence build our own.
1906+
final ScrollableResultsImplementor scrollableResults =
1907+
(ScrollableResultsImplementor) session.createQuery(criteriaQuery)
1908+
.setReadOnly(true)
1909+
.setCacheable(false)
1910+
.setFetchSize(FETCH_SIZE)
1911+
.scroll(ScrollMode.FORWARD_ONLY);
1912+
final OrderedStreamIterator<T> iterator = new OrderedStreamIterator<>(scrollableResults);
1913+
final Spliterator<T> spliterator =
1914+
Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL | Spliterator.ORDERED);
1915+
1916+
return (Stream<T>) new StreamDecorator(StreamSupport.stream(spliterator, false), scrollableResults::close);
1917+
}
1918+
18851919
private void transmit(final List<DatabaseEntry> results) {
18861920
try (final Session session = sessionFactory.openSession()) {
18871921
final Transaction transaction = session.beginTransaction();
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* This Source Code Form is subject to the terms of the MIT License.
3+
* Copyright (c) 2023-2024 Draegerwerk AG & Co. KGaA.
4+
*
5+
* SPDX-License-Identifier: MIT
6+
*/
7+
8+
package com.draeger.medical.sdccc.messages
9+
10+
import org.hibernate.query.spi.CloseableIterator
11+
import org.hibernate.query.spi.ScrollableResultsImplementor
12+
13+
/**
14+
* Iterator to be used to create Streams with the ORDERED characteristic from ScrollableResults.
15+
*/
16+
class OrderedStreamIterator<T>(private val results: ScrollableResultsImplementor) : CloseableIterator<T> {
17+
18+
override fun close() {
19+
results.close()
20+
}
21+
22+
override fun remove() {
23+
throw UnsupportedOperationException(
24+
"this stream does not support the" +
25+
" remove operation"
26+
)
27+
}
28+
29+
override fun hasNext(): Boolean {
30+
if (results.isClosed) {
31+
return false
32+
}
33+
return results.next()
34+
}
35+
36+
override fun next(): T {
37+
val element = results.get()
38+
@Suppress("UNCHECKED_CAST")
39+
return if (element.size == 1) {
40+
element[0]
41+
} else {
42+
element
43+
} as T
44+
}
45+
}

sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import javax.xml.namespace.QName;
5757
import org.apache.commons.io.ByteOrderMark;
5858
import org.apache.commons.lang3.tuple.Pair;
59+
import org.jetbrains.annotations.NotNull;
5960
import org.junit.jupiter.api.BeforeEach;
6061
import org.junit.jupiter.api.Test;
6162
import org.junit.jupiter.api.io.TempDir;
@@ -155,15 +156,7 @@ public void testMdibVersionOverflow(@TempDir final File dir) throws IOException,
155156
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
156157
final ListMultimap<String, String> multimap = ArrayListMultimap.create();
157158

158-
final String transactionId = "transactionId";
159-
final String requestUri = "requestUri";
160-
161-
final X509Certificate certificate = CertificateUtil.getDummyCert();
162-
final CommunicationContext headerContext = new CommunicationContext(
163-
new HttpApplicationInfo(multimap, transactionId, requestUri),
164-
new TransportInfo(
165-
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
166-
null);
159+
final CommunicationContext headerContext = getCommunicationContext(multimap);
167160

168161
try (final Message message = new Message(
169162
CommunicationLog.Direction.INBOUND,
@@ -196,15 +189,7 @@ public void testMdibVersionCloseToOverflow(@TempDir final File dir) throws IOExc
196189
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
197190
final ListMultimap<String, String> multimap = ArrayListMultimap.create();
198191

199-
final String transactionId = "transactionId";
200-
final String requestUri = "requestUri";
201-
202-
final X509Certificate certificate = CertificateUtil.getDummyCert();
203-
final CommunicationContext headerContext = new CommunicationContext(
204-
new HttpApplicationInfo(multimap, transactionId, requestUri),
205-
new TransportInfo(
206-
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
207-
null);
192+
final CommunicationContext headerContext = getCommunicationContext(multimap);
208193

209194
try (final Message message = new Message(
210195
CommunicationLog.Direction.INBOUND,
@@ -248,15 +233,7 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException
248233
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
249234
final ListMultimap<String, String> multimap = ArrayListMultimap.create();
250235

251-
final String transactionId = "transactionId";
252-
final String requestUri = "requestUri";
253-
254-
final X509Certificate certificate = CertificateUtil.getDummyCert();
255-
final CommunicationContext headerContext = new CommunicationContext(
256-
new HttpApplicationInfo(multimap, transactionId, requestUri),
257-
new TransportInfo(
258-
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
259-
null);
236+
final CommunicationContext headerContext = getCommunicationContext(multimap);
260237

261238
try (final Message message = new Message(
262239
CommunicationLog.Direction.INBOUND,
@@ -302,6 +279,78 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException
302279
}
303280
}
304281

282+
/**
283+
* Tests whether SequenceId values are ordered by the timestamp of the first message they appear in.
284+
*
285+
* @param dir message storage directory
286+
* @throws IOException on io exceptions
287+
* @throws CertificateException on certificate exceptions
288+
*/
289+
@Test
290+
public void testGetUniqueSequenceIdsOrdering(@TempDir final File dir) throws IOException, CertificateException {
291+
try (final MessageStorage messageStorage = new MessageStorage(
292+
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
293+
final ListMultimap<String, String> multimap = ArrayListMultimap.create();
294+
295+
final CommunicationContext headerContext = getCommunicationContext(multimap);
296+
297+
try (final Message message = new Message(
298+
CommunicationLog.Direction.INBOUND,
299+
CommunicationLog.MessageType.REQUEST,
300+
headerContext,
301+
messageStorage)) {
302+
message.write(String.format(
303+
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "3"))
304+
.getBytes(StandardCharsets.UTF_8));
305+
}
306+
messageStorage.flush();
307+
308+
try (final Message message = new Message(
309+
CommunicationLog.Direction.INBOUND,
310+
CommunicationLog.MessageType.REQUEST,
311+
headerContext,
312+
messageStorage)) {
313+
message.write(String.format(
314+
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "2"))
315+
.getBytes(StandardCharsets.UTF_8));
316+
}
317+
messageStorage.flush();
318+
319+
try (final Message message = new Message(
320+
CommunicationLog.Direction.INBOUND,
321+
CommunicationLog.MessageType.REQUEST,
322+
headerContext,
323+
messageStorage)) {
324+
message.write(String.format(
325+
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "1"))
326+
.getBytes(StandardCharsets.UTF_8));
327+
}
328+
messageStorage.flush();
329+
330+
try (final Stream<String> sequenceIdStream = messageStorage.getUniqueSequenceIds()) {
331+
assertEquals(List.of("urn:uuid:3", "urn:uuid:2", "urn:uuid:1"), sequenceIdStream.toList());
332+
}
333+
334+
try (final MessageStorage.GetterResult<MessageContent> inboundMessages =
335+
messageStorage.getInboundMessages()) {
336+
assertEquals(3, inboundMessages.getStream().count());
337+
}
338+
}
339+
}
340+
341+
private static @NotNull CommunicationContext getCommunicationContext(final ListMultimap<String, String> multimap)
342+
throws CertificateException, IOException {
343+
final String transactionId = "transactionId";
344+
final String requestUri = "requestUri";
345+
346+
final X509Certificate certificate = CertificateUtil.getDummyCert();
347+
return new CommunicationContext(
348+
new HttpApplicationInfo(multimap, transactionId, requestUri),
349+
new TransportInfo(
350+
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
351+
null);
352+
}
353+
305354
/**
306355
* Tests whether headers and the transaction id are stored properly.
307356
*

0 commit comments

Comments
 (0)