Apache Beam入门及Java SDK开发初体验

作者: 南瓜慢说

Apache Beam入门及Java SDK开发初体验{cb_post_title_url}

1 什么是Apache Beam

Apache Beam是一个开源的统一的大数据编程模型,它本身并不提供执行引擎,而是支持各种平台如GCP Dataflow、Spark、Flink等。通过Apache Beam来定义批处理或流处理,就可以放在各种执行引擎上运行了。 目前支持的SDK语言也很丰富,有Java、Python、Go等。

1.1 一些基础概念

  • PCollection:可理解为数据包,数据处理就是在对各种PCollection进行转换和处理。

  • PTransform:代表数据处理,用来定义数据是怎么被处理的,用来处理PCollection。

  • Pipeline:流水线,是由PTransform和PCollection组成的集合,可以理解为它定义了数据处理从源到目标的整个过程。

  • Runner:数据处理引擎。 一个最简单的Pipeline例子如下: 从数据库读数据为PCollection,经过转化成为另一个PCollection,然后写回到数据库中去。 可以有多个PTransform处理同一个PCollection: 一个PTransform也可以生成多个PCollection:

    2 Java开发初体验

    们通过使用Java SDK来开发一个WordCount感受一下。 先引入必要的依赖,版本为2.32.0:

    <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>$</version>
    </dependency>
    <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>$</version>
    </dependency>
    

    写Java主程序如下:

    public class WordCountDirect {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);
        PCollection<String> lines = pipeline.apply("read from file",
                TextIO.read().from("pkslow.txt"));
        PCollection<List<String>> wordList = lines.apply(MapElements.via(new SimpleFunction<String, List<String>>() {
            @Override
            public List<String> apply(String input) {
                List<String> result = new ArrayList<>();
               char[] chars = input.toCharArray();
                for (char c:chars) {
                    result.add(String.valueOf(c));
                }
              return result;
            }
        }));
        PCollection<String> words = wordList.apply(Flatten.iterables());
        PCollection<KV<String, Long>> wordCount = words.apply(Count.perElement());
        wordCount.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
            @Override
            public String apply(KV<String, Long> count) {
                return String.format("%s : %s", count.getKey(), count.getValue());
            }
        })).apply(TextIO.write().to("word-count-result"));
        pipeline.run().waitUntilFinish();
    }
    }
    

    直接运行,默认是通过DirectRunner来执行的,即在本地即可执行,不用搭建。非常方便开发和测试Pipeline。 整个程序大概流程是: 从pkslow.txt文件里读取所有行,然后将每一行拆分为多个字符,计算每个字符出现的次数,输出到文件中word-count-result。 pkslow.txt文件内容如下: 执行后的结果文件如下所示:

    3 总结

    简单体验了一下,基于Beam的模型开发还是很简单,很好理解的。但它在各种平台上的执行效率如何,就还需要深挖了。 代码请查看:https://github.com/LarryDpk/pkslow-samples
    posted on 2021-10-17 11:54 南瓜慢说 阅读(15) 评论(0) 编辑 收藏 举报 {post_comment_count}

    原文创作:南瓜慢说

    原文链接:https://www.cnblogs.com/larrydpk/p/15416590.html

更多推荐

更多
  • Pulsar消息队列-一套高可用实时消息系统实现 实时消息【即时通信】系统,有群聊和单聊两种方式,其形态异于消息队列:1 大量的 group 信息变动,群聊形式的即时通信系统在正常服务形态下,瞬时可能有大量用户登入登出。2 ...
  • Pulsar消息队列-Pulsar对比Kafka笔记 很多人查看 Pulsar 之前可能对 Kafka 很熟悉,参照上图可见二者内部结构的区别,Pulsar 和 Kafka 都是以 Topic 描述一个基本的数据集合,Topic 数据又分为若干 Partition,即对数据进行逻辑上的 ...
  • Pulsar消息队列-对 2017 年一套 IM 系统的反思 信系统的开发,前前后后参与或者主导了六七个 IM 系统的研发。上一次开发的 IM 系统的时间点还是 2018 年,关于该系统的详细描述见 [一套高可用实时消息系统实现][1] ...
  • Apache APISIX文档-快速入门指南-如何构建 Apache APISIX 如何构建 Apache APISIX,步骤1:安装 Apache APISIX,步骤2:安装 etcd,步骤3:管理 Apache APISIX 服务,步骤4:运行测试案例,步骤5:修改 Admin API key,步骤6:为 Apac
  • Apache APISIX文档-快速入门指南-快速入门指南 快速入门指南,概述,前提条件,第一步:安装 Apache APISIX,第二步:创建路由,第三步:验证,进阶操作,工作原理,创建上游服务Upstream,绑定路由与上游服务,添加身份验证,为路由添加前缀,APISIX Dashboard
  • Apache APISIX文档-架构设计-APISIX APISIX,软件架构,插件加载流程,插件内部结构,配置 APISIX,插件加载流程,比如指定 APISIX 默认监听端口为 8000,并且设置 etcd 地址为 http://foo:2379, 其他配置保持默认。在 ...
  • Apache APISIX文档-架构设计-Service Service 是某类 API 的抽象(也可以理解为一组 Route 的抽象)。它通常与上游服务抽象是一一对应的,Route 与 Service 之间,通常是 N:1 的关系,参看下图。不同 Route 规则同时绑定到一个 Service ...
  • Apache APISIX文档-架构设计-Plugin Config 如果你想要复用一组通用的插件配置,你可以把它们提取成一个 Plugin config,并绑定到对应的路由上。举个例子,你可以这么做:创建 Plugin config,如果这个路由已经配置了 plugins,那么 Plugin config ...
  • Apache APISIX文档-架构设计-Debug Mode 注意:在 APISIX 2.10 之前,开启基本调试模式曾经是设置 conf/config.yaml 中的 apisix.enable_debug 为 true。设置 conf/debug.yaml 中的选项,开启高级调试模式。由于 ...
  • Apache APISIX文档-架构设计-Consumer 如上图所示,作为 API 网关,需要知道 API Consumer(消费方)具体是谁,这样就可以对不同 API Consumer 配置不同规则。授权认证:比如有 [key-auth] 等。获取 consumer_...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多