用户登录
用户注册

分享至

cassandra联姻spark

  • 作者: 青山青水
  • 来源: 51数据库
  • 2020-09-29
Hadoop(大数据分析领域无可争辩的王者)专注于批处理。这种模型对许多情形(比如为网页建立索引)已经足够,但还存在其他一些使用模型,它们需要来自高度动态的来源的实时信息。为了解决这个问题,就得借助 Nathan Marz 推出的 Storm(现在在 Twitter 中称为 BackType)。Storm 不处理静态数据,但它处理预计会连续的流数据。考虑到 Twitter 用户每天生成 1.4 亿条推文 (tweet),那么就很容易看到此技术的巨大用途。
但 Storm 不只是一个传统的大数据分析系统:它是复杂事件处理 (CEP) 系统的一个示例。CEP 系统通常分类为计算和面向检测,其中每个系统都可通过用户定义的算法在 Storm 中实现。举例而言,CEP 可用于识别事件洪流中有意义的事件,然后实时地处理这些事件。
Nathan Marz 提供了在 Twitter 中使用 Storm 的大量示例。一个最有趣的示例是生成趋势信息。Twitter 从海量的推文中提取所浮现的趋势,并在本地和国家级别维护它们。这意味着当一个案例开始浮现时,Twitter 的趋势主题算法就会实时识别该主题。这种实时算法在 Storm 中实现为 Twitter 数据的一种连续分析。
什么是 “大数据”?
大数据 指的是海量无法通过传统方式管理的数据。互联网范围的数据正在推动能够处理这类新数据的新架构和应用程序的创建。这些架构高度可扩展,且能够跨无限多的服务器并行、高效地处理数据。
Storm 与传统的大数据
Storm 与其他大数据解决方案的不同之处在于它的处理方式。Hadoop 在本质上是一个批处理系统。数据被引入 Hadoop 文件系统 (HDFS) 并分发到各个节点进行处理。当处理完成时,结果数据返回到 HDFS 供始发者使用。Storm 支持创建拓扑结构来转换没有终点的数据流。不同于 Hadoop 作业,这些转换从不停止,它们会持续处理到达的数据。
回页首
大数据实现
Hadoop 的核心是使用 Java? 语言编写的,但支持使用各种语言编写的数据分析应用程序。最新的应用程序的实现采用了更加深奥的路线,以充分利用现代语言和它们的特性。例如,位于伯克利的加利福尼亚大学 (UC) 的 Spark 是使用 Scala 语言实现的,而 Twitter Storm 是使用 Clojure(发音同 closure)语言实现的。
Clojure 是 Lisp 语言的一种现代方言。类似于 Lisp,Clojure 支持一种功能性编程风格,但 Clojure 还引入了一些特性来简化多线程编程(一种对创建 Storm 很有用的特性)。Clojure 是一种基于虚拟机 (VM) 的语言,在 Java 虚拟机上运行。但是,尽管 Storm 是使用 Clojure 语言开发的,您仍然可以在 Storm 中使用几乎任何语言编写应用程序。所需的只是一个连接到 Storm 的架构的适配器。已存在针对 Scala、JRuby、Perl 和 PHP 的适配器,但是还有支持流式传输到 Storm 拓扑结构中的结构化查询语言适配器。
回页首
Storm 的关键属性
Storm 实现的一些特征决定了它的性能和可靠性的。Storm 使用 ZeroMQ 传送消息,这就消除了中间的排队过程,使得消息能够直接在任务自身之间流动。在消息的背后,是一种用于序列化和反序列化 Storm 的原语类型的自动化且高效的机制。
Storm 的一个最有趣的地方是它注重容错和管理。Storm 实现了有保障的消息处理,所以每个元组都会通过该拓扑结构进行全面处理;如果发现一个元组还未处理,它会自动从喷嘴处重放。Storm 还实现了任务级的故障检测,在一个任务发生故障时,消息会自动重新分配以快速重新开始处理。Storm 包含比 Hadoop 更智能的处理管理,流程会由监管员来进行管理,以确保资源得到充分使用。
回页首
Storm 模型
Storm 实现了一种数据流模型,其中数据持续地流经一个转换实体网络(参见 图 1)。一个数据流的抽象称为一个流,这是一个无限的元组序列。元组就像一种使用一些附加的序列化代码来表示标准数据类型(比如整数、浮点和字节数组)或用户定义类型的结构。每个流由一个惟一 ID 定义,这个 ID 可用于构建数据源和接收器 (sink) 的拓扑结构。流起源于喷嘴,喷嘴将数据从外部来源流入 Storm 拓扑结构中。
图 1. 一个普通的 Storm 拓扑结构的概念性架构
接收器(或提供转换的实体)称为螺栓。螺栓实现了一个流上的单一转换和一个 Storm 拓扑结构中的所有处理。螺栓既可实现 MapReduce 之类的传统功能,也可实现更复杂的操作(单步功能),比如过滤、聚合或与数据库等外部实体通信。典型的 Storm 拓扑结构会实现多个转换,因此需要多个具有独立元组流的螺栓。喷嘴和螺栓都实现为 Linux? 系统中的一个或多个任务。
可使用 Storm 为词频轻松地实现 MapReduce 功能。如 图 2 中所示,喷嘴生成文本数据流,螺栓实现 Map 功能(令牌化一个流的各个单词)。来自 “map” 螺栓的流然后流入一个实现 Reduce 功能的螺栓中(以将单词聚合到总数中)。
图 2. MapReduce 功能的简单 Storm 拓扑结构
请注意,螺栓可将数据传输到多个螺栓,也可接受来自多个来源的数据。Storm 拥有流分组 的概念,流分组实现了混排 (shuffling)(随机但均等地将元组分发到螺栓)或字段分组(根据流的字段进行流分区)。还存在其他流分组,包括生成者使用自己的内部逻辑路由元组的能力。
但是,Storm 架构中一个最有趣的特性是有保障的消息处理。Storm 可保证一个喷嘴发射出的每个元组都会处理;如果它在超时时间内没有处理,Storm 会从该喷嘴重放该元组。此功能需要一些聪明的技巧来在拓扑结构中跟踪元素,也是 Storm 的重要的附加价值之一。
除了支持可靠的消息传送外,Storm 还使用 ZeroMQ 最大化消息传送性能(删除中间排队,实现消息在任务间的直接传送)。ZeroMQ 合并了拥塞检测并调整了它的通信,以优化可用的带宽。



  动手实验apache spark的最好方式是使用交互式shell命令行,spark目前有python shell和scala shell两种交互式命令行。 可以从 这里下载apache spark,下载时选择最近预编译好的版本以便能够立即运行shell。 目前最新的apache spark版本是1.5.0,发布时间是2015年9月9日。 tar -xvzf ~/spark-1.5.0-bin-hadoop2.4.tgz 运行python shell cd spark-1.5.0-bin-hadoop2.4 ./bin/pyspark 在本节中不会使用python shell进行演示。 scala交互式命令行由于运行在jvm上,能够使用java库。 运行scala shell cd spark-1.5.0-bin-hadoop2.4 ./bin/spark-shell 执行完上述命令行,你可以看到下列输出: scala shell欢迎信息 welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ &apos;_/ /___/ .__/\_,_/_/ /_/\_\ version 1.5.0 /_/ using scala version 2.10.4 (java hotspot(tm) 64-bit server vm, java 1.8.0_25) type in expressions to have them evaluated. type :help for more information. 15/08/24 21:58:29 info sparkcontext: running spark version 1.5.0 下面是一些简单的练习以便帮助使用shell。也许你现在不能理解我们做的是什么,但在后面我们会对此进行详细分析。在scala shell中,执行下列操作: 在spark中使用readme 文件创建textfilerdd val textfile = sc.textfile("readme.md") 获取textfile rdd的第一个元素 textfile.first() res3: string = # apache spark 对textfile rdd中的数据进行过滤操作,返回所有包含“spark”关键字的行,操作完成后会返回一个新的rdd,操作完成后可以对返回的rdd的行进行计数 筛选出包括spark关键字的rdd然后进行行计数 val lineswithspark = textfile.filter(line => line.contains("spark")) lineswithspark.count() res10: long = 19 要找出rdd lineswithspark单词出现最多的行,可以使用下列操作。使用map方法,将rdd中的各行映射成一个数,然后再使用reduce方法找出包含单词数最多的行。 找出rdd textfile 中包含单词数最多的行 textfile.map(line => line.split(" ").size) .reduce((a, b) => if (a > b) a else b) res11: int = 14 返回结果表明第14行单词数最多。 也可以引入其它java包,例如 math.max()方法,因为map和reduce方法接受scala函数字面量作为参数。 在scala shell中引入java方法 import java.lang.math textfile.map(line => line.split(" ").size) .reduce((a, b) => math.max(a, b)) res12: int = 14 我们可以很容易地将数据缓存到内存当中。 将rdd lineswithspark 缓存,然后进行行计数 lineswithspark.cache() res13: lineswithspark.type = mappartitionsrdd[8] at filter at <console>:23 lineswithspark.count() res15: long = 19 上面简要地给大家演示的了如何使用spark交互式命令行。 弹性分布式数据集(rdds) spark在集群中可以并行地执行任务,并行度由spark中的主要组件之一——rdd决定。弹性分布式数据集(resilient distributed data, rdd)是一种数据表示方式,rdd中的数据被分区存储在集群中(碎片化的数据存储方式),正是由于数据的分区存储使得任务可以并行执行。分区数量越多,并行越高。下图给出了rdd的表示: display- edit 想像每列均为一个分区(partition ),你可以非常方便地将分区数据分配给集群中的各个节点。 为创建rdd,可以从外部存储中读取数据,例如从cassandra、amazon简单存储服务(amazon simple storage service)、hdfs或其它hadoop支持的输入数据格式中读取。也可以通过读取文件、数组或json格式的数据来创建rdd。另一方面,如果对于应用来说,数据是本地化的,此时你仅需要使用parallelize方法便可以将spark的特性作用于相应数据,并通过apache spark集群对数据进行并行化分析。为验证这一点,我们使用scala spark shell进行演示:
软件
前端设计
程序设计
Java相关