其他分享
首页 > 其他分享> > mapreduce案列--求出每天访问的去重用户数、会员数、session数

mapreduce案列--求出每天访问的去重用户数、会员数、session数

作者:互联网

数据类似
在这里插入图片描述数据在百度网盘
链接:https://pan.baidu.com/s/1kIpnSroPntL3ZoswdApKng
提取码:9dgc
输出格式2018-07-12 用户数:400000 会员数:238 session:400001

数据解释:第一列用户访问的ip地址,第二列时间戳,第三列:网址,第四列中:u_ud,u_mid,u_sd分别对应题目中的用户,会员,session

话不多说!!!!来上代码
第一种方法: 没有自定义类

/**
 * 求出每天访问的去重用户数、会员数、session数
 */
public class exam01 {
        public  static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String str = value.toString();
                String[] strings = str.split("\t");
                String time = strings[1];
                //格式化时间戳
                SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd");
                Date date=new Date(Long.parseLong(time)*1000L);
                String time1 = df.format(date);

                String a = strings[3];
                //用???切割
                String[] a1 = a.split("\\?\\?\\?");
                String val = a1[1];
                String[] val1 = val.split("&");
                for(String uuuuu : val1){
                    String res[] = uuuuu.split("=");
                    if(res[0].equals("u_ud")){
                        context.write(new Text(time1),new Text("u_ud"+":"+res[1]));
                    }else if(res[0].equals("u_mid")){
                        context.write(new Text(time1),new Text("u_mid"+":"+res[1]));
                    }else if(res[0].equals("u_sd")){
                        context.write(new Text(time1),new Text("u_sd"+":"+res[1]));
                    }
                }
        }
    }
       public static class MyReducer extends Reducer<Text,Text, Text,Text> {
        TreeSet<String> u_ud= new TreeSet<String>();
        TreeSet<String> u_mid= new TreeSet<String>();
        TreeSet<String> u_sd= new TreeSet<String>();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for(Text s : values){
                String str = s.toString();
                String [] a = str.split(":");
                if(a[0].equals("u_ud")){
                    u_ud.add(a[1]);
                }else if(a[0].equals("u_mid")){
                    u_mid.add(a[1]);
                }else{
                    u_sd.add(a[1]);
                }
            }
            context.write(key,new Text("用户数:"+u_ud.size()+"\t会员数:"+u_mid.size()+"\tsession:"+u_sd.size()));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        BasicConfigurator.configure();
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(exam01.class);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);


        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path("E:/hadoop/examdata/logdata.log"));
        FileOutputFormat.setOutputPath(job, new Path("E:/hadoop/examout/exam01"));
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

第二种:自定义类来输出格式

public class Demo01_userBean implements Writable {
    public String u_ud;
    private String u_mid;
    private String u_sd;

    @Override
    public String toString() {
        return "Demo01_userBean{" +
                "u_ud='" + u_ud + '\'' +
                ", u_mid='" + u_mid + '\'' +
                ", u_sd='" + u_sd + '\'' +
                '}';
    }

    public Demo01_userBean() {
        super();
    }

    public Demo01_userBean(String u_ud, String u_mid, String u_sd) {
        this.u_ud = u_ud;
        this.u_mid = u_mid;
        this.u_sd = u_sd;
    }

    public String getU_ud() {
        return u_ud;
    }

    public void setU_ud(String u_ud) {
        this.u_ud = u_ud;
    }

    public String getU_mid() {
        return u_mid;
    }

    public void setU_mid(String u_mid) {
        this.u_mid = u_mid;
    }

    public String getU_sd() {
        return u_sd;
    }

    public void setU_sd(String u_sd) {
        this.u_sd = u_sd;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        /*if (this.u_ud==null){
            this.u_ud=null;
        }
        if (this.u_mid==null){
            this.u_mid=null;
        }
        if (this.u_sd==null){
            this.u_sd=null;
        }*/
        out.writeUTF(this.u_ud);
        out.writeUTF(this.u_mid);
        out.writeUTF(this.u_sd);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.u_ud = in.readUTF();
        this.u_mid = in.readUTF();
        this.u_sd = in.readUTF();
    }
}

package com.ali.TwoTest;

import com.ali.mapreduce.WordCountTest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;

public class Demo01_user {
    public static class MyMapper extends Mapper<LongWritable,Text, Text,Demo01_userBean> {
        public Text k = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            Map<String,String> map = new HashMap<String,String>();
            String[] split = value.toString().split("\t");
            String[] res = split[3].split("&");
            for (String s : res){
                String[] str = s.split("=");
                map.put(str[0],str[1]);
            }
            String u_ud = map.get("u_ud");
            String u_mid = map.get("u_mid");
            String u_sd = map.get("u_sd");
            Demo01_userBean bean = new Demo01_userBean();
            if (u_ud != null){
                bean.setU_ud(u_ud);
            }else {
                bean.setU_ud(null);
            }
            if (u_mid != null){
                bean.setU_mid(u_mid);
            }else {
                bean.setU_mid(null);
            }
            if (u_sd != null){
                bean.setU_sd(u_sd);
            }else {
                bean.setU_sd(null);
            }
            String dateTime = new SimpleDateFormat("yyyy-MM-dd").format(new Date(Long.valueOf(split[1] + "000")));
            k.set(dateTime);
            context.write(k,bean);
        }
    }
    public static class MyReduce extends Reducer<Text,Demo01_userBean,Text,Text> {

        @Override
        protected void reduce(Text key, Iterable<Demo01_userBean> values, Context context) throws IOException, InterruptedException {
             TreeSet<String> set1 = new TreeSet<String>();
             TreeSet<String> set2 = new TreeSet<String>();
             TreeSet<String> set3 = new TreeSet<String>();
            for (Demo01_userBean du : values){
                String u_ud = du.getU_ud();
                String u_mid = du.getU_mid();
                String u_sd = du.getU_sd();
                if (u_ud!=null){
                    set1.add(u_ud);
                }
                if (u_mid!=null){
                    set2.add(u_mid);
                }
                if (u_sd!=null){
                    set3.add(u_sd);
                }
            }
            int udNum = set1.size();
            int midNum = set2.size();
            int sdNum = set3.size();
            String res = "用户数为:"+udNum+"  会员数为:"+midNum+"  session数为:"+sdNum;
            context.write(key,new Text(res));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"test01");
        job.setJarByClass(Demo01_user.class);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Demo01_userBean.class);

        job.setReducerClass(MyReduce.class);
      job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Text.class);

        //可以指定参数输入
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //提交程序并且 监控打印job的日志
        boolean b = job.waitForCompletion(true);
        int n = b?0:1;
        System.exit(0);
    }
}

标签:String,job,mid,mapreduce,案列,session,new,ud,sd
来源: https://blog.csdn.net/weidajiangjiang/article/details/100627903