如何构建高效的storm计算模型
计算机制简介
Storm采用流式计算的模型,和shell类似让数据在一个个“管道”中进行处理。
- Spout负责从数据源拉取数据,相当于整个系统的生产者。
- Bolt负责消费数据并将tuple发送给下一个计算单元。Bolt可以接受多个spout和bolt的数据。
- 每个spout,bolt可以设置并行度excuter相当于多进程,每个excuter可以设置多个task
- shuffle grouping,它随机将tuple发给任何一个task;fields grouping,相同field值的tuple发送给同一个task。
数据完整性
当spout发送一个数据的时候为每一个tuple产生一个唯一的message id。当数据被完整处理的时候bolt会产生一个应答ack(成功)或fail(失败),如果数据超过(默认30s)则视为超时然后丢弃掉(可以通过操纵fail方法重新发送数据,不过这带来很高的计算成本)。同时受spout发射tuple最大数的限制bole的处理速度会影响spout的发射速度。因此如果保证数据被快速消费掉成为影响流式计算速度的关键所在。
stom计算模型
一个简单的storm计算模型基本包括3部分:从数据源拉取数据,关联离线的维表,将结果写入数据库。
我们假设需要统计一个购物网站商品分类目的点击人数次数,而且这个网站数据量非常大。大致步骤如下:
A. FF负责产生商品点击数据
B. 关联商品类目
C.将结果写入hbase
商品id:auc_id 用户id:user_id
A.拉取数据
你的任务跑的很快,资源占用也少但是数据为啥数据也这么少呢?不好,数据全堆积在FF数据源了。ok,加大spout的task数,并行度为1。但是为啥数据还是这么少,来看看我们的代码。
public void nextTuple() { while (true) { LogData log = null; try { log = queue.take(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); return; } if(log.getType() == null) continue; collector.emit(streamId, new Values(log.getData(), uid)); } }
问题的关键在于take为阻塞方法,而storm的多线程由同一个excuter来控制,相当于一个循环在多个task之间切换。当一个task阻塞的时候其他task也无法执行,但大部分线程是可以拿到数据的,整体相当于只有单线程在执行。解决办法是改用无阻塞的poll方法从队列中拿到数据。
如果增加并行度呢?改成多 excuter单task之后即使单个task的take 方法阻塞也不会对其他task产生影响,而且效率也比多task高。但随之引来一个新问题:由于task的阻塞导致任务超时失败率增加部分数据被丢弃,因此乖乖改成无阻塞的poll方法吧。
如果还想快一点呢?那就直接去掉应答,因为应答本身也消耗资源,但是统计不到失败率的,慎用。
conf.setNumAckers(0);
总结起来就是拉数据提高并行度,task数设为1,取数据采用无阻塞方法,数据量大去掉应答。
B. 关联商品类目
离线商品表?听起来很大的样子。这时候我们需要一个缓存,LRUCache是个不错的选择,他是一个双向链表的数据结构,查询次数越高会靠前,查询次数低会排在后面,甚至舍弃。商品表太大导入hbase很慢?分表吧。我们需要做的就是将商品表哈希到n个小表然后批量导入。查询的的时候如果没有命中缓存则将auc_id哈希到对应的商品表进行查询。
这时候你会发现查询商品表,累加,然后将结果存入hbase是一个很长的过程,而这很可能造成你的处理超时然后数据被丢掉。这里我们引入BlockingQueue,如果BlockingQueue是空的,取数操作会阻断进入等待状态,直到有值才被唤醒,存数时如果队列是满的,则阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒。我们将user_id哈希到n个BlockingQueue(为最大利用cpu n为cpu数),将用户数据插入到对应的BlockingQueue后直接应答,这样storm就可以快速进行下一步处理。同时该BlockingQueue对应的线程负责消费数据,所有线程共享LRUCach商品表缓存。
C.将结果写入hbase
这时我们需要一个队列存储结果,ConcurrentHashMap是一个线程安全的无阻塞数组。前面说到数据是分散到不同的线程进行计算的。每个线程将结果插入到同一个ConcurrentHashMap(插入读写无阻塞),然后通过ScheduledThreadPoolExecutor定时将输入批量导入到hbase。
D.计算点击人数
这里讨论另外一个问题大数据去重,比较简单的方法是直接建立user_id缓存,但是这样很耗资源。通过bloomfilter可以损失很小的准确性的情况下完成去重。具体参考http://blog.csdn.net/jiaomeng/article/details/1495500
转自:http://blog.csdn.net/z_l_l_m/article/details/8251785
知识点
相关教程
更多Storm 实战:构建大数据实时计算
Storm 实战:构建大数据实时计算(阿里巴巴集团技术丛书,大数据丛书。大型互联网公司大数据实时处理干货分享!来自淘宝一线技术团队的丰富实践,快速掌握Storm技术精髓!) 阿里巴巴集团数据平台事业部商家数据业务部 编著 ISBN 978-7-121-22649-6 2014年8月出版 定价:59.00元 184页 16开 编辑推荐 Storm以其简单、灵活、健壮而著称。随着大数据实时处理需求
利用Ant构建Hadoop高效开发环境
最近Hadoop的研究中,都是利用Mockito来模拟数据进行,下一个阶段需要在Hadoop服务器上做大量的运行进行验证,同时也要为正式使用做准备。 今天考虑使用Ant来搭建一个Hadoop的开发和调试环境,不使用hadoop自带的插件。 思路如下: 1、 利用Ant在开发机器上将代码编译、打包,最终得到可执行的jar包。 2、 利用Ant的SSH属性,将jar包传到hadoop服务器的指定工作目
【转】Spark:一个高效的分布式计算系统
原文地址:http://tech.uc.cn/?p=2116 概述 什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDF
构建Disruptor实例-生产消费模型完成整个入门示例
初始化Disruptor,构建Disruptor只要需要以下几个参数:1 eventFactory: 消息(event)工厂对象,2 ringBufferSize: 容器的长度,3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler,4 ProducerType: 单生产者 还是 多生产者, 5 waitStrategy: 等待策略2、初始化好Disruptor之后,通过该对象的handleEventsWith添加消费者的监听。3、然后启动D...
跟我一起云计算(1)——storm
概述 最近要做一个实时分析的项目,所以需要深入一下storm。 为什么storm 综合下来,有以下几点: 1. 生逢其时 MapReduce 计算模型打开了分布式计算的另一扇大门,极大的降低了实现分布式计算的门槛。有了MapReduce架构的支持,开发者只需要把注意力集中在如何使用 MapReduce的语义来解决具体的业务逻辑,而不用头疼诸如容错,可扩展性,可靠性等一系列硬骨头。一时间,人们拿
[zz]流式计算之Storm简介
转载自:http://blog.sina.com.cn/s/blog_406d9bb00100ui5p.html Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中
流式计算之Storm简介
http://blog.sina.com.cn/s/blog_406d9bb00100ui5p.html 流式计算之Storm简介 (2011-11-04 14:51:39) 转载▼ Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于
流式计算之Storm简介
Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。可以使用任意编程语言来做开发。 主要商业应用及案例:Twitter Storm的优
Storm数据流模型的分析及讨论
转自:http://www.cnblogs.com/panfeng412/archive/2012/07/29/storm-stream-model-analysis-and-discussion.html Storm数据流模型的分析及讨论 本文首先介绍了Storm的基本概念和数据流模型,然后结合一个典型应用场景来说明Storm支持Topology之间数据流订阅的必要性,最后对比了S
Twitter Storm 使用maven构建storm项目
Maven 要开发拓扑,你需要在classpath环境变量设置storm的相关jar文件路径。你也应把未打包的jar文件放到你项目的classpath,或使用maven来管理storm相关的开发依赖包。Storm托管在Clojars(一个maven仓库)。为了在项目中包含storm相关的开发依赖包,在pom.xml文件中添加如下代码: <repository&
顶 Storm[技术博客] Storm数据流模型的分析以及讨论
Storm 是一个开源的实时计算系统,它提供了一系列的基本元素用于进行计算: 1 Topology 2 Stream 3 spout 4 bolt 在我们提交我们的topology的时候,一旦你提交了你的topology到你的集群之中后,除非你显示的去停止任务 集群中间的topology会一直的在运行 计算任务Topology是由不同的Spouts 和 bolts,通过数据流 Stream连接起来
Storm -- 实时计算平台
1.1 实时流计算 互联网从诞生的第一时间起,对世界的最大的改变就是让信息能够实时交互,从而大大加速了各个环节的效率。正因为大家对信息实时响应、实时交互的需求,软件行业除了个人操作系统之外,数据库(更精确的说是关系型数据库)应该是软件行业发展最快、收益最为丰厚的产品了。记得十年前,很多银行别说实时转账,连实时查询都做不到,但是数据库和高速网络改变了这个情况。 随着互联网的更进一步发展,从Porta
【译】流式计算框架-Storm简介
在当前的数据分析领域,对实时数据的计算需求越来越强烈,在此领域,出现了各类计算框架,如:Storm、S4等。目前本土公司对这些流式计算框架的应用也比较广泛,但苦于相关文档英文居多,缺少成系列且与官方相对应的中文手册。本系列试图从官方文档翻译入手,给大家呈现较为完备的中文资料,同时也是对自身知识的总结沉淀。 在这个系列博客中,我们选择了twitter的Storm框架,原因很简单,因为本人长期使用的就
storm实时流式计算框架集群搭建过程
Storm集群安装配置过程 ——by comaple.zhang 这几天在其他同事的帮助下,调研了twitter的开源流式计算框架storm的使用,下面分享一下storm集群的安装配置过程。以作备忘之用。 我的实验机器为:195和196 如果转载请注明出处:comaple的博客 首先:安装依赖包 1, ZeroMQ 2.1.7 2, JZMQ 3, Java 4, Python 5, Unzi
Storm分布式实时流计算框架相关技术总结
Storm作为一个开源的分布式实时流计算框架,其内部实现使用了一些常用的技术,这里是对这些技术及其在Storm中作用的概括介绍。以此为基础,后续再深入了解Storm的内部实现细节。 1. Zookeeper集群 Zookeeper是一个针对大型分布式系统的可靠协调服务系统,其采用类似Unix文件系统树形层次结构的数据模型(如:/zoo/a,/zoo/b),节点内可存储少量数据(<1M,当节点
最新教程
更多java线程状态详解(6种)
java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞
redis从库只读设置-redis集群管理
默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave. 127.0.0.1:6382> set k3 111 (error) READONLY You can't write against a read only slave. 如果你要开启从库
Netty环境配置
netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。
Netty基于流的传输处理
在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据
Netty入门实例-使用POJO代替ByteBuf
使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。
Netty入门实例-时间服务器
Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现
Netty入门实例-编写服务器端程序
channelRead()处理程序方法实现如下
Netty开发环境配置
最新版本的Netty 4.x和JDK 1.6及更高版本
电商平台数据库设计
电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表
HttpClient 上传文件
我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。
MongoDB常用命令
查看当前使用的数据库 > db test 切换数据库 > use foobar switched to db foobar 插入文档 > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new
快速了解MongoDB【基本概念与体系结构】
什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。
windows系统安装MongoDB
安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我
Spring boot整合MyBatis-Plus 之二:增删改查
基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId; import com.baomidou.mybatisplus.annotations.TableName; import com.baom
分布式ID生成器【snowflake雪花算法】
基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案