storm事件管理器EventManager源码分析-event.clj

storm事件管理器定义在event.clj中,主要功能就是通过独立线程执行"事件处理函数"。我们可以将"事件处理函数"添加到EventManager的阻塞队列中,EventManager的事件处理线程不断地从阻塞队列中获取"事件处理函数"并执行。

EventManager协议

协议就是一组函数定义的集合,协议中函数的第一个参数必须为实现该协议的实例本身,类似于java中实例方法的第一个参数为this;协议类似于java中的接口。

 

( defprotocol EventManager
 ( add [ this event-fn ])
 ( waiting? [ this ])
 ( shutdown [ this ]))
 
event-manager函数

( defn event-manager
  "Creates a thread to respond to events. Any error will cause process to halt"
  ;; daemon?表示是否将事件处理线程设置成守护线程
  [ daemon? ]
  ;; added表示已添加的"事件处理函数"的个数
 ( let [ added ( atom 0)
      ;; processed表示已处理的"事件处理函数"的个数
        processed ( atom 0)
      ;; queue绑定事件管理器的阻塞队列LinkedBlockingQueue
        ^ LinkedBlockingQueue queue ( LinkedBlockingQueue.)
      ;; 设置事件管理器的状态为"running"
        running ( atom true)
      ;; 创建事件处理线程。Clojure函数实现了Runnable和Callable接口,所以可以将Clojure函数作为参数传递给java.lang.Thread类的构造函数
        runner ( Thread.
               ;; 事件处理线程循环检查事件处理器的状态是否是"running",如果是,就从阻塞队列中获取"事件处理函数",并执行;然后将processed加1
                ( fn []
                  ( try-cause
                    ( while @ running
                      ( let [ r ( .take queue )]
                        ( r)
                        ( swap! processed inc)))
                    ( catch InterruptedException t
                      ( log-message "Event manager interrupted"))
                    ( catch Throwable t
                      ( log-error t "Error when processing event")
                      ( exit-process! 20 "Error when processing an event" )))))]
   ( .setDaemon runner daemon?)
    ;; 启动事件处理线程
   ( .start runner)
    ;; 返回一个实现了EventManager协议的实例
   ( reify
      EventManager
      ;; add函数将"事件处理函数"添加到事件处理器的阻塞队列中
     ( add
        [ this event-fn ]
        ;; should keep track of total added and processed to know if this is finished yet
       ( when-not @ running
         ( throw ( RuntimeException. "Cannot add events to a shutdown event manager")))
       ( swap! added inc)
       ( .put queue event-fn))
      ;; waiting?判断事件处理线程是否处于等待状态
     ( waiting?
        [ this ]
       ( or ( Time/isThreadWaiting runner)
           ( = @ processed @ added)))
      ;; 关闭事件管理器
     ( shutdown
        [ this ]
       ( reset! running false)
       ( .interrupt runner)
       ( .join runner)))))

 


转自:http://www.cnblogs.com/ierbar0604/p/3947580
2019-03-02 23:47

知识点

相关教程

更多

storm定时器timer源码分析-timer.clj

storm定时器与java.util.Timer定时器比较相似。java.util.Timer定时器实际上是个线程,定时调度所拥有的TimerTasks;storm定时器也有一个线程负责调度所拥有的"定时任务"。storm定时器的"定时任务"是一个vector类型的数据[time, callback, uuid],内有会有三个值,分别是时间、函数、和uuid

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zo

【Storm-kafka】接口:PartitionManager 分区管理器

阅读背景:对于java内部类有一个粗浅的认识 阅读目的:了解kafka 分区是如何在Storm接口之中进行管理的 最终主题:详尽的梳理PartitionManager的整个过程 packagecom.mixbox.storm.kafka;importbacktype.storm.Config;importbacktype.storm.metric.api.CombinedMetric;import

grab依赖管理器


                            

eclipse SDK更新管理器安装插件

1.在Eclipse的主菜单,点击help,选择Install New Software 2.Work with:Indigo - http://download.eclipse.org/releases/indigo 3.勾选需要的插件点击Next更新即可       大小: 70.3 KB      查看图片附件

Storm-源码分析汇总

Storm Features  Storm 简介 Storm Topology的并发度 Storm - Guaranteeing message processing Storm - Transactional-topologies Twitter Storm – DRPC Storm 多语言支持 Storm Starter  Storm starter - Overview Storm star

Storm-源码分析- Storm中Zookeeper的使用

在backtype.storm.cluster.clj中, 定义了storm对于Zookeeper的使用 ClusterState  首先定义操作Zookeeper集群的interface    (defprotocol ClusterState  (set-ephemeral-node [this path data])  (delete-node [this path])  (create-s

Storm-源码分析- Multimethods使用例子

1. storm通过multimethods来区分local和distributed模式  当调用launch-worker的时候, clojure会自动根据defmulti里面定义的fn来判断是调用哪个版本的launch-worker    (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervis

Storm TimeCacheMap RotatingMap源码分析

TimeCacheMap是Twitter Storm里面一个类, Storm使用它来保存那些最近活跃的对象,并且可以自动删除那些已经过期的对象。 不过在storm0.8之后TimeCacheMap被弃用了,取而代之的是RotatingMap。   RotatingMap与TimeCacheMap的区别如下:   1.前者去掉了自动清理的线程,让用户自己去控制清理过期的数据,控制清理数据用rotat

Storm-源码分析- metric

首先定义一系列metric相关的interface, IMetric, IReducer, ICombiner (backtype.storm.metric.api) 在task中, 创建一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注册到topology context里面 task会不断的利用如spout-acked

Storm-源码分析- Thrift的使用

1 IDL  首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service  然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码 比如对于nimbus service  在IDL的定义为,     service Nimbus {  void submitTopology(1: string name, 2: str

Storm-源码分析- Disruptor在storm中的使用

Disruptor 2.0, (http://ifeve.com/disruptor-2-change/)  Disruptor为了更便于使用, 在2.0做了比较大的调整, 比较突出的是更换了几乎所有的概念名 老版本,    新版本,   从左到右的变化如下, 1. Producer –> Publisher  2. ProducerBarrier被integrate到RingBuffer里

Storm-源码分析-Topology Submit-Nimbus

Nimbus Server   Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus”来启动  看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main  nimbus是用clojure实现的, 但

Storm-源码分析-Topology Submit-Worker

1 mk-worker   和其他的daemon一样, 都是通过defserverfn macro来创建worker    (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]  (log-message "Launching worker for " stor

【第九章】 Spring的事务 之 9.2 事务管理器 ——跟我学spring3

Spring框架支持事务管理的核心是事务管理器抽象,对于不同的数据访问框架(如Hibernate)通过实现策略接口PlatformTransactionManager,从而能支持各种数据访问框架的事务管理,PlatformTransactionManager接口定义如下:

最新教程

更多

java线程状态详解(6种)

java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

redis从库只读设置-redis集群管理

默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty基于流的传输处理

​在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-编写服务器端程序

channelRead()处理程序方法实现如下

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

电商平台数据库设计

电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

HttpClient 上传文件

我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

MongoDB常用命令

查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

快速了解MongoDB【基本概念与体系结构】

什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

windows系统安装MongoDB

安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

Spring boot整合MyBatis-Plus 之二:增删改查

基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
import com.baomidou.mybatisplus.annotations.TableName;
import com.baom

分布式ID生成器【snowflake雪花算法】

基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案