环境配置
- Ubuntu 18
- JDK 11
- Maven 3.9.3
- VSCode 最新版
- Flink 1.17.1
- Clickhouse 21.4.5.46
- Flink CDC 2.4.1
Flink 本地开发快速上手
JDK 11 和 JDK17
JDK17主要是为了VSCode java配置要求
sudo apt install openjdk-11-jdk
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
下载tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local/jdk/
解压- 环境配置,或之后一起配置
查看java位置
whereis java
ll -a /usr/bin/java
ll -a /etc/alternatives/java
Maven
- 解压在 /usr/local/maven
- 修改settings.xml
<localRepository>/home/bigdata/maven_repos</localRepository>
<mirror>
<id>aliyun-maven</id>
<mirrorOf>*</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
注意:/home/bigdata/maven_repos不要先创建,不然之后运行命令会构建失败
也可以直接sudo apt install maven
安装
- 配置环境变量
在/etc/proflie中配置
export JDK17_HOME=/usr/local/jdk/jdk-17.0.7
export JDK11_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export JAVA_HOME=${JDK11_HOME}
export MAVEN_HOME=/usr/local/maven/apache-maven-3.9.3
export PATH=${MAVEN_HOME}/bin:${JAVA_HOME}/bin:$PATH
source /etc/profile
使命令生效
Flink
wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz --no-check-certificate
tar -xvzf flink-1.17.1-bin-scala_2.12.tgz -C /usr/local/flink/
cd flink-1.17.1/
- 启动集群
./bin/start-cluster.sh
- 测试示例
./bin/flink run examples/streaming/WordCount.jar
- 查看效果
tail log/flink-*-taskexecutor-*.out
- 停止集群
./bin/stop-cluster.sh
远程开发
配置Frp
- frps:
[common]
bind_port = 7000
vhost_http_port = 50070
token = xxxxx
dashboard_port = xxxx
dashboard_user = xxxx
dashboard_pwd = xxxx
enable_prometheus = true
log_file = /var/log/frps.log
log_level = info
log_max_days = 3
- frpc
[common]
server_addr = 域名
server_port = 7000
tls_enable = true
token = xxxxx
[ssh]
type = tcp
local_ip = 127.0.0.1
local_port = 22
remote_port = xxxxx
[flink]
type = http
local_ip = 127.0.0.1
local_port = xxxx
custom_domains = xxxx
- 后台启动
./frpc -c frpc.ini &
- 停止
ps -aux|grep frp| grep -v grep
kill 进程
配置VSCode
然后这里我选择使用VSCode远程开发
下载Remote Development 拓展,然后连接即可进入远程环境,记得下载相应拓展,因为远程是没有本地的拓展的,需要重新下载
Flink初探
基于官方命令构建反欺诈检测系统
官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/
- 生成工程文件
在用户目录下创建一个workspace,然后在workspace下运行以下命令
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-java \
-DarchetypeVersion=1.17.1 \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
会自动生成frauddetection工程文件,用tree
生成项目层级目录
.
├── pom.xml // 依赖管理文件
└── src
└── main
├── java // 相关java代码
│ └── spendreport
│ ├── FraudDetectionJob.java
│ └── FraudDetector.java
└── resources // 相关配置
└── log4j2.properties
- Flink编程模型
大数据通用:对接数据源 --> 使用引擎进行业务逻辑处理 --> 结果写到某个地方去
Flink: Source --> Transformation --> Sink - 在 IDE 中运行项目可能会导致 java.lang.NoClassDefFoundError 异常。这可能是因为您没有将所有必需的 Flink 依赖项隐式加载到类路径中。
解决办法:
- IntelliJ IDEA: Go to Run > Edit Configurations > Modify options > Select include dependencies with "Provided" scope.
- VSCode:将所有"Provided" scope注释掉。让他们能在运行时被编译。
注: "Provided" scope意味着打包的时候不会包含。
- 启用webUI
vim /flink-1.17.1/conf/flink-conf.yaml
修改这行为rest.bind-address: 0.0.0.0
- 额外知识点:
- CDC:实时抓取数据库日志文件,用于异构数据库同步。
- binlog:MySQL集群同步
- OLAP:侧重于查询
- OLTP:侧重于事务,两者很难兼顾
flinkCDC快速上手
Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: One or more required options are missing.
Missing required options are:
slot.name
官方文档有问题,正在修正。应该在pg建表的时候增加'slot.name' = 'flink'
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments',
'slot.name' = 'flink'
);
修改之后运行仍然报错:
[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block data
暂时用这个方法解决:(后注:不需要改,应该还是时间的原因)
类加载顺序问题,flink默认是child-first
在flink的flink-conf.yaml文件中添加
classloader.resolve-order: parent-first
重启集群即可。
sql确实执行成功了,但是job执行未成功,kibana里面也没有找到数据
问题已解决,咨询了官方群的大佬,原来是MySQL时间和ip地址?
的问题,排查过程:
建表之后运行select * from products;
报错:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
确定是时间问题:
修改时间vim /etc/timezone
将内容改为UTC
备注: 校准时间用timedatectl
Flink 编程
pom文件
主pom文件
// 依赖管理
<dependencyManagement>
<dependencies>
<dependency>
</dependency>
</dependencies>
</dependencyManagement>
// 插件管理
// 打包实际上需要插件来进行,不然jar包里只会包含代码,不包含依赖
<build>
<plugins>
<plugin>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
</plugin>
</plugins>
</pluginManagement>
</build>
次级pom文件(不需要版本)
<dependencies>
<dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
</dependency>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
</plugins>
</build>
注意点:
- 用
versions-maven-plugin
来管理jar版本,将他放到父模块。- 统一设置版本
mvn versions:set -DnewVersion=1.2
- 退回版本
mvn versions:revert
- 提交修改
mvn versions:commit
- 统一设置版本
- 两种CDC包的使用
- 使用flink-sql-connector-mysql-cdc更好,可以直接放在flink lib目录下,而不用打包。
- flink-connector-mysql-cdc包务必要打包,不要使用provided scope
测试
nc -lk 9527
这个命令是在本地启动一个TCP服务器,监听9627端口,并将接收到的数据输出到控制台。nc是netcat的缩写,-l选项表示监听模式,-k选项表示保持监听,不断开连接。
流批区别
批处理与流处理代码不同:
groupBy
与keyBy
- 批处理不需要
env.execute
大数据结构
基本都是主从结构Master-Slave结构
JobManager(HA高可用性)-TaskManager
注意事项
- 连接MySQL必须导入flink连接依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.1</version>
</dependency>
- 找不到主类错误:
Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: Error in serialization.
缺少依赖,需要table api依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
注意: 记得在模块的pom文件里也加上
Checkpoint
Flink中的Checkpoint用于实现作业的故障恢复和状态持久化,非常重要。关于Checkpoint可以总结如下几点:
-
Checkpoint用于保存作业运行状态,以便失败后可以恢复。它会定期保存状态快照。
-
触发Checkpoint可以是定时触发,也可以是处理某些记录数触发。需要设置checkpoint间隔参数来控制。
-
Checkpoint会产生一定的性能开销,需要平衡故障恢复需求和性能开销。
-
保存checkpoint的状态backend可以是内存级的,也可以是持久化的,如保存到HDFS等。
-
开启checkpoint后,需要设置state.backend参数来配置状态存储位置。
-
对于实时流作业,可以启用incremental checkpoint来优化性能。
-
exactly-once语义需要开启checkpoint并将sink设置为支持幂等写入。
-
设置checkpoint超时时间,如果checkpoint长时间未完成,会强制触发超时退出。
-
重启作业时从最新的checkpoint恢复状态。
exactly-once:
Flink 中的 exactly-once 表示 end-to-end 的精确一次语义,是 Flink 提供的最强的数据一致性保证。
它的工作原理可以概括为:
-
基于 checkpoint 实现atleast-once(至少一次)语义
Flink 的 checkpoint 机制可以在失败时恢复状态和数据,确保每条数据至少被处理一次。 -
幂等sink来避免重复写入
通过让 sink 更新支持幂等操作,可以避免从 checkpoint 恢复时重复写入导致的数据重复。 -
两者结合实现 exactly-once
checkpoint + 幂等sink 结合可以保证每条数据既不会丢失,也不会重复处理,即实现了精确一次语义。 -
对不同源的支持
Flink 为 Kafka、RabbitMQ 等源提供了支持 exactly-once 的内置连接器。 -
总结:
通过 checkpoint、幂等 sink、idempotent source 三者的配合,Flink 作业可以实现端到端精确一次的语义,使得流式处理结果和批处理一样精确。这是 Flink 的重要优势之一。
时区错误
Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone UTC. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
检查当前时间是否同步timedatectl
Local time: Tue 2023-07-18 11:25:51 CST
Universal time: Tue 2023-07-18 03:25:51 UTC
RTC time: Tue 2023-07-18 03:25:51
Time zone: Asia/Shanghai (CST, +0800)
System clock synchronized: yes
systemd-timesyncd.service active: yes
RTC in local TZ: no
解释:
- 系统本地时间为CST 中国标准时间
- 硬件RTC时间为UTC时间
- 系统时钟已与网络时间同步
- RTC时钟未使用本地时区
用这个命令修改:timedatectl set-timezone UTC
注: 尽量使用timedatectl
来修改时间,而不是修改/etc/timezone
文件,因为timedatectl
会自动修改多个配置文件。
额外知识点
- 可以选择自定义反序列化器
- 可以设置savepoint实现断点续传
Flink CDC采集MySQL binlog日志实时写入ClickHouse
读取从库binlog
报错
Caused by: java.net.ConnectException: Connection refused: connect
clickhouse 远程配置没开启
vim /etc/clickhouse-server/config.xml
把注释掉的<listen_host>::</listen_host>
取消注释,然后重启服务:
service clickhouse-server restart
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation
缺少权限,赋予权限 grant replication slave,replication client on *.* to 'bigdata'@'%';
java.lang.AbstractMethodError: Receiver class com.example.demo.CustomerDebeziumDeserializationSchema does not define or inherit an implementation of the resolved method 'abstract void deserialize(com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord, org.apache.flink.util.Collector)' of interface com.ververica.cdc.debezium.DebeziumDeserializationSchema.
知识点
- MySQL行设置为ROW模式,现在貌似都是默认ROW模式了
- 建表:
CREATE TABLE t_demo (
id Int32,
name String,
birthday Date32,
ts DateTime64
) ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMM(birthday)
ORDER BY id
CREATE TABLE t_demo (
id Int32,
name String,
birthday String
) ENGINE = ReplacingMergeTree(id)
order by id;
-
表引擎合并是延时的
optimize table t_demo
立刻合并排序键值相同的重复项
注: 要在查询时就看合并效果可以在查询语句后面加final
-
官网中文版缺乏大量资料,包括clickhouse-jdbc的使用教程,资料还是需要优先英文。
-
用
Nullable(String)
类型 可传入空字符串null -
反序列器
StringDebeziumDeserializationSchema()
最简单JsonDebeziumDeserializationSchema
教程中使用,简单的封装了一下,信息还是比较多- 还是要自己自定义反序列器
-
ClickHouseSink:继承 RichSinkFunction,重写sink。因为是异构数据库,且官方未提供clickhouse sink。
-
hikaricp连接池,官网示例里建议使用这个来管理连接(暂未使用)
-
sink建议并行度:1。技术主管说是因为要避免乱序,但是根据官方群里的回复,一开始全量阶段,多并行度会增加同步效率,基本不需要考虑顺序;增量阶段,都是一个并行度,结合flink的ckp,有序输出。所以设置为多并行度也没关系。
-
更新操作,有多种方案:
- 直接用插入语句,然后等待表引擎自动合并。不行,因为clickhouse要实时数据分析展示,这样旧数据会影响展示。
- 直接用更新语句。在table api中,读取更新是两条日志-U +U,不方便 (另外,UPDATE 在 MergeTree 引擎中是有限制的,例如无法更新数据表中的主键列。万一遇到有人修改主键的情况,就出问题了。)
- 先删除、再插入。 现在使用的是这个,代码实现也相当简单。
- Mutation操作执行是一个异步的过程,ALTER语句的变种。clickhouse无法实现真正意义上的更新和删除。
未解决的问题
- 无法传递时间类型LocalDateTime到clickhouse的datatime64里
- 在重写的
invoke
方法里死活写不了Integer id = data.getJSONObject("after").getInteger("birthday");
这样获取嵌套json里的数据的语句,birthday一直获取不了,显示为null,但是after是可以获取的。原因不明,现在在batchInsert
里获取,也算勉强能用 - 有时候表引擎手动合并莫名没反应,且通过jdbc写入的和在client里面写入的数据貌似没有合并;
评论区