Storm 实现滑动窗口计数和TopN排序 【转】
< 返回列表时间: 2016-04-02来源:OSCHINA
计算top N words的topology, 用于比如trending topics or trending images on Twitter.

实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码


Topology

这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping

String spoutId = ""; String counterId = ""; String intermediateRankerId = ""; String totalRankerId = ""; builder.setSpout(spoutId, TestWordSpout(), 5); builder.setBolt(counterId, RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, Fields("")); builder.setBolt(intermediateRankerId, IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, Fields("")); builder.setBolt(totalRankerId, TotalRankingsBolt TOP_N)).globalGrouping(intermediateRankerId);

RollingCountBolt
首先使用RollingCountBolt, 并且此处是按照word进行fieldsGrouping的, 所以相同的word会被发送到同一个bolt, 这个field id是在上一级的declareOutputFields时指定的
RollingCountBolt, 用于基于时间窗口的counting, 所以需要两个参数, the length of the sliding window in seconds和the emit frequency in seconds
new RollingCountBolt(9, 3), 意味着output the latest 9 minutes sliding window every 3 minutes
1. 创建SlidingWindowCounter(SlidingWindowCounter和SlotBasedCounter参考下面)
counter = new SlidingWindowCounter(this.windowLengthInSeconds / this.windowUpdateFrequencyInSeconds);
如何定义slot数? 对于9 min的时间窗口, 每3 min emit一次数据, 那么就需要9/3=3个slot
那么在3 min以内, 不停的调用countObjAndAck(tuple)来递增所有对象该slot上的计数
每3分钟会触发调用emitCurrentWindowCounts, 用于滑动窗口(通过getCountsThenAdvanceWindow), 并emit (Map<obj, 窗口内的计数和>, 实际使用时间)
因为实际emit触发时间, 不可能刚好是3 min, 会有误差, 所以需要给出实际使用时间

2. TupleHelpers.isTickTuple(tuple), TickTuple
前面没有说的一点是, 如何触发emit? 这是比较值得说明的一点, 因为其使用Storm的TickTuple特性.
这个功能挺有用, 比如数据库批量存储, 或者这里的时间窗口的统计等应用
"__system" component会定时往task发送 "__tick" stream的tuple
发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置, 可以在default.ymal里面配置
也可以在代码里面通过getComponentConfiguration()来进行配置,
Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); conf;
配置完成后, storm就会定期的往task发送ticktuple
只需要通过isTickTuple来判断是否为tickTuple, 就可以完成定时触发的功能
isTickTuple(Tuple tuple) { tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) \\ SYSTEM_COMPONENT_ID == "" && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); \\ SYSTEM_TICK_STREAM_ID == "" }


最终, 这个blot的输出为, collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
obj, count(窗口内的计数和), 实际使用时间

SlotBasedCounter
基于slot的counter, 模板类, 可以指定被计数对象的类型T
这个类其实很简单, 实现计数对象和一组slot(用long数组实现)的map, 并可以对任意slot做increment或reset等操作
关键结构为Map<T, long []> objToCounts, 为每个obj都对应于一个大小为numSlots的long数组, 所以对每个obj可以计numSlots个数
incrementCount, 递增某个obj的某个slot, 如果是第一次需要创建counts数组
getCount, getCounts, 获取某obj的某slot值, 或某obj的所有slot值的和
wipeSlot, resetSlotCountToZero, reset所有对象的某solt为0, reset某obj的某slot为0
wipeZeros, 删除所有total count为0的obj, 以释放空间
SlotBasedCounter<T> Serializable { serialVersionUID = 4858185737378394432L; Map<T, []> objToCounts = HashMap<T, []>(); numSlots; SlotBasedCounter( numSlots) { (numSlots <= 0) { IllegalArgumentException("" + numSlots + ""); } .numSlots = numSlots; } incrementCount(T obj, slot) { [] counts = objToCounts.get(obj); (counts == ) { counts = [.numSlots]; objToCounts.put(obj, counts); } counts[slot]++; } getCount(T obj, slot) { [] counts = objToCounts.get(obj); (counts == ) { 0; } { counts[slot]; } } Map<T, Long> getCounts() { Map<T, Long> result = HashMap<T, Long>(); (T obj : objToCounts.keySet()) { result.put(obj, computeTotalCount(obj)); } result; } computeTotalCount(T obj) { [] curr = objToCounts.get(obj); total = 0; ( l : curr) { total += l; } total; } wipeSlot( slot) { (T obj : objToCounts.keySet()) { resetSlotCountToZero(obj, slot); } } resetSlotCountToZero(T obj, slot) { [] counts = objToCounts.get(obj); counts[slot] = 0; } shouldBeRemovedFromCounter(T obj) { computeTotalCount(obj) == 0; } wipeZeros() { Set<T> objToBeRemoved = HashSet<T>(); (T obj : objToCounts.keySet()) { (shouldBeRemovedFromCounter(obj)) { objToBeRemoved.add(obj); } } (T obj : objToBeRemoved) { objToCounts.remove(obj); } } }


SlidingWindowCounter
SlidingWindowCounter只是对SlotBasedCounter做了进一步的封装, 通过headSlot和tailSlot提供sliding window的概念
incrementCount, 只能对headSlot进行increment, 其他slot作为窗口中的历史数据
核心的操作为, getCountsThenAdvanceWindow
1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map
2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口
advanceHead的实现, 如何在数组实现循环的滑动窗口
SlidingWindowCounter<T> Serializable { serialVersionUID = -2645063988768785810L; SlotBasedCounter<T> objCounter; headSlot; tailSlot; windowLengthInSlots; SlidingWindowCounter( windowLengthInSlots) { (windowLengthInSlots < 2) { IllegalArgumentException("" + windowLengthInSlots + ""); } .windowLengthInSlots = windowLengthInSlots; .objCounter = SlotBasedCounter<T>(.windowLengthInSlots); .headSlot = 0; .tailSlot = slotAfter(headSlot); } incrementCount(T obj) { objCounter.incrementCount(obj, headSlot); } Map<T, Long> getCountsThenAdvanceWindow() { Map<T, Long> counts = objCounter.getCounts(); objCounter.wipeZeros(); objCounter.wipeSlot(tailSlot); advanceHead(); counts; } advanceHead() { headSlot = tailSlot; tailSlot = slotAfter(tailSlot); } slotAfter( slot) { (slot + 1) % windowLengthInSlots; } }


IntermediateRankingsBolt
这个bolt作用就是对于中间结果的排序, 为什么要增加这步, 应为数据量比较大, 如果直接全放到一个节点上排序, 会负载太重
所以先通过IntermediateRankingsBolt, 过滤掉一些
这里仍然使用, 对于obj进行fieldsGrouping, 保证对于同一个obj, 不同时间段emit的统计数据会被发送到同一个task
IntermediateRankingsBolt继承自AbstractRankerBolt(参考下面)
并实现了updateRankingsWithTuple,
updateRankingsWithTuple(Tuple tuple) { Rankable rankable = RankableObjectWithFields.from(tuple); .getRankings().updateWith(rankable); }
逻辑很简单, 将Tuple转化Rankable, 并更新Rankings列表
参考AbstractRankerBolt, 该bolt会定时将Ranking列表emit出去


Rankable
Rankable除了继承Comparable接口, 还增加getObject()和getCount()接口
Rankable Comparable<Rankable> { Object getObject(); getCount(); }
RankableObjectWithFields
RankableObjectWithFields实现Rankable接口
1. 提供将Tuple转化为RankableObject
Tuple由若干field组成, 第一个field作为obj, 第二个field作为count, 其余的都放到List<Object> otherFields中
2. 实现Rankable定义的getObject()和getCount()接口
3. 实现Comparable接口, 包含compareTo, equals
public class RankableObjectWithFields implements Rankable
RankableObjectWithFields from(Tuple tuple) { List<Object> otherFields = Lists.newArrayList(tuple.getValues()); Object obj = otherFields.remove(0); Long count = (Long) otherFields.remove(0); RankableObjectWithFields(obj, count, otherFields.toArray()); }
Rankings
Rankings维护需要排序的List, 并提供对List相应的操作
核心的数据结构如下, 用来存储rankable对象的list
List<Rankable> rankedItems = Lists.newArrayList();
提供一些简单的操作, 比如设置maxsize(list size), getRankings(返回rankedItems, 排序列表)
核心的操作是,
updateWith(Rankable r) { addOrReplace(r); rerank(); shrinkRankingsIfNeeded(); }
上一级的blot会定期的发送某个时间窗口的(obj, count), 所以obj之间的排序是在不断变化的
1. 替换已有的, 或新增rankable对象(包含obj, count)
2. 从新排序(Collections.sort)
3. 由于只需要topN, 所以大于maxsize的需要删除
AbstractRankerBolt
首先以TopN为参数, 创建Rankings对象
Rankings rankings; AbstractRankerBolt( topN, emitFrequencyInSeconds) { count = topN; .emitFrequencyInSeconds = emitFrequencyInSeconds; rankings = Rankings(count); }
在execute中, 也是定时触发emit, 同样是通过emitFrequencyInSeconds来配置tickTuple
一般情况, 只是使用updateRankingsWithTuple不断更新Rankings
这里updateRankingsWithTuple是abstract函数, 需要子类重写具体的update逻辑
execute(Tuple tuple, BasicOutputCollector collector) { (TupleHelpers.isTickTuple(tuple)) { emitRankings(collector); } { updateRankingsWithTuple(tuple); } }
最终将整个rankings列表emit出去

emitRankings(BasicOutputCollector collector) { collector.emit( Values(rankings)); getLogger().info("" + rankings); }


TotalRankingsBolt
该bolt会使用globalGrouping, 意味着所有的数据都会被发送到同一个task进行最终的排序.
TotalRankingsBolt同样继承自AbstractRankerBolt
updateRankingsWithTuple(Tuple tuple) { Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0); .getRankings().updateWith(rankingsToBeMerged); }
唯一的不同是, 这里updateWith的参数是个rankable列表, 在Rankings里面的实现一样, 只是多了遍历

最终可以得到, 全局的TopN的Rankings列表






作者




晨色星空J2EE

已学习课程数: 12

已发表笔记数: 51





Ta的笔记


01 awk 实战使用
统计访问日志里面的IP数据,通过IP分类统计出每一个IP最近访问次数,并进行访问次数排序,日志文件格式如下,看似比较混乱的结构:{"referer":"http://www.52ipr.com/index.html","headers":{"X-Proxy":"nginx","Accept":"*/*"},"scheme":"http


02 Linux 学习笔记 Sed命令详解
简介sed 是一种在线编辑器,它一次处理一行内容。处理时,把当前处理的行存储在临时缓冲区中,称为“模式空间”(pattern space),接着用sed命令处理缓冲区中的内容,处理完成后,把缓冲区的内容送往屏幕。接着处理下一行,这样不断重复,直到文件末尾。文件内容并没有 改变,除非你使用重定向存储输出。Sed主要用来自动编辑一个或多个文件;简化对文件的反复操作;编写转换程序等。 sed使用参数 # sed 选项与参数: -n :使用安静(silent)模式。在一般 sed 的用法中,所有


03 Linux 学习笔记 Grep篇
grep 是一个很常见也很常用的命令,他最重要的功能就是进行字串数据的比对,然后将符合使用者需求的字串列印出来。需要说明的是『grep 在数据中查寻一个字串时,是以 "整行" 为单位来进行数据的撷取的!』也就是说,假如一个文件内有 10 行,其中有两行具有你所搜寻的字串,则将那两行显示在萤幕上,其他的就丢弃了!在关键字的显示方面,grep 可以使用 --color=auto 来将关键字部分使用颜色显示。这可是个很不错的功能啊!但是如果每次使用 grep 都得要自行加上 --color=


04 Spark中文手册 编程指南-3
共享变量一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上 的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)广播变量广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷


05 Spark中文手册 编程指南-2 弹性分布式数据集 (RDDs)
弹性分布式数据集 (RDDs)Spark 核心的概念是 Resilient Distributed Dataset (RDD):一个可并行操作的有容错机制的数据集合。有 2 种方式创建 RDDs:第一种是在你的驱动程序中并行化一个已经存在的集合;另外一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其他 Hadoop 数据格式的数据源。并行集合外部数据集RDD 操作传递函数到 Spark使用键值对TransformationsActionsRDD持久化并行集合并行集合


06 Spark中文手册 编程指南-1
概论在高层中,每个 Spark 应用程序都由一个驱动程序(driver programe)构成,驱动程序在集群上运行用户的mian 函数来执行各种各样的并行操作(parallel operations)。Spark 的主要抽象是提供一个弹性分布式数据集(RDD),RDD 是指能横跨集群所有节点进行并行计算的分区元素集合。RDDs 从 Hadoop 的文件系统中的一个文件中创建而来(或其他 Hadoop 支持的文件系统),或者从一个已有的 Scala 集合转换得到。用户可以要求 Spark 将 R


07 Spark中文手册-快速上手
快速上手本节课程提供一个使用 Spark 的快速介绍,首先我们使用 Spark 的交互式 shell(用 Python 或 Scala) 介绍它的 API。当演示如何在 Java, Scala 和 Python 写独立的程序时,看编程指南里完整的参考。依照这个指南,首先从 Spark 网站下载一个 Spark 发行包。因为我们不会使用 HDFS,你可以下载任何 Hadoop 版本的包。Spark Shell独立应用程序开始翻滚吧!使用 Spark Shell基础Spark 的 shell 作为一


08 Spark 中文手册 总览
Spark 编程指南简体中文版Introduction快速上手Spark Shell独立应用程序开始翻滚吧!编程指南引入 Spark初始化 SparkSpark RDDs并行集合外部数据集RDD 操作传递函数到 Spark使用键值对TransformationsActionsRDD持久化共享变量从这里开始Spark Streaming一个快速的例子基本概念关联初始化StreamingContext离散流输入DStreamsDStream中的转换DStream的输出操作缓存或持久化Checkpointing


09 linux awk命令详解
简介awk是一个强大的文本分析工具,相对于grep的查找,sed的编辑,awk在其对数据分析并生成报告时,显得尤为强大。简单来说awk就是把文件逐行的读入,以空格为默认分隔符将每行切片,切开的部分再进行各种分析处理。awk有3个不同版本: awk、nawk和gawk,未作特别说明,一般指gawk,gawk 是 AWK 的 GNU 版本。awk其名称得自于它的创始人 Alfred Aho 、Peter Weinberger 和 Brian Kernighan 姓氏的首个字母。实际上 AWK 的确拥有自己的语


10 linux sort,uniq,cut,wc命令详解
sortsort 命令对 File 参数指定的文件中的行排序,并将结果写到标准输出。如果 File 参数指定多个文件,那么 sort 命令将这些文件连接起来,并当作一个文件进行排序。sort语法# sort 选项与参数: -f :忽略大小写的差异,例如 A 与 a 视为编码相同; -b :忽略最前面的空格符部分; -M :以月份的名字来排序,例如 JAN, DEC 等等的排序方法; -n :使用『纯数字』进行排序(默认是以文字型态来排序的); -r :反向排序; -u :就是




相关笔记


[无]



最新笔记


01 TaoKeeper监控ZK集群
ZK Web界面, node-zk-browserWeb展示每个path的属性和数据。需要安装Node.js 和 node-zookeeper taokeeper为zookeeper做了什么?1.CPU/MEM/LOAD的监控//load2. ZK日志目录所在磁盘剩余空间监控3. 单机连接数的峰值报警4. 单机 Watcher数的峰值报警5. 节点自检:是指对集群中每个IP所在ZK节点上的PATH:


02 java高手养成地:116804208,Java零基础到项目实战在线直播公开课
学习编程并没有那么困难,你只是需要一位大神来手把手来带领,专业的Java老师在线讲课,帮助你有效快速的掌握Java,真心想要学习java的伙伴可以加我们,想学习编程不要看网上的这些视频教程,没有任何意义,你操作过程中会遇到大量的问题,学习编程可以加群【116,804,208】这里有很多人指导你一起学习,Java零基础到项目实战公开课,我们的课程偏向实战性,想要学习Java的伙伴欢迎到我们的课堂来一起学习。


03 awk
类似的数据结构使用awk进行分析。第一种方式假定 ip地址在逗号分隔的第8段中,可以进行如下操作(当然这也的假定条件本身就是有问题的) awk: cat access_log.* | awk -F ',' '{print $8}' | awk -F ':' '{if($1=="\"ip\""){print $2}}' |sort |uniq -c |sort -rn cat


04 awk 实战使用
统计访问日志里面的IP数据,通过IP分类统计出每一个IP最近访问次数,并进行访问次数排序,日志文件格式如下,看似比较混乱的结构:{"referer":"http://www.52ipr.com/index.html","headers":{"X-Proxy":"nginx","Accept":"*/*"},"scheme":"http


05 hive 表跟mysql绑定以后hive表drop不掉解决办法
这个问题网上给出的一般都是修改编码集,但对有些朋友来说,貌似不好使,我自己就是受害者。 原因也确实是编码集,但是很多人像我都搞错了顺序呢!下面是原因以及解决! 我安装mysql的时候默认编码是utf8 ,然后我启动hive 它给我自动create了若干张表,然后我才把表的编码集改成latin,但是其实里面表的编码集还是utf8 导致后面的问题我把mysql下的database


06 CDH 运维笔记
1Hbase日常运维 1.1 监控Hbase运行状况1.1.1操作系统1.1.1.1 IOa.群集网络IO,磁盘IO,HDFS IOIO越大说明文件读写操作越多。当IO突然增加时,有可能:1.compact队列较大,集群正在进行大量压缩操作。2.正在执行mapreduce作业可以通过CDH前台查看整个集群综合的数据或进入指定机器的前台查看单台机器的数据:b.Io wait磁盘IO对集群的影响


07 • Storm优势
• Storm优势  1. 简单的编程模型。类似于MapReduce降低了并行批处 理复杂性,Storm降低了进行实时处理的复杂性。  2. 服务化,一个服务框架,支持热部署,即时上线或下线App.  3. 可以使用各种编程语言。你可以在Storm之上使用各种 编程语言。默认支持Clojure、Java、Ruby和Python。要 增加对其他语言的支持,只需实现一个简单的Storm通信 协议即可。  4. 容错性。Storm会管理工作进程和节点的故障。  5. 水平扩展。计算是在多个线程、进程和


08 如何在LATEX里高亮显示R代码
转自 如何在Latex中高亮显示R语言代码,请看下面的示例:\documentclass{beamer}