Skip to content

Commit 0264b4a

Browse files
committed
move sharding-orchestration-reg-etcd from apache repo to here for license reason
0 parents  commit 0264b4a

File tree

11 files changed

+1691
-0
lines changed

11 files changed

+1691
-0
lines changed

pom.xml

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.shardingsphere</groupId>
23+
<artifactId>sharding-orchestration-reg</artifactId>
24+
<version>4.0.0-RC1-SNAPSHOT</version>
25+
</parent>
26+
<artifactId>sharding-orchestration-reg-etcd</artifactId>
27+
<name>${project.artifactId}</name>
28+
29+
<dependencies>
30+
<dependency>
31+
<groupId>org.apache.shardingsphere</groupId>
32+
<artifactId>sharding-orchestration-reg-api</artifactId>
33+
<version>${project.version}</version>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>io.grpc</groupId>
38+
<artifactId>grpc-netty</artifactId>
39+
</dependency>
40+
<dependency>
41+
<groupId>io.grpc</groupId>
42+
<artifactId>grpc-protobuf</artifactId>
43+
</dependency>
44+
<dependency>
45+
<groupId>io.grpc</groupId>
46+
<artifactId>grpc-stub</artifactId>
47+
</dependency>
48+
<dependency>
49+
<groupId>com.github.rholder</groupId>
50+
<artifactId>guava-retrying</artifactId>
51+
</dependency>
52+
</dependencies>
53+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.orchestration.reg.etcd;
19+
20+
import com.google.common.base.Optional;
21+
import com.google.common.base.Splitter;
22+
import com.google.protobuf.ByteString;
23+
import etcdserverpb.KVGrpc;
24+
import etcdserverpb.KVGrpc.KVFutureStub;
25+
import etcdserverpb.LeaseGrpc;
26+
import etcdserverpb.LeaseGrpc.LeaseFutureStub;
27+
import etcdserverpb.Rpc.LeaseGrantRequest;
28+
import etcdserverpb.Rpc.PutRequest;
29+
import etcdserverpb.Rpc.RangeRequest;
30+
import etcdserverpb.Rpc.RangeResponse;
31+
import etcdserverpb.Rpc.WatchCreateRequest;
32+
import etcdserverpb.Rpc.WatchRequest;
33+
import etcdserverpb.WatchGrpc;
34+
import etcdserverpb.WatchGrpc.WatchStub;
35+
import io.grpc.Channel;
36+
import mvccpb.Kv.KeyValue;
37+
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter;
38+
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
39+
import org.apache.shardingsphere.orchestration.reg.etcd.internal.channel.EtcdChannelFactory;
40+
import org.apache.shardingsphere.orchestration.reg.etcd.internal.keepalive.KeepAlive;
41+
import org.apache.shardingsphere.orchestration.reg.etcd.internal.retry.EtcdRetryEngine;
42+
import org.apache.shardingsphere.orchestration.reg.etcd.internal.watcher.EtcdWatchStreamObserver;
43+
import org.apache.shardingsphere.orchestration.reg.exception.RegistryCenterException;
44+
import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener;
45+
46+
import java.util.ArrayList;
47+
import java.util.Arrays;
48+
import java.util.Collections;
49+
import java.util.List;
50+
import java.util.concurrent.Callable;
51+
import java.util.concurrent.ExecutionException;
52+
import java.util.concurrent.TimeUnit;
53+
import java.util.concurrent.TimeoutException;
54+
55+
/**
56+
* Etcd based registry center.
57+
*
58+
* @author junxiong
59+
*/
60+
public final class EtcdRegistryCenter implements RegistryCenter {
61+
62+
private RegistryCenterConfiguration config;
63+
64+
private EtcdRetryEngine etcdRetryEngine;
65+
66+
private KVFutureStub kvStub;
67+
68+
private LeaseFutureStub leaseStub;
69+
70+
private WatchStub watchStub;
71+
72+
private KeepAlive keepAlive;
73+
74+
@Override
75+
public void init(final RegistryCenterConfiguration config) {
76+
this.config = config;
77+
etcdRetryEngine = new EtcdRetryEngine(config);
78+
Channel channel = EtcdChannelFactory.getInstance(Splitter.on(',').trimResults().splitToList(config.getServerLists()));
79+
kvStub = KVGrpc.newFutureStub(channel);
80+
leaseStub = LeaseGrpc.newFutureStub(channel);
81+
watchStub = WatchGrpc.newStub(channel);
82+
keepAlive = new KeepAlive(channel, config.getTimeToLiveSeconds());
83+
}
84+
85+
@Override
86+
public String get(final String key) {
87+
final RangeRequest request = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(key)).build();
88+
return etcdRetryEngine.execute(new Callable<String>() {
89+
90+
@Override
91+
public String call() throws InterruptedException, ExecutionException, TimeoutException {
92+
RangeResponse response = kvStub.range(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS);
93+
return response.getKvsCount() > 0 ? response.getKvs(0).getValue().toStringUtf8() : null;
94+
}
95+
}).orNull();
96+
}
97+
98+
@Override
99+
public String getDirectly(final String key) {
100+
return get(key);
101+
}
102+
103+
@Override
104+
public boolean isExisted(final String key) {
105+
return null != get(key);
106+
}
107+
108+
@Override
109+
public List<String> getChildrenKeys(final String key) {
110+
String path = key + "/";
111+
final RangeRequest request = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(path)).setRangeEnd(getRangeEnd(path)).build();
112+
Optional<List<String>> result = etcdRetryEngine.execute(new Callable<List<String>>() {
113+
114+
@Override
115+
public List<String> call() throws InterruptedException, ExecutionException, TimeoutException {
116+
RangeResponse response = kvStub.range(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS);
117+
List<String> result = new ArrayList<>();
118+
for (KeyValue each : response.getKvsList()) {
119+
String childFullPath = each.getKey().toStringUtf8();
120+
result.add(childFullPath.substring(childFullPath.lastIndexOf("/") + 1));
121+
}
122+
return result;
123+
}
124+
});
125+
return result.isPresent() ? result.get() : Collections.<String>emptyList();
126+
}
127+
128+
@Override
129+
public void persist(final String key, final String value) {
130+
final PutRequest request = PutRequest.newBuilder().setPrevKv(true).setKey(ByteString.copyFromUtf8(key)).setValue(ByteString.copyFromUtf8(value)).build();
131+
etcdRetryEngine.execute(new Callable<Void>() {
132+
133+
@Override
134+
public Void call() throws InterruptedException, ExecutionException, TimeoutException {
135+
kvStub.put(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS);
136+
return null;
137+
}
138+
});
139+
}
140+
141+
@Override
142+
public void update(final String key, final String value) {
143+
persist(key, value);
144+
}
145+
146+
@Override
147+
public void persistEphemeral(final String key, final String value) {
148+
final Optional<Long> leaseId = lease();
149+
if (!leaseId.isPresent()) {
150+
throw new RegistryCenterException("Unable to set up heat beat for key %s", key);
151+
}
152+
final PutRequest request = PutRequest.newBuilder().setPrevKv(true).setLease(leaseId.get()).setKey(ByteString.copyFromUtf8(key)).setValue(ByteString.copyFromUtf8(value)).build();
153+
etcdRetryEngine.execute(new Callable<Void>() {
154+
155+
@Override
156+
public Void call() throws InterruptedException, ExecutionException, TimeoutException {
157+
kvStub.put(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS);
158+
return null;
159+
}
160+
});
161+
}
162+
163+
private Optional<Long> lease() {
164+
final LeaseGrantRequest request = LeaseGrantRequest.newBuilder().setTTL(config.getTimeToLiveSeconds()).build();
165+
return etcdRetryEngine.execute(new Callable<Long>() {
166+
167+
@Override
168+
public Long call() throws InterruptedException, ExecutionException, TimeoutException {
169+
long leaseId = leaseStub.leaseGrant(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS).getID();
170+
keepAlive.heartbeat(leaseId);
171+
return leaseId;
172+
}
173+
});
174+
}
175+
176+
@Override
177+
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
178+
WatchCreateRequest createWatchRequest = WatchCreateRequest.newBuilder().setKey(ByteString.copyFromUtf8(key)).setRangeEnd(getRangeEnd(key)).build();
179+
final WatchRequest request = WatchRequest.newBuilder().setCreateRequest(createWatchRequest).build();
180+
etcdRetryEngine.execute(new Callable<Void>() {
181+
182+
@Override
183+
public Void call() {
184+
watchStub.watch(new EtcdWatchStreamObserver(dataChangedEventListener)).onNext(request);
185+
return null;
186+
}
187+
});
188+
}
189+
190+
@Override
191+
public void close() {
192+
keepAlive.close();
193+
}
194+
195+
private ByteString getRangeEnd(final String key) {
196+
byte[] noPrefix = {0};
197+
byte[] endKey = key.getBytes().clone();
198+
for (int i = endKey.length - 1; i >= 0; i--) {
199+
if (endKey[i] < 0xff) {
200+
endKey[i] = (byte) (endKey[i] + 1);
201+
return ByteString.copyFrom(Arrays.copyOf(endKey, i + 1));
202+
}
203+
}
204+
return ByteString.copyFrom(noPrefix);
205+
}
206+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.orchestration.reg.etcd.internal.channel;
19+
20+
import io.grpc.Channel;
21+
import io.grpc.netty.NettyChannelBuilder;
22+
import io.grpc.util.RoundRobinLoadBalancerFactory;
23+
import lombok.AccessLevel;
24+
import lombok.NoArgsConstructor;
25+
26+
import java.util.List;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
29+
/**
30+
* Etcd channel factory.
31+
*
32+
* @author zhangliang
33+
*/
34+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
35+
public final class EtcdChannelFactory {
36+
37+
private static final String TARGET = "etcd";
38+
39+
private static ConcurrentHashMap<List<String>, Channel> etcdChannels = new ConcurrentHashMap<>();
40+
41+
/**
42+
* Get etcd channel instance.
43+
*
44+
* @param endpoints etcd endpoints
45+
* @return etcd channel
46+
*/
47+
public static Channel getInstance(final List<String> endpoints) {
48+
if (etcdChannels.containsKey(endpoints)) {
49+
return etcdChannels.get(endpoints);
50+
}
51+
Channel channel = NettyChannelBuilder.forTarget(TARGET)
52+
.usePlaintext(true)
53+
.nameResolverFactory(new EtcdNameSolverFactory(TARGET, endpoints))
54+
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
55+
.build();
56+
Channel result = etcdChannels.putIfAbsent(endpoints, channel);
57+
return null == result ? channel : result;
58+
}
59+
}

0 commit comments

Comments
 (0)