Skip to content

Commit 18c7b54

Browse files
authored
fix: redis key generate (#432)
1 parent e438599 commit 18c7b54

File tree

25 files changed

+558
-930
lines changed

25 files changed

+558
-930
lines changed

arex-instrumentation/common/arex-common/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,23 @@
1919
<version>${reactor.version}</version>
2020
<scope>provided</scope>
2121
</dependency>
22+
<dependency>
23+
<groupId>io.projectreactor</groupId>
24+
<artifactId>reactor-test</artifactId>
25+
<version>${reactor.version}</version>
26+
<scope>test</scope>
27+
</dependency>
28+
<dependency>
29+
<groupId>com.fasterxml.jackson.core</groupId>
30+
<artifactId>jackson-databind</artifactId>
31+
<version>${jackson.version}</version>
32+
<scope>test</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>io.lettuce</groupId>
36+
<artifactId>lettuce-core</artifactId>
37+
<version>6.1.8.RELEASE</version>
38+
<scope>test</scope>
39+
</dependency>
2240
</dependencies>
2341
</project>

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,52 +8,46 @@
88
import java.util.ArrayList;
99
import java.util.List;
1010
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.function.Consumer;
1112
import java.util.function.UnaryOperator;
1213
import reactor.core.publisher.Flux;
13-
import java.util.function.Function;
1414

1515

16-
public class FluxRecordFunction implements UnaryOperator<Flux<?>> {
16+
public class FluxRecordFunction<T> implements UnaryOperator<Flux<T>> {
1717

18-
private final Function<FluxResult, Void> executor;
18+
private final Consumer<Object> consumer;
1919
private final TraceTransmitter traceTransmitter;
2020

21-
public FluxRecordFunction(Function<FluxResult, Void> executor) {
21+
public FluxRecordFunction(Consumer<Object> consumer) {
2222
this.traceTransmitter = TraceTransmitter.create();
23-
this.executor = executor;
23+
this.consumer = consumer;
2424
}
2525

2626
@Override
27-
public Flux<?> apply(Flux<?> responseFlux) {
27+
public Flux<T> apply(Flux<T> flux) {
2828
// use a list to record all elements
29-
List<FluxReplayUtil.FluxElementResult> fluxElementMockerResults = new ArrayList<>();
29+
List<FluxReplayUtil.FluxElementResult> results = new ArrayList<>();
3030
AtomicInteger index = new AtomicInteger(1);
31-
String responseType = TypeUtil.getName(responseFlux);
32-
return responseFlux
33-
// add element to list
34-
.doOnNext(element -> {
35-
try (TraceTransmitter tm = traceTransmitter.transmit()) {
36-
fluxElementMockerResults.add(
37-
getFluxElementMockerResult(index.getAndIncrement(), element));
38-
}
39-
})
40-
// add error to list
41-
.doOnError(error -> {
42-
try (TraceTransmitter tm = traceTransmitter.transmit()) {
43-
fluxElementMockerResults.add(
44-
getFluxElementMockerResult(index.getAndIncrement(), error));
45-
}
46-
})
47-
.doFinally(result -> {
48-
try (TraceTransmitter tm = traceTransmitter.transmit()) {
49-
FluxResult fluxResult = new FluxResult(responseType, fluxElementMockerResults);
50-
executor.apply(fluxResult);
51-
}
52-
});
31+
String responseType = TypeUtil.getName(flux);
32+
return flux.doOnNext(element -> {
33+
try (TraceTransmitter tm = traceTransmitter.transmit()) {
34+
results.add(buildElementResult(index.getAndIncrement(), element));
35+
}
36+
}).doOnError(error -> {
37+
try (TraceTransmitter tm = traceTransmitter.transmit()) {
38+
results.add(buildElementResult(index.getAndIncrement(), error));
39+
}
40+
}).doFinally(result -> {
41+
try (TraceTransmitter tm = traceTransmitter.transmit()) {
42+
consumer.accept(new FluxResult(responseType, results));
43+
} catch (Exception ignore) {
44+
// ignore
45+
}
46+
});
5347
}
5448

5549

56-
private FluxReplayUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) {
50+
private FluxReplayUtil.FluxElementResult buildElementResult(int index, Object element) {
5751
String content = Serializer.serialize(element, ArexConstants.GSON_SERIALIZER);
5852
return new FluxReplayUtil.FluxElementResult(index, content, TypeUtil.getName(element));
5953
}

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxReplayUtil.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class FluxReplayUtil{
1717

1818

1919

20-
public static Flux<?> restore(Object fluxObj) {
20+
public static <T> Flux<T> restore(Object fluxObj) {
2121
if (fluxObj == null) {
2222
return Flux.empty();
2323
}
@@ -32,14 +32,14 @@ public static Flux<?> restore(Object fluxObj) {
3232
String responseType = fluxResult.getResponseType();
3333
if (responseType != null) {
3434
if (FLUX_FROM_ITERATOR.equals(responseType)) {
35-
return Flux.fromIterable(resultList);
35+
return (Flux<T>) Flux.fromIterable(resultList);
3636
} else if (FLUX_FROM_ARRAY.equals(responseType)) {
37-
return Flux.fromArray(resultList.toArray());
37+
return (Flux<T>) Flux.fromArray(resultList.toArray());
3838
} else if (FLUX_FROM_STREAM.equals(responseType)) {
39-
return Flux.fromStream(resultList.stream());
39+
return (Flux<T>) Flux.fromStream(resultList.stream());
4040
}
4141
}
42-
return Flux.just(resultList);
42+
return (Flux<T>) Flux.fromIterable(resultList);
4343
}
4444

4545

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,34 @@
11
package io.arex.inst.common.util;
22

33
import io.arex.agent.bootstrap.ctx.TraceTransmitter;
4-
import java.util.function.Function;
4+
import java.util.function.Consumer;
55
import java.util.function.UnaryOperator;
66
import reactor.core.publisher.Mono;
77

8-
public class MonoRecordFunction implements UnaryOperator<Mono<?>> {
8+
public class MonoRecordFunction<T> implements UnaryOperator<Mono<T>> {
99

10-
private final Function<Object, Void> executor;
10+
private final Consumer<Object> consumer;
1111
private final TraceTransmitter traceTransmitter;
1212

13-
public MonoRecordFunction(Function<Object, Void> executor) {
13+
public MonoRecordFunction(Consumer<Object> consumer) {
1414
this.traceTransmitter = TraceTransmitter.create();
15-
this.executor = executor;
15+
this.consumer = consumer;
1616
}
1717

1818
@Override
19-
public Mono<?> apply(Mono<?> responseMono) {
20-
return responseMono
21-
.doOnSuccess(result -> {
22-
try (TraceTransmitter tm = traceTransmitter.transmit()) {
23-
executor.apply(result);
24-
}
25-
})
26-
.doOnError(error -> {
27-
try (TraceTransmitter tm = traceTransmitter.transmit()) {
28-
executor.apply(error);
29-
}
30-
});
19+
public Mono<T> apply(Mono<T> responseMono) {
20+
return responseMono.doOnSuccess(result -> {
21+
try (TraceTransmitter tm = traceTransmitter.transmit()) {
22+
consumer.accept(result);
23+
} catch (Exception ignore) {
24+
// ignore
25+
}
26+
}).doOnError(error -> {
27+
try (TraceTransmitter tm = traceTransmitter.transmit()) {
28+
consumer.accept(error);
29+
} catch (Exception ignore) {
30+
// ignore
31+
}
32+
});
3133
}
3234
}

arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxRecordFuntionTest.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertNull;
55
import static org.junit.jupiter.api.Assertions.assertThrows;
6-
import io.arex.inst.common.util.FluxReplayUtil.FluxResult;
6+
77
import io.arex.inst.runtime.config.ConfigBuilder;
88
import io.arex.inst.runtime.context.ContextManager;
9-
import java.util.function.Function;
9+
import java.util.function.Consumer;
1010
import org.junit.jupiter.api.AfterAll;
1111
import org.junit.jupiter.api.BeforeAll;
1212
import org.junit.jupiter.api.Test;
@@ -15,13 +15,18 @@
1515

1616
public class FluxRecordFuntionTest {
1717

18-
static FluxRecordFunction fluxRecordFunction;
18+
static FluxRecordFunction fluxRecordFunction;
1919
@BeforeAll
2020
static void setUp() {
2121
Mockito.mockStatic(ContextManager.class);
2222
ConfigBuilder.create("test").enableDebug(true).build();
23-
Function<FluxResult, Void> executor = result -> null;
24-
fluxRecordFunction = new FluxRecordFunction(executor);
23+
fluxRecordFunction = new FluxRecordFunction(new Consumer<Object>() {
24+
@Override
25+
public void accept(Object object) {
26+
System.out.println("record content: " + object);
27+
throw new RuntimeException("exception");
28+
}
29+
});
2530
}
2631

2732
@AfterAll
@@ -53,13 +58,13 @@ void record() {
5358
}
5459

5560
private static void testNormalFlux() {
56-
Flux flux = Flux.just(1, 2, 3, 4, 5)
61+
Object flux = Flux.just(1, 2, 3, 4, 5)
5762
.doOnNext(val -> System.out.println("val" + ":" + val))
5863
// doFinally performs some operations that have nothing to do with the value of the element.
5964
// If the doFinally operator is called multiple times, doFinally will be executed once at the end of each sequence.
6065
.doFinally(System.out::println);
61-
Flux subscribe = fluxRecordFunction.apply(flux);
62-
Flux blockFirst = fluxRecordFunction.apply(flux);
66+
Flux subscribe = fluxRecordFunction.apply((Flux<Object>) flux);
67+
Flux blockFirst = fluxRecordFunction.apply((Flux<Object>) flux);
6368
// record content: 1,2,3,4,5
6469
subscribe.subscribe();
6570
// record content: 1

arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxReplayUtilTest.java

Lines changed: 107 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,51 @@
44
import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_ITERATOR;
55
import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_STREAM;
66
import static org.junit.jupiter.api.Assertions.assertEquals;
7-
import static org.junit.jupiter.api.Assertions.assertNotNull;
7+
8+
import com.fasterxml.jackson.core.JsonProcessingException;
9+
import com.fasterxml.jackson.databind.JavaType;
10+
import com.fasterxml.jackson.databind.ObjectMapper;
11+
import io.arex.agent.bootstrap.util.StringUtil;
812
import io.arex.inst.common.util.FluxReplayUtil.FluxElementResult;
913
import io.arex.inst.common.util.FluxReplayUtil.FluxResult;
14+
import io.arex.inst.runtime.serializer.Serializer;
15+
import io.arex.inst.runtime.serializer.StringSerializable;
1016
import io.arex.inst.runtime.util.TypeUtil;
17+
import io.lettuce.core.KeyValue;
18+
import java.lang.reflect.Type;
1119
import java.util.ArrayList;
1220
import java.util.List;
21+
import org.junit.jupiter.api.AfterAll;
22+
import org.junit.jupiter.api.BeforeAll;
1323
import org.junit.jupiter.api.Test;
24+
import org.mockito.Mockito;
1425
import reactor.core.publisher.Flux;
26+
import reactor.test.StepVerifier;
1527

1628
public class FluxReplayUtilTest {
29+
@BeforeAll
30+
static void setUp() {
31+
Serializer.builder(new TestJacksonSerializable()).build();
32+
}
1733

34+
@AfterAll
35+
static void tearDown() {
36+
Mockito.clearAllCaches();
37+
}
1838
@Test
19-
void FluxRecory() {
39+
void restore() {
40+
// flux is empty
41+
Flux<?> result = FluxReplayUtil.restore(null);
42+
StepVerifier.create(result)
43+
.expectComplete()
44+
.verify();
45+
2046
List<FluxElementResult> list = new ArrayList<>();
2147
FluxResult fluxResult = new FluxResult(null, list);
22-
// flux is empty
23-
assertNotNull(FluxReplayUtil.restore(null));
24-
Flux<?> result = FluxReplayUtil.restore(fluxResult);
25-
assertNotNull(result);
48+
result = FluxReplayUtil.restore(fluxResult);
49+
StepVerifier.create(result)
50+
.expectComplete()
51+
.verify();
2652

2753
// flux is not empty
2854
FluxElementResult fluxElement1 = new FluxElementResult(1, "1", "java.lang.Integer");
@@ -33,21 +59,95 @@ void FluxRecory() {
3359
// Flux.just()
3460
fluxResult = new FluxResult(null, list);
3561
result = FluxReplayUtil.restore(fluxResult);
36-
assertEquals(TypeUtil.getName(result),"reactor.core.publisher.FluxJust-java.util.ArrayList-");
62+
assertEquals(TypeUtil.getName(result),"reactor.core.publisher.FluxIterable-");
63+
StepVerifier.create(result)
64+
.expectNextMatches(item-> item.equals(1))
65+
.expectError(NullPointerException.class)
66+
.verify();
3767

3868
// Flux.fromIterable()
3969
fluxResult = new FluxResult(FLUX_FROM_ITERATOR, list);
4070
result = FluxReplayUtil.restore(fluxResult);
4171
assertEquals(TypeUtil.getName(result),FLUX_FROM_ITERATOR);
72+
StepVerifier.create(result)
73+
.expectNextMatches(item-> item.equals(1))
74+
.expectError(NullPointerException.class)
75+
.verify();
4276

4377
// Flux.fromArray()
4478
fluxResult = new FluxResult(FLUX_FROM_ARRAY, list);
4579
result = FluxReplayUtil.restore(fluxResult);
4680
assertEquals(TypeUtil.getName(result),FLUX_FROM_ARRAY);
81+
StepVerifier.create(result)
82+
.expectNextMatches(item-> item.equals(1))
83+
.expectError(NullPointerException.class)
84+
.verify();
4785

4886
// Flux.fromStream()
4987
fluxResult = new FluxResult(FLUX_FROM_STREAM, list);
5088
result = FluxReplayUtil.restore(fluxResult);
5189
assertEquals(TypeUtil.getName(result),FLUX_FROM_STREAM);
90+
StepVerifier.create(result)
91+
.expectNextMatches(item-> item.equals(1))
92+
.expectError(NullPointerException.class)
93+
.verify();
94+
}
95+
96+
@Test
97+
void testReactiveMGet() {
98+
List<FluxElementResult> fluxElementResults = new ArrayList<>(2);
99+
FluxElementResult elementResult1 = new FluxElementResult(1, "{\"key\":\"mget-key1\",\"value\":\"mget-value1-2024-04-02 16:37\"}", "io.lettuce.core.KeyValue-java.lang.String,java.lang.String");
100+
FluxElementResult elementResult2 = new FluxElementResult(1, "{\"key\":\"mget-key2\",\"value\":\"mget-value2-2024-04-02 16:37\"}", "io.lettuce.core.KeyValue-java.lang.String,java.lang.String");
101+
fluxElementResults.add(elementResult1);
102+
fluxElementResults.add(elementResult2);
103+
FluxResult fluxResult = new FluxResult("reactor.core.publisher.FluxSource-", fluxElementResults);
104+
Flux<KeyValue<String, String>> restore = FluxReplayUtil.restore(fluxResult);
105+
StepVerifier.create(restore)
106+
.expectNextMatches(item-> item.getKey().equals("mget-key1") && item.getValue().equals("mget-value1-2024-04-02 16:37"))
107+
.expectNextMatches(item-> item.getKey().equals("mget-key2") && item.getValue().equals("mget-value2-2024-04-02 16:37"))
108+
.expectComplete().verify();
109+
}
110+
111+
public static class TestJacksonSerializable implements StringSerializable {
112+
private final ObjectMapper MAPPER = new ObjectMapper();
113+
114+
@Override
115+
public boolean isDefault() {
116+
return true;
117+
}
118+
119+
@Override
120+
public String name() {
121+
return "jackson";
122+
}
123+
124+
@Override
125+
public String serialize(Object object) throws JsonProcessingException {
126+
return MAPPER.writeValueAsString(object);
127+
}
128+
129+
@Override
130+
public <T> T deserialize(String json, Class<T> clazz) throws JsonProcessingException {
131+
if (StringUtil.isEmpty(json) || clazz == null) {
132+
return null;
133+
}
134+
135+
return MAPPER.readValue(json, clazz);
136+
}
137+
138+
@Override
139+
public <T> T deserialize(String json, Type type) throws JsonProcessingException {
140+
if (StringUtil.isEmpty(json) || type == null) {
141+
return null;
142+
}
143+
144+
JavaType javaType = MAPPER.getTypeFactory().constructType(type);
145+
return MAPPER.readValue(json, javaType);
146+
}
147+
148+
@Override
149+
public StringSerializable reCreateSerializer() {
150+
return new TestJacksonSerializable();
151+
}
52152
}
53153
}

0 commit comments

Comments
 (0)