flink 流式处理中如何集成mybatis框架

flink 流式处理中如何集成mybatis框架flink 中自身虽然实现了大量的connectors,如下图所示,也实现了jdbc的connector,可以通过jdbc 去操作数据库,但是flink-jdbc包中对数据库的操作是以ROW来操作并且

flink 中自身虽然实现了大量的connectors,如下图所示,也实现了jdbc的connector,可以通过jdbc 去操作数据库,但是flink-jdbc包中对数据库的操作是以ROW来操作并且对数据库事务的控制比较死板,有时候操作关系型数据库我们会非常怀念在java web应用开发中的非常优秀的mybatis框架,那么其实flink中是可以自己集成mybatis进来的。 我们这里以flink 1.9版本为例来进行集成。

flink 流式处理中如何集成mybatis框架

如下图为flink内部自带的flink-jdbc:

 flink 流式处理中如何集成mybatis框架

创建一个flink的流式处理项目,引入flink的maven依赖和mybatis依赖(注意这里引入的是非spring版本,也就是mybatis的单机版):

<properties>

<flink.version>1.9.0</flink.version>
</properties>
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis</artifactId>
    <version>3.5.2</version>
</dependency>
<!-- flink java 包 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

代码100分

maven依赖引入以后,那么需要在resources下面定义mybatis-config.xml 配置:

flink 流式处理中如何集成mybatis框架

mybatis-config.xml 需要定义如下配置:

代码100分<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <typeAliases>
        <typeAlias alias="BankBillPublic" type="xxxx.xx.xx.BankBillPublic" />
    </typeAliases>
    <environments default="development">
        <environment id="development">
            <transactionManager type="JDBC" />
            <dataSource type="POOLED">
                <property name="driver" value="com.mysql.jdbc.Driver" />
                <property name="url" value="jdbc:mysql://xx.xx.xx.xx:3306/hue?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&autoReconnect=true" />
                <property name="username" value="xxxx" />
                <property name="password" value="xxxx*123%" />
            </dataSource>
        </environment>
    </environments>
    <mappers>
        <mapper resource="mapper/xxxxxMapper.xml" />
    </mappers>
</configuration>

typeAlias 标签中为自定义的数据类型,然后在xxxxxMapper.xml 中parameterType或者resultType就可以直接用这种定义的数据类型。

<mappers> 下面为定义的mybatis 的xxxxxMapper文件。里面放置的都是sql语句。

本文作者张永清,转载请注明出处:flink 流式处理中如何集成mybatis框架

xxxxxMapper.xml 中的sql示例:

代码100分<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="xx.xx.bigdata.flink.xx.xx.mapper.UserRelaInfoMapper">
    <!--查询关键字匹配 -->
    <select id="queryUserRelaInfo" parameterType="String" resultType="UserRelaInfo">
        SELECT id AS id,
        USER_NAME AS userName,
        APPL_IDCARD AS applIdCard,
        PEER_USER AS peerUser,
        RELA_TYPE AS relaType,
        CREATE_USER AS createUser,
        CREATE_TIME AS createTime
        FROM USER_RELA_INFO
        <where>
            <if test="applIdCard != null">
                APPL_IDCARD=#{applIdCard}
            </if>
            <if test="peerUser != null">
            AND PEER_USER=#{peerUser}
            </if>
        </where>
    </select>
</mapper>

 定义Mapper,一般可以定义一个interface ,和xxxxxMapper.xml中的namespace保持一致

注意传入的参数一般加上@Param 注解,传入的参数和xxxxxMapper.xml中需要的参数保持一致

public interface UserRelaInfoMapper {
    List<UserRelaInfo> queryUserRelaInfo(@Param("applIdCard")String applIdCard,@Param("peerUser") String peerUser);
}

定义SessionFactory工厂(单例模式):

/**
 *
 *  sqlsession factory 单例  事务设置为手动提交
 */
public class MybatisSessionFactory {
    private static final Logger LOG = LoggerFactory.getLogger(MybatisSessionFactory.class);
    private static SqlSessionFactory sqlSessionFactory;
    private MybatisSessionFactory(){
        super();
    }
    public synchronized static SqlSessionFactory getSqlSessionFactory(){
        if(null==sqlSessionFactory){
            InputStream inputStream=null;
            try{
                inputStream = MybatisSessionFactory.class.getClassLoader().getResourceAsStream("mybatis-config.xml");
                sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
            }
            catch (Exception e){
                LOG.error("create MybatisSessionFactory read mybatis-config.xml cause Exception",e);
            }
            if(null!=sqlSessionFactory){
                LOG.info("get Mybatis sqlsession sucessed....");
            }
            else {
                LOG.info("get Mybatis sqlsession failed....");
            }
        }
        return sqlSessionFactory;
    }
}

  

使用mybatis 对数据库进行操作:

        SqlSession sqlSession = MybatisSessionFactory.getSqlSessionFactory().openSession();
        UserRelaInfoMapper  userRelaInfoMapper  = sqlSession.getMapper(UserRelaInfoMapper .class);
		//调用对应的方法
		userRelaInfoMapper.xxxx();
		//提交事务
		sqlSession.commit();
		//回滚事务,一般可以捕获异常,在发生Exception的时候,事务进行回滚
		sqlSession.rollback();
		
		
		

这里以mysql为示例,写一个flink下mysql的sink示例,可以自己来灵活控制事务的提交:

public class MysqlSinkFunction<IN> extends RichSinkFunction {
    private static final Logger LOG = LoggerFactory.getLogger(MysqlSinkFunction.class);
    @Override
    public void invoke(Object value, Context context) throws Exception{
        SqlSession sqlSession = MybatisSessionFactory.getSqlSessionFactory().openSession();
        try{
                            //插入
                            LOG.info("MysqlSinkFunction start to do insert data...");
                            xxx.xxx();
							//更新
                            LOG.info("MysqlSinkFunction start to do update data...");
							xxx.xxx();
                            //删除
                            LOG.info("MysqlSinkFunction start to do delete data...");
							xxx.xxx();

                    
                
                sqlSession.commit();
                LOG.info("MysqlSinkFunction commit transaction success...");
        }
        catch (Throwable e){
            sqlSession.rollback();
            LOG.error("MysqlSinkFunction cause Exception,sqlSession transaction rollback...",e);
        }
    }
}  

相信您如果以前在spring中用过mybatis的话,对上面的这些操作一定不会陌生。由此你也可以发现,在大数据中可以完美的集成mybatis,这样可以发挥mybatis框架对数据库操作的优势,使用起来也非常简单方便。
一旦集成了mybaitis后,在flink中就可以方便的对各种各样的关系型数据库进行操作了。

本文作者张永清,转载请注明出处:flink 流式处理中如何集成mybatis框架

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/10902.html

(0)
上一篇 2022-12-19
下一篇 2022-12-19

相关推荐

  • Python实现简单的命令行工具

    Python实现简单的命令行工具a href=”https://beian.miit.gov.cn/”苏ICP备2023018380号-1/a Copyright www.python100.com .Some Rights Reserved.

    2024-02-26
    93
  • Python注释的重要性

    Python注释的重要性Python注释是指在程序代码中,使用特定格式的注释语句对程序代码进行说明、解释和补充说明。在Python程序设计中,注释可适用于单行注释和多行注释两种情况。在单行注释中,使用#作为注释符号,而在多行注释中,使用”’ 及 ”’ 或者是 “”” 及 “””作为文本的边界符号。

    2024-05-27
    48
  • Navicat 创建数据库连接提示1045 access denied for user

    Navicat 创建数据库连接提示1045 access denied for user navicat 中创建数据库连接,提示: 原因:密码不正确。 情况1:新装的mysql服务。解决方案:(1)取消使用密码登录(2)dos进入MySQL后修改密码 情况2:密码过期。解决方案:…

    2023-03-28
    161
  • 使用Python读取JSON数据

    使用Python读取JSON数据a href=”https://beian.miit.gov.cn/”苏ICP备2023018380号-1/a Copyright www.python100.com .Some Rights Reserved.

    2024-05-26
    56
  • springboot~redis-cluster动态感应的配置[亲测有效]

    springboot~redis-cluster动态感应的配置[亲测有效]redis-cluster是一个高可用,可分片的分布式redis集群解决方案,建议使用springboot2.3及以上版本的脚手架,如果是<2.3版本,你需要手动添加LettuceConnect

    2023-06-12
    130
  • NumPy数组添加元素方法详解

    NumPy数组添加元素方法详解NumPy是Python中用于科学计算的核心库之一,它提供了高性能的多维数组对象以及相关工具。在NumPy中,向数组中添加元素是一个经常需要用到的操作。因此,本文将详细介绍使用NumPy数组添加元素的方法,以帮助读者更好地理解和使用NumPy库。

    2024-09-06
    4
  • Mycat 学习笔记「建议收藏」

    Mycat 学习笔记「建议收藏」概述 1. Mycat 是什么? Mycat 是数据库中间件,连接 Java 应用程序和数据库,它的作用如下: 读写分离 数据分片:垂直拆分(分库)、水平拆分(分表)、垂直+水平拆分(分库分表) 多数

    2023-05-10
    129
  • MySQL延迟问题和数据刷盘策略[亲测有效]

    MySQL延迟问题和数据刷盘策略[亲测有效]一、MySQL复制流程 官方文档流程图如下: 1、绝对的延时,相对的同步 2、纯写操作,线上标准配置下,从库压力大于主库,最起码从库有relaylog的写入。 二、MySQL延迟问题分析 1、主库D…

    2023-03-03
    139

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注