Skip to content

Commit 4668e9d

Browse files
dbus版本升级到0.6.0
canal升级到1.1.4,废弃mysql-extractor模块 sinker功能完善,添加根据table订阅功能 文档完善
1 parent 68482a0 commit 4668e9d

File tree

168 files changed

+2925
-1854
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

168 files changed

+2925
-1854
lines changed

README.md

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,13 @@ DBus的主要潜在客户包括:
3232

3333
专注于数据的收集及实时数据流计算,通过简单灵活的配置,以无侵入的方式对源端数据进行采集,采用高可用的流式计算框架,对公司各个IT系统在业务流程中产生的数据进行汇聚,经过转换处理后成为统一JSON的数据格式(UMS),提供给不同数据使用方订阅和消费,充当数仓平台、大数据分析平台、实时报表和实时营销等业务的数据源。
3434

35-
### 快速开始
36-
37-
全套DBus包含诸多组件(Canal,zk,kafka,storm,mysql,influxdb,grafana),为了简单化,我们准备了All in One 包,包含了预先安装数据和一键启动脚本, 用于快速尝试。 请参考 [Quick Start](docs/quick-start.md)
38-
3935
### 相关文档
4036

41-
详细介绍 DBus请参考 [wiki](docs/index.md)
37+
详细介绍 DBus请参考 [wiki](https://bridata.github.io/DBus/index.html)
4238

43-
常见问题可参考 [FAQ](docs/more-faq.md)
39+
常见问题可参考 [FAQ](https://bridata.github.io/DBus/more-faq.html)
4440

45-
系统介绍参考 [system architecture](docs/more-system-architecture.md)
41+
系统介绍参考 [system architecture](https://bridata.github.io/DBus/more-system-architecture.html)
4642

4743
### 系统架构和工作原理
4844

@@ -110,19 +106,19 @@ DBUS源端数据采集大体来说分为2部分:
110106

111107
##### 编译打包代码
112108

113-
关于编译代码,参考 [compile](docs/more-compile-code.md)
109+
关于编译代码,参考 [compile](https://bridata.github.io/DBus/more-compile-code.html)
114110

115111
##### 版本相关:
116112

117-
建议版本:0.6.0
113+
建议版本:0.6.x
118114

119115
下载发布包:请参考:[downloads](https://github.com/BriData/DBus/releases)
120116

121117
##### 版权声明
122118

123119
DBus 自身使用 Apache v2.0 协议
124120

125-
关于DBus 自身协议,修改第三方包代码,以及三方包协议参考: [License](docs/more-license.md)
121+
关于DBus 自身协议,修改第三方包代码,以及三方包协议参考: [License](https://bridata.github.io/DBus/more-license.html)
126122

127123
##### 其他相关资料:
128124

dbus-auto-check/allinone-auto-check/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>dbus-auto-check</artifactId>
77
<groupId>com.creditease.dbus</groupId>
8-
<version>0.6.0</version>
8+
<version>0.6.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

dbus-auto-check/dbus-canal-auto/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>dbus-auto-check</artifactId>
77
<groupId>com.creditease.dbus</groupId>
8-
<version>0.6.0</version>
8+
<version>0.6.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

dbus-auto-check/dbus-canal-auto/src/main/java/com/creditease/dbus/canal/auto/AddLine.java

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,20 @@
2222
package com.creditease.dbus.canal.auto;
2323

2424

25+
import com.creditease.dbus.canal.utils.CanalUtils;
2526
import org.apache.commons.cli.*;
27+
import org.apache.commons.lang3.StringUtils;
2628

2729
import java.io.BufferedWriter;
2830
import java.io.FileOutputStream;
2931
import java.io.OutputStreamWriter;
32+
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.List;
35+
import java.util.stream.Collectors;
36+
37+
import static com.creditease.dbus.canal.utils.FileUtils.getValueFromFile;
38+
import static com.creditease.dbus.canal.utils.FileUtils.writeProperties;
3039

3140
/**
3241
* This is Description
@@ -35,27 +44,133 @@
3544
* @date 2018/12/12
3645
*/
3746
public class AddLine {
47+
public static String type = "newLine";
3848
public static String dsName = null;
3949
public static String zkString = null;
4050
public static String address = null;
4151
public static String user = null;
4252
public static String pass = null;
4353
public static Integer slaveId = null;
4454
public static String bootstrapServers = null;
55+
public static String tableNames = null;
4556
public static String userDir = System.getProperty("user.dir");
46-
57+
public static String DEFAULT_FILTER = ".*\\\\..*";
4758

4859
public static void main(String[] args) {
4960
try {
5061
parseCommandArgs(args);
5162
autoDeploy();
52-
AutoDeployStart.main(null);
5363
} catch (Exception e) {
5464
e.printStackTrace();
5565
}
5666
}
5767

5868
private static void autoDeploy() throws Exception {
69+
switch (type) {
70+
case "newLine":
71+
newLine();
72+
AutoDeployStart.main(null);
73+
break;
74+
case "editFilter":
75+
editFilter();
76+
break;
77+
case "initFilter":
78+
initFilter();
79+
break;
80+
case "deleteFilter":
81+
deleteFilter();
82+
break;
83+
}
84+
}
85+
86+
private static void deleteFilter() throws Exception {
87+
if (StringUtils.isNotBlank(tableNames)) {
88+
System.out.println("delete filter." + tableNames);
89+
deleteTableFromParamFile();
90+
}
91+
restart();
92+
}
93+
94+
private static void initFilter() throws Exception {
95+
if (StringUtils.isNotBlank(tableNames)) {
96+
System.out.println("init filter." + tableNames);
97+
addAllTableToParamFile();
98+
}
99+
restart();
100+
}
101+
102+
private static void editFilter() throws Exception {
103+
if (StringUtils.isNotBlank(tableNames)) {
104+
System.out.println("edit filter." + tableNames);
105+
addTableToParamFile();
106+
}
107+
restart();
108+
}
109+
110+
private static void addAllTableToParamFile() throws Exception {
111+
String userdir = System.getProperty("user.dir");
112+
String paramFilePath = String.format("%s/canal-%s/conf/%s/instance.properties", userdir, dsName, dsName);
113+
if (StringUtils.isBlank(tableNames)) {
114+
tableNames = DEFAULT_FILTER;
115+
}
116+
writeProperties(paramFilePath, "canal.instance.filter.regex", "canal.instance.filter.regex=" + tableNames);
117+
}
118+
119+
private static void deleteTableFromParamFile() throws Exception {
120+
String userdir = System.getProperty("user.dir");
121+
String paramFilePath = String.format("%s/canal-%s/conf/%s/instance.properties", userdir, dsName, dsName);
122+
String filterRegex = getValueFromFile(paramFilePath, "canal.instance.filter.regex");
123+
if (StringUtils.equals(DEFAULT_FILTER, filterRegex)) {
124+
return;
125+
}
126+
List<String> oldFilterList = Arrays.asList(StringUtils.split(filterRegex, ","));
127+
// 删除的表
128+
List<String> tableNameList = Arrays.asList(StringUtils.split(tableNames, ","));
129+
// 新的filter表
130+
List<String> newFilterList = new ArrayList<>();
131+
for (String tableName : oldFilterList) {
132+
if (!tableNameList.contains(tableName)) {
133+
newFilterList.add(tableName);
134+
}
135+
}
136+
String newfilterRegex = newFilterList.stream().collect(Collectors.joining(","));
137+
if (StringUtils.isBlank(newfilterRegex)) {
138+
newfilterRegex = DEFAULT_FILTER;
139+
}
140+
writeProperties(paramFilePath, "canal.instance.filter.regex", "canal.instance.filter.regex=" + newfilterRegex);
141+
}
142+
143+
private static void restart() throws Exception {
144+
String canalPath = String.format("%s/canal-%s", System.getProperty("user.dir"), dsName);
145+
CanalUtils.start(canalPath);
146+
}
147+
148+
private static void addTableToParamFile() throws Exception {
149+
String userdir = System.getProperty("user.dir");
150+
String paramFilePath = String.format("%s/canal-%s/conf/%s/instance.properties", userdir, dsName, dsName);
151+
String filterRegex = getValueFromFile(paramFilePath, "canal.instance.filter.regex");
152+
if (StringUtils.equals(DEFAULT_FILTER, filterRegex)) {
153+
filterRegex = "";
154+
}
155+
List<String> oldFilterList = Arrays.asList(StringUtils.split(filterRegex, ","));
156+
// 新添加的表
157+
List<String> tableNameList = Arrays.asList(StringUtils.split(tableNames, ","));
158+
// 新的filter表
159+
List<String> newFilterList = new ArrayList<>();
160+
for (String tableName : tableNameList) {
161+
if (!oldFilterList.contains(tableName)) {
162+
newFilterList.add(tableName);
163+
}
164+
}
165+
newFilterList.addAll(oldFilterList);
166+
String newfilterRegex = newFilterList.stream().collect(Collectors.joining(","));
167+
if (StringUtils.isBlank(newfilterRegex)) {
168+
newfilterRegex = ".*\\\\..*";
169+
}
170+
writeProperties(paramFilePath, "canal.instance.filter.regex", "canal.instance.filter.regex=" + newfilterRegex);
171+
}
172+
173+
private static void newLine() throws Exception {
59174
BufferedWriter bw = null;
60175
try {
61176
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(userDir + "/conf/canal-auto.properties")));
@@ -89,18 +204,23 @@ private static void autoDeploy() throws Exception {
89204
private static void parseCommandArgs(String[] args) throws Exception {
90205
Options options = new Options();
91206

207+
options.addOption("t", "type", true, "newSchema");
92208
options.addOption("dn", "dsName", true, "");
93209
options.addOption("zk", "zkString", true, "");
94210
options.addOption("a", "address", true, "");
95211
options.addOption("u", "user", true, "");
96212
options.addOption("p", "pass", true, "");
97213
options.addOption("s", "slaveId", true, "");
98214
options.addOption("bs", "bootstrap.servers", true, "");
215+
options.addOption("tn", "tableNames", true, "");
99216

100217

101218
CommandLineParser parser = new DefaultParser();
102219
try {
103220
CommandLine line = parser.parse(options, args);
221+
if (line.hasOption("type")) {
222+
type = line.getOptionValue("type");
223+
}
104224
if (line.hasOption("dsName")) {
105225
dsName = line.getOptionValue("dsName");
106226
}
@@ -122,6 +242,9 @@ private static void parseCommandArgs(String[] args) throws Exception {
122242
if (line.hasOption("bootstrap.servers")) {
123243
bootstrapServers = line.getOptionValue("bootstrap.servers");
124244
}
245+
if (line.hasOption("tableNames")) {
246+
tableNames = line.getOptionValue("tableNames");
247+
}
125248

126249
} catch (ParseException e) {
127250
throw e;

dbus-auto-check/dbus-canal-auto/src/main/java/com/creditease/dbus/canal/utils/CanalUtils.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,17 @@ public static void start(String canalPath) throws Exception {
4040
try {
4141
String startPath = canalPath + "/bin/" + "startup.sh";
4242
String stopPath = canalPath + "/bin/" + "stop.sh";
43+
4344
String cmd = "sh " + stopPath;
4445
writeAndPrint("exec: " + cmd);
45-
4646
//停止已存在
4747
exec(cmd);
48+
4849
cmd = "sh " + startPath;
4950
writeAndPrint("exec: " + cmd);
50-
5151
exec(cmd);
52-
5352
} catch (Exception e) {
5453
writeAndPrint("************************************* START CANAL FAIL ************************************** ");
55-
5654
throw e;
5755
}
5856
}
@@ -100,7 +98,7 @@ public static String exec(Object cmd) throws Exception {
10098

10199
try {
102100
if (cmd instanceof String) {
103-
process = Runtime.getRuntime().exec(cmd.toString());
101+
process = Runtime.getRuntime().exec(((String) cmd));
104102
} else {
105103
String[] cmd2 = (String[]) cmd;
106104
process = Runtime.getRuntime().exec(cmd2);

dbus-auto-check/dbus-canal-auto/src/main/java/com/creditease/dbus/canal/utils/FileUtils.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
*
10+
*
1111
* http://www.apache.org/licenses/LICENSE-2.0
12-
*
12+
*
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -45,9 +45,8 @@ public static void init(BufferedWriter bufferedWriter) {
4545
}
4646

4747
public static DeployPropsBean readProps(String path) throws Exception {
48-
try {
48+
try (InputStream ins = new BufferedInputStream(new FileInputStream(path))) {
4949
Properties deployProps = new Properties();
50-
InputStream ins = new BufferedInputStream(new FileInputStream(path));
5150
deployProps.load(ins);
5251
DeployPropsBean props = new DeployPropsBean();
5352
props.setDsName(deployProps.getProperty("dsname").trim());
@@ -68,6 +67,20 @@ public static DeployPropsBean readProps(String path) throws Exception {
6867
}
6968
}
7069

70+
public static String getValueFromFile(String path, String key) throws Exception {
71+
try (BufferedReader br = new BufferedReader(new FileReader(path))) {
72+
String line;
73+
while ((line = br.readLine()) != null) {
74+
if (line.matches(key + "\\s*=.*")) {
75+
return StringUtils.trim(StringUtils.split(line, "=")[1]);
76+
}
77+
}
78+
return null;
79+
} catch (Exception e) {
80+
throw e;
81+
}
82+
}
83+
7184
/**
7285
* 更新properties文件某key的value
7386
*/

dbus-auto-check/dbus-ogg-auto/pom.xml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>dbus-auto-check</artifactId>
77
<groupId>com.creditease.dbus</groupId>
8-
<version>0.6.0</version>
8+
<version>0.6.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

@@ -45,10 +45,6 @@
4545
<groupId>org.apache.curator</groupId>
4646
<artifactId>curator-recipes</artifactId>
4747
</exclusion>
48-
<exclusion>
49-
<groupId>mysql</groupId>
50-
<artifactId>mysql-connector-java</artifactId>
51-
</exclusion>
5248
</exclusions>
5349
</dependency>
5450
<dependency>

dbus-auto-check/log-auto-check/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>dbus-auto-check</artifactId>
77
<groupId>com.creditease.dbus</groupId>
8-
<version>0.6.0</version>
8+
<version>0.6.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

dbus-auto-check/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>dbus-main</artifactId>
77
<groupId>com.creditease.dbus</groupId>
8-
<version>0.6.0</version>
8+
<version>0.6.1</version>
99
</parent>
1010

1111
<modelVersion>4.0.0</modelVersion>

dbus-commons/pom.xml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>com.creditease.dbus</groupId>
88
<artifactId>dbus-main</artifactId>
9-
<version>0.6.0</version>
9+
<version>0.6.1</version>
1010
</parent>
1111
<artifactId>dbus-commons</artifactId>
1212

@@ -78,16 +78,10 @@
7878
<!-- 你需要添加 mysql 依赖在这里 mysql-connector-java -->
7979

8080
<!-- 你需要添加 oracle 依赖在这里 ojdbc7 -->
81-
<!--
82-
dbcp从1.4升级到2.6.0对应的oracle需要升级,否在报以下错误
83-
Exception in thread "emit-heartbeat-event-xinghuo" java.lang.AbstractMethodError
84-
at org.apache.commons.dbcp2.DelegatingConnection.isValid(DelegatingConnection.java:874)
85-
-->
8681
<!-- mvn install:install-file -DgroupId=com.oracle -DartifactId=ojdbc7 -Dversion=12.1.0.2 -Dpackaging=jar -Dfile=ojdbc7-12.1.0.2.jar -DgeneratePom=true -->
8782

8883

8984
<!-- 你需要添加 db2 依赖在这里 db2jcc4 -->
90-
<!-- Install driver jar to your local repository with the flowing command.-->
9185
<!-- mvn install:install-file -DgroupId=com.ibm.db2.jcc -DartifactId=db2jcc4 -Dversion=4.23.42 -Dpackaging=jar -Dfile=db2jcc4-4.23.42.jar -DgeneratePom=true-->
9286

9387
</dependencies>

0 commit comments

Comments
 (0)