添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); props.setProperty( "metadata.broker.list","localhost:9092" ); props.setProperty( "serializer.class","kafka.serializer.StringEncoder" ); props.put( "request.required.acks","1" ); ProducerConfig config = new ProducerConfig(props); // 创建生产这对象 Producer<String, String> producer = new Producer<String, String> (config); // 生成消息 KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("top1","test kafka" ); KeyedMessage <String, String> data2 = new KeyedMessage<String, String>("top2","hello world" ); try { int i =1 ; while (i < 100 ){ // 发送消息 producer.send(data1); producer.send(data2); i ++ ; Thread.sleep( 1000 ); } catch (Exception e) { e.printStackTrace(); producer.close();
  • 在SparkStreaming中接收指定话题的数据,对单词进行统计
  • package streaming;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.regex.Pattern;
    import org.apache.spark.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.*;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import scala.Tuple2;
    import com.google.common.collect.Lists;
    public class KafkaStreamingWordCount {
        public static void main(String[] args) {
            //设置匹配模式,以空格分隔
            final Pattern SPACE = Pattern.compile(" ");
            //接收数据的地址和端口
            String zkQuorum = "localhost:2181";
            //话题所在的组
            String group = "1";
            //话题名称以“,”分隔
            String topics = "top1,top2";
            //每个话题的分片数
            int numThreads = 2;        
            SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
    //        jssc.checkpoint("checkpoint"); //设置检查点
            //存放话题跟分片的映射关系
            Map<String, Integer> topicmap = new HashMap<>();
            String[] topicsArr = topics.split(",");
            int n = topicsArr.length;
            for(int i=0;i<n;i++){
                topicmap.put(topicsArr[i], numThreads);
            //从Kafka中获取数据转换成RDD
            JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
            //从话题中过滤所需数据
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
                @Override
                public Iterable<String> call(Tuple2<String, String> arg0)
                        throws Exception {
                    return Lists.newArrayList(SPACE.split(arg0._2));
            //对其中的单词进行统计
            JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                  new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                      return new Tuple2<String, Integer>(s, 1);
                  }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                      return i1 + i2;
            //打印结果
            wordCounts.print();
            jssc.start();
            jssc.awaitTermination();