diff --git a/core/src/main/java/dev/keva/core/aof/AOFContainer.java b/core/src/main/java/dev/keva/core/aof/AOFContainer.java index de696e2a..e8721b67 100644 --- a/core/src/main/java/dev/keva/core/aof/AOFContainer.java +++ b/core/src/main/java/dev/keva/core/aof/AOFContainer.java @@ -114,6 +114,7 @@ public List read() throws IOException { byte[][] objects = (byte[][]) input.readObject(); commands.add(Command.newInstance(objects, false)); } catch (EOFException e) { + log.error("Error while reading AOF command", e); fis.close(); return commands; } catch (ClassNotFoundException e) { diff --git a/core/src/main/java/dev/keva/core/command/impl/zset/ZAdd.java b/core/src/main/java/dev/keva/core/command/impl/zset/ZAdd.java new file mode 100644 index 00000000..f58c8790 --- /dev/null +++ b/core/src/main/java/dev/keva/core/command/impl/zset/ZAdd.java @@ -0,0 +1,123 @@ +package dev.keva.core.command.impl.zset; + +import dev.keva.core.command.annotation.CommandImpl; +import dev.keva.core.command.annotation.Execute; +import dev.keva.core.command.annotation.Mutate; +import dev.keva.core.command.annotation.ParamLength; +import dev.keva.ioc.annotation.Autowired; +import dev.keva.ioc.annotation.Component; +import dev.keva.protocol.resp.reply.BulkReply; +import dev.keva.protocol.resp.reply.ErrorReply; +import dev.keva.protocol.resp.reply.IntegerReply; +import dev.keva.protocol.resp.reply.Reply; +import dev.keva.store.KevaDatabase; +import dev.keva.util.DoubleUtil; +import dev.keva.util.hashbytes.BytesKey; + +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap.SimpleEntry; + +import static dev.keva.util.Constants.FLAG_CH; +import static dev.keva.util.Constants.FLAG_GT; +import static dev.keva.util.Constants.FLAG_INCR; +import static dev.keva.util.Constants.FLAG_LT; +import static dev.keva.util.Constants.FLAG_NX; +import static dev.keva.util.Constants.FLAG_XX; + +@Component +@CommandImpl("zadd") +@ParamLength(type = ParamLength.Type.AT_LEAST, value = 3) +@Mutate +public final class ZAdd { + private static final String XX = "xx"; + private static final String NX = "nx"; + private static final String GT = "gt"; + private static final String LT = "lt"; + private static final String INCR = "incr"; + private static final String CH = "ch"; + + private final KevaDatabase database; + + @Autowired + public ZAdd(KevaDatabase database) { + this.database = database; + } + + @Execute + public Reply execute(byte[][] params) { + // Parse the flags, if any + boolean xx = false, nx = false, gt = false, lt = false, incr = false; + int argPos = 1, flags = 0; + String arg; + while (argPos < params.length) { + arg = new String(params[argPos], StandardCharsets.UTF_8); + if (XX.equalsIgnoreCase(arg)) { + xx = true; + flags |= FLAG_XX; + } else if (NX.equalsIgnoreCase(arg)) { + nx = true; + flags |= FLAG_NX; + } else if (GT.equalsIgnoreCase(arg)) { + gt = true; + flags |= FLAG_GT; + } else if (LT.equalsIgnoreCase(arg)) { + lt = true; + flags |= FLAG_LT; + } else if (INCR.equalsIgnoreCase(arg)) { + incr = true; + flags |= FLAG_INCR; + } else if (CH.equalsIgnoreCase(arg)) { + flags |= FLAG_CH; + } else { + break; + } + ++argPos; + } + + int numMembers = params.length - argPos; + if (numMembers % 2 != 0) { + return ErrorReply.SYNTAX_ERROR; + } + numMembers /= 2; + + if (nx && xx) { + return ErrorReply.ZADD_NX_XX_ERROR; + } + if ((gt && nx) || (lt && nx) || (gt && lt)) { + return ErrorReply.ZADD_GT_LT_NX_ERROR; + } + if (incr && numMembers > 1) { + return ErrorReply.ZADD_INCR_ERROR; + } + + // Parse the key and value + final SimpleEntry[] members = new SimpleEntry[numMembers]; + double score; + String rawScore; + for (int memberIndex = 0; memberIndex < numMembers; ++memberIndex) { + try { + rawScore = new String(params[argPos++], StandardCharsets.UTF_8); + if (rawScore.equalsIgnoreCase("inf") || rawScore.equalsIgnoreCase("infinity") + || rawScore.equalsIgnoreCase("+inf") || rawScore.equalsIgnoreCase("+infinity") + ) { + score = Double.POSITIVE_INFINITY; + } else if (rawScore.equalsIgnoreCase("-inf") || rawScore.equalsIgnoreCase("-infinity")) { + score = Double.NEGATIVE_INFINITY; + } else { + score = Double.parseDouble(rawScore); + } + } catch (final NumberFormatException ignored) { + // return on first bad input + return ErrorReply.ZADD_SCORE_FLOAT_ERROR; + } + members[memberIndex] = new SimpleEntry<>(score, new BytesKey(params[argPos++])); + } + + if (incr) { + Double result = database.zincrby(params[0], members[0].getKey(), members[0].getValue(), flags); + return result == null ? BulkReply.NIL_REPLY : new BulkReply(DoubleUtil.toString(result)); + } + int result = database.zadd(params[0], members, flags); + return new IntegerReply(result); + } +} diff --git a/core/src/main/java/dev/keva/core/command/impl/zset/ZScore.java b/core/src/main/java/dev/keva/core/command/impl/zset/ZScore.java new file mode 100644 index 00000000..fae864de --- /dev/null +++ b/core/src/main/java/dev/keva/core/command/impl/zset/ZScore.java @@ -0,0 +1,36 @@ +package dev.keva.core.command.impl.zset; + +import dev.keva.core.command.annotation.CommandImpl; +import dev.keva.core.command.annotation.Execute; +import dev.keva.core.command.annotation.ParamLength; +import dev.keva.ioc.annotation.Autowired; +import dev.keva.ioc.annotation.Component; +import dev.keva.protocol.resp.reply.BulkReply; +import dev.keva.store.KevaDatabase; + +@Component +@CommandImpl("zscore") +@ParamLength(type = ParamLength.Type.EXACT, value = 2) +public final class ZScore { + private final KevaDatabase database; + + @Autowired + public ZScore(KevaDatabase database) { + this.database = database; + } + + @Execute + public BulkReply execute(byte[] key, byte[] member) { + final Double result = database.zscore(key, member); + if(result == null){ + return BulkReply.NIL_REPLY; + } + if (result.equals(Double.POSITIVE_INFINITY)) { + return BulkReply.POSITIVE_INFINITY_REPLY; + } + if (result.equals(Double.NEGATIVE_INFINITY)) { + return BulkReply.NEGATIVE_INFINITY_REPLY; + } + return new BulkReply(result.toString()); + } +} diff --git a/core/src/test/java/dev/keva/core/server/AOFTest.java b/core/src/test/java/dev/keva/core/server/AOFTest.java index 665725af..3f559226 100644 --- a/core/src/test/java/dev/keva/core/server/AOFTest.java +++ b/core/src/test/java/dev/keva/core/server/AOFTest.java @@ -24,7 +24,7 @@ Server startServer(int port) throws Exception { .persistence(false) .aof(true) .aofInterval(1000) - .workDirectory("./") + .workDirectory(System.getProperty("java.io.tmpdir")) .build(); val server = KevaServer.of(config); new Thread(() -> { diff --git a/core/src/test/java/dev/keva/core/server/AbstractServerTest.java b/core/src/test/java/dev/keva/core/server/AbstractServerTest.java index f7f1cbc4..c0f71cfc 100644 --- a/core/src/test/java/dev/keva/core/server/AbstractServerTest.java +++ b/core/src/test/java/dev/keva/core/server/AbstractServerTest.java @@ -9,10 +9,13 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import lombok.var; +import redis.clients.jedis.params.ZAddParams; import static org.junit.jupiter.api.Assertions.*; @@ -827,6 +830,101 @@ void setrange() { } } + @Test + void zaddWithXXAndNXErrs() { + assertThrows(JedisDataException.class, () -> { + jedis.zadd("zset", 1.0, "val", new ZAddParams().xx().nx()); + }); + } + + @Test + void zaddSingleWithNxAndGtErrs() { + assertThrows(JedisDataException.class, () -> { + jedis.zadd("zset", 1.0, "val", new ZAddParams().gt().nx()); + }); + } + + @Test + void zaddSingleWithNxAndLtErrs() { + assertThrows(JedisDataException.class, () -> { + jedis.zadd("zset", 1.0, "val", new ZAddParams().lt().nx()); + }); + } + + @Test + void zaddSingleWithGtAndLtErrs() { + assertThrows(JedisDataException.class, () -> { + jedis.zadd("zset", 1.0, "val", new ZAddParams().lt().gt()); + }); + } + + @Test + void zaddSingleWithoutOptions() { + try { + var result = jedis.zadd("zset", 1.0, "val"); + assertEquals(1, result); + + result = jedis.zadd("zset", 1.0, "val"); + assertEquals(0, result); + } catch (Exception e) { + fail(e); + } + } + + @Test + void zaddMultipleWithoutOptions() { + try { + Map members = new HashMap<>(); + int numMembers = 100; + for(int i=0; i +
+ SortedSet + +- ZADD +- ZSCORE + +
+
Pub/Sub diff --git a/resp-protocol/src/main/java/dev/keva/protocol/resp/reply/BulkReply.java b/resp-protocol/src/main/java/dev/keva/protocol/resp/reply/BulkReply.java index 3018a572..7d2b90db 100644 --- a/resp-protocol/src/main/java/dev/keva/protocol/resp/reply/BulkReply.java +++ b/resp-protocol/src/main/java/dev/keva/protocol/resp/reply/BulkReply.java @@ -11,6 +11,8 @@ public class BulkReply implements Reply { public static final BulkReply NIL_REPLY = new BulkReply(); + public static final BulkReply POSITIVE_INFINITY_REPLY = new BulkReply("inf"); + public static final BulkReply NEGATIVE_INFINITY_REPLY = new BulkReply("-inf"); public static final char MARKER = '$'; private final ByteBuf bytes; @@ -22,11 +24,7 @@ private BulkReply() { } public BulkReply(byte[] bytes) { - if (bytes.length == 0) { - this.bytes = Unpooled.EMPTY_BUFFER; - } else { - this.bytes = Unpooled.wrappedBuffer(bytes); - } + this.bytes = Unpooled.wrappedBuffer(bytes); capacity = bytes.length; } @@ -59,7 +57,7 @@ public void write(ByteBuf os) throws IOException { os.writeByte(MARKER); os.writeBytes(numToBytes(capacity, true)); if (capacity > 0) { - os.writeBytes(bytes); + os.writeBytes(bytes.array()); os.writeBytes(CRLF); } if (capacity == 0) { diff --git a/resp-protocol/src/main/java/dev/keva/protocol/resp/reply/ErrorReply.java b/resp-protocol/src/main/java/dev/keva/protocol/resp/reply/ErrorReply.java index b4d927c1..e723e498 100644 --- a/resp-protocol/src/main/java/dev/keva/protocol/resp/reply/ErrorReply.java +++ b/resp-protocol/src/main/java/dev/keva/protocol/resp/reply/ErrorReply.java @@ -7,6 +7,13 @@ public class ErrorReply implements Reply { public static final char MARKER = '-'; + // Pre-defined errors + public static final ErrorReply SYNTAX_ERROR = new ErrorReply("ERR syntax error"); + public static final ErrorReply ZADD_NX_XX_ERROR = new ErrorReply("ERR XX and NX options at the same time are not compatible"); + public static final ErrorReply ZADD_GT_LT_NX_ERROR = new ErrorReply("GT, LT, and/or NX options at the same time are not compatible"); + public static final ErrorReply ZADD_INCR_ERROR = new ErrorReply("INCR option supports a single increment-element pair"); + public static final ErrorReply ZADD_SCORE_FLOAT_ERROR = new ErrorReply("value is not a valid float"); + private final String error; public ErrorReply(String error) { diff --git a/store/src/main/java/dev/keva/store/KevaDatabase.java b/store/src/main/java/dev/keva/store/KevaDatabase.java index a5cedb2d..d18bb0bd 100644 --- a/store/src/main/java/dev/keva/store/KevaDatabase.java +++ b/store/src/main/java/dev/keva/store/KevaDatabase.java @@ -1,5 +1,8 @@ package dev.keva.store; +import dev.keva.util.hashbytes.BytesKey; + +import java.util.AbstractMap; import java.util.concurrent.locks.Lock; public interface KevaDatabase { @@ -69,4 +72,9 @@ public interface KevaDatabase { byte[][] mget(byte[]... keys); + int zadd(byte[] key, AbstractMap.SimpleEntry[] members, int flags); + + Double zincrby(byte[] key, Double score, BytesKey e, int flags); + + Double zscore(byte[] key, byte[] member); } diff --git a/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java b/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java index d1cb266a..7412ffc8 100644 --- a/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java +++ b/store/src/main/java/dev/keva/store/impl/OffHeapDatabaseImpl.java @@ -1,11 +1,13 @@ package dev.keva.store.impl; +import dev.keva.store.type.ZSet; import dev.keva.util.hashbytes.BytesKey; import dev.keva.util.hashbytes.BytesValue; import dev.keva.store.DatabaseConfig; import dev.keva.store.KevaDatabase; import dev.keva.store.lock.SpinLock; import lombok.Getter; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import lombok.val; import lombok.var; @@ -16,9 +18,22 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Lock; +import static dev.keva.util.Constants.FLAG_CH; +import static dev.keva.util.Constants.FLAG_GT; +import static dev.keva.util.Constants.FLAG_LT; +import static dev.keva.util.Constants.FLAG_NX; +import static dev.keva.util.Constants.FLAG_XX; + @Slf4j public class OffHeapDatabaseImpl implements KevaDatabase { @Getter @@ -716,4 +731,100 @@ public byte[][] mget(byte[]... keys) { lock.unlock(); } } + + @Override + public int zadd(final byte[] key, @NonNull final AbstractMap.SimpleEntry[] members, final int flags) { + boolean xx = (flags & FLAG_XX) != 0; + boolean nx = (flags & FLAG_NX) != 0; + boolean lt = (flags & FLAG_LT) != 0; + boolean gt = (flags & FLAG_GT) != 0; + boolean ch = (flags & FLAG_CH) != 0; + + // Track both to eliminate conditional branch + int added = 0, changed = 0; + + lock.lock(); + try { + byte[] value = chronicleMap.get(key); + ZSet zSet; + zSet = value == null ? new ZSet() : (ZSet) SerializationUtils.deserialize(value); + for (AbstractMap.SimpleEntry member : members) { + Double currScore = zSet.getScore(member.getValue()); + if (currScore == null) { + if (xx) { + continue; + } + currScore = member.getKey(); + zSet.add(new AbstractMap.SimpleEntry<>(currScore, member.getValue())); + ++added; + ++changed; + continue; + } + Double newScore = member.getKey(); + if(nx || (lt && newScore >= currScore) || (gt && newScore <= currScore)) { + continue; + } + if (!newScore.equals(currScore)) { + zSet.removeByKey(member.getValue()); + zSet.add(member); + ++changed; + } + } + chronicleMap.put(key, SerializationUtils.serialize(zSet)); + return ch ? changed : added; + } finally { + lock.unlock(); + } + } + + @Override + public Double zincrby(byte[] key, Double incr, BytesKey e, int flags) { + lock.lock(); + try { + byte[] value = chronicleMap.get(key); + ZSet zSet; + zSet = value == null ? new ZSet() : (ZSet) SerializationUtils.deserialize(value); + Double currentScore = zSet.getScore(e); + if (currentScore == null) { + if ((flags & FLAG_XX) != 0) { + return null; + } + currentScore = incr; + zSet.add(new AbstractMap.SimpleEntry<>(currentScore, e)); + chronicleMap.put(key, SerializationUtils.serialize(zSet)); + return currentScore; + } + if ((flags & FLAG_NX) != 0) { + return null; + } + if ((flags & FLAG_LT) != 0 && (incr >= 0 || currentScore.isInfinite())) { + return null; + } + if ((flags & FLAG_GT) != 0 && (incr <= 0 || currentScore.isInfinite())) { + return null; + } + zSet.remove(new AbstractMap.SimpleEntry<>(currentScore, e)); + currentScore += incr; + zSet.add(new AbstractMap.SimpleEntry<>(currentScore, e)); + chronicleMap.put(key, SerializationUtils.serialize(zSet)); + return currentScore; + } finally { + lock.unlock(); + } + } + + @Override + public Double zscore(byte[] key, byte[] member) { + lock.lock(); + try { + byte[] value = chronicleMap.get(key); + if (value == null) { + return null; + } + ZSet zset = (ZSet) SerializationUtils.deserialize(value); + return zset.getScore(new BytesKey(member)); + } finally { + lock.unlock(); + } + } } diff --git a/store/src/main/java/dev/keva/store/impl/OnHeapDatabaseImpl.java b/store/src/main/java/dev/keva/store/impl/OnHeapDatabaseImpl.java index 75f3ca81..e28b1c80 100644 --- a/store/src/main/java/dev/keva/store/impl/OnHeapDatabaseImpl.java +++ b/store/src/main/java/dev/keva/store/impl/OnHeapDatabaseImpl.java @@ -1,5 +1,6 @@ package dev.keva.store.impl; +import dev.keva.store.type.ZSet; import dev.keva.util.hashbytes.BytesKey; import dev.keva.util.hashbytes.BytesValue; import dev.keva.store.KevaDatabase; @@ -10,9 +11,19 @@ import org.apache.commons.lang3.SerializationUtils; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Lock; +import static dev.keva.util.Constants.*; +import static dev.keva.util.Constants.FLAG_GT; + public class OnHeapDatabaseImpl implements KevaDatabase { @Getter private final Lock lock = new SpinLock(); @@ -674,4 +685,103 @@ public byte[][] mget(byte[]... keys) { lock.unlock(); } } + + @Override + public int zadd(byte[] key, AbstractMap.SimpleEntry[] members, int flags) { + boolean xx = (flags & FLAG_XX) != 0; + boolean nx = (flags & FLAG_NX) != 0; + boolean lt = (flags & FLAG_LT) != 0; + boolean gt = (flags & FLAG_GT) != 0; + boolean ch = (flags & FLAG_CH) != 0; + + // Track both to eliminate conditional branch + int added = 0, changed = 0; + + lock.lock(); + try { + final BytesKey mapKey = new BytesKey(key); + byte[] value = map.get(mapKey).getBytes(); + ZSet zSet; + zSet = value == null ? new ZSet() : (ZSet) SerializationUtils.deserialize(value); + for (AbstractMap.SimpleEntry member : members) { + Double currScore = zSet.getScore(member.getValue()); + if (currScore == null) { + if (xx) { + continue; + } + currScore = member.getKey(); + zSet.add(new AbstractMap.SimpleEntry<>(currScore, member.getValue())); + ++added; + ++changed; + continue; + } + Double newScore = member.getKey(); + if(nx || (lt && newScore >= currScore) || (gt && newScore <= currScore)) { + continue; + } + if (!newScore.equals(currScore)) { + zSet.removeByKey(member.getValue()); + zSet.add(member); + ++changed; + } + } + map.put(mapKey, new BytesValue(SerializationUtils.serialize(zSet))); + return ch ? changed : added; + } finally { + lock.unlock(); + } + } + + @Override + public Double zincrby(byte[] key, Double incr, BytesKey e, int flags) { + lock.lock(); + try { + final BytesKey mapKey = new BytesKey(key); + byte[] value = map.get(mapKey).getBytes(); + ZSet zSet; + zSet = value == null ? new ZSet() : (ZSet) SerializationUtils.deserialize(value); + Double currentScore = zSet.getScore(e); + if (currentScore == null) { + if ((flags & FLAG_XX) != 0) { + return null; + } + currentScore = incr; + zSet.add(new AbstractMap.SimpleEntry<>(currentScore, e)); + map.put(mapKey, new BytesValue(SerializationUtils.serialize(zSet))); + return currentScore; + } + if ((flags & FLAG_NX) != 0) { + return null; + } + if ((flags & FLAG_LT) != 0 && incr >= 0) { + return null; + } + if ((flags & FLAG_GT) != 0 && incr <= 0) { + return null; + } + zSet.remove(new AbstractMap.SimpleEntry<>(currentScore, e)); + currentScore += incr; + zSet.add(new AbstractMap.SimpleEntry<>(currentScore, e)); + map.put(mapKey, new BytesValue(SerializationUtils.serialize(zSet))); + return currentScore; + } finally { + lock.unlock(); + } + + } + + @Override + public Double zscore(byte[] key, byte[] member) { + lock.lock(); + try { + byte[] value = map.get(new BytesKey(key)).getBytes(); + if (value == null) { + return null; + } + ZSet zset = (ZSet) SerializationUtils.deserialize(value); + return zset.getScore(new BytesKey(member)); + } finally { + lock.unlock(); + } + } } diff --git a/store/src/main/java/dev/keva/store/type/ZSet.java b/store/src/main/java/dev/keva/store/type/ZSet.java new file mode 100644 index 00000000..ff55cb1a --- /dev/null +++ b/store/src/main/java/dev/keva/store/type/ZSet.java @@ -0,0 +1,203 @@ +package dev.keva.store.type; + +import dev.keva.util.hashbytes.BytesKey; +import lombok.NonNull; + +import java.io.Serializable; +import java.util.AbstractMap.SimpleEntry; +import java.util.AbstractSet; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.NavigableSet; +import java.util.TreeSet; + +/** + * A SortedSet implementation tailor made for Redis, and hence no generic definition. + * The current implementation uses TreeSet which internally used Balanced BST. + * In the future, if needed, we can implement a SkipList. + */ +public class ZSet extends AbstractSet> implements NavigableSet>, Serializable { + + private static final long serialVersionUID = 1L; + + private final HashMap keys = new HashMap<>(); + + private final TreeSet> scores = new TreeSet<>((Comparator> & Serializable)(e1, e2) -> { + int cmp = e1.getKey().compareTo(e2.getKey()); + if (cmp != 0) { + return cmp; + } + return e1.getValue().compareTo(e2.getValue()); + }); + + @Override + public SimpleEntry lower(SimpleEntry entry) { + return scores.lower(entry); + } + + @Override + public SimpleEntry floor(SimpleEntry entry) { + return scores.floor(entry); + } + + @Override + public SimpleEntry ceiling(SimpleEntry entry) { + return scores.ceiling(entry); + } + + @Override + public SimpleEntry higher(SimpleEntry entry) { + return scores.higher(entry); + } + + @Override + public SimpleEntry pollFirst() { + return scores.pollFirst(); + } + + @Override + public SimpleEntry pollLast() { + return scores.pollLast(); + } + + @Override + public int size() { + return keys.size(); + } + + @Override + public boolean isEmpty() { + return keys.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return scores.contains(o); + } + + @Override + public Iterator> iterator() { + return scores.iterator(); + } + + @Override + public Object[] toArray() { + return scores.toArray(); + } + + @Override + public T[] toArray(@NonNull T[] ts) { + return scores.toArray(ts); + } + + @Override + public boolean add(SimpleEntry entry) { + boolean result = true; + if (keys.containsKey(entry.getValue())){ + result = false; + scores.remove(new SimpleEntry<>(keys.get(entry.getValue()), entry.getValue())); + } + scores.add(new SimpleEntry<>(entry.getKey(), entry.getValue())); + keys.put(entry.getValue(), entry.getKey()); + return result; + } + + @Override + public boolean remove(@NonNull Object o) { + SimpleEntry entry = (SimpleEntry) o; + if (keys.containsKey(entry.getValue())){ + scores.remove(new SimpleEntry<>(entry.getKey(), entry.getValue())); + keys.remove(entry.getValue()); + return true; + } + return false; + } + + public boolean removeByKey(@NonNull BytesKey key) { + if (keys.containsKey(key)) { + scores.remove(new SimpleEntry<>(keys.get(key), key)); + keys.remove(key); + return true; + } + return false; + } + + @Override + public synchronized void clear() { + scores.clear(); + keys.clear(); + } + + @NonNull + @Override + public NavigableSet> descendingSet() { + return scores.descendingSet(); + } + + @NonNull + @Override + public Iterator> descendingIterator() { + return scores.descendingIterator(); + } + + @NonNull + @Override + public NavigableSet> subSet(SimpleEntry start, boolean b1, SimpleEntry end, boolean b2) { + return scores.subSet(start, b1, end, b2); + } + + @NonNull + @Override + public NavigableSet> headSet(SimpleEntry entry, boolean b) { + return scores.headSet(entry, b); + } + + @NonNull + @Override + public NavigableSet> tailSet(SimpleEntry entry, boolean b) { + return scores.tailSet(entry, b); + } + + @Override + public Comparator> comparator() { + return scores.comparator(); + } + + @NonNull + @Override + public java.util.SortedSet> subSet(SimpleEntry begin, SimpleEntry end) { + return scores.subSet(begin, end); + } + + @NonNull + @Override + public java.util.SortedSet> headSet(SimpleEntry entry) { + return scores.headSet(entry); + } + + @NonNull + @Override + public java.util.SortedSet> tailSet(SimpleEntry entry) { + return scores.tailSet(entry); + } + + @Override + public SimpleEntry first() { + return scores.first(); + } + + @Override + public SimpleEntry last() { + return scores.last(); + } + + public Double getScore(BytesKey key) { + return keys.get(key); + } + + @Override + public String toString() { + return keys.toString(); + } +} diff --git a/util/src/main/java/dev/keva/util/Constants.java b/util/src/main/java/dev/keva/util/Constants.java new file mode 100644 index 00000000..2ad4f815 --- /dev/null +++ b/util/src/main/java/dev/keva/util/Constants.java @@ -0,0 +1,14 @@ +package dev.keva.util; + +public final class Constants { + + public static final int FLAG_XX = 1; + public static final int FLAG_NX = 1 << 1; + public static final int FLAG_GT = 1 << 2; + public static final int FLAG_LT = 1 << 3; + public static final int FLAG_INCR = 1 << 4; + public static final int FLAG_CH = 1 << 5; + + private Constants() { + } +} diff --git a/util/src/main/java/dev/keva/util/DoubleUtil.java b/util/src/main/java/dev/keva/util/DoubleUtil.java new file mode 100644 index 00000000..fe194648 --- /dev/null +++ b/util/src/main/java/dev/keva/util/DoubleUtil.java @@ -0,0 +1,16 @@ +package dev.keva.util; + +public final class DoubleUtil { + + private DoubleUtil(){} + + public static String toString(Double d){ + if (d.equals(Double.POSITIVE_INFINITY)) { + return "inf"; + } + if (d.equals(Double.NEGATIVE_INFINITY)) { + return "-inf"; + } + return d.toString(); + } +}