ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

实时--业务数据分析

2019-05-12 16:50:13  阅读:444  来源: 互联网

标签:数据分析 canal String keyword gmall 业务 实时 value type


业务数据部分:

mysql ---->canal---->kafka----->sparkStreaming(充当ETL)--->ES--->SpringBoot接口--->页面展示

单日订单量及收入

1. 搭建利用canal对mysql中的数据实时监控

canal的安装和部署:https://www.cnblogs.com/shengyang17/p/10834781.html 

2. 利用Java程序获得canal数据

创建gmall-canal模块

  搭建利用canal对mysql中的数据实时监控: 前提是canal服务端要一直打开这;

  搭建gmall-canal的maven模块: 利用Java程序获得canal数据:①CanalClient交互性连接,  ②handler是处理具体的业务; ③发送到Kafka

Canal客户端:CanalClient.java

public class CanalClient {
    public static void main(String[] args) {
        //连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop101", 11111), "example", "", "");
        while (true){
            canalConnector.connect();
            canalConnector.subscribe("gmall.order_info");//订阅
            Message message = canalConnector.get(100); //message:一次canal从日志中抓取的信息,一个message包含多个sql; 
            int size = message.getEntries().size();

            if (size == 0){
                System.out.println("没有数据休息5s");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else {
                for (CanalEntry.Entry entry : message.getEntries()) { //entry:相当于一个sql命令,一个sql可能会对多行记录造成影响:
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA){ // ①type用于区分是数据变化,还是事务变化
                        CanalEntry.RowChange rowChange = null;
                        try {
                           rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //②storevalue得到rowchange:entry经过反序列化得到的对象,包含了多行记录的变化值
                        } catch (InvalidProtocolBufferException e) {  //②.1 eventtype数据的变化类型  insert  update  delete  create  alter  drop;②.2 rowdatalist
                            e.printStackTrace();
                        }
                        String tableName = entry.getHeader().getTableName();    // ③header.tableName 表名
                        CanalEntry.EventType eventType = rowChange.getEventType(); //insert update delete
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();//行集, 数据
                        CanalHandler.handler(tableName, eventType, rowDatasList);
                        //RowDatas:一个rowchange里包含的数据变化集,其中每一个rowdata里面包含了一行的多个字段;包含afterColumnList和beforeColumnList
                        //column: 一个RowData里包含了多个column,每个column包含了 name和 value: columnName和columnValue
                    }

                }
            }

        }
    }
}
View Code

CanalHander.java

public class CanalHandler {
    public static void handler(String tableName, CanalEntry.EventType eventType, List<CanalEntry.RowData> rowDataList){
        //下单操作
        if ("order_info".equals(tableName) && CanalEntry.EventType.INSERT == eventType){
            //遍历行集
            for (CanalEntry.RowData rowData : rowDataList) {
                List<CanalEntry.Column> columnsList = rowData.getAfterColumnsList();
                JSONObject jsonObject = new JSONObject();
                for (CanalEntry.Column column : columnsList) {
                    System.out.println(column.getName() + ":" + column.getValue());
                    String propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());
                    jsonObject.put(propertyName, column.getValue());
                }
                MyKafkaSender.send(GmallConstant.KAFKA_TOPIC_ORDER, jsonObject.toJSONString());

            }
        }
    }
}
View Code

MyKafkaSender.java

public class MyKafkaSender {

    public static KafkaProducer<String, String> kafkaProducer = null;
    public static KafkaProducer<String, String> createKafkaProducer(){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = null;

        try {
            producer = new KafkaProducer<String, String>(properties);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return producer;
    }
    public static void send(String topic, String msg){
        if (kafkaProducer == null){
            kafkaProducer = createKafkaProducer();
        }
        kafkaProducer.send(new ProducerRecord<String, String>(topic, msg));
    }
}
View Code

3. sparkstreaming消费kafka并保持到ES中

 

OrderAPP.java   从kafka获取数据到sparkStreaming中并做处理(对手机号做脱敏处理)且写入到ES中,:

object OrderApp {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("gmall").setMaster("local[*]")
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val inputDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstant.KAFKA_TOPIC_ORDER, streamingContext)
/*    inputDStream.map(_.value()).foreachRDD{ rdd =>
      println(rdd.collect().mkString("\n")) //测试数据是否收集到
    }*/

    val orderInfoDStream: DStream[OrderInfo] = inputDStream.map {
      _.value()
    }.map { orderJson =>
      val orderInfo: OrderInfo = JSON.parseObject(orderJson, classOf[OrderInfo])
      //转变为Json对象
      val createTimeArr: Array[String] = orderInfo.createTime.split(" ")
      orderInfo.createDate = createTimeArr(0) //2019-05-03
      val timeArr: Array[String] = createTimeArr(1).split(":") //
      orderInfo.createHour = timeArr(0) //小时 02
      orderInfo.createHourMinute = timeArr(0) + ":" + timeArr(1)
      //02:54
      //收件人 电话 脱敏  //切完之后元组
      orderInfo.consigneeTel = "*******" + orderInfo.consigneeTel.splitAt(7)._2 //OrderInfo中consigneeTel应该是var可变得
      orderInfo
    }
    //保存到ES中
    orderInfoDStream.foreachRDD{rdd =>
      rdd.foreachPartition{orderItr: Iterator[OrderInfo] =>
        val list: List[OrderInfo] = orderItr.toList
        MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_ORDER, list)
      }
    }
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
View Code

 

创建indexmapping结构
PUT gmall_order 
{
  "mappings" : {
    "_doc" : {
      "properties" : {
        "provinceId" : {
          "type" : "keyword"
        },
        "consignee" : {
          "type" : "keyword",
          "index":false
        },
        "consigneeTel" : {
          "type" : "keyword",
          "index":false
        },
        "createDate" : {
          "type" : "keyword"
        },
        "createHour" : {
          "type" : "keyword"
        },
        "createHourMinute" : {
          "type" : "keyword"
        },
        "createTime" : {
          "type" : "keyword"
        },
        "deliveryAddress" : {
          "type" : "keyword"
        },
        "expireTime" : {
          "type" : "keyword"
        },
        "id" : {
          "type" : "keyword"
        },
        "imgUrl" : {
          "type" : "keyword",
          "index":false
        },
        "operateTime" : {
          "type" : "keyword"
        },
        "orderComment" : {
          "type" : "keyword",
          "index":false
        },
        "orderStatus" : {
          "type" : "keyword"
        },
        "outTradeNo" : {
          "type" : "keyword",
          "index":false 
        },
        "parentOrderId" : {
          "type" : "keyword" 
        },
        "paymentWay" : {
          "type" : "keyword"
        },
        "totalAmount" : {
          "type" : "double"
        },
        "trackingNo" : {
          "type" : "keyword"
        },
        "tradeBody" : {
          "type" : "keyword",
          "index":false
        },
        "userId" : {
          "type" : "keyword"
        }
      }
    }
  }
}

  =========>>>
  {
    "acknowledged" : true,
    "shards_acknowledged" : true,
    "index" : "gmall_order"
  }

4. 从ES中查询数据,并根据接口发布出来

在Kibana中查询

##########做查询聚合操作###########

##当日总交易金额
GET gmall_order/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "createDate": "2019-05-12"
        }
      }
    }
  },
  "aggs": {
    "sum_totalAmount": {
      "sum": {
        "field": "totalAmount"
      }
    }
  }
}
==============>>>
 xxxxxxxxxxxx
  "aggregations" : {
    "sum_totalAmount" : {
      "value" : 15989.0
    }
  }

##分时交易;分组再聚合
GET gmall_order/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "createDate": "2019-05-12"
        }
      }
    }
  },
  "aggs": {
    "groupby_createHour": {
      "terms": {
        "field": "createHour",
        "size": 24
      },
      "aggs": {
        "sum_totalAmount": {
          "sum": {
            "field": "totalAmount"
          }
        }
      }
    }
  }
}

========>>>
xxxx
 "aggregations" : {
    "groupby_createHour" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "11",
          "doc_count" : 5,
          "sum_totalAmount" : {
            "value" : 1658.0
          }
        },
        {
          "key" : "10",
          "doc_count" : 3,
          "sum_totalAmount" : {
            "value" : 1936.0
          }
        },
        {
          "key" : "12",
          "doc_count" : 3,
          "sum_totalAmount" : {
            "value" : 1199.0
          }
        },
        {
          "key" : "21",
          "doc_count" : 3,
          "sum_totalAmount" : {
            "value" : 2245.0
          }
        },
        xxxxx

启动SpringBoot的主类: com.atguigu.gmall.publisher.GmallPublisherApplication

在浏览器中访问:

新增交易额: http://127.0.0.1:8070/realtime-total?date=2019-05-12

[{"name":"新增日活","id":"dau","value":0},{"name":"新增设备","id":"new_mid","value":233},{"name":"新增交易额","id":"order_amount","value":15989.0}]

分时:浏览器中访问:http://127.0.0.1:8070/realtime-hour?id=order_amount&&date=2019-05-12

{"yesterday":
  {"00":951.0,"11":1594.0,"22":707.0,"01":2682.0,"12":577.0,"02":1519.0,"13":1478.0,"03":1125.0,"14":1064.0,"04":1487.0,"15":1126.0,"05":517.0,"16":298.0,"06":1785.0,"07":1575.0,"18":463.0,"09":463.0,"20":691.0,"10":2327.0,"21":2283.0},
"today":
  {"11":1658.0,"22":1867.0,"12":1199.0,"13":520.0,"03":233.0,"04":1306.0,"15":495.0,"16":1461.0,"05":829.0,"06":811.0,"07":122.0,"18":171.0,"19":1136.0,"10":1936.0,"21":2245.0}}

5. 启动程序,页面展示 

启动:gmall-publisher--springBoot的主类: com.atguigu.gmall.publisher.GmallPublisherApplication,给chart的接口,启动

 启动:gmall--dw-chart---com.demo.DemoApplication的主类; 接接口展示数据的动态变化

启动:gmall-canal的com.atguigu.gmall.client.CanalClient的客户端,实时监控mysql的变化;

启动:gmall-realtime的com.atguigu.gmall.realtime.app.OrderApp,从kafka中获取数据并实时存到ES中

 



 

标签:数据分析,canal,String,keyword,gmall,业务,实时,value,type
来源: https://www.cnblogs.com/shengyang17/p/10836027.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有