导航菜单
首页 >  spark实时数据分析与可视化考试  > 基于Spark Streaming对新闻网站项目案例分析

基于Spark Streaming对新闻网站项目案例分析

目录一、需求分析二、数据准备(1)数据格式(2)基于Java开发实时数据生成器 三、实施过程

一、需求分析

新闻网站需求:

pvuv注册用户数热门板块

数据处理流程:

数据源 -> kafka -> spark streaming 二、数据准备 (1)数据格式

网站日志格式 :

date,timestamp,userid,pageid,section,action

日志字段说明:

date: 日期,yyyy-MM-dd格式 timestamp: 时间戳 userid: 用户id pageid: 页面id section: 版块 action: 用户行为,两类,点击页面和注册

数据展示:

2020-12-20 1608451521565 364 422 fashion view2020-12-20 1608451521565 38682 708 aviation view2020-12-20 1608451521565 65444 270 internet view2020-12-20 1608451521565 4805 250 tv-show view2020-12-20 1608451521565 1130 743 movie view2020-12-20 1608451521565 85320 605 carton view2020-12-20 1608451521565 null 581 movie view2020-12-20 1608451521565 null null null register

kafka消费者启动:

bin/kafka-console-consumer.sh --bootstrap-server bigdata-pro-m04:9092 --topic spark (2)基于Java开发实时数据生成器

这里生成的实时数据流,生成的传递给kafka,每隔1秒随机生成1000条数据。

package com.kfk.spark.news_analysis_project;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Properties;import java.util.Random;/** * 访问日志Kafka Producer * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/12 * @time : 7:51 下午 */public class AccessProducer extends Thread{private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");private static String date;// 版块内容private static String[] sections = new String[] {"country", "international", "sport","entertainment", "movie", "carton","tv-show", "technology", "internet","car", "military", "funny","fashion", "aviation", "government"};private static Random random = new Random();private static int[] newOldUserArr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};private Producer producer;private String topic;/** * 构造函数 * @param topic */public AccessProducer(String topic){this.topic = topic;producer = new KafkaProducer(createProducerConfig());date = simpleDateFormat.format(new Date());}/** * createProducerConfig * @return */public Properties createProducerConfig(){Properties properties = new Properties();properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", "bigdata-pro-m04:9092");return properties;}@Overridepublic void run(){int counter = 0;while (true){// 生成1000条访问数据for (int i = 0;i map -> JavaDStream */JavaDStream accessDstream = stream.map(new Function() {@Overridepublic String call(ConsumerRecord v1) throws Exception {return v1.value();}});/** * accessDStream -> filter -> action(view) */JavaDStream filterDstream = accessDstream.filter(new Function() {@Overridepublic Boolean call(String v1) throws Exception {String[] lines = v1.split(" ");String action = lines[5];String actionValue = "view";if (actionValue.equals(action)){return true;} else {return false;}}});// 求网页的pvcalculatePagePV(filterDstream);// 求网页的uvcalculatePageUV(filterDstream);// 求注册用户数calculateRegistercount(accessDstream);// 求热门板块calculateUserSectionPV(accessDstream);jssc.start();jssc.awaitTermination();}/** * 求网页的pv * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * filterDstream -> mapToPair -> -> reduceByKey ->* * @param filterDstream */public static void calculatePagePV(JavaDStream filterDstream){/** * filterDstream -> mapToPair ->*/JavaPairDStream pairDstream = filterDstream.mapToPair(new PairFunction() {@Overridepublic Tuple2 call(String lines) throws Exception {String[] line = lines.split(" ");return new Tuple2(line[0] + "_" + line[3], 1);}});/** * pairDstream -> reduceByKey ->*/JavaPairDStream pvStream = pairDstream.reduceByKey(new Function2() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}});pvStream.print();/** * (2020-12-21_16,1) * (2020-12-21_548,1) * (2020-12-21_881,1) * (2020-12-21_27,1) * (2020-12-21_771,1) * (2020-12-21_344,2) * (2020-12-21_313,1) * (2020-12-21_89,1) * (2020-12-21_14,1) * (2020-12-21_366,1) * ... */}/** * 求网页的uv * input data: * 2020-12-20 1608451521565 364 422 fashion view * 2020-12-20 1608451521565 364 422 fashion view * 2020-12-20 1608451521565 365 422 fashion view * 2020-12-20 1608451521565 366 422 fashion view * 2020-12-20 1608451521565 367 422 fashion view * 2020-12-20 1608451521565 367 453 fashion view * * 数据演化过程: * 第一步:map * (2020-12-20,364,422) * (2020-12-20,364,422) * (2020-12-20,365,422) * (2020-12-20,366,422) * (2020-12-20,367,422) * (2020-12-20,367,453) * * 第二步:rdd -> distinct * (2020-12-20,364,422) * (2020-12-20,365,422) * (2020-12-20,366,422) * (2020-12-20,367,422) * (2020-12-20,367,453) * * 第三步:mapToPair ****** * 第四步:reduceByKey *** * @param filterDstream */public static void calculatePageUV(JavaDStream filterDstream){/** * filterDstream -> map -> (2020-12-20,364,422) */JavaDStream mapDstream = filterDstream.map(new Function() {@Overridepublic String call(String lines) throws Exception {String[] line = lines.split(" ");return line[0] + "," + line[2] + "," + line[3];}});/** * mapDstream -> distinct */JavaDStream distinctDstream = mapDstream.transform(new Function() {@Overridepublic JavaRDD call(JavaRDD lines) throws Exception {return lines.distinct();}});/** * distinctDstream -> mapToPair ->*/JavaPairDStream pairDstream = distinctDstream.mapToPair(new PairFunction() {@Overridepublic Tuple2 call(String lines) throws Exception {String[] line = lines.split(",");return new Tuple2(line[0] + "_" + line[2], 1);}});/** * pairDstream -> reduceByKey ->*/JavaPairDStream uvStream = pairDstream.reduceByKey(new Function2() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}});uvStream.print();/** * (2020-12-21_492,1) * (2020-12-21_85,2) * (2020-12-21_18,1) * (2020-12-21_27,2) * (2020-12-21_825,1) * (2020-12-21_366,1) * (2020-12-21_89,1) * (2020-12-21_14,2) * (2020-12-21_69,1) * (2020-12-21_188,1) * ... */}/** * 求注册用户数:过滤出action=register的数据就可以 * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * accessDStream -> filter -> action(register) -> mapToPair -> reduceByKey * * @param accessDstream */public static void calculateRegistercount(JavaDStream accessDstream){/** * accessDStream -> filter -> action(register) */JavaDStream filterDstream = accessDstream.filter(new Function() {@Overridepublic Boolean call(String v1) throws Exception {String[] lines = v1.split(" ");String action = lines[5];String actionValue = "register";if (actionValue.equals(action)){return true;} else {return false;}}});/** * filterDstream -> mapToPair ->*/JavaPairDStream pairDstream = filterDstream.mapToPair(new PairFunction() {@Overridepublic Tuple2 call(String lines) throws Exception {String[] line = lines.split(" ");return new Tuple2(line[0] + "_" + line[5], 1);}});/** * pairDstream -> reduceByKey ->*/JavaPairDStream registerCountStream = pairDstream.reduceByKey(new Function2() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}});registerCountStream.print();/** * (2020-12-21_register,11) */}/** * 求出热门板块 * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * filterDstream -> mapToPair -> -> reduceByKey ->* * @param filterDstream */public static void calculateUserSectionPV(JavaDStream filterDstream){/** * filterDstream -> mapToPair ->*/JavaPairDStream pairDstream = filterDstream.mapToPair(new PairFunction() {@Overridepublic Tuple2 call(String lines) throws Exception {String[] line = lines.split(" ");return new Tuple2(line[0] + "_" + line[4], 1);}});/** * pairDstream -> reduceByKey ->*/JavaPairDStream pvStream = pairDstream.reduceByKey(new Function2() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}});pvStream.print();/** * (2020-12-21_internet,16) * (2020-12-21_military,24) * (2020-12-21_aviation,21) * (2020-12-21_carton,19) * (2020-12-21_government,25) * (2020-12-21_tv-show,19) * (2020-12-21_country,14) * (2020-12-21_movie,13) * (2020-12-21_international,16) * ... */}}

这里没有做过多的业务分析,可以根据前面所学的知识进行扩展,将算子灵活运用组合起来!

以上内容仅供参考学习,如有侵权请联系我删除! 如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。 您的鼓励就是博主最大的动力!

相关推荐: