如何构建高效的storm计算模型

计算机制简介

        Storm采用流式计算的模型,和shell类似让数据在一个个“管道”中进行处理。

  • Spout负责从数据源拉取数据,相当于整个系统的生产者。
  • Bolt负责消费数据并将tuple发送给下一个计算单元。Bolt可以接受多个spout和bolt的数据。
  • 每个spout,bolt可以设置并行度excuter相当于多进程,每个excuter可以设置多个task  
  • shuffle grouping,它随机将tuple发给任何一个task;fields grouping,相同field值的tuple发送给同一个task。

A Storm topology

数据完整性

        当spout发送一个数据的时候为每一个tuple产生一个唯一的message id。当数据被完整处理的时候bolt会产生一个应答ack(成功)或fail(失败),如果数据超过(默认30s)则视为超时然后丢弃掉(可以通过操纵fail方法重新发送数据,不过这带来很高的计算成本)。同时受spout发射tuple最大数的限制bole的处理速度会影响spout的发射速度。因此如果保证数据被快速消费掉成为影响流式计算速度的关键所在。

 

stom计算模型

        一个简单的storm计算模型基本包括3部分:从数据源拉取数据,关联离线的维表,将结果写入数据库。

我们假设需要统计一个购物网站商品分类目的点击人数次数,而且这个网站数据量非常大。大致步骤如下:

    A. FF负责产生商品点击数据

    B. 关联商品类目

    C.将结果写入hbase

    商品id:auc_id   用户id:user_id

 

A.拉取数据

         你的任务跑的很快,资源占用也少但是数据为啥数据也这么少呢?不好,数据全堆积在FF数据源了。ok,加大spout的task数,并行度为1。但是为啥数据还是这么少,来看看我们的代码。

    public  void nextTuple() {     

        while (true) {
            LogData log = null;
            try {
                log = queue.take(); 
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
                return;
            }            

            if(log.getType() == null)
                continue;            
            
            collector.emit(streamId, new Values(log.getData(), uid));
        }
    }

        问题的关键在于take为阻塞方法,而storm的多线程由同一个excuter来控制,相当于一个循环在多个task之间切换。当一个task阻塞的时候其他task也无法执行,但大部分线程是可以拿到数据的,整体相当于只有单线程在执行。解决办法是改用无阻塞的poll方法从队列中拿到数据。

        如果增加并行度呢?改成多  excuter单task之后即使单个task的take  方法阻塞也不会对其他task产生影响,而且效率也比多task高。但随之引来一个新问题:由于task的阻塞导致任务超时失败率增加部分数据被丢弃,因此乖乖改成无阻塞的poll方法吧。

        如果还想快一点呢?那就直接去掉应答,因为应答本身也消耗资源,但是统计不到失败率的,慎用。

   conf.setNumAckers(0);

        总结起来就是拉数据提高并行度,task数设为1,取数据采用无阻塞方法,数据量大去掉应答。

B. 关联商品类目

        离线商品表?听起来很大的样子。这时候我们需要一个缓存,LRUCache是个不错的选择,他是一个双向链表的数据结构,查询次数越高会靠前,查询次数低会排在后面,甚至舍弃。商品表太大导入hbase很慢?分表吧。我们需要做的就是将商品表哈希到n个小表然后批量导入。查询的的时候如果没有命中缓存则将auc_id哈希到对应的商品表进行查询。

        这时候你会发现查询商品表,累加,然后将结果存入hbase是一个很长的过程,而这很可能造成你的处理超时然后数据被丢掉。这里我们引入BlockingQueue,如果BlockingQueue是空的,取数操作会阻断进入等待状态,直到有值才被唤醒,存数时如果队列是满的,则阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒。我们将user_id哈希到n个BlockingQueue(为最大利用cpu n为cpu数),将用户数据插入到对应的BlockingQueue后直接应答,这样storm就可以快速进行下一步处理。同时该BlockingQueue对应的线程负责消费数据,所有线程共享LRUCach商品表缓存。

C.将结果写入hbase

        这时我们需要一个队列存储结果,ConcurrentHashMap是一个线程安全的无阻塞数组。前面说到数据是分散到不同的线程进行计算的。每个线程将结果插入到同一个ConcurrentHashMap(插入读写无阻塞),然后通过ScheduledThreadPoolExecutor定时将输入批量导入到hbase。

       

D.计算点击人数

        这里讨论另外一个问题大数据去重,比较简单的方法是直接建立user_id缓存,但是这样很耗资源。通过bloomfilter可以损失很小的准确性的情况下完成去重。具体参考http://blog.csdn.net/jiaomeng/article/details/1495500


转自:http://blog.csdn.net/z_l_l_m/article/details/8251785
2019-03-02 23:56

知识点

相关教程

更多

Storm 实战:构建大数据实时计算

Storm 实战:构建大数据实时计算(阿里巴巴集团技术丛书,大数据丛书。大型互联网公司大数据实时处理干货分享!来自淘宝一线技术团队的丰富实践,快速掌握Storm技术精髓!) 阿里巴巴集团数据平台事业部商家数据业务部 编著  ISBN 978-7-121-22649-6 2014年8月出版 定价:59.00元  184页 16开 编辑推荐 Storm以其简单、灵活、健壮而著称。随着大数据实时处理需求

利用Ant构建Hadoop高效开发环境

最近Hadoop的研究中,都是利用Mockito来模拟数据进行,下一个阶段需要在Hadoop服务器上做大量的运行进行验证,同时也要为正式使用做准备。 今天考虑使用Ant来搭建一个Hadoop的开发和调试环境,不使用hadoop自带的插件。 思路如下: 1、 利用Ant在开发机器上将代码编译、打包,最终得到可执行的jar包。 2、 利用Ant的SSH属性,将jar包传到hadoop服务器的指定工作目

【转】Spark:一个高效的分布式计算系统

原文地址:http://tech.uc.cn/?p=2116 概述 什么是Spark   Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDF

构建Disruptor实例-生产消费模型完成整个入门示例

初始化Disruptor,构建Disruptor只要需要以下几个参数:1 eventFactory: 消息(event)工厂对象,2 ringBufferSize: 容器的长度,​3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler,​4 ProducerType: 单生产者 还是 多生产者, ​5 waitStrategy: 等待策略2、初始化好Disruptor之后,通过该对象的handleEventsWith添加消费者的监听。3、然后启动D...

跟我一起云计算(1)——storm

概述  最近要做一个实时分析的项目,所以需要深入一下storm。 为什么storm  综合下来,有以下几点: 1. 生逢其时 MapReduce 计算模型打开了分布式计算的另一扇大门,极大的降低了实现分布式计算的门槛。有了MapReduce架构的支持,开发者只需要把注意力集中在如何使用 MapReduce的语义来解决具体的业务逻辑,而不用头疼诸如容错,可扩展性,可靠性等一系列硬骨头。一时间,人们拿

[zz]流式计算之Storm简介

转载自:http://blog.sina.com.cn/s/blog_406d9bb00100ui5p.html Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中

流式计算之Storm简介

http://blog.sina.com.cn/s/blog_406d9bb00100ui5p.html  流式计算之Storm简介     (2011-11-04 14:51:39)     转载▼         Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于

流式计算之Storm简介

Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。可以使用任意编程语言来做开发。 主要商业应用及案例:Twitter Storm的优

Storm数据流模型的分析及讨论

转自:http://www.cnblogs.com/panfeng412/archive/2012/07/29/storm-stream-model-analysis-and-discussion.html  Storm数据流模型的分析及讨论       本文首先介绍了Storm的基本概念和数据流模型,然后结合一个典型应用场景来说明Storm支持Topology之间数据流订阅的必要性,最后对比了S

Twitter Storm 使用maven构建storm项目

Maven    要开发拓扑,你需要在classpath环境变量设置storm的相关jar文件路径。你也应把未打包的jar文件放到你项目的classpath,或使用maven来管理storm相关的开发依赖包。Storm托管在Clojars(一个maven仓库)。为了在项目中包含storm相关的开发依赖包,在pom.xml文件中添加如下代码:              <repository&

顶 Storm[技术博客] Storm数据流模型的分析以及讨论

Storm 是一个开源的实时计算系统,它提供了一系列的基本元素用于进行计算: 1 Topology 2 Stream 3 spout 4 bolt 在我们提交我们的topology的时候,一旦你提交了你的topology到你的集群之中后,除非你显示的去停止任务 集群中间的topology会一直的在运行 计算任务Topology是由不同的Spouts 和 bolts,通过数据流 Stream连接起来

Storm -- 实时计算平台

1.1 实时流计算 互联网从诞生的第一时间起,对世界的最大的改变就是让信息能够实时交互,从而大大加速了各个环节的效率。正因为大家对信息实时响应、实时交互的需求,软件行业除了个人操作系统之外,数据库(更精确的说是关系型数据库)应该是软件行业发展最快、收益最为丰厚的产品了。记得十年前,很多银行别说实时转账,连实时查询都做不到,但是数据库和高速网络改变了这个情况。 随着互联网的更进一步发展,从Porta

【译】流式计算框架-Storm简介

在当前的数据分析领域,对实时数据的计算需求越来越强烈,在此领域,出现了各类计算框架,如:Storm、S4等。目前本土公司对这些流式计算框架的应用也比较广泛,但苦于相关文档英文居多,缺少成系列且与官方相对应的中文手册。本系列试图从官方文档翻译入手,给大家呈现较为完备的中文资料,同时也是对自身知识的总结沉淀。 在这个系列博客中,我们选择了twitter的Storm框架,原因很简单,因为本人长期使用的就

storm实时流式计算框架集群搭建过程

Storm集群安装配置过程  ——by comaple.zhang  这几天在其他同事的帮助下,调研了twitter的开源流式计算框架storm的使用,下面分享一下storm集群的安装配置过程。以作备忘之用。 我的实验机器为:195和196 如果转载请注明出处:comaple的博客 首先:安装依赖包 1, ZeroMQ 2.1.7 2, JZMQ 3, Java 4, Python 5, Unzi

Storm分布式实时流计算框架相关技术总结

Storm作为一个开源的分布式实时流计算框架,其内部实现使用了一些常用的技术,这里是对这些技术及其在Storm中作用的概括介绍。以此为基础,后续再深入了解Storm的内部实现细节。 1. Zookeeper集群 Zookeeper是一个针对大型分布式系统的可靠协调服务系统,其采用类似Unix文件系统树形层次结构的数据模型(如:/zoo/a,/zoo/b),节点内可存储少量数据(<1M,当节点

最新教程

更多

java线程状态详解(6种)

java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

redis从库只读设置-redis集群管理

默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty基于流的传输处理

​在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-编写服务器端程序

channelRead()处理程序方法实现如下

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

电商平台数据库设计

电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

HttpClient 上传文件

我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

MongoDB常用命令

查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

快速了解MongoDB【基本概念与体系结构】

什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

windows系统安装MongoDB

安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

Spring boot整合MyBatis-Plus 之二:增删改查

基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
import com.baomidou.mybatisplus.annotations.TableName;
import com.baom

分布式ID生成器【snowflake雪花算法】

基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案