概述

SparkStreaming 是用于流式数据的处理。数据输入后可以用高级抽象原语(就是 SparkCore 中的算子,这里只是为了区分),如 map、reduce、window 等进行计算。

SparkStreaming 使用离散化流(discretized stream)作为抽象表示(DStream)。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,是对 RDD 在实时数据处理场景的一种封装。

为了更好的协同数据接收速率和资源处理能力,SparkStreaming 引入了背压机制(Spark Streaming Backpressuere):根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。

WordCount 案例

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))

// 获取端口数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordToOne: DStream[(String, Int)] = words.map((_, 1))
val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)
wordToCount.print()

// 由于 SparkStreaming 采集器是长期执行的任务,所以不能直接关
// 如果 main 方法执行完毕,应用也会自动结束,所以不能让 main 方法关闭
// ssc.stop()
// 1. 启动采集器
ssc.start()
// 2. 等待采集器关闭
ssc.awaitTermination()

运行程序,然后在命令行中打开 9999 端口 nc -l 9999,并输入数据。

Stream 创建

RDD 队列

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))

// 创建 RDD 队列
val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()

// 创建 QueueInputDStream
val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)

// 处理队列中的 RDD 数据
val mappedStream: DStream[(Int, Int)] = inputStream.map((_, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)

reducedStream.print()

ssc.start()

for (i <- 1 to 5) {
  rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
  Thread.sleep(2000)
}

ssc.awaitTermination()

自定义数据源

需要继承 Receiver,并实现 onStartonStop 方法来自定义数据源采集。

def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  val ssc = new StreamingContext(sparkConf, Seconds(3))

  val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
  messageDS.print()

  ssc.start()
  ssc.awaitTermination()
}


/*
  自定义数据采集器
  1. 继承Receiver,定义泛型, 传递参数
  2. 重写方法
 */
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  private var flag = true

  override def onStart(): Unit = {
    new Thread(new Runnable {
      override def run(): Unit = {
        while (flag) {
          val message: String = "采集的数据为:" + new Random().nextInt(10).toString
          // 存储数据,底层自动封装为指定的StorageLevel.MEMORY_ONLY
          store(message)
          Thread.sleep(500)
        }
      }
    }).start()
  }

  override def onStop(): Unit = {
    flag = false
  }
}

Kafka 数据源

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.1</version>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.10.1</version>
</dependency>
// 定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka1:9001,kafka2:9002,kafka3:9003",
  ConsumerConfig.GROUP_ID_CONFIG -> "groupId",
  "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
  "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)

// 读取 Kafka 数据创建 DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
    // 由哪个 Executor 负责采集数据,由框架自己选择
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](Set("topicName"), kafkaPara))

// 将kafka每条消息的 value 取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

// 计算 WordCount
valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

DStream 转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operation(输出)两种,此外转换操作还有一些比较特殊的原语,如 updateStateByKey()、transform() 和各种 window 原语。

与 SparkCore 转化操作不同的是 DStream 转换操作有状态的概念。看是否保存了某个周期的计算结果,如果保存了就是有状态,如果不保存就是无状态。

无状态转换

无状态转换操作就是把简单的 RDD 转换操作应用到每个批次上,也就是转换 DStream 中的每个 RDD。

// 使用有状态操作,需要设定检查点路径
ssc.checkpoint("checkpoint")

val datas = ssc.socketTextStream("localhost", 9999)
val wordToOne = datas.map((_, 1))
// 无状态数据操作,只对当前的采集周期内的数据进行处理
// 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
val wordToCount = wordToOne.reduceByKey(_ + _)

// updateStateByKey:根据 Key 对数据的状态进行更新
// 传递的参数中含有两个值
// 第一个值:相同 Key 的 Value 数据
// 第二个值:缓冲区相同 Key 的 Value 数据
val state = wordToOne.updateStateByKey(
  (seq: Seq[Int], buff: Option[Int]) => {
    val newCount = buff.getOrElse(0) + seq.sum
    Option(newCount)
  }
)

state.print()
// wordToCount.print()

Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过这个函数可以扩展 Spark API,这个函数每一批次调度一次。

// transform方法可以将底层RDD获取到后进行操作
// transform使用场景
// 1. DStream功能不完善
// 2. 需要代码周期性的执行

// Code : Driver端
val newDS: DStream[String] = lines.transform(
  rdd => {
    // Code : Driver端,(周期性执行)
    rdd.map(
      str => {
        // Code : Executor端
        str
      }
    )
  }
)
// Code : Driver端
val newDS1: DStream[String] = lines.map(
  data => {
    // Code : Executor端
    data
  }
)

Join

两个流之间的 Join 需要两个流的批次大小一致,这样才能做到同时触发。计算过程就是对当前批次的两个流中各自的 RDD 进行 Join,与两个 RDD 的 Join 效果相同。

val data9999 = ssc.socketTextStream("localhost", 9999)
val data8888 = ssc.socketTextStream("localhost", 8888)

val map9999: DStream[(String, Int)] = data9999.map((_,9))
val map8888: DStream[(String, Int)] = data8888.map((_,8))

// 所谓的DStream的Join操作,其实就是两个RDD的join
val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
joinDS.print()

有状态转换

Window Operations

可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Streaming 的允许状态。

  • 窗口时长:计算内容的时间范围
  • 滑动步长:隔多久触发一次

这两个参数都必须是采集周期的整数倍。

val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_, 1))

// 窗口的范围应该是采集周期的整数倍
// 窗口是可以滑动的,但是默认情况下,以一个采集周期进行滑动
// 为了避免重复数据的计算,可以改变滑动的幅度(第二个参数)
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))

val wordToCount = windowDS.reduceByKey(_ + _)
wordToCount.print()
// 设置检查点
ssc.checkpoint("checkpoint")

val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))

// reduceByKeyAndWindow : 当窗口范围比较大,但是滑动幅度比较小,那么可以采用增加数据和删除数据的方式
// 无需重复计算,提升性能。
val windowDS: DStream[(String, Int)] =
wordToOne.reduceByKeyAndWindow(
  (x:Int, y:Int) => { x + y}, // 窗口中增加的数据
  (x:Int, y:Int) => {x - y}, // 窗口中减少的数据
  Seconds(9), Seconds(3))

windowDS.print()

原文地址:http://www.cnblogs.com/fireonfire/p/16819874.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性