必威体育Betway必威体育官网
当前位置:首页 > IT技术

3333

时间:2019-06-07 20:44:09来源:IT技术作者:seo实验室小编阅读:62次「手机版」
 

3333

package bigdata.spark.SparkStreaming.kafka010

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
1, 复制配置文件到 resource目录下
2,配置环境变量
3,spark-defaults.conf
 */
object kafka010QuickStart {

   def main(args: Array[String]) {
      //    创建一个批处理时间是2s的context 要增加环境变量
      val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
        .set("yarn.resourcemanager.hostname", "mt-mdh.local")
        .set("spark.executor.instances","2")
        .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
          ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
          ,"/opt/jars/kafka-clients-0.10.2.2.jar"
          ,"/opt/jars/kafka_2.11-0.10.2.2.jar"))
      val ssc = new StreamingContext(sparkConf, Seconds(5))


      //    使用broker和topic创建DirectStream
      val topicsSet = "test".split(",").toSet
      val kafkaParams = Map[String, Object]("bootstrap.servers" -> "mt-mdh.local:9093",
        "key.deserializer"->classOf[StringDeserializer],
        "value.deserializer"-> classOf[StringDeserializer],
        "group.id"->"test",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit"->(false: java.lang.Boolean))

      val messages = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

      // Get the lines, split them into words, count the words and print
      val lines = messages.map(_.value)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
      wordCounts.print()

      //    启动流
      ssc.start()
      ssc.awaitTermination()
    }
}

相关阅读

分享到:

栏目导航

推荐阅读

热门阅读