如何利用Flink实现从MySQL批量读取数据并写入Elasticsearch?

Flink 批量读取 MySQL 写入 ES(Elasticsearch)

flink批量读取mysql写入es

在大数据处理和实时分析领域,Apache Flink 是一个非常强大的工具,它可以从多种数据源中提取数据,进行实时处理,并将结果输出到不同的存储系统中,本文将介绍如何使用 Flink 从 MySQL 数据库中批量读取数据,并将其写入到 Elasticsearch(ES)。

1. 环境准备

确保你已经安装了以下软件:

Java Development Kit (JDK)

Apache Flink

MySQL

Elasticsearch

flink批量读取mysql写入es

还需要相应的连接器库:

flink-connector-jdbc

flink-connector-elasticsearch7

2. Maven 依赖配置

在你的 Flink 项目的pom.xml 文件中添加必要的依赖:

<dependencies>
    <!-Flink 核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!-Flink JDBC 连接器 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!-Flink Elasticsearch 连接器 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

3. Flink 程序实现

3.1 创建 Flink 流执行环境

flink批量读取mysql写入es

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
public class FlinkMySQLToES {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);
        // 调用方法进行数据处理
        process(env);
    }
}

3.2 定义数据源与数据接收器

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.jdbc.JdbcInputFormat;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.types.Row;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class FlinkMySQLToES {
    private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/your_database";
    private static final String MYSQL_USERNAME = "your_username";
    private static final String MYSQL_PASSWORD = "your_password";
    private static final String ELASTICSEARCH_HOST = "localhost";
    private static final int ELASTICSEARCH_PORT = 9200;
    private static final String ELASTICSEARCH_INDEX = "your_index";
    private static final String ELASTICSEARCH_TYPE = "_doc";
    private static final String ELASTICSEARCH_ID_FIELD = "id";
    private static RestHighLevelClient createEsClient() {
        Settings settings = Settings.builder()
                .put("cluster.name", "my-cluster")
                .build();
        return new PreBuiltTransportClient(settings, new TransportAddress(TransportAddress.parse("localhost:9300")));
    }
    private static DataStream<Row> readFromMySQL(StreamExecutionEnvironment env) {
        Properties properties = new Properties();
        properties.setProperty("driver", "com.mysql.jdbc.Driver");
        properties.setProperty("url", MYSQL_URL);
        properties.setProperty("username", MYSQL_USERNAME);
        properties.setProperty("password", MYSQL_PASSWORD);
        properties.setProperty("fetchSize", "5000"); // 每次获取的记录数
        JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl(MYSQL_URL)
                .setUsername(MYSQL_USERNAME)
                .setPassword(MYSQL_PASSWORD)
                .setQuery("SELECT * FROM your_table")
                .setRowTypeInfo(new RowSqlType(new String[]{"id", "name", "value"}, new TypeInformation[]{Types.INT, Types.STRING, Types.STRING}))
                .finish();
        return env.createInput(jdbcInputFormat);
    }
    private static void writeToElasticSearch(DataStream<Row> dataStream, StreamExecutionEnvironment env) {
        RestHighLevelClient esClient = createEsClient();
        List<HttpHost> hosts = new ArrayList<>();
        hosts.add(new HttpHost(ELASTICSEARCH_HOST, ELASTICSEARCH_PORT, "http"));
        ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, new ElasticsearchSinkFunction<Row>() {
            public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
                Map<String, Object> jsonMap = new HashMap<>();
                jsonMap.put("id", element.getField(0)); // id field
                jsonMap.put("name", element.getField(1)); // name field
                jsonMap.put("value", element.getField(2)); // value field
                indexer.add(Requests.indexRequest()
                        .index(ELASTICSEARCH_INDEX)
                        .type(ELASTICSEARCH_TYPE)
                        .source(jsonMap, XContentType.JSON));
            }
        });
        esSinkBuilder.setBulkFlushMaxActions(1); // 每条记录立即刷新到ES
        dataStream.addSink(esSinkBuilder.build());
    }
}

3.3 主程序入口

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.elasticsearch.client.RestHighLevelClient;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.elasticsearch.client.Requests;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.*;
import org.apache.http.HttpHost;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.*;
import org.elasticsearch.*;
import org.elasticsearch.*;
import org.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java();]]]--}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}$$

小伙伴们,上文介绍了“flink批量读取mysql写入es”的内容,你了解清楚吗?希望对你有所帮助,任何问题可以给我留言,让我们下期再见吧。

原创文章,作者:K-seo,如若转载,请注明出处:https://www.kdun.cn/ask/730350.html

Like (0)
Donate 微信扫一扫 微信扫一扫
K-seoK-seo
Previous 2024-12-13 09:13
Next 2024-12-13 09:18

相关推荐

  • Oracle Linux 6.8安装 mysql 5.7.17的详细教程

    在Oracle Linux 6.8上安装MySQL 5.7.17的详细教程如下:准备工作1、确保系统已经安装了Oracle Linux 6.8,并且网络连接正常。2、下载MySQL 5.7.17的安装包,访问MySQL官方网站(https://dev.mysql.com/downloads/mysql/5.7.html)下载对应的安装……

    2024-03-02
    0241
  • 如何在Linux系统中登录MySQL数据库?

    要在Linux上登录MySQL,首先确保已安装MySQL服务器。通过命令行输入mysql u 用户名 p,接着输入密码即可登录。如果未指定用户名,默认为root用户。

    2024-08-06
    074
  • Spring Boot框架怎么配置MySQL

    Spring Boot框架怎么配置MySQL在Spring Boot项目中,我们可以使用MySQL作为数据库,本文将介绍如何在Spring Boot项目中配置MySQL,包括添加依赖、配置数据源、创建实体类、创建Repository接口以及使用JdbcTemplate进行数据库操作。添加依赖1、在项目的pom.xml文件中,添加以下依……

    2024-01-02
    0160
  • mysql怎么赋予用户所有权限

    使用GRANT ALL PRIVILEGES ON *.* TO '用户名'@'localhost';命令,将用户的所有权限赋予指定用户。

    2024-05-21
    092
  • 如何在MySQL中创建数据库并实现函数访问?

    要建立MySQL数据库,首先需要登录到MySQL服务器,然后使用CREATE DATABASE语句创建数据库。,,``sql,CREATE DATABASE 数据库名;,``,,要访问MySQL数据库,可以使用以下方法:,,1. 使用命令行工具,如mysql命令行客户端。,2. 使用图形化管理工具,如phpMyAdmin、MySQL Workbench等。,3. 通过编程语言的MySQL库进行连接和操作,如Python的mysqlconnectorpython库、PHP的mysqli扩展等。

    2024-08-10
    045
  • Net对接MySQL极致的Web开发体验

    在现代Web开发中,后端数据库的选择对于应用的性能和可维护性至关重要,MySQL作为一个广泛使用的开源关系型数据库管理系统,因其高性能、高可靠性和易用性而受到开发者的青睐,为了实现与MySQL的高效对接,.NET 平台提供了多种方式来实现极致的Web开发体验。使用Entity Framework Core (EF Core)Entit……

    网站运维 2024-04-04
    0171

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

免备案 高防CDN 无视CC/DDOS攻击 限时秒杀,10元即可体验  (专业解决各类攻击)>>点击进入