storm事件管理器EventManager源码分析-event.clj
storm事件管理器定义在event.clj中,主要功能就是通过独立线程执行"事件处理函数"。我们可以将"事件处理函数"添加到EventManager的阻塞队列中,EventManager的事件处理线程不断地从阻塞队列中获取"事件处理函数"并执行。
EventManager协议
协议就是一组函数定义的集合,协议中函数的第一个参数必须为实现该协议的实例本身,类似于java中实例方法的第一个参数为this;协议类似于java中的接口。
(
defprotocol
EventManager
( add [ this event-fn ])
( waiting? [ this ])
( shutdown [ this ]))
( add [ this event-fn ])
( waiting? [ this ])
( shutdown [ this ]))
event-manager函数
(
defn
event-manager
"Creates a thread to respond to events. Any error will cause process to halt"
;; daemon?表示是否将事件处理线程设置成守护线程
[ daemon? ]
;; added表示已添加的"事件处理函数"的个数
( let [ added ( atom 0)
;; processed表示已处理的"事件处理函数"的个数
processed ( atom 0)
;; queue绑定事件管理器的阻塞队列LinkedBlockingQueue
^ LinkedBlockingQueue queue ( LinkedBlockingQueue.)
;; 设置事件管理器的状态为"running"
running ( atom true)
;; 创建事件处理线程。Clojure函数实现了Runnable和Callable接口,所以可以将Clojure函数作为参数传递给java.lang.Thread类的构造函数
runner ( Thread.
;; 事件处理线程循环检查事件处理器的状态是否是"running",如果是,就从阻塞队列中获取"事件处理函数",并执行;然后将processed加1
( fn []
( try-cause
( while @ running
( let [ r ( .take queue )]
( r)
( swap! processed inc)))
( catch InterruptedException t
( log-message "Event manager interrupted"))
( catch Throwable t
( log-error t "Error when processing event")
( exit-process! 20 "Error when processing an event" )))))]
( .setDaemon runner daemon?)
;; 启动事件处理线程
( .start runner)
;; 返回一个实现了EventManager协议的实例
( reify
EventManager
;; add函数将"事件处理函数"添加到事件处理器的阻塞队列中
( add
[ this event-fn ]
;; should keep track of total added and processed to know if this is finished yet
( when-not @ running
( throw ( RuntimeException. "Cannot add events to a shutdown event manager")))
( swap! added inc)
( .put queue event-fn))
;; waiting?判断事件处理线程是否处于等待状态
( waiting?
[ this ]
( or ( Time/isThreadWaiting runner)
( = @ processed @ added)))
;; 关闭事件管理器
( shutdown
[ this ]
( reset! running false)
( .interrupt runner)
( .join runner)))))
"Creates a thread to respond to events. Any error will cause process to halt"
;; daemon?表示是否将事件处理线程设置成守护线程
[ daemon? ]
;; added表示已添加的"事件处理函数"的个数
( let [ added ( atom 0)
;; processed表示已处理的"事件处理函数"的个数
processed ( atom 0)
;; queue绑定事件管理器的阻塞队列LinkedBlockingQueue
^ LinkedBlockingQueue queue ( LinkedBlockingQueue.)
;; 设置事件管理器的状态为"running"
running ( atom true)
;; 创建事件处理线程。Clojure函数实现了Runnable和Callable接口,所以可以将Clojure函数作为参数传递给java.lang.Thread类的构造函数
runner ( Thread.
;; 事件处理线程循环检查事件处理器的状态是否是"running",如果是,就从阻塞队列中获取"事件处理函数",并执行;然后将processed加1
( fn []
( try-cause
( while @ running
( let [ r ( .take queue )]
( r)
( swap! processed inc)))
( catch InterruptedException t
( log-message "Event manager interrupted"))
( catch Throwable t
( log-error t "Error when processing event")
( exit-process! 20 "Error when processing an event" )))))]
( .setDaemon runner daemon?)
;; 启动事件处理线程
( .start runner)
;; 返回一个实现了EventManager协议的实例
( reify
EventManager
;; add函数将"事件处理函数"添加到事件处理器的阻塞队列中
( add
[ this event-fn ]
;; should keep track of total added and processed to know if this is finished yet
( when-not @ running
( throw ( RuntimeException. "Cannot add events to a shutdown event manager")))
( swap! added inc)
( .put queue event-fn))
;; waiting?判断事件处理线程是否处于等待状态
( waiting?
[ this ]
( or ( Time/isThreadWaiting runner)
( = @ processed @ added)))
;; 关闭事件管理器
( shutdown
[ this ]
( reset! running false)
( .interrupt runner)
( .join runner)))))
转自:http://www.cnblogs.com/ierbar0604/p/3947580
2019-03-02 23:47
知识点
相关教程
更多storm定时器timer源码分析-timer.clj
storm定时器与java.util.Timer定时器比较相似。java.util.Timer定时器实际上是个线程,定时调度所拥有的TimerTasks;storm定时器也有一个线程负责调度所拥有的"定时任务"。storm定时器的"定时任务"是一个vector类型的数据[time, callback, uuid],内有会有三个值,分别是时间、函数、和uuid
storm操作zookeeper源码分析-cluster.clj
storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zo
【Storm-kafka】接口:PartitionManager 分区管理器
阅读背景:对于java内部类有一个粗浅的认识 阅读目的:了解kafka 分区是如何在Storm接口之中进行管理的 最终主题:详尽的梳理PartitionManager的整个过程 packagecom.mixbox.storm.kafka;importbacktype.storm.Config;importbacktype.storm.metric.api.CombinedMetric;import
grab依赖管理器
eclipse SDK更新管理器安装插件
1.在Eclipse的主菜单,点击help,选择Install New Software 2.Work with:Indigo - http://download.eclipse.org/releases/indigo 3.勾选需要的插件点击Next更新即可 大小: 70.3 KB 查看图片附件
Storm-源码分析汇总
Storm Features Storm 简介 Storm Topology的并发度 Storm - Guaranteeing message processing Storm - Transactional-topologies Twitter Storm – DRPC Storm 多语言支持 Storm Starter Storm starter - Overview Storm star
Storm-源码分析- Storm中Zookeeper的使用
在backtype.storm.cluster.clj中, 定义了storm对于Zookeeper的使用 ClusterState 首先定义操作Zookeeper集群的interface (defprotocol ClusterState (set-ephemeral-node [this path data]) (delete-node [this path]) (create-s
Storm-源码分析- Multimethods使用例子
1. storm通过multimethods来区分local和distributed模式 当调用launch-worker的时候, clojure会自动根据defmulti里面定义的fn来判断是调用哪个版本的launch-worker (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervis
Storm TimeCacheMap RotatingMap源码分析
TimeCacheMap是Twitter Storm里面一个类, Storm使用它来保存那些最近活跃的对象,并且可以自动删除那些已经过期的对象。 不过在storm0.8之后TimeCacheMap被弃用了,取而代之的是RotatingMap。 RotatingMap与TimeCacheMap的区别如下: 1.前者去掉了自动清理的线程,让用户自己去控制清理过期的数据,控制清理数据用rotat
Storm-源码分析- metric
首先定义一系列metric相关的interface, IMetric, IReducer, ICombiner (backtype.storm.metric.api) 在task中, 创建一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注册到topology context里面 task会不断的利用如spout-acked
Storm-源码分析- Thrift的使用
1 IDL 首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service 然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码 比如对于nimbus service 在IDL的定义为, service Nimbus { void submitTopology(1: string name, 2: str
Storm-源码分析- Disruptor在storm中的使用
Disruptor 2.0, (http://ifeve.com/disruptor-2-change/) Disruptor为了更便于使用, 在2.0做了比较大的调整, 比较突出的是更换了几乎所有的概念名 老版本, 新版本, 从左到右的变化如下, 1. Producer –> Publisher 2. ProducerBarrier被integrate到RingBuffer里
Storm-源码分析-Topology Submit-Nimbus
Nimbus Server Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus”来启动 看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main nimbus是用clojure实现的, 但
Storm-源码分析-Topology Submit-Worker
1 mk-worker 和其他的daemon一样, 都是通过defserverfn macro来创建worker (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id] (log-message "Launching worker for " stor
【第九章】 Spring的事务 之 9.2 事务管理器 ——跟我学spring3
Spring框架支持事务管理的核心是事务管理器抽象,对于不同的数据访问框架(如Hibernate)通过实现策略接口PlatformTransactionManager,从而能支持各种数据访问框架的事务管理,PlatformTransactionManager接口定义如下:
最新教程
更多java线程状态详解(6种)
java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞
redis从库只读设置-redis集群管理
默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave. 127.0.0.1:6382> set k3 111 (error) READONLY You can't write against a read only slave. 如果你要开启从库
Netty环境配置
netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。
Netty基于流的传输处理
在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据
Netty入门实例-使用POJO代替ByteBuf
使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。
Netty入门实例-时间服务器
Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现
Netty入门实例-编写服务器端程序
channelRead()处理程序方法实现如下
Netty开发环境配置
最新版本的Netty 4.x和JDK 1.6及更高版本
电商平台数据库设计
电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表
HttpClient 上传文件
我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。
MongoDB常用命令
查看当前使用的数据库 > db test 切换数据库 > use foobar switched to db foobar 插入文档 > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new
快速了解MongoDB【基本概念与体系结构】
什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。
windows系统安装MongoDB
安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我
Spring boot整合MyBatis-Plus 之二:增删改查
基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId; import com.baomidou.mybatisplus.annotations.TableName; import com.baom
分布式ID生成器【snowflake雪花算法】
基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案