directDstream是什么-古蔺大橙子建站
RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
directDstream是什么

这篇文章主要为大家展示了“direct Dstream是什么”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“direct Dstream是什么”这篇文章吧。

创新互联建站专注于企业营销型网站建设、网站重做改版、招远网站定制设计、自适应品牌网站建设、H5场景定制成都商城网站开发、集团公司官网建设、成都外贸网站建设公司、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为招远等各大城市提供网站开发制作服务。

前言

前面,有分享过基于receiver的,实际上,看到receiver based Dstream大家就对阅读提不起兴趣了,实际上这是错误的,基于receiver的才是spark streaming根本,虽然direct stream才更合适。但是,我们从基于receiver可以学到很多内容,最重要的spark streaming实现原理,数据本地性等。

direct dstream运行架构图

direct Dstream是什么

对比

对比receiver based的Dstream和direct Dstream

   a无需启动receiver,减少不必要的cpu占用

   b减少了receiver接收数据,写入blockmanager,然后运行时再通过blockid,网络传输,磁盘读区,来获取数据这个过程。提升了效率。

   c无需wal,进一步减少磁盘读写。

   d可以通过手动维护offset来实现精确的一次消费。

   eDstream中生成的RDD,并不是blockrdd,而是kafkardd,kafkardd是和kafka分区一一对应的,更便于我们把控并行度。

   f数据本地性的问题,导致receiver存在的机器会运行过多的任务,会导致有些executor空闲。

而kafkardd,在compute函数里,会使用simpleconsumer,根据指定的topic,分区,offset范围,去kafka读取数据。010版本以后,又存在假如kafka和spark运行于同一集群,会有数据本性的概念。

数据本地性

spark streaming与kafka 082结合生成的rdd,数据本地性计算方式如下:

override def getPreferredLocations(thePart: Partition): Seq[String] = {
 val part = thePart.asInstanceOf[KafkaRDDPartition]
 // TODO is additional hostname resolution necessary here
 Seq(part.host)
}

spark streaming 与kafka 010结合生成的rdd,数据本地性计算方式如下:

override def getPreferredLocations(thePart: Partition): Seq[String] = {
 // The intention is best-effort consistent executor for a given topicpartition,
 // so that caching consumers can be effective.
 // TODO what about hosts specified by ip vs name
 val part = thePart.asInstanceOf[KafkaRDDPartition]
 val allExecs = executors()
 val tp = part.topicPartition
 val prefHost = preferredHosts.get(tp)
 val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost)
 val execs = if (prefExecs.isEmpty) allExecs else prefExecs
 if (execs.isEmpty) {
   Seq.empty
 } else {
   // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
   val index = Math.floorMod(tp.hashCode, execs.length)
   val chosen = execs(index)
   Seq(chosen.toString)
 }
}

对于 与kafka010结合的注意事项,实际上以前浪尖也翻译过一篇文章。

必读:Spark与kafka010整合

限速

限速,很多人使用姿势不对,详细的原理可以参看

Spark的PIDController源码赏析及backpressure详解

具体配置参数详解,可以参考:

  1. spark.streaming.backpressure.enabled 默认是false,设置为true,就开启了背压机制。

  2. spark.streaming.backpressure.initialRate 默认没设置,初始速率。第一次启动的时候每个receiver接受数据的最大值。

  3. spark.streaming.receiver.maxRate 默认值没设置。每个接收器将接收数据的最大速率(每秒记录数)。 实际上,每个流每秒最多将消费此数量的记录。 将此配置设置为0或负数将不会对速率进行限制。

  4. spark.streaming.kafka.maxRatePerPartition 使用新Kafka direct API时从每个Kafka分区读取数据的最大速率(每秒记录数)。

以上是“direct Dstream是什么”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!


分享名称:directDstream是什么
文章地址:http://scgulin.cn/article/igjhjd.html