Skip to main content

Spring-boot 整合flink 教程

· 4 min read
jeesk

Flink框架:Flink整合springboot

首先说一下, 为什么flink 需要集成flink, spring boot给我们带来了更好的框架整合, 同时使用spring的DI和IOC,能更好的使用bean,当然直接使用spring 整合也是一样。后来我们使用了google guice 来整合。

1. 实现原理

实现原理, spring 的启动 一般使用 AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(AppConfig.class); 即可启动spring 容器, 对么spring boot 呢, 看过源码的人或许知道

SpringApplication.run(arge); 只需要在启动flink之前启动sping boot 即可。

2. 代码实战

flink 整合spring boot 以及redission, 并将事件的id放入redis 中, 代码库 https://gitee.com/imomoda/flink-sprint-boot

1. spring boot 启动工具类
@SpringBootApplication(scanBasePackages = {"io.github.jeesk.flink"})
@Import(SpringUtil.class)
@Slf4j
@EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
public class SpringBootApplicationUtil {


static SpringApplication springBootApplication = null;
static SpringApplicationBuilder springApplicationBuilder = null;

public static synchronized void run(String[] arge) {
if (springBootApplication == null) {
StandardEnvironment standardEnvironment = new StandardEnvironment();
MutablePropertySources propertySources = standardEnvironment.getPropertySources();
propertySources.addFirst(new SimpleCommandLinePropertySource(arge));
String startJarPath = SpringBootApplicationUtil.class.getResource("/").getPath().split("!")[0];
String[] activeProfiles = standardEnvironment.getActiveProfiles();
propertySources.addLast(new MapPropertySource("systemProperties", standardEnvironment.getSystemProperties()));
propertySources.addLast(new SystemEnvironmentPropertySource("systemEnvironment", standardEnvironment.getSystemEnvironment()));
if (springBootApplication == null) {
springApplicationBuilder = new SpringApplicationBuilder(SpringBootApplicationUtil.class);
// 这里可以通过命令行传入
springApplicationBuilder.profiles("dev");
springApplicationBuilder.sources(SpringBootApplicationUtil.class).web(WebApplicationType.NONE);
}
springBootApplication = springApplicationBuilder.build();
springBootApplication.run(arge);
}
}


}
package io.github.jeesk.flink;

import cn.hutool.extra.spring.SpringUtil;
import io.github.jeesk.flink.config.SpringBootApplicationUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.springframework.data.redis.core.StringRedisTemplate;

public class FraudDetectionJob {
public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();
if (args != null) {
configuration.setString("args", String.join(" ", args));
}
SpringBootApplicationUtil.run(args);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");

DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");

alerts
.addSink(new AlertSink())
.name("send-alerts");

env.execute("Fraud Detection");
}

static public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

private StringRedisTemplate redisTemplate = null;

@Override
public void open(Configuration parameters) throws Exception {
// 初始化bean
super.open(parameters);
SpringBootApplicationUtil.run(parameters.getString("arge", "").split(" "));
redisTemplate = SpringUtil.getBean(StringRedisTemplate.class);

}

@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {

Alert alert = new Alert();
alert.setId(transaction.getAccountId());
// 将id 放入redis 中
redisTemplate.opsForSet().add("tmpKey", String.valueOf(alert.getId()));
collector.collect(alert);
}
}
}

  1. 服务器端处理: flink 的安装目录下面放入logback 的包,log4j-over-slf4j-1.7.15.jar,logback-classic-1.2.3.jar,logback-core-1.2.3.jar ,
  2. 然后删除lib下面关于log4j的包 log4j-1.2.17.jar及slf4j-log4j12-1.7.15.jar), 如果不懂这些包的作用可以仔细阅读: JAVA 常见日志依赖处理细节
  3. 在代码的pom文件里面排除log4j的包
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
<!--排除log4j-->
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.1</version>
<!--排除log4j-->
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
  1. 如果想修改flink 的logback的日志文件 , 可以在flink的conf目录下面修改下面的三个文件
logback-console.xml
logback-session.xml
logback.xml

3.参考内容