HQueue:基于HBase的消息队列

机器学习 58 Views

数盟倡导“数据创造价值”,致力于打造最卓越的数据科学交流平台,为企业、个人提供最卓越的服务】

(部分文章来自于第三方平台,未能找到原作者,敬请谅解~sorry~)

1. HQueue简介

2. HQueue特性

3. HQueue系统设计及处理流程

图(1):HQueue系统结构

  • PartitionID:一个Queue可以有多个Partitions,目前最多支持Short.MAX_VALUE个Partitions。Partition ID可以不在创建Message对象时指定,而是在发送消息时设定,或者不指定而使用一个随机Partition ID。

  • MessageID:即消息ID,它由Timestamp和SequenceID两部分组成。Timestamp是消息写入HQueue时的时间戳,单位为毫秒。SequenceID是同一Timestamp下消息的顺序编号,目前最多支持同一Timestamp下Short.MAX_VALUE个Messages。

  • Column Family:HBase Column Family,此处为固定值“message”。

  • Message Topic :HBase Column Qualifier,消息Topic名称。用户可以根据需要将Message存储在不同的Topics之下,也可以从Queue中获取感兴趣的Topics消息数据。

  • QueueScanner:用于Scan Queue中全部Partitions的数据;

  • PartitionScanner:用于Scan Queue中指定Partition的数据;

  • CombinedPartitionScanner:用于Scan Queue中若干指定Partitions的数据。

  • Subscrier:即订阅者。主要完成向ZoeoKeeper写入订阅信息、启动监听、接收新消息并回调注册在其上的消息处理函数(MessageListener)等功能。

  • ZooKeeper:用于保存订阅者提交的订阅信息,主要包括订阅者订阅的Queue、Partitions和Topics;订阅者的地址和Checkpoint等信息,更为详细信息参见后续描述。

  • Coprocessor:主要完成从ZooKeeper获取订阅信息、使用InternalScanner从Queue中Scan最新的消息、将新消息发送至订阅者并将当前Checkpoint更新至ZooKeeper等功能。

  • 订阅者订阅的Queue名称;

  • 订阅者订阅的Queuee Partitions以及各Partition上消息的起始ID。一个订阅者可以订阅多个Partitions,如果没有指定,那么认为订阅该Queue的所有Partitions。

  • 订阅者订阅的消息Topics。一个订阅者可以订阅多个主题,如果没有指定,那么认为订阅该Queue上的所有Topics。

  • 订阅者的Addresss/Hostname和监听端口。用户创建订阅者时可以指定监听端口,如果没有指定,那么会随机选择一个当前可用端口作为监听端口。

  • SubscriptionZooKeeper:主要完成与ZooKeeper相关的工作,包括从ZooKeeper获取订阅信息并注册相关Watcher、SubscriptionWorker将Checkpoint更新至ZooKeeper等操作。

  • SubscriptionWorker又主要包括MessageScanner和MessageSender两部分,主要完成Scan新消息、发送消息至Subscriber和更新Checkpoint等操作。

  • 当缓冲队列中没有空闲空间时,MessageScanner会等待直至缓冲队列中的消息被MessageSender消费掉,腾出剩余空间。

  • 当Queue Partition中没有新消息时,MessageScanner会主动Sleep,当有新消息写入时,Coprocessor会通过SubscriptionWorker唤醒MessageScanner,开始新一轮Scan。

4. HQueue使用

  • queue_name:待创建的HQueue的名称,必选参数。

  • partition_count:待创建的HQueue的Partition个数,必选参数。

  • ttl:失效时间,必选参数。

  • Configuration Dictonary:可选配置参数。目前支持的配置参数为:(1)hbase.hqueue.partitionsPerRegion;(2)hbase.hregion.memstore.flush.size;(3)hbase.hregion.majorcompaction;(4)hbase.hstore.compaction.min;(5)hbase.hstore.compaction.max;(6)hbase.hqueue.compression;(7)hbase.hstore.blockingStoreFiles等。

EXAMPLES:

    • hqueue> create ‘q1′, 32, 86400

    • hqueue> create ‘q1′, 32, 86400, {‘hbase.hqueue.partitionsPerRegion’ => ’4′, ‘hbase.hstore.compaction.min’ => ’16′, ‘hbase.hstore.compaction.max’ => ’32′}

            • 1
              ScannerID messageScannerOpen(1:Text queueName,2:i16 partitionID,3:TMessageScan messageScan)
              根据Scan,打开Queue中某个Partition上的Scanner
              2
              TMessage messageScannerGet(1:ScannerID id)
              逐条获取Message
              3
              list<TMessage> messageScannerGetList(1:ScannerID id,2:i32 nbMessages)
              批量获取Messages
              4
              void messageScannerClose(1:ScannerID id)
              关闭ScannerID
              5
              void putMessage(1:Text queueName,2:TMessage tMessage)
              向Queue中写入Message,使用随机的Partition ID
              6
              void putMessages(1:Text queueName,2:list<TMessage> tMessages)
              向Queue中批量写入Messages,使用随机的Partition ID
              7
              void putMessageWithPid(1:Text queueName,2:i16 partitionID,3:TMessage tMessage)
              向Queue中写入Message,使用指定的Partition ID
              8
              void putMessagesWithPid(1:Text queueName,2:i16 partitionID,3:list<TMessage> tMessages)
              向Queue中批量写入Messages,使用指定的Partition ID
              9
              list<Text> getQueueLocations(1:Text queueName)
              获取Queue中所有Partition所在主机的地址
              

              5. 总结

              —————————————————

              数盟网站:www.dataunion.org

              数盟微博:@数盟社区

              数盟微信:DataScientistUnion

              数盟【大数据群】272089418

              数盟【数据可视化群】 179287077

              数盟【数据分析群】 174306879 ,110875722

              —————————————————

              点击阅读原文,更多技术、资讯~

如未说明则本站原创,转载请注明出处:NULL » HQueue:基于HBase的消息队列