Skip to content

Commit 1519283

Browse files
[state]: state api refactor. (#374)
* state api refactor. * [state]: fix code style. * [state]: change cstore test. * fix ut. * disable time-consuming test in native cstore. * fix code style. * fix comments. * fix comments.
1 parent 1417cf4 commit 1519283

File tree

73 files changed

+788
-1193
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+788
-1193
lines changed

geaflow-cstore/src/engine/store_engine.rs

+1
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ mod tests {
737737
}
738738

739739
#[test]
740+
#[ignore]
740741
fn test_engine_compact_local() {
741742
log_util::try_init(LogType::ConsoleAndFile, LogLevel::Debug, 0);
742743

geaflow-cstore/tests/test_engine_compact_without_fo.rs

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const COMPACT_ITERATIONS: [u32; 4] = [10000, 50000, 100000, 200000];
3434
const COMPACT_TEST_ITERATIONS: u32 = 10;
3535

3636
#[test]
37+
#[ignore]
3738
fn test_engine_compact_without_fo() {
3839
log_util::try_init(LogType::ConsoleAndFile, LogLevel::Debug, 0);
3940
let mut rng: IsaacRng = SeedableRng::from_entropy();

geaflow-cstore/tests/test_table_with_large_data.rs

+1
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ fn test_table(test_mode: &TestMode) {
204204
}
205205

206206
#[test]
207+
#[ignore]
207208
fn test_table_local_mode() {
208209
test_table(&TestMode::Local);
209210
}

geaflow-cstore/tests/test_two_store_sf10.rs

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use rustc_hash::FxHashMap;
2424
use crate::log_util::info;
2525

2626
#[test]
27+
#[ignore]
2728
fn test_two_store_with_ldbc_sf10_source() {
2829
log_util::try_init(LogType::ConsoleAndFile, LogLevel::Debug, 0);
2930
info!("start test_two_store_with_ldbc_sf10_source");

geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/MemoryClusterMetaKVStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import com.antgroup.geaflow.state.DataModel;
1818
import com.antgroup.geaflow.state.StoreType;
1919
import com.antgroup.geaflow.store.IStoreBuilder;
20+
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
2021
import com.antgroup.geaflow.store.api.key.IKVStore;
21-
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
2222
import com.antgroup.geaflow.store.context.StoreContext;
2323

2424
public class MemoryClusterMetaKVStore<V> implements IClusterMetaKVStore<String, V> {

geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/RocksdbClusterMetaKVStore.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import com.antgroup.geaflow.state.DataModel;
1818
import com.antgroup.geaflow.state.StoreType;
1919
import com.antgroup.geaflow.store.IStoreBuilder;
20-
import com.antgroup.geaflow.store.api.key.IKVStore;
21-
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
20+
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
21+
import com.antgroup.geaflow.store.api.key.IKVStatefulStore;
2222
import com.antgroup.geaflow.store.context.StoreContext;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
@@ -29,15 +29,16 @@ public class RocksdbClusterMetaKVStore<K, V> implements IClusterMetaKVStore<K, V
2929

3030
private static final Integer DEFAULT_VERSION = 1;
3131

32-
private IKVStore<K, Object> kvStore;
32+
private IKVStatefulStore<K, Object> kvStore;
3333
private transient long version;
3434
private String name;
3535

3636
@Override
3737
public void init(StoreContext storeContext) {
3838
IStoreBuilder builder = StoreBuilderFactory.build(StoreType.ROCKSDB.name());
3939
this.name = storeContext.getName();
40-
kvStore = (IKVStore<K, Object>) builder.getStore(DataModel.KV, storeContext.getConfig());
40+
kvStore = (IKVStatefulStore<K, Object>) builder.getStore(DataModel.KV,
41+
storeContext.getConfig());
4142
kvStore.init(storeContext);
4243

4344
// recovery

geaflow/geaflow-core/geaflow-engine/geaflow-highavailability/src/main/java/com/antgroup/geaflow/ha/service/RedisHAService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.antgroup.geaflow.state.StoreType;
2121
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
2222
import com.antgroup.geaflow.store.IStoreBuilder;
23-
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
23+
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
2424
import com.antgroup.geaflow.store.context.StoreContext;
2525
import com.antgroup.geaflow.store.redis.KVRedisStore;
2626

geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/function/OffsetStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
2929
import com.antgroup.geaflow.state.serializer.IKVSerializer;
3030
import com.antgroup.geaflow.store.IStoreBuilder;
31+
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
3132
import com.antgroup.geaflow.store.api.key.IKVStore;
32-
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
3333
import com.antgroup.geaflow.store.context.StoreContext;
3434
import com.antgroup.geaflow.store.rocksdb.RocksdbStoreBuilder;
3535
import java.nio.charset.StandardCharsets;

geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/sink/AsyncKvStoreWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import com.antgroup.geaflow.state.DataModel;
2626
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
2727
import com.antgroup.geaflow.store.IStoreBuilder;
28+
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
2829
import com.antgroup.geaflow.store.api.key.IKVStore;
29-
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
3030
import com.antgroup.geaflow.store.context.StoreContext;
3131
import java.util.LinkedList;
3232
import java.util.Queue;

geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/sink/SyncKvStoreWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import com.antgroup.geaflow.state.DataModel;
2323
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
2424
import com.antgroup.geaflow.store.IStoreBuilder;
25+
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
2526
import com.antgroup.geaflow.store.api.key.IKVStore;
26-
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
2727
import com.antgroup.geaflow.store.context.StoreContext;
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# GeaFlow Store API
2+
3+
GeaFlow Store API provides a unified storage interface for managing state data in graph computing.
4+
This document introduces how to use and implement these storage interfaces.
5+
6+
## Core Interfaces
7+
8+
GeaFlow Store API includes the following core interfaces:
9+
10+
### IStoreBuilder
11+
12+
Interface for building storage instances. Main methods:
13+
14+
- `getStoreDesc()`: Get storage description
15+
- `build()`: Build storage instance
16+
17+
### IBaseStore
18+
19+
The base interface for all storage implementations, defining basic storage operations:
20+
21+
- `init()`: Initialize storage
22+
- `flush()`: Flush data to storage
23+
- `close()`: Close storage connection
24+
25+
### IStatefulStore
26+
27+
State management storage interface, inheriting from IBaseStore, providing stateful management
28+
operations:
29+
30+
- `archive()`: Archive current state
31+
- `recovery()`: Recover state from specified position
32+
- `recoveryLatest()`: Recover latest version state
33+
- `compact()`: Compress/merge historical states
34+
- `drop()`: Drop all data
35+
36+
### IGraphStore
37+
38+
Graph data storage interface, inheriting from IStatefulStore, used for storing graph vertices and
39+
edges:
40+
41+
- `addVertex()`: Add vertex
42+
- `getVertex()`: Get vertex
43+
- `addEdge()`: Add edge
44+
- `getEdges()`: Get edges
45+
- `getOneDegreeGraph()`: Get one degree Graph.
46+
47+
## Usage Examples
48+
49+
### Basic Graph Storage Example
50+
51+
```java
52+
// 1. Create StoreBuilder
53+
IStoreBuilder builder = new MemoryStoreBuilder();
54+
55+
// 2. Build storage instance
56+
IGraphStore graphStore = (IGraphStore) builder.build();
57+
58+
// 3. Initialize storage
59+
graphStore.init(context);
60+
61+
// 4. Use storage
62+
// Add vertex
63+
graphStore.addVertex(vertex);
64+
65+
// Get vertex
66+
IVertex vertex = graphStore.getVertex(vertexId);
67+
68+
// Add edge
69+
graphStore.addEdge(edge);
70+
71+
// Get edges
72+
List<IEdge> edges = graphStore.getEdges(vertexId);
73+
74+
// 5. Close storage
75+
graphStore.close();
76+
```
77+
78+
### State Management Example
79+
80+
```java
81+
// 1. Create stateful storage instance
82+
IStatefulStore statefulStore = new MemoryStatefulStore();
83+
84+
// 2. Initialize
85+
statefulStore.init(context);
86+
87+
// 3. State management operations
88+
// Archive current state
89+
statefulStore.archive();
90+
91+
// Recover from specific version
92+
statefulStore.recovery(version);
93+
94+
// Compact historical states
95+
statefulStore.compact(version);
96+
97+
// 4. Close storage
98+
statefulStore.close();
99+
```
100+
101+
## Implementing Custom Storage
102+
103+
To implement custom storage, you need to:
104+
105+
1. Implement `IStoreBuilder` interface to create storage builder
106+
2. Implement corresponding storage interfaces based on requirements:
107+
- Basic storage: implement `IBaseStore`
108+
- State management storage: implement `IStatefulStore`
109+
- Graph data storage: implement `IGraphStore`
110+
3. Configure SPI service in `resources/META-INF/services`
111+
112+
For examples, refer to implementations in `geaflow-store-memory` module:
113+
114+
- `MemoryStoreBuilder`
115+
- `StaticGraphMemoryStore`
116+
117+
## Configuration
118+
119+
Specify storage implementation and related parameters through configuration:
120+
121+
```java
122+
Configuration conf = new Configuration();
123+
// Specify storage type
124+
conf.put(StoreConfig.STORE_TYPE, "memory");
125+
126+
// State management configurations
127+
conf.put(StoreConfig.STATE_RETENTION_TIME, "24h"); // State retention time
128+
conf.put(StoreConfig.COMPACT_INTERVAL, "6h"); // State compaction interval
129+
130+
// Other storage related configurations

geaflow/geaflow-plugins/geaflow-store/geaflow-store-api/src/main/java/com/antgroup/geaflow/store/AbstractBaseStore.java

-44
This file was deleted.

geaflow/geaflow-plugins/geaflow-store/geaflow-store-api/src/main/java/com/antgroup/geaflow/store/IBaseStore.java

+2-27
Original file line numberDiff line numberDiff line change
@@ -27,37 +27,12 @@ public interface IBaseStore {
2727
void init(StoreContext storeContext);
2828

2929
/**
30-
* archive current store data for persistence.
31-
*/
32-
void archive(long checkpointId);
33-
34-
/**
35-
* recovery the store data from persistent storage.
36-
*/
37-
void recovery(long checkpointId);
38-
39-
/**
40-
* recovery the latest store data.
41-
*/
42-
long recoveryLatest();
43-
44-
/**
45-
* compact the store data.
46-
*/
47-
void compact();
48-
49-
/**
50-
* flush the store data to disk or remote storage.
30+
* flush memory data to disk.
5131
*/
5232
void flush();
5333

5434
/**
55-
* close the store handler and ll other used resources.
35+
* close the store handler and all other used resources.
5636
*/
5737
void close();
58-
59-
/**
60-
* delete the disk data.
61-
*/
62-
void drop();
6338
}

geaflow/geaflow-plugins/geaflow-store/geaflow-store-api/src/main/java/com/antgroup/geaflow/store/ILocalStore.java

-26
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.antgroup.geaflow.store;
2+
3+
/**
4+
* IStateful store is stateful, which means it ensure data HA and can be recovered.
5+
*/
6+
public interface IStatefulStore extends IBaseStore {
7+
8+
/**
9+
* make a snapshot and ensure data HA.
10+
*/
11+
void archive(long checkpointId);
12+
13+
/**
14+
* recover the store data.
15+
*/
16+
void recovery(long checkpointId);
17+
18+
/**
19+
* recover the latest store data.
20+
*/
21+
long recoveryLatest();
22+
23+
/**
24+
* trigger manual store data compaction.
25+
*/
26+
void compact();
27+
28+
/**
29+
* delete the store data.
30+
*/
31+
void drop();
32+
}

0 commit comments

Comments
 (0)