Flink DataStream-Apache Cassandra Connector

作者: ApacheFlink

This connector provides sinks that writes data into a Apache Cassandra database. To use this connector, add the following dependency to your project: Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.

Installing Apache Cassandra

There are multiple ways to bring up a Cassandra instance on local machine:

  1. Follow the instructions from Cassandra Getting Started page.
  2. Launch a container running Cassandra from Official Docker Repository

Cassandra Sinks

Configurations

Flink's Cassandra sink are created by using the static CassandraSink.addSink(DataStream input) method. This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally build() the sink instance. The following configuration methods can be used:

  1. setQuery(String query)
    • Sets the upsert query that is executed for every record the sink receives.
    • The query is internally treated as CQL statement.
    • DO set the upsert query for processing Tuple data type.
    • DO NOT set the query for processing POJO data types.
  2. setClusterBuilder(ClusterBuilder clusterBuilder)
    • Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc.
  3. setHost(String host[, int port])
    • Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances
  4. setMapperOptions(MapperOptions options)
    • Sets the mapper options that are used to configure the DataStax ObjectMapper.
    • Only applies when processing POJO data types.
  5. setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout)
    • Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute.
    • Only applies when enableWriteAheadLog() is not configured.
  6. enableWriteAheadLog([CheckpointCommitter committer])
    • An optional setting
    • Allows exactly-once processing for non-deterministic algorithms.
  7. setFailureHandler([CassandraFailureHandler failureHandler])
    • An optional setting
    • Sets the custom failure handler.
  8. setDefaultKeyspace(String keyspace)
    • Sets the default keyspace to be used.
  9. enableIgnoreNullFields()
    • Enables ignoring null values, treats null values as unset and avoids writing null fields and creating tombstones.
  10. build()
    • Finalizes the configuration and constructs the CassandraSink instance.

Write-ahead Log

A checkpoint committer stores additional information about completed checkpoints in some resource. This information is used to prevent a full replay of the last completed checkpoint in case of a failure. You can use a CassandraCommitter to store these in a separate table in cassandra. Note that this table will NOT be cleaned up by Flink. Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple times without changing the result) and checkpointing is enabled. In case of a failure the failed checkpoint will be replayed completely. Furthermore, for non-deterministic programs the write-ahead log has to be enabled. For such a program the replayed checkpoint may be completely different than the previous attempt, which may leave the database in an inconsistent state since part of the first attempt may already be written. The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt. Note that that enabling this feature will have an adverse impact on latency.

Note: The write-ahead log functionality is currently experimental. In many cases it is sufficient to use the connector without enabling it. Please report problems to the development mailing list.

Checkpointing and Fault Tolerance

With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. More details on [checkpoints docs]

Examples

The Cassandra sink currently supports both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use of those streaming data types, please refer to [Supported Data Types]. We show two implementations based on {{< gh_link file="flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java" name="SocketWindowWordCount" >}}, for POJO and Tuple data types respectively. In all these examples, we assumed the associated Keyspace example and Table wordcount have been created.

CREATE KEYSPACE IF NOT EXISTS example
    WITH replication = ;
CREATE TABLE IF NOT EXISTS example.wordcount (
    word text,
    count bigint,
    PRIMARY KEY(word)
);

Cassandra Sink Example for Streaming Tuple Data Type

While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to the database. With the upsert query cached as PreparedStatement, each Tuple element is converted to parameters of the statement. For details about PreparedStatement and BoundStatement, please visit DataStax Java Driver manual

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<Tuple2<String, Long>> result = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
                // normalize and split the line
                String[] words = value.toLowerCase().split("\\s");
                // emit the pairs
                for (String word : words) {
                    //Do not accept empty word, since word is defined as primary key in C* table
                    if (!word.isEmpty()) {
                        out.collect(new Tuple2<String, Long>(word, 1L));
                    }
                }
            }
        })
        .keyBy(value -> value.f0)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .sum(1);
CassandraSink.addSink(result)
        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
        .setHost("127.0.0.1")
        .build();
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
// parse the data, group it, window it, and aggregate the counts
val result: DataStream[(String, Long)] = text
  // split up the lines in pairs (2-tuples) containing: (word,1)
  .flatMap(_.toLowerCase.split("\\s"))
  .filter(_.nonEmpty)
  .map((_, 1L))
  // group by the tuple field "0" and sum up tuple field "1"
  .keyBy(_._1)
  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  .sum(1)
CassandraSink.addSink(result)
  .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
  .setHost("127.0.0.1")
  .build()
result.print().setParallelism(1)

Cassandra Sink Example for Streaming POJO Data Type

An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow DataStax Java Driver Manual to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver com.datastax.driver.mapping.Mapper class. The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class. For details of the mapping, please refer to CQL documentation on Definition of Mapped Classes and CQL Data types

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordCount> result = text
        .flatMap(new FlatMapFunction<String, WordCount>() {
            public void flatMap(String value, Collector<WordCount> out) {
                // normalize and split the line
                String[] words = value.toLowerCase().split("\\s");
                // emit the pairs
                for (String word : words) {
                    if (!word.isEmpty()) {
                        //Do not accept empty word, since word is defined as primary key in C* table
                        out.collect(new WordCount(word, 1L));
                    }
                }
            }
        })
        .keyBy(WordCount::getWord)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .reduce(new ReduceFunction<WordCount>() {
            @Override
            public WordCount reduce(WordCount a, WordCount b) {
                return new WordCount(a.getWord(), a.getCount() + b.getCount());
            }
        });
CassandraSink.addSink(result)
        .setHost("127.0.0.1")
        .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
        .build();
@Table(keyspace = "example", name = "wordcount")
public class WordCount {
    @Column(name = "word")
    private String word = "";
    @Column(name = "count")
    private long count = 0;
    public WordCount() 
    public WordCount(String word, long count) {
        this.setWord(word);
        this.setCount(count);
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
    public long getCount() {
        return count;
    }
    public void setCount(long count) {
        this.count = count;
    }
    @Override
    public String toString() {
        return getWord() + " : " + getCount();
    }
}

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多