Skip to content

Commit 54c24a0

Browse files
committed
Add message cancellation and Entity-Component-System pattern support
1 parent c3a1640 commit 54c24a0

13 files changed

+584
-89
lines changed

CHANGES

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
[1.6.0]
2+
- Add EntityMessageData for transportation of entities in an Entity-Component-System pattern
3+
- Allow for cancellation of messages in the bus queue
4+
- Add method to notify message bus of deleted entities so that messages are cancelled
5+
- Add CancelledMessageHandler that can be notified of cancelled messages
6+
17
[1.5.0]
28
- Add LockProvider interface to allow for alternative locking implementations
39

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ if(project.hasProperty('ossrhUser') && project.hasProperty("release")) {
2727
}
2828

2929
group = 'org.mini2Dx'
30-
version = '1.5.1-SNAPSHOT'
30+
version = '1.6.0-SNAPSHOT'
3131
description = 'A lightweight message bus library for Java-based game engines'
3232

3333
repositories {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* The MIT License (MIT)
3+
*
4+
* Copyright (c) 2020 See AUTHORS file
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
package org.mini2Dx.minibus;
25+
26+
/**
27+
* Common interface for processing cancelled messages
28+
*/
29+
public interface CancelledMessageHandler {
30+
31+
/**
32+
* Called when a message is cancelled
33+
*
34+
* @param messageType The string representing the message type
35+
* @param source
36+
* The {@link MessageExchange} that sent the message
37+
* @param receiver
38+
* The {@link MessageExchange} that received the message
39+
* @param messageData
40+
* The {@link MessageData} that was received if any (i.e. possibly null)
41+
*/
42+
public void onMessageCancelled(String messageType, MessageExchange source, MessageExchange receiver, MessageData messageData);
43+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* The MIT License (MIT)
3+
*
4+
* Copyright (c) 2020 See AUTHORS file
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
package org.mini2Dx.minibus;
25+
26+
/**
27+
* Interface for message data that contains an entity in an Entity-Component-System pattern
28+
*/
29+
public interface EntityMessageData extends MessageData {
30+
31+
public int getEntityId();
32+
}

src/main/java/org/mini2Dx/minibus/MessageBus.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class MessageBus {
4747
public static LockProvider LOCK_PROVIDER = new JvmLockProvider();
4848

4949
final List<MessageExchange> exchangers = new SnapshotArrayList<MessageExchange>();
50+
final List<CancelledMessageHandler> cancelledMessageHandlers = new SnapshotArrayList<CancelledMessageHandler>();
5051
final MessageTransmissionPool transmissionPool = new MessageTransmissionPool();
5152

5253
private final MessageExchange anonymousExchange;
@@ -371,7 +372,72 @@ public void broadcastQuery(String messageType, MessageData messageData, String r
371372
broadcast(queryMessageExchange, messageType, messageData);
372373
}
373374

374-
void dispose(MessageExchange messageExchange) {
375+
/**
376+
* Cancels all messages in the bus.
377+
*/
378+
public void cancelAllMessages() {
379+
cancelAllMessages(true);
380+
}
381+
382+
/**
383+
* Cancels all messages in the bus
384+
* @param notify True if {@link CancelledMessageHandler}s should be notified
385+
*/
386+
public void cancelAllMessages(boolean notify) {
387+
for(int i = exchangers.size() - 1; i >= 0; i--) {
388+
if(i >= exchangers.size()) {
389+
continue;
390+
}
391+
exchangers.get(i).cancelAllMessages(notify);
392+
}
393+
}
394+
395+
/**
396+
* Cancels all messages of a specific type in the bus
397+
* @param messageType The message type to cancel
398+
*/
399+
public void cancelAllMessages(String messageType) {
400+
cancelAllMessages(messageType, true);
401+
}
402+
403+
/**
404+
* Cancels all messages of a specific type in the bus
405+
* @param messageType The message type to cancel
406+
* @param notify True if {@link CancelledMessageHandler}s should be notified
407+
*/
408+
public void cancelAllMessages(String messageType, boolean notify) {
409+
for(int i = exchangers.size() - 1; i >= 0; i--) {
410+
if(i >= exchangers.size()) {
411+
continue;
412+
}
413+
exchangers.get(i).cancelAllMessages(messageType, notify);
414+
}
415+
}
416+
417+
/**
418+
* Notify message exchanges of a deleted entity
419+
* @param entityId The entity ID
420+
*/
421+
public void entityDeleted(int entityId) {
422+
for(int i = exchangers.size() - 1; i >= 0; i--) {
423+
if(i >= exchangers.size()) {
424+
continue;
425+
}
426+
exchangers.get(i).entityDeleted(entityId);
427+
}
428+
}
429+
430+
void notifyMessageCancelled(String messageType, MessageExchange source, MessageExchange receiver, MessageData messageData) {
431+
for(int i = cancelledMessageHandlers.size() - 1; i >= 0; i--) {
432+
if(i >= cancelledMessageHandlers.size()) {
433+
continue;
434+
}
435+
final CancelledMessageHandler handler = cancelledMessageHandlers.get(i);
436+
handler.onMessageCancelled(messageType, source, receiver, messageData);
437+
}
438+
}
439+
440+
void dispose(MessageExchange messageExchange) {
375441
exchangers.remove(messageExchange);
376442
}
377443

@@ -403,6 +469,14 @@ public int getTotalActiveExchanges() {
403469
return exchangers.size();
404470
}
405471

472+
public void addCancelledMessageHandler(CancelledMessageHandler handler) {
473+
cancelledMessageHandlers.add(handler);
474+
}
475+
476+
public void removeCancelledMessageHandler(CancelledMessageHandler handler) {
477+
cancelledMessageHandlers.remove(handler);
478+
}
479+
406480
/**
407481
* An internal {@link MessageExchange} for anonymous message sending
408482
*/

src/main/java/org/mini2Dx/minibus/MessageExchange.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public abstract class MessageExchange {
4242

4343
protected final MessageBus messageBus;
4444
protected final MessageTransmissionPool messageTransmissionPool;
45-
protected final Queue<MessageTransmission> messageQueue = new SynchronizedQueue<>();
45+
protected final SynchronizedQueue<MessageTransmission> messageQueue = new SynchronizedQueue<>();
4646

4747
private final int id;
4848

@@ -67,6 +67,51 @@ public MessageExchange(MessageBus messageBus, MessageHandler... messageHandlers)
6767
}
6868
}
6969

70+
void entityDeleted(int entityId) {
71+
for(int i = messageQueue.size() - 1; i >= 0; i--) {
72+
if(i >= messageQueue.size()) {
73+
continue;
74+
}
75+
final MessageTransmission messageTransmission = messageQueue.get(i);
76+
if(messageTransmission.getMessage() instanceof EntityMessageData) {
77+
final EntityMessageData entityMessageData = (EntityMessageData) messageTransmission.getMessage();
78+
if(entityMessageData.getEntityId() != entityId) {
79+
continue;
80+
}
81+
messageQueue.remove(i);
82+
messageBus.notifyMessageCancelled(messageTransmission.getMessageType(), messageTransmission.getSource(), this, messageTransmission.getMessage());
83+
messageTransmission.release();
84+
}
85+
}
86+
}
87+
88+
public void cancelAllMessages(boolean notify) {
89+
while(!messageQueue.isEmpty()) {
90+
final MessageTransmission messageTransmission = messageQueue.poll();
91+
if(notify) {
92+
messageBus.notifyMessageCancelled(messageTransmission.getMessageType(), messageTransmission.getSource(), this, messageTransmission.getMessage());
93+
}
94+
messageTransmission.release();
95+
}
96+
}
97+
98+
public void cancelAllMessages(String messageType, boolean notify) {
99+
for(int i = messageQueue.size() - 1; i >= 0; i--) {
100+
if(i >= messageQueue.size()) {
101+
continue;
102+
}
103+
final MessageTransmission messageTransmission = messageQueue.get(i);
104+
if(!messageTransmission.getMessageType().equals(messageType)) {
105+
continue;
106+
}
107+
messageQueue.remove(i);
108+
if(notify) {
109+
messageBus.notifyMessageCancelled(messageTransmission.getMessageType(), messageTransmission.getSource(), this, messageTransmission.getMessage());
110+
}
111+
messageTransmission.release();
112+
}
113+
}
114+
70115
/**
71116
* An overidable method for processing a {@link MessageTransmission} before
72117
* queueing into this {@link MessageExchange}
@@ -223,4 +268,12 @@ public int getId() {
223268
public boolean isAnonymous() {
224269
return false;
225270
}
271+
272+
/**
273+
* Returns the current amount of messages queued
274+
* @return 0 if no messages queued
275+
*/
276+
public int getMessageQueueSize() {
277+
return messageQueue.size();
278+
}
226279
}

src/main/java/org/mini2Dx/minibus/util/SnapshotArrayList.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,20 @@ public class SnapshotArrayList<T> implements List<T> {
3232
private final Object iteratorLock = new Object();
3333
private final ReadWriteLock lock = MessageBus.LOCK_PROVIDER.newReadWriteLock();
3434
private final Queue<SnapshotIterator<T>> iteratorPool = new ArrayDeque<SnapshotIterator<T>>();
35+
36+
private final boolean ordered;
3537
private Object[] array = new Object[32];
3638

3739
private int size = 0;
3840

41+
public SnapshotArrayList() {
42+
this(false);
43+
}
44+
45+
public SnapshotArrayList(boolean ordered) {
46+
this.ordered = ordered;
47+
}
48+
3949
private void ensureCapacity(int capacity) {
4050
ArrayList list = new ArrayList();
4151
if(capacity <= array.length) {
@@ -119,8 +129,14 @@ public boolean remove(Object o) {
119129
if(!array[i].equals(o)) {
120130
continue;
121131
}
122-
array[i] = array[size - 1];
123-
array[size - 1] = null;
132+
if(ordered) {
133+
remove(i);
134+
i--;
135+
} else {
136+
array[i] = array[size - 1];
137+
array[size - 1] = null;
138+
}
139+
124140
size--;
125141
result = true;
126142
break;
@@ -207,26 +223,46 @@ public void add(int index, T element) {
207223
throw new UnsupportedOperationException();
208224
}
209225

210-
@Override
211-
public T remove(int index) {
226+
private T remove(int index, boolean throwException) {
212227
Object result = null;
213228
lock.writeLock().lock();
214229
if(index < 0) {
215230
lock.writeLock().unlock();
216-
throw new IndexOutOfBoundsException();
231+
if(throwException) {
232+
throw new IndexOutOfBoundsException();
233+
}
234+
return null;
217235
}
218236
if(index >= size) {
219237
lock.writeLock().unlock();
220-
throw new IndexOutOfBoundsException();
238+
if(throwException) {
239+
throw new IndexOutOfBoundsException();
240+
}
241+
return null;
221242
}
222243

223244
result = array[index];
224-
array[index] = array[size - 1];
245+
246+
if(ordered) {
247+
System.arraycopy(array, index + 1, array, index, size - 1 - index);
248+
} else {
249+
array[index] = array[size - 1];
250+
}
251+
225252
size--;
226253
lock.writeLock().unlock();
227254
return (T) result;
228255
}
229256

257+
public T safeRemove(int index) {
258+
return remove(index, false);
259+
}
260+
261+
@Override
262+
public T remove(int index) {
263+
return remove(index, true);
264+
}
265+
230266
@Override
231267
public int indexOf(Object o) {
232268
lock.readLock().lock();

0 commit comments

Comments
 (0)