博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[hadoop](1) MapReduce:ChainMapper
阅读量:5311 次
发布时间:2019-06-14

本文共 4018 字,大约阅读时间需要 13 分钟。

前言

本章主要讲述的是对于hadoop生态系统中,MapReduce写的ChainMapper的学习。MapReduce是hadoop集群数据处理的默认框架。而对于数据集中所有的数据必然有一些不友好的数据,我们需要将其丢弃。我们称之为数据的预处理。所以我们需要将预处理模块与数据处理逻辑分开,以便以后可以复用数据预处理模块。以下是一个mapper的通用模式:

  • 丢弃无用的已损坏的数据
  • 处理有效数据,提取感兴趣的字段
  • 针对这些字段,输出我们感兴趣的数据

准备工作

数据集:ufo-60000条记录,这个数据集有一系列包含下列字段的UFO目击事件记录组成,每条记录的字段都是以tab键分割,文件名为ufo.tsv,这里就不提供下载连接了

  • sighting date:UFO目击事件发生时间
  • Recorded date:报告目击事件的时间
  • Location:目击事件发生的地点
  • Shape:UFO形状
  • Duration:目击事件持续时间
  • Dexcription:目击事件的大致描述

例子: 

19950915 19950915 Redmond, WA 6 min. Young man w/ 2 co-workers witness tiny, distinctly white round disc drifting slowly toward NE. Flew in dir. 90 deg. to winds.

 

 

ChainMapper介绍

全限定名: org.apache.hadoop.mapred.lib.ChainMapper 

作用:顺序的执行多个mapper,并且最后一个mapper的输出会传递给reducer。

 

ChainMapper的使用

题目:通过使用 ChainMapper 类验证数据集的记录是否有效,即判断每条记录是否都可以划分为6个字符串

  • 上传ufo.tsv到hadoop
hadoop dfs -put ufo.tsv ufo.tsv
  • 编写 UFORecordValidationMapper.java 
import java.io.IOException;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.mapred.lib.*;public class UFORecordValidationMapper extends MapReduceBase implements Mapper
{ public void map(LongWritable key, Text value, OutputCollector
output, Reporter reporter) throws IOException { String line = value.toString(); if(validate(line)) { output.collect(key, value); } } private boolean validate(String str) { String[] parts = str.split("\t"); if(parts.length != 6) { return false; } return true; }}

 

  •  编写 UFOLocation.java 
import java.io.IOException;import java.util.Iterator;import java.util.regex.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.mapred.lib.*;public class UFOLocation {    public static class MapClass extends MapReduceBase implements Mapper
{ private final static LongWritable one = new LongWritable(1); private static Pattern locationPattern = Pattern.compile("[a-zA-Z]{2}[^a-zA-Z]*$"); public void map(LongWritable key, Text value, OutputCollector
output, Reporter reporter) throws IOException { String line = value.toString(); String[] fields = line.split("\t"); String location = fields[2].trim(); if(location.length() >= 2) { Matcher matcher = locationPattern.matcher(location); if(matcher.find()) { int start = matcher.start(); String state = location.substring(start, start + 2); output.collect(new Text(state.toUpperCase()), one); } } } } public static void main(String...args) throws Exception { Configuration config = new Configuration(); JobConf conf = new JobConf(config, UFOLocation.class); conf.setJobName("UFOLocation"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(LongWritable.class); JobConf mapconf1 = new JobConf(false); ChainMapper.addMapper(conf, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, true, mapconf1); JobConf mapconf2 = new JobConf(false); ChainMapper.addMapper(conf, MapClass.class, LongWritable.class, Text.class, Text.class, LongWritable.class, true, mapconf2); conf.setMapperClass(ChainMapper.class); conf.setCombinerClass(LongSumReducer.class); conf.setReducerClass(LongSumReducer.class); FileInputFormat.setInputPaths(conf, args[0]); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }}

 

  • 编译上述两个文件
javac UFORecordValidationMapper.java UFOLocation.java
  • 将编译好的文件打包成jar
jar cvf ufo.jar UFO*class
  • 提交打包好的jar包到hadoop上运行
hadoop jar ufo.jar UFOLocation ufo.tsv output
  • 从hadoop上获取结果到本地
hadoop dfs -get output/part-00000 ufo_result.txt
  • 查看结果
more ufo_result.txt

转载于:https://www.cnblogs.com/cafebabe-yun/p/8679994.html

你可能感兴趣的文章
关于 linux 的 limit 的设置
查看>>
HDU(4528),BFS,2013腾讯编程马拉松初赛第五场(3月25日)
查看>>
vim中文帮助教程
查看>>
SpringMvc拦截器运行原理。
查看>>
MySQL基础3
查看>>
云计算数据与信息安全防护
查看>>
全局设置导航栏
查看>>
RxJS & Angular
查看>>
面向对象(多异常的声明与处理)
查看>>
MTK笔记
查看>>
ERROR: duplicate key value violates unique constraint "xxx"
查看>>
激活office 365 的启动文件
查看>>
9款免费的Windows远程协助软件
查看>>
Maven(八) Maven项目和testng结合应用
查看>>
iOS 的 set.get.构造方法
查看>>
无法根据中文查找
查看>>
文件编码,文件或文件名编码格式转换(转)
查看>>
[简讯]phpMyAdmin项目已迁移至GitHub
查看>>
转载 python多重继承C3算法
查看>>
【题解】 bzoj1597: [Usaco2008 Mar]土地购买 (动态规划+斜率优化)
查看>>