MapReduce编程-搜索引擎日志分析

搜索引擎一般会有一个日志文件来记录所有用户的查询,当有一个用户使用搜索引擎进行搜索时,日志文件会记录这样一条记录:(搜索时间、搜索关键字、用户IP)。由于搜索引擎用户量较大,这个日志文件往往很大,一般可以存放在分布式文件系统中,例如HDFS中。分析这个日志文件,我们可以得到搜索引擎在近一段时间内的热点词(即搜索较多的词,例如一天内搜索次数最多的50个关键词,也即日志文件中出现次数最多的50个搜索关键字)。

请使用HDFS接口来自动生成这样一个日志文件,日志文件中每行的内容要求是如下格式:

---------------------------------------------
   日期      时间      关键词     IP地址
---------------------------------------------
2011-10-26 06:11:35    云计算   210.77.23.12
2011-10-26 06:11:45    Hadoop   210.77.23.12
---------------------------------------------

注意:记录中日期,时间,IP地址必须是有效的,即不能出现-1.-1.-1.-2这样的IP地址(随机数即可产生有效的IP)。关键词可以是任意词,不需要保证必须有意义,只需要是一个字符串即可,中英文都可以(可以随机产生)。

编写MapReduce程序对日志文件做分析,找出日志文件中的热点词:(1)所有日志中出现次数最多的50个关键词;(2)特定日期区间日志中出现次数最多的50个关键词;(3)特定IP地址区间(假定前3段相同,区间在4段中设定)日志中出现次数最多的50个关键词。

要求将结果输出在文件中,文件中的每行记录是:关键词 出现次数。文件从前到后,关键词是按次数从多到小排序的,例如:

云计算 5980
并行计算 5976
分布式 5878

原码如下:

Log.java

package ac.ucas.SearchLogAnalyze;

import java.lang.Math;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Writer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;

public class Log {
	private String path = "";		//日志文件路径
	private long numOfRecord = 10;	//日志条目数目
	
	public String getPath() {
		return path;
	}
	public void setPath(String path) {
		this.path = path;
	}
	public long getNumOfRecord() {
		return numOfRecord;
	}
	public void setNumOfRecord(long numOfRecord) {
		this.numOfRecord = numOfRecord;
	}
	//*生成随机字符,包括数字和大小写字母*/
	private char randomChar(){
		int tmp = 0;
		do{
			tmp = (int)(Math.random() * 122);
		}while( !( (48 <= tmp && tmp <= 57) || (65 <= tmp && tmp <= 90) || (97 <= tmp && tmp <= 122) ) );
		return (char)tmp;
	}
	
	//*随机生成IP地址*/
	private String randomIp(){
		String ip = null;
		ip = new Integer((int)(Math.random() * 128)).toString() + "." +
				new Integer((int)(Math.random() * 128)).toString() + "." +
				new Integer((int)(Math.random() * 128)).toString() + "." +
				new Integer((int)(Math.random() * 128)).toString();
		return ip;
	}
	
	//*随机生成一个检索记录*/
	private String generateRecord(){
		String result = "";
		int year = 2000 + (int)(Math.random() * 14);	//起始年份2000年
		int month = 1 + (int)(Math.random() * 11);
		int day = 1 + (int)(Math.random() * 29);		//没有考虑个别月份有31号的情况
		int hour = (int)(Math.random() * 23);
		int minute = (int)(Math.random() * 59);
		int second = (int)(Math.random() * 59);
		
		String sYear = new Integer(year).toString();
		String sMonth = (month < 10 ? "-0" : "-") + new Integer(month).toString();
		String sDay = (day < 10 ? "-0" : "-") + new Integer(day).toString();
		String sHour = (hour < 10 ? " 0" : " ") + new Integer(hour).toString();
		String sMinute = (minute < 10 ? ":0" : ":") + new Integer(minute).toString();
		String sSecond = (second < 10 ? ":0" : ":") + new Integer(second).toString();
		
		String query = "";
		int lengthOfQuery = (int)(Math.random() * 4 + 1);	//随机生成搜索词长度,1-5
		for(int i = 0; i < lengthOfQuery; i++)
		{
			char tmp = randomChar();
			query += tmp;
		}
		result = sYear + sMonth + sDay + sHour + sMinute + sSecond + " " + query + " " + randomIp();
		return result;
	}
	
	//*将随机生成的检索记录写入日志文件*/
	public void logWriter() throws IOException{
		FileSystem fs = FileSystem.get( new Configuration() );
		FSDataOutputStream fos = fs.create(new Path(getPath()),true);
		Writer out = new OutputStreamWriter(fos,"utf-8");
		for(long i = 0; i < numOfRecord; i++){
			out.write(generateRecord() + "\n");
		}
		out.close();
		fos.close();
	}
	
	//*读取日志文件到stdout*/
	public void logReader() throws IOException{
		FileSystem fs = FileSystem.get(new Configuration());
		FSDataInputStream fis = fs.open(new Path(getPath()));
		InputStreamReader isr = new InputStreamReader(fis,"utf-8");
		BufferedReader br = new BufferedReader(isr);
		String line = "";
		while((line = br.readLine()) != null){
			System.out.println(line);
		}
		br.close();
		isr.close();
		fis.close();
	}
}

SearchLogAnalyze.java

package ac.ucas.SearchLogAnalyze;

import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.Date;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.lang.Integer;
import java.lang.Long;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SearchLogAnalyze {
	//定义一个临时文件夹,存放作业count的输出
	static Path tempDir = new Path("wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
	
	public static class SearchLogAnalyzeCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		
		private final static IntWritable one = new IntWritable(1);
	    private Text word = new Text();
	    
	    //*检查读入的条目是否符合指定条件,如果是则map,否则跳过不处理*/
	    public static  boolean meetCondictions(Context context,Date date,String ip){
	    	//获取ip的最后一部分
			int ipEnd = Integer.parseInt(ip.substring(ip.lastIndexOf(".") + 1));
			//获取指定的条件
			String startDate = context.getConfiguration().get("StartDate");
			String endDate = context.getConfiguration().get("EndDate");
			String ipPrefix = context.getConfiguration().get("IpPrefix");
			String sStartIp = context.getConfiguration().get("StartIp");
			String sEndIp = context.getConfiguration().get("EndIp");
			int startIp = 0;
			int endIp = 127;
			if(sStartIp != null)
				startIp = new Integer(sStartIp).intValue();
			if(sEndIp != null)
				endIp = new Integer(sEndIp).intValue();

			try {
				if(
						( startDate == null || date.after(new SimpleDateFormat("yyyy-MM-dd").parse(startDate) ) ) &&
						( endDate == null || date.before(new SimpleDateFormat("yyyy-MM-dd").parse(endDate) ) ) &&
						( ipPrefix == null || ipPrefix.equals(ip.substring(0, ip.lastIndexOf(".") ) ) ) &&
						( startIp <= ipEnd ) &&
						( endIp >= ipEnd )
						)
						return true;
			} catch (ParseException e) {
				e.printStackTrace();
			}
			return false;
		}
	    
	    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {	
	      StringTokenizer itr = new StringTokenizer(value.toString());
	      while (itr.hasMoreTokens()){
	    	  Date date = null;
	    	  try {
	    		  date = new SimpleDateFormat("yyyy-MM-dd").parse(itr.nextToken());
			} catch (ParseException e) {
				e.printStackTrace();
			}
	    	itr.nextToken();					//时间
	    	String query = itr.nextToken();		//检索词
	    	String ip = itr.nextToken();		//IP
	    	
	    	//如果符合条件则进行map处理,否则忽略
	    	if(meetCondictions(context,date, ip)){
					word.set(query);
					context.write(word, one);
			}
	      }
	    }
	}
	
	public static class SearchLogAnalyzeCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

		private IntWritable result = new IntWritable();

	    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
	      int sum = 0;
	      for (IntWritable val : values) {
	        sum += val.get();
	      }
	      result.set(sum);
	      context.write(key, result);
	    }
	}
	
	public static class SearchLogAnalyzeSortMapper extends Mapper<Text, IntWritable, IntWritable, Text>{
	    public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
					context.write(value, key);
	    }
	}
	
	public static class SearchLogAnalyzeSortReducer extends Reducer<IntWritable,Text,Text,IntWritable>{

	    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

	    	int iTopK = new Integer(context.getConfiguration().get("TopK")).intValue();
	    	for (Text val : values) {
	    		if(iTopK > 0){
	    			context.write(val,key);
	    			iTopK--;
	    			}
	    		else{
	    			context.getConfiguration().set("TopK", new Integer(0).toString());
	    			return ;
	    			}
	    		}
	    	context.getConfiguration().set("TopK", new Integer(iTopK).toString());
	    	}
	    }
	
	//hadoop默认按照key升序排序,要实现降序排序需要重新定义排序类
	private static class IntWritableDecreasingComparator extends IntWritable.Comparator {  
        public int compare(@SuppressWarnings("rawtypes") WritableComparable a, @SuppressWarnings("rawtypes") WritableComparable b) {  
          return -super.compare(a, b);  
        }  
          
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
            return -super.compare(b1, s1, l1, b2, s2, l2);  
        }  
    }
	
	public static void main(String[] args) throws Exception {
		if(args.length < 2){
			if(args.length == 0){
				System.err.println("<in> <out> directories needed,type -h to see help information");
				System.exit(1);
			}
			if(args[0].equals("-h")){
				System.out.println("Usage: SearchLog.jar <int> <out> [-parameters]");
				System.out.println("-NumOfRecord	long		set the number of searchlog records(default:10)");
				System.out.println("-LogFileName	String		set the filename of logfile(default:<in>/SearchLog.txt)");
				System.out.println("-TopK		int		Return the top K results(default:50)");
				System.out.println("-StartDate	yyyy-MM-dd	set the StartDate of filter(default:2000-01-01)");
				System.out.println("-EndDate	yyyy-MM-dd	set the EndDate of filter(default:2014-12-30)");
				System.out.println("-IpPrefix	x.x.x		set the IpPrefix of filter(default:null)");
				System.out.println("-StartIp	int		set the StartIp of filter(default:0)");
				System.out.println("-EndIp		int		set the EndIp of filter(default:127)");
				System.exit(1);
			}
        }
		
		Log log = new Log();
		log.setPath(args[0] + "/SearchLog.txt");
		Configuration conf = new Configuration();
		conf.set("TopK", "50");
		
		for(int i = 2; i < args.length - 1;i += 2){
			switch(args[i]){
			case ("-NumOfRecord"):	log.setNumOfRecord(Long.parseLong(args[i + 1]));break;
			case ("-LogFileName"):	log.setPath(args[0] + "/" + args[i + 1]);break;
			case ("-TopK"):			conf.set("TopK", args[i + 1]);break;
			case ("-StartDate"):	conf.set("StartDate", args[i + 1]);break;
			case ("-EndDate"):		conf.set("EndDate", args[i + 1]);break;
			case ("-IpPrefix"):		conf.set("IpPrefix", args[i + 1]);break;
			case ("-StartIp"):		conf.set("StartIp", args[i + 1]);break;
			case ("-EndIp"):		conf.set("EndIp", args[i + 1]);break;
			default:				System.out.println("Unknow parameters:" + args[i] + " ignored");break;
			}
		}
		
		log.logWriter();
		//log.logReader();
		try {
			Job countJob = Job.getInstance(conf, "SearchLogAlalyze-Count");	//count作业	
			FileInputFormat.addInputPath(countJob, new Path(args[0]));
			FileOutputFormat.setOutputPath(countJob, tempDir);				//临时文件夹作为输出目录
			countJob.setJarByClass(SearchLogAnalyze.class);
			countJob.setMapperClass(SearchLogAnalyzeCountMapper.class);		//指定mapper类
			countJob.setCombinerClass(SearchLogAnalyzeCountReducer.class);	//指定reducer类
			countJob.setReducerClass(SearchLogAnalyzeCountReducer.class);
			countJob.setOutputKeyClass(Text.class);
			countJob.setOutputValueClass(IntWritable.class);
			countJob.setOutputFormatClass(SequenceFileOutputFormat.class);	//输出格式,作为下一个job的输入
			
			if(countJob.waitForCompletion(true)){
				Job sortJob = Job.getInstance(conf, "SearchLogAlalyze-Sort");//sort作业
				FileInputFormat.addInputPath(sortJob, tempDir);  			 //临时文件作为输入路径
				sortJob.setInputFormatClass(SequenceFileInputFormat.class);	 //输入格式
				FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));  //输出路径
				sortJob.setJarByClass(SearchLogAnalyze.class);
				sortJob.setMapperClass(SearchLogAnalyzeSortMapper.class);
				sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);
				sortJob.setNumReduceTasks(1);
				sortJob.setMapOutputKeyClass(IntWritable.class);
				sortJob.setMapOutputValueClass(Text.class);
				sortJob.setReducerClass(SearchLogAnalyzeSortReducer.class);	 //指定reducer类
				sortJob.setOutputKeyClass(Text.class);  
				sortJob.setOutputValueClass(IntWritable.class);
				System.exit(sortJob.waitForCompletion(true) ? 0 : 1);
				}
			} catch (Exception e){
				e.printStackTrace();
				}finally{
					FileSystem.get(conf).deleteOnExit(tempDir);
					}
	}
}

Log类类视图如下:

成员变量
	numOfRecord:检索记录的条目,即检索次数
	path:保存检索日志的文件路径
成员方法:
get/set:成员变量的获取器和设置器
randomIp():随机生成一个ip地址
randomChar():随机生成一个字符,包括大小写字母和数字
generateRecord():随机生成一次检索记录
logWriter():生成检索日志,并写入HDFS文件中
logReader():从HDFS中读取检索日志

SearchLogAnalyze类视图如下:

IntWritableDecreasingComparator:降序比较器,用于第二个job Map后的sort阶段,使得reduce输出能按照检索次数降序排序
SearchLogAnalyzeCountMapper:第一个作业的Mapper,用于统计检索次数
SearchLogAnalyzeCountRecuder:第一个作业的Reducer,用于将符合检索条件的检索条目写入hdfs临时文件,作为第二个作业的输入。
SearchLogAnalyzeSortMapper:第二个作业的Mapper,从第一个作业的输出文件中读取符合条件的检索条目,按照检索次数降序排序
SearchLogAnalyzeSortReducer:第二个作业的Reducer,用于交换Mapper输出的key和value,使得按照query query-times 键值对输出结果。

启动hadoop后,运行相关命令,结果如下:

hadoop jar SearchLog.jar -h

发表评论

电子邮件地址不会被公开。