博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
自定义inputformat和outputformat
阅读量:4980 次
发布时间:2019-06-12

本文共 13296 字,大约阅读时间需要 44 分钟。

1. 自定义inputFormat

1.1 需求

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

 

1.2 分析

小文件的优化无非以下几种方式:

1、 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS

2、 业务处理之前HDFS上使用mapreduce程序对小文件进行合并

3、 mapreduce处理时,可采用combineInputFormat提高效率

 

实现

 

本节实现的是上述第二种方式

 

程序的核心机制:

 

自定义一个InputFormat

 

改写RecordReader,实现一次读取一个完整文件封装为KV

 

在输出时使用SequenceFileOutPutFormat输出合并文件

 

 

 

代码如下:

 

自定义InputFromat

 

 

package cn.itcast.bigdata.combinefile;import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WholeFileInputFormat extends FileInputFormat
{ @Override protected boolean isSplitable(JobContext context, Path file) { return false; } @Override public RecordReader
createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; }}

 

 

package cn.itcast.bigdata.combinefile;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;/** *  * RecordReader的核心工作逻辑: * 通过nextKeyValue()方法去读取数据构造将返回的key   value * 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value *  *  * @author * */class WholeFileRecordReader extends RecordReader
{ private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * 返回当前进度 */ @Override public float getProgress() throws IOException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { // do nothing }}
package cn.itcast.bigdata.combinefile;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class SmallFilesToSequenceFileConverter extends Configured implements Tool {    static class SequenceFileMapper extends            Mapper
{ private Text filenameKey; @Override protected void setup(Context context) throws IOException, InterruptedException { InputSplit split = context.getInputSplit(); Path path = ((FileSplit) split).getPath(); filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(filenameKey, value); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); /*System.setProperty("HADOOP_USER_NAME", "hadoop");*/ String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: combinefiles
"); System.exit(2); } Job job = Job.getInstance(conf,"combine small files to sequencefile"); job.setJarByClass(SmallFilesToSequenceFileConverter.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { args=new String[]{"c:/wordcount/smallinput","c:/wordcount/smallout"}; int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args); System.exit(exitCode); }}

 

 自定义outputFormat

需求

现有一些原始日志需要做增强解析处理,流程:

1、 从原始日志文件中读取数据

2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志

3、 如果成功增强则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录

 

 

2.2 分析

程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录这类灵活的输出需求可以通过自定义outputformat来实现

 

2.3 实现

实现要点:

1、 mapreduce中访问外部资源

2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()

 

代码实现如下:

数据库获取数据的工具

package cn.itcast.bigdata.mr.logenhance;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;import java.util.HashMap;import java.util.Map;public class DBLoader {    public static void dbLoader(Map
ruleMap) throws Exception { Connection conn = null; Statement st = null; ResultSet res = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/urldb", "root", "root"); st = conn.createStatement(); res = st.executeQuery("select url,content from url_rule"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); } }catch(Exception e){ e.printStackTrace(); } } }}

 

package cn.itcast.bigdata.mr.logenhance;import java.io.IOException;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter * 然后再调用RecordWriter的write(k,v)方法将数据写出 *  * @author *  */public class LogEnhanceOutputFormat extends FileOutputFormat
{ @Override public RecordWriter
getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); Path enhancePath = new Path("D:/temp/en/log.dat"); Path tocrawlPath = new Path("D:/temp/crw/url.dat"); FSDataOutputStream enhancedOs = fs.create(enhancePath); FSDataOutputStream tocrawlOs = fs.create(tocrawlPath); return new EnhanceRecordWriter(enhancedOs, tocrawlOs); } /** * 构造一个自己的recordwriter * * @author * */ static class EnhanceRecordWriter extends RecordWriter
{ FSDataOutputStream enhancedOs = null; FSDataOutputStream tocrawlOs = null; public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) { super(); this.enhancedOs = enhancedOs; this.tocrawlOs = tocrawlOs; } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String result = key.toString(); // 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.dat if (result.contains("tocrawl")) { tocrawlOs.write(result.getBytes()); } else { // 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.dat enhancedOs.write(result.getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (tocrawlOs != null) { tocrawlOs.close(); } if (enhancedOs != null) { enhancedOs.close(); } } }}

 

package cn.itcast.bigdata.mr.logenhance;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogEnhance {    static class LogEnhanceMapper extends Mapper
{ Map
ruleMap = new HashMap
(); Text k = new Text(); NullWritable v = NullWritable.get(); // 从数据库中加载规则信息倒ruleMap中 @Override protected void setup(Context context) throws IOException, InterruptedException { try { DBLoader.dbLoader(ruleMap); } catch (Exception e) { e.printStackTrace(); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称 Counter counter = context.getCounter("malformed", "malformedline"); String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { String url = fields[26]; String content_tag = ruleMap.get(url); // 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志 if (content_tag == null) { k.set(url + "\t" + "tocrawl" + "\n"); context.write(k, v); } else { k.set(line + "\t" + content_tag + "\n"); context.write(k, v); } } catch (Exception exception) { counter.increment(1); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogEnhance.class); job.setMapperClass(LogEnhanceMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法 job.setOutputFormatClass(LogEnhanceOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("D:/srcdata/webloginput/")); // 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat // 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出path FileOutputFormat.setOutputPath(job, new Path("D:/temp/output/")); // 不需要reducer job.setNumReduceTasks(0); job.waitForCompletion(true); System.exit(0); }}

 

 

转载于:https://www.cnblogs.com/duan2/p/7545070.html

你可能感兴趣的文章
安装Python2.7
查看>>
[学习笔记]基数排序
查看>>
程序员成长之路(转)
查看>>
2016年9月份工作知识点汇总
查看>>
VB6 变量定义作用域的一个奇特形式
查看>>
ASCII对照表
查看>>
STM32/MINI
查看>>
Flume#描述/配置Source a1.sources.r1.type = http a1.sources.r1.port = 44444 a1.sources.r1.interceptor...
查看>>
hdu2686_多线程dp
查看>>
aop编程之前置通知
查看>>
BizTalk RosettaNet 配置导入与导出
查看>>
MongoDB 复制集(Replica Set) 配置(Windows 版)
查看>>
闭包例子
查看>>
熟悉常用的Linux操作
查看>>
ThreeJs 选中物体事件
查看>>
Scheduler:Event UID not valid(转)
查看>>
Python读取xml报错解析--ExpatError: not well-formed (invalid token)
查看>>
jdbc 连接mysql Communications link failure的解决办法
查看>>
20140928
查看>>
2018-2019-2 20189205《移动平台应用开发实践》第六周作业
查看>>