博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink批量(batch)写入mysql/oracle
阅读量:3958 次
发布时间:2019-05-24

本文共 7089 字,大约阅读时间需要 23 分钟。

1、前言

博主之前分享过一篇文章,是flink高性能写入关系型数据库,那篇文章的效果虽然可以实现写入数据的高性能,但是牺牲了程序的健壮性,比如遇到不可控因素:数据库重启,连接失效,连接超时等,这样线上运行的程序可能就会出现问题,并且这样的问题可能只会日志打印error,并不会导致程序的挂掉,所以如果出现这样的问题,很难被发现。

接下来,博主分享一波源代码,实现流式处理批量写入关系型数据库。

整个程序的流量是这样的: kafka->flink->mysql

2、driver类描述:flink消费kafka数据源,利用window实现每10s进行一次sink。

package com.learn.flinkBatchMysql;import com.learn.metricsCounter.MyMapper;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import org.apache.flink.util.Collector;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class Driver {    public static void main(String[] args) throws Exception {        //1、flink运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2、kafka数据源        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","centos:9092");        properties.setProperty("group.id", "aa");        FlinkKafkaConsumer011
kafkaSource0 = new FlinkKafkaConsumer011<>("hhhh", new SimpleStringSchema(), properties); DataStreamSource
kafkaSource = env.addSource(kafkaSource0); //3、流式数据没10s做为一个批次,写入到mysql SingleOutputStreamOperator
> streamOperator = kafkaSource.timeWindowAll(Time.seconds(10)).apply(new AllWindowFunction
, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable
values, Collector
> out) throws Exception { ArrayList
students = Lists.newArrayList(values); if (students.size() > 0) { out.collect(students); } } }); //4、每批的数据批量写入到mysql streamOperator.addSink(new SinkToMySQL()); env.execute("metricsCounter"); }}

3、sink类描述:利用batch实现一个窗口内数据输出到mysql。

package com.learn.flinkBatchMysql;import org.apache.commons.dbcp2.BasicDataSource;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.PreparedStatement;import java.util.List;/** * 程序功能: 数据批量 sink 数据到 mysql * */public class SinkToMySQL extends RichSinkFunction
> { private PreparedStatement ps; private BasicDataSource dataSource; private Connection connection; /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); dataSource = new BasicDataSource(); connection = getConnection(dataSource); String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每批数据的插入都要调用一次 invoke() 方法 */ @Override public void invoke(List
value, Context context) throws Exception { //遍历数据集合 for (String student : value) { ps.setInt(1, 1); ps.setString(2, student); ps.setString(3, "123456"); ps.setInt(4, 18); ps.addBatch(); } int[] count = ps.executeBatch(); //批量后执行 System.out.println("成功了插入了" + count.length + "行数据"); } private static Connection getConnection(BasicDataSource dataSource) { dataSource.setDriverClassName("com.mysql.jdbc.Driver"); //注意,替换成自己本地的 mysql 数据库地址和用户名、密码 dataSource.setUrl("jdbc:mysql://localhost:3306/novel"); dataSource.setUsername("root"); dataSource.setPassword("root"); //设置连接池的一些参数 dataSource.setInitialSize(10); dataSource.setMaxTotal(50); dataSource.setMinIdle(2); Connection con = null; try { con = dataSource.getConnection(); System.out.println("创建连接池:" + con); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; }}

4、sink类描述:利用batch实现一个窗口内数据输出到oracle。

package com.learn.flinkBatchMysql;import org.apache.commons.dbcp2.BasicDataSource;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.util.List;/** * 程序功能: 数据批量 sink 数据到 mysql * */public class SinkToMySQL extends RichSinkFunction
> { private PreparedStatement ps; private BasicDataSource dataSource; private Connection connection; /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); dataSource = new BasicDataSource(); connection = getConnection(dataSource); String sql = "insert into STUDENT(ID,NAME,PASSWORD,AAAA) values(?, ?, ?, ?)"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每批数据的插入都要调用一次 invoke() 方法 */ @Override public void invoke(List
value, Context context) throws Exception { //遍历数据集合 for (String student : value) { ps.setInt(1, 1); ps.setString(2, student); ps.setString(3, "123456"); ps.setInt(4, 18); ps.addBatch(); } int[] count = ps.executeBatch(); //批量后执行 System.out.println("成功了插入了" + count.length + "行数据"); } private static Connection getConnection(BasicDataSource dataSource) { //设置连接池的一些参数 dataSource.setInitialSize(10); dataSource.setMaxTotal(50); dataSource.setMinIdle(2); Connection con = null; try { Class.forName("oracle.jdbc.driver.OracleDriver"); con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:xe", "hr", "hr"); System.out.println("创建连接池:" + con); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; }}

oracle的使用建议表名以及字段名全部使用大写,以免报错:如表或视图不存在或者无效的列。

5、总结:

网上很多的例子都是简单的 demo 形式,都是单条数据就创建数据库连接插入 MySQL,如果要写的数据量很大的话,会对 MySQL 的写有很大的压力,如果要提高性能必定要批量的写。就拿我们现在这篇文章来说,如果数据量大的话,聚合一分钟数据达万条,那么这样批量写会比来一条写一条性能提高不知道有多少。

转载地址:http://nimzi.baihongyu.com/

你可能感兴趣的文章
网络 https 握手
查看>>
去掉调试信息
查看>>
lsof 使用
查看>>
golang获取本机地址
查看>>
date 使用
查看>>
ipcalc
查看>>
网络 linux 禁止 ping
查看>>
ELF 格式详解
查看>>
chromium 使用
查看>>
linux 检测虚拟机类型
查看>>
go - 运行时:内存不足
查看>>
top 使用
查看>>
Linux Netlink通信机制详解
查看>>
rsync 远程同步
查看>>
nano使用
查看>>
c函数
查看>>
linux 链接
查看>>
centos6.x 添加开机启动服务
查看>>
zfs 简单使用
查看>>
linux EXT4格式分区扩容
查看>>