> 技术文档 > Hadoop3

Hadoop3

@Test public void testDeletel1() throws IOException { fs.delete(new Path(\"youxi/csgo\"),false);

 出现报错

org.apache.hadoop.fs.PathlsNotEmptyDirectoryException: \"/youxilwangzherongyao is non
empty: Directory is not empty

解决方法

 @Test //删除非空文件夹,第二个参数选为true,会删除这个文件夹下所有的文件和这个文件夹 public void testDeletel1() throws IOException { fs.delete(new Path(\"youxi/csgo\"),false);
@Test public void testFileFiles() throws IOException { RemoteIterator ListFile = fs.listFiles(new Path(\"/xiudiuderen\"), true); while (ListFile.hasNext()) { LocatedFileStatus status = ListFiles.next(); System.out.println(status.getPath().getName()); System.out.println(status.getLen()); System.out.println(status.getPermission()); System.out.println(status.getGroup()); System.out.println(\"------beatiful----------\"); BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { String[] hosts = blockLocation.getHosts(); for (String host : hosts) {  System.out.println(host); } } System.out.println(\"------yitiaoxinxijieshu----------\"); } }

 判断是文件还是文件夹

@Test public void testLisStatus() throws IOException{ FileStatus[] listStatus = fs.listStatus(new Path(\"/input\")); for (FileStatus fileStatus:listStatus){ if (fileStatus.isFile()){ System.out.println(\"file;\"+fileStatus.getPath().getName()); }else { System.out.println(\"dirc:\"+fileStatus.getPath().getName()); } } }

1.客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已经存在,检查目录结构和权限。

2.NameNode检查后返回是否可以上传。

3.客户端请求第一个Block上传到哪几个DataNode服务器上。

4.NameNode返回三个(副本为多少,就返回多少个)DataNode节点,如上图返回DataNode1,DataNode2,DataNode3.

5.客户端通过FSDataOutputStream模块请求的、DataNode1上传数据,DataNode1收到请求后会继续调用DataNode2,然后DataNode2调用DataNode3,将这个通信管道建立完成。

6.DataNode3,DataNode2,DataNode1逐级答应客户端。

7.客户端开始往DataNode1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet单位,DataNode1收到一个Packet就会传给DataNode2,DataNode2传给DataNode3;DataNode1每传一个Packet会放入一个应答队列等待应答。

8.当一个Block传输完成后,客户端再次请求上传第二个Block的服务器(就是第3步),如果有多个Block就会一直重复3-8步,直到所有Block上传完成。

网络拓扑图

hdfs写入数据的过程中,Namenode会选择距离待上传数据最近距离的DataNode接收数据。

距离怎么计算?怎么知道远近?

    节点距离:两个节点到达最近的共同祖先的距离总和。

Distance(d1/r1/n0,d1/r1/n0)=0;

Distance(d1/r1/n1,d1/r1/n2)=1+1=2;

Distance(d1/r2/n1,d1/r1/n2)=1+1=2;

Distance(d1/r2/n1,d1/r3/n2)=2+2=4;

Distance(d1/r2/n0,d1/r4/n1)=3+3=6;

1.客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件所在的DataNode。(Shell脚本)查询文件存储位置。

hdfs fsck /input -files -blocks -location

 2.挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。(负载均衡)

3.DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。

4.客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

Namenode和Secondarynamenode

在namenode节点,我们用下面的命令可以看到:

cd /usr/local/hadoop/tmp/dfs/name/current

1.Fsimage(镜像文件):NameNode内存中元数据序列化后形成的文件。

2.Edits(日志):记录客户端更新元数据信息的每一步操作。

3.seen_txid文件保存的是一个数字,就是最后一个edits_的数字

4.每次NameNode启动的时候,都会将Fsimage文件读入内存,加载Edits里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NameNode启动的时候就会将Fsimage和Edits文件进行合并。

NameNode工作机制

第一阶段:NameNode启动

(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。

(2)客户端对元数据进行增删改的请求。

(3)NameNode记录操作日志,更新滚动日志。

(4)NameNode在内存中对元数据进行增删改。

第二阶段:Secondary NameNode工作

(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。

(2)Secondary请求执行CheckPoint。

(3)NameNode滚动正在写的Edits日志。

(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。

(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件fsimage.chkpoint。

(7)拷贝fsimage.chkpoint到NameNode。

(8)NameNode将fsimage.chkpoint重命名为fsimage。

DataNode工作机制

(1) 一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
(2) DataNode启动后向NameNode注册,通过后,周期性的向NameNode上报所有的块信息。
(3) 心跳是每3秒一次,心跳返回结果带有Name Node给该DataNode的命令如复制缺数据到另台机器,或者删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
(4)集群运行中可以安全加入和推出一些机器。

Mapreduce

1.1MapReduce定义

MapReduce是一种简化并行计算的变成模型,用于进行大数据量的计算。

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

1.2MapReduce优缺点

1.2.1优点

(1)MapReduce易于编程:

他简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得流行起来。

(2)良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

(3)高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,就是要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

(4)擅长对PB级以上海量数据进行离线处理

可以实现上千台服务器集群并发工作,提供数据处理能力。

1.2.2缺点

(1)不擅长实时计算

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

(2)不擅长流式计算

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。就是因为MapReduce自身的设计特点决定了数据源必须是静态的。

(3)不擅长DAG(有向图)计算

多个应用程序存在一定的依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入磁盘,会造成大量的磁盘IO,导致性能非常的低下。

1.3MapReduce进程

一个完整的MapReduce程序在分布式运算时有三类实例进程:

1.MrAppMaster:负责整个程序的过程调度以及状态协调。

2.MapTask:负责Map阶段的整个数据处理流程。

3.ReduceTask:负责Reduce阶段的整个数据处理流程。

1.4常用数据序列化类型

常用的数据类型对应的Hadoop数据序列化类型

Java类型 Hadoop Writable类型 Boolean BooleanWritable Byte ByteWritable Int IntWritable Float FloatWritable Long LongWritable Double DoubleWritable String Text Map MapWritable Array ArrayWritable

 1.5MapReduce编程规范

1.Mapper阶段

(1)用户自定义的Mapper要继承

2.Reduce阶段

(1)用户自定义的Reduce要继承自己的父类

(2)Reduce的输入数据类型对应的是Mapper的输出数据类型,也是kv

(3)Reduce的业务逻辑写在reduce()方法中

(4)ReduceTask进程对每一组相同的k的组调用一次reduce()方法

3.Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。

1.6WordCount案例操作

1.需求

给定文本文件,统计文本文件中单词的个数,输出每个单词出现的总次数。

(1)输入数据

I am somebodyI am smart and kindI am importantI am starve of educationI have places to goI have people to 

(2)输出数据

2.需求分析

Mapper部分

key代表偏移量,v1这一行的内容,text

输入 k1=0,v1=\"I am somebody\"

k1=14,v1=\"I am smart and kind\"

0 1 2 3 4 5 6 7 8 9 10 11 12 13 I a m s o m e b o d y 换行符

3.1将MapTask传给我们的文本内容先转换成String

I am somebody

3.2根据空格将这一行切分成单词

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper{ @Override protected void map(LongWritable key1,Text value1,Context context) throws IOException,InterruptedException{ //1.将数据转成String类型 String data=value1.toString(); //2.分词 String[] words=data.split(\" \"); for (String word:words){ //输出 key2,value2 context.write(new text(word),new IntWritable(1)); } }}

 如下:

如下:

v3是v2的集合

如下:

Reduce阶段:

输入:,v3是v2同一个key的集合

输出:

1.汇总各个key的个数

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver { public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException { //1. Configuration configuration=new Configuration(); Job job=Job.getInstance(configuration); //2. job.setJarByClass(WordCountDriver.class); //3. job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordcountReduce.class); //4. job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5. job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6. FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); FileInputFormat.setInputPaths(job,new Path(\"/root/IdeaProjects/mapReduceText/lotus.txt\")); FileOutputFormat.setOutputPath(job,new Path(\"/root/IdeaProjects/mapReduceText/WordCountoutput\")); //7. boolean result=job.waitForCompletion(true); System.exit(result?0:1); }}

3.1输入数据处理InputFormat

作用:1.数据切分 2.为Mapper提供输入数据

3.1.1切片与MapTask并行度决定机制

1.MapTask的并行度决定Map阶段的任务处理并发度,进而影响到Job的处理速度。

   MapTask不是越多越好。

2.MapTask并行度决定机制

   数据块:Block是HDFS物理上把数据分层一块一块。

   数据切片:数据切片只是在逻辑上对输入进行切片,并不会在磁盘上将其切分成片进行存储。

1.一个Job的map阶段并行度由客户端在提交Job时的切片数决定

2.每一个切片分配一个MapTask并行实例处理

3.默认情况下,切片大小=blocksize(128M)

4.切片时不考虑数据集整体情况,而是逐个针对每一个文件单独切片(不让文件合并再切,不管你几个文件,一个文件一个文件处理)

3.1.3FileInputFormat实现类

1.FileInputFormat常见的接口实现类包括:TextInputFormat,KeyValueTextInput,NLineFormat,CombineTextInputFormat和自定义InputFormat等。

TextInputFormat是默认的FileInputFormat实现类。

MapReduce的默认输入格式是?TextInputFormat

3.2Shuffle

3.3输出数据OutputFormat

3.3.1OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。

OutputFormat的实现类有TextOutputFormat,SequenceFileOutputFormat,自定义OutputFormat。

默认的输出格式是TextOutputFormat,他把每条记录都写为文本行。

3.3.2自定义OutputFormat

1.使用场景

    为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

    例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

2.自定义OutputFormat步骤

(1)自定义一个类继承FileOutputFormat 

(2)改写RecordWriter,具体改写输出数据的方法write()。

Partition定义:
Mapper任务划分数据的过程称作Partition。
负责实现数据的类称作Partitioner,默认的分区是Hash分区(Hash Partition)。Partition作用:将map阶段产生的所有对分配给不同的Reducer处理,可以将Reduce阶段的处理负载进行分摊。
→ Partition的数量决定Reducer的数量。

整个过程涉及五个独立的实体:
(1)客户端:提交MapReduce作业
(2)YARN Resource Manager:负责协调集群上计算机资源的分配
(3)YARN Node Manager:负责启动和监视集群中机器上的计算容器(container)
(4) MapReduce的Application Master,负责协调运行MapReduce作业的任务。它和
MapReduce任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理
(5)分布式文件系统(一般为HDFS):共享作业文件

YARN是一种通用资源调度框架,不仅仅支持MapReduce

(3)NodeManager(节点管理器)

NodeManager是每个节点上的资源和任务管理器。
定时向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态;接收并处理来自ApplicationManager的Container启动/停止等请求。
(4)ApplicationMaster (主应用)
ApplicationMaster是一个详细的框架库,它结合从 ResourceManager 获得的资源和NodeManager 协同工作来运行和监控任务。
用户提交的每一个应用程序均包含一个ApplicationMaster.主要功能包括:
1)、与ResourceManager调度器协商以获取抽象资源 (Container);
2)、负责应用的监控,跟踪应用执行状态,重启失败任务等;
3)、并且与NodeManager协同工作完成Task的执行和监控。

分工 MapReduce1 Yarn

(1)作业调度

(2)任务进度监控(跟踪任务,重启失败任务,记录任务的流水)

JobTracker

ResourceManager作业调度

(1)作业调度

(2)任务进度监控(跟踪任务,重启失败任务,记录任务的流水)

JobTracker ApplicationMaster任务进度监控 任务执行(或节点资源管理) TaskTracker NodeManager 资源调配单元(CPU,内存等) Slot(槽) Container(容器)

YARN调度器 

YARN调度器分为三种:(1)FIFO Scheduler:先进先出调度器 (2)Capacity Scheduler:容器调度器 (3)Fair Scheduler:公平调度器

(1) FIFO Scheduler:先进先出调度器,是最简单的调度器,任务是按顺序执行的,哪个任务先提交,就先执行哪个任务,而且任务执行时资源利用率为100%。
(2)Capacity Schedule:容量调度器 分成多个队列来执行不同的任务,每个队列占用一定资源,可以看作是FIFO Scheduler的多队列版本。每个队列可以限制资源使用量。但是,队列间的资源分配以使用量作为排列依据,使得容量小的队列有竞争优势。需要注意的是:如果不限制某队列最大容量,则运行过程中,它可以占用全部资源。

YARN默认采用CapacityScheduler(容量调度器)

(3)Fair Scheduler:公平调度器所谓的“公平”,强调的是任务按队列公平的使用YARN资源,即,队列内的任务公平使用队列中的资源。需要注意的是:假设每个任务具有相同的优先级,采用公平调度器将平均分配系统的资源