博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop常见问题汇集
阅读量:5323 次
发布时间:2019-06-14

本文共 13902 字,大约阅读时间需要 46 分钟。

 

1  hadoop conf.addResource

http://stackoverflow.com/questions/16017538/how-does-configuration-addresource-method-work-in-hadoop

How does Configuration.addResource() method work in hadoopup vote0down votefavorite    Does Configuration.addResource() method load resource file like ClassLoader of java or it just encapsulates ClassLoader class.Because I find it can not use String like "../resource.xml" as argument of addResource() to load resource file out of classpath, this property is just the same as ClassLoader.Thx!hadoopshareimprove this question    asked Apr 15 '13 at 14:18foolyoghurt478                "How does it work" is a different question from "why is my usage not working for me?" Which do you really want to know? – Matt Ball Apr 15 '13 at 14:19add a comment1 Answeractiveoldestvotesup vote2down vote    Browsing the Javadocs and source code for Configuration, Strings are assumed to be classpaths (line 1162), rather than relative to the file system - you should use URLs to reference files on the local file system as follows:conf.addResource(new File("../resource.xml").toURI().toURL());shareimprove this answer    answered Apr 17 '1

 

2  hadoop MapReduce 读取参数

下面我们先通过一个表格来看下,在hadoop中,使用全局变量或全局文件共享的几种方法

1     使用Configuration的set方法,只适合数据内容比较小的场景
2     将共享文件放在HDFS上,每次都去读取,效率比较低
3     将共享文件放在DistributedCache里,在setup初始化一次后,即可多次使用,缺点是不支持修改操作,仅能读取

下面是第3中方式的介绍

Alternative to deprecated DistributedCache class in Hadoop 2.2.0As of Hadoop 2.2.0, if you use org.apache.hadoop.filecache.DistributedCache class to load files you want to add to your job as distributed cache, then your compiler will warn you regarding this class being deprecated.In earlier versions of Hadoop, we used DistributedCache class in the following fashion to add files to be available to all mappers and reducers locally:?1234567    // In the main driver class using the new mapreduce APIConfiguration conf = getConf();...DistributedCache.addCacheFile(new Path(filename).toUri(), conf);...Job job = new Job(conf);...?12    // In the mapper class, mostly in the setup methodPath[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);But now, with Hadoop 2.2.0, the functionality of addition of files to distributed cache has been moved to the org.apache.hadoop.mapreduce.Job class. You may also notice that the constructor we used to use for the Job  class has also been deprecated and instead we should be using the new factory method getInstance(Configuration conf). The alternative solution would look as follows:?123456    // In the main driver class using the new mapreduce APIConfiguration conf = getConf();...Job job = Job.getInstance(conf);...job.addCacheFile(new URI(filename));?12    // In the mapper class, mostly in the setup methodURI[] localPaths = context.getCacheFiles();
souce code

 

原文链接  http://www.bigdataspeak.com/2014/06/alternative-to-deprecated.html

 

Hadoop DistributedCache is deprecated - what is the preferred API?http://stackoverflow.com/questions/21239722/hadoop-distributedcache-is-deprecated-what-is-the-preferred-api大矩阵相乘   http://www.cnblogs.com/zhangchaoyang/articles/4646315.html如何使用Hadoop的DistributedCache    http://blog.itpub.net/29755051/viewspace-1220340/DistributedCache小记http://www.cnblogs.com/xuxm2007/p/3344930.html迭代式MapReduce解决方案(二) DistributedCachehttp://hongweiyi.com/2012/02/iterative-mapred-distcache/
其它参考链接

 

 

3 hadoop Mapper 类

Mapper类有四个方法:(1)protected void setup(Context context)(2)protected void map(KEYIN key,VALUEIN value,Context context)(3)protected void cleanup(Context context)(4)public void run(Context context)setup()方法一般是在实例化时用户程序需要做的一些初始化工作(如打开一个全局文件,建立数据库链接等等)cleanup()方法是收尾工作,如关闭文件或者执行map()后的键值对分发等。map()方法承担主要的处理工作,一般我们些代码的时候主要用到的是map方法。默认Mapper的run()方法的核心代码如下:public void run(Context context) throws IOException,InterruptedException{     setup(context);    while(context.nextKeyValue())          map(context.getCurrentKey(),context,context.getCurrentValue(),context);    cleanup(context);}setup和cleanup仅仅在初始化Mapper实例和Mapper任务结束时由系统作为回调函数分别各做一次,并不是每次调用map方法时都去执行。所以如果是要处理map中的某些数值数据时,想把代码写在cleanup里面需要特别注意。Mapper输出结果到reduce阶段之前,还有几个可以自定义的步骤(1)combiner  每个节点输出的键值可以先进行合并处理。(2)合并处理之后如果还想将不同key值分配给不同reduce进行处理,称为shuffle洗牌过程,提供了一个partioner类来完成。(3)如果想将key值自定义进行排序,这边提供了一个sort类,可以自定义进行排序
View Code

 

 4  hadoop ChainMapper 和  ChainReducer

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WCount {    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf, "wordcount");        job.setJarByClass(WCount.class);                FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));                ChainMapper.addMapper(job, WCMapper.class, LongWritable.class, Text.class, Text.class, LongWritable.class, conf);        ChainReducer.setReducer(job, WCReduce.class, Text.class, LongWritable.class, Text.class, LongWritable.class, conf);        ChainReducer.addMapper(job, WCMapper2.class, Text.class, LongWritable.class, LongWritable.class, Text.class, conf);                job.waitForCompletion(true);    }    public static class WCMapper extends            Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] lineSet = line.split(" "); for (String e : lineSet) { context.write(new Text(e), new LongWritable(1)); } } } public static class WCReduce extends Reducer
{ private LongWritable outVla = new LongWritable(); @Override protected void reduce(Text k1, Iterable
v1, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable e : v1) { sum += e.get(); } outVla.set(sum); context.write(k1, outVla); } } public static class WCMapper2 extends Mapper
{ @Override protected void map(Text key, LongWritable value, Context context) throws IOException, InterruptedException { context.write(value, key); } }}
wordCount

 

 5 Hadoop JobControl

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WCount2 {    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        // 第一个job的配置        Job job1 = Job.getInstance(conf, "wordcount1");        job1.setJarByClass(WCount2.class);        job1.setMapperClass(WCMapper.class);        job1.setMapOutputKeyClass(Text.class);        job1.setMapOutputValueClass(LongWritable.class);        job1.setReducerClass(WCReduce.class);        job1.setOutputKeyClass(Text.class);        job1.setOutputValueClass(LongWritable.class);        // job1的输入输出文件路径        FileInputFormat.addInputPath(job1, new Path(args[0]));        FileOutputFormat.setOutputPath(job1, new Path(args[1]));         // 加入控制容器        ControlledJob ctrljob1 = new ControlledJob(conf);        ctrljob1.setJob(job1);        // 第二个作业的配置        Job job2 = Job.getInstance(conf, "wordcount2");        job2.setJarByClass(WCount2.class);        job2.setMapperClass(WCMapper2.class);        job2.setMapOutputKeyClass(Text.class);        job2.setMapOutputValueClass(Text.class);        // 作业2加入控制容器        ControlledJob ctrljob2 = new ControlledJob(conf);        ctrljob2.setJob(job2);        // 设置多个作业直接的依赖关系        // 如下所写:        // 意思为job2的启动,依赖于job1作业的完成        ctrljob2.addDependingJob(ctrljob1);        // job2的输入输出文件路径        FileInputFormat.addInputPath(job2, new Path(args[1]));        FileOutputFormat.setOutputPath(job2, new Path(args[2]));         // 主的控制容器,控制上面的总的两个子作业        JobControl JC = new JobControl("wordcount");        // 添加到总的JobControl里,进行控制        JC.addJob(ctrljob1);        JC.addJob(ctrljob2);        // 在线程启动,记住一定要有这个        Thread t = new Thread(JC);        t.start();               while (true) {            if (JC.allFinished()) {
// 如果作业成功完成,就打印成功作业的信息 System.out.println(JC.getSuccessfulJobList()); JC.stop(); break; } } } public static class WCMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] lineSet = line.split(" "); for (String e : lineSet) { context.write(new Text(e), new LongWritable(1)); } } } public static class WCReduce extends Reducer
{ private LongWritable outVla = new LongWritable(); @Override protected void reduce(Text k1, Iterable
v1, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable e : v1) { sum += e.get(); } outVla.set(sum); context.write(k1, outVla); } } public static class WCMapper2 extends Mapper
{ private Text outval = new Text(); private Text outkey = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] lineSet = line.split("\t"); outkey.set(lineSet[1]); outval.set(lineSet[0]); context.write(outkey, outval); } }}
WordCount

 6 hadoop Filesystem closed

We are running a workflow in oozie. It contains two actions: the first is a map reduce job that generates files in the hdfs and the second is a job that should copy the data in the files to a database.Both parts are done successfully but the oozie throws an exception at the end that marks it as a failed process. This is the exception:2014-05-20 17:29:32,242 ERROR org.apache.hadoop.security.UserGroupInformation:   PriviledgedActionException as:lpinsight (auth:SIMPLE) cause:java.io.IOException: Filesystem   closed2014-05-20 17:29:32,243 WARN org.apache.hadoop.mapred.Child: Error running childjava.io.IOException: Filesystem closed    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:565)    at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:589)    at java.io.FilterInputStream.close(FilterInputStream.java:155)    at org.apache.hadoop.util.LineReader.close(LineReader.java:149)    at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:243)    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:222)    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:421)    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)    at java.security.AccessController.doPrivileged(Native Method)    at javax.security.auth.Subject.doAs(Subject.java:396)    at   org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)    at org.apache.hadoop.mapred.Child.main(Child.java:262)2014-05-20 17:29:32,256 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the taskAny idea ? Thanks, Lital2 Answersactiveoldestvotesup vote2down vote    Use the below configuration while accessing file system.Configuration conf = new Configuration();conf.setBoolean("fs.hdfs.impl.disable.cache", true);FileSystem fileSystem = FileSystem.get(conf);shareimprove this answer    answered Jun 24 '14 at 12:49NelsonPaul12615    add a commentDid you find this question interesting? Try our newsletterSign up for our newsletter and get our top new questions delivered to your inbox (see an example).up vote0down vote    I had encountered a similar issue that prompted java.io.IOException: Filesystem closed. Finally, I found I closed the filesystem somewhere else. The hadoop filesystem API returns the same object. So if I closed one filesystem, then all filesystems are closed. I get the solution from this answershareimprove this answer
answers

 

 

 

 

参考资料

如何在hadoop中控制map的个数  

大数据技术博客   http://lxw1234.com/ 

Mapper类     http://blog.csdn.net/witsmakemen/article/details/8445133

使用内部jar   http://stackoverflow.com/questions/17265002/hadoop-no-filesystem-for-scheme-file

hadoop 集群 Running job 卡住   http://bbs.csdn.net/topics/391031853

  http://stackoverflow.com/questions/21005643/container-is-running-beyond-memory-limits

  http://www.cnblogs.com/manhua/p/4136138.html

hadoop jar 和 java -cp 之间的不同   http://blog.csdn.net/aaa1117a8w5s6d/article/details/34120003

hadoop map 读取数据库  

虚拟内存超出界限 

 

转载于:https://www.cnblogs.com/hdu-2010/p/4929894.html

你可能感兴趣的文章
oracle job
查看>>
Redis常用命令
查看>>
EFCode First 导航属性
查看>>
XML学习笔记(二)-- DTD格式规范
查看>>
I - Agri-Net - poj 1258
查看>>
git 的回退
查看>>
IOS开发学习笔记026-UITableView的使用
查看>>
Confluence配置数据库
查看>>
Java锁机制(一)synchronized
查看>>
002.文件删除功能
查看>>
[转载]电脑小绝技
查看>>
windos系统定时执行批处理文件(bat文件)
查看>>
06-redis主从
查看>>
linux下面桌面的安装
查看>>
thinkphp如何实现伪静态
查看>>
作业引擎quartz.net --- 监听链
查看>>
iframe传参数
查看>>
BZOJ 2243: [SDOI2011]染色( 树链剖分 )
查看>>
BZOJ 1925: [Sdoi2010]地精部落( dp )
查看>>
c++中的string常用函数用法总结!
查看>>