Hadoop3
@Test public void testDeletel1() throws IOException { fs.delete(new Path(\"youxi/csgo\"),false);
出现报错
org.apache.hadoop.fs.PathlsNotEmptyDirectoryException: \"/youxilwangzherongyao is non
empty: Directory is not empty
解决方法
@Test //删除非空文件夹,第二个参数选为true,会删除这个文件夹下所有的文件和这个文件夹 public void testDeletel1() throws IOException { fs.delete(new Path(\"youxi/csgo\"),false);
@Test public void testFileFiles() throws IOException { RemoteIterator ListFile = fs.listFiles(new Path(\"/xiudiuderen\"), true); while (ListFile.hasNext()) { LocatedFileStatus status = ListFiles.next(); System.out.println(status.getPath().getName()); System.out.println(status.getLen()); System.out.println(status.getPermission()); System.out.println(status.getGroup()); System.out.println(\"------beatiful----------\"); BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println(\"------yitiaoxinxijieshu----------\"); } }
判断是文件还是文件夹
@Test public void testLisStatus() throws IOException{ FileStatus[] listStatus = fs.listStatus(new Path(\"/input\")); for (FileStatus fileStatus:listStatus){ if (fileStatus.isFile()){ System.out.println(\"file;\"+fileStatus.getPath().getName()); }else { System.out.println(\"dirc:\"+fileStatus.getPath().getName()); } } }
1.客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已经存在,检查目录结构和权限。
2.NameNode检查后返回是否可以上传。
3.客户端请求第一个Block上传到哪几个DataNode服务器上。
4.NameNode返回三个(副本为多少,就返回多少个)DataNode节点,如上图返回DataNode1,DataNode2,DataNode3.
5.客户端通过FSDataOutputStream模块请求的、DataNode1上传数据,DataNode1收到请求后会继续调用DataNode2,然后DataNode2调用DataNode3,将这个通信管道建立完成。
6.DataNode3,DataNode2,DataNode1逐级答应客户端。
7.客户端开始往DataNode1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet单位,DataNode1收到一个Packet就会传给DataNode2,DataNode2传给DataNode3;DataNode1每传一个Packet会放入一个应答队列等待应答。
8.当一个Block传输完成后,客户端再次请求上传第二个Block的服务器(就是第3步),如果有多个Block就会一直重复3-8步,直到所有Block上传完成。
网络拓扑图
hdfs写入数据的过程中,Namenode会选择距离待上传数据最近距离的DataNode接收数据。
距离怎么计算?怎么知道远近?
节点距离:两个节点到达最近的共同祖先的距离总和。
Distance(d1/r1/n0,d1/r1/n0)=0;
Distance(d1/r1/n1,d1/r1/n2)=1+1=2;
Distance(d1/r2/n1,d1/r1/n2)=1+1=2;
Distance(d1/r2/n1,d1/r3/n2)=2+2=4;
Distance(d1/r2/n0,d1/r4/n1)=3+3=6;
1.客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件所在的DataNode。(Shell脚本)查询文件存储位置。
hdfs fsck /input -files -blocks -location
2.挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。(负载均衡)
3.DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
4.客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
Namenode和Secondarynamenode
在namenode节点,我们用下面的命令可以看到:
cd /usr/local/hadoop/tmp/dfs/name/current
1.Fsimage(镜像文件):NameNode内存中元数据序列化后形成的文件。
2.Edits(日志):记录客户端更新元数据信息的每一步操作。
3.seen_txid文件保存的是一个数字,就是最后一个edits_的数字
4.每次NameNode启动的时候,都会将Fsimage文件读入内存,加载Edits里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NameNode启动的时候就会将Fsimage和Edits文件进行合并。
NameNode工作机制
第一阶段:NameNode启动
(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志,更新滚动日志。
(4)NameNode在内存中对元数据进行增删改。
第二阶段:Secondary NameNode工作
(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
(2)Secondary请求执行CheckPoint。
(3)NameNode滚动正在写的Edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重命名为fsimage。
DataNode工作机制
(1) 一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
(2) DataNode启动后向NameNode注册,通过后,周期性的向NameNode上报所有的块信息。
(3) 心跳是每3秒一次,心跳返回结果带有Name Node给该DataNode的命令如复制缺数据到另台机器,或者删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
(4)集群运行中可以安全加入和推出一些机器。
Mapreduce
1.1MapReduce定义
MapReduce是一种简化并行计算的变成模型,用于进行大数据量的计算。
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
1.2MapReduce优缺点
1.2.1优点
(1)MapReduce易于编程:
他简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得流行起来。
(2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
(3)高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,就是要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
(4)擅长对PB级以上海量数据进行离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
1.2.2缺点
(1)不擅长实时计算
MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
(2)不擅长流式计算
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。就是因为MapReduce自身的设计特点决定了数据源必须是静态的。
(3)不擅长DAG(有向图)计算
多个应用程序存在一定的依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入磁盘,会造成大量的磁盘IO,导致性能非常的低下。
1.3MapReduce进程
一个完整的MapReduce程序在分布式运算时有三类实例进程:
1.MrAppMaster:负责整个程序的过程调度以及状态协调。
2.MapTask:负责Map阶段的整个数据处理流程。
3.ReduceTask:负责Reduce阶段的整个数据处理流程。
1.4常用数据序列化类型
常用的数据类型对应的Hadoop数据序列化类型
1.5MapReduce编程规范
1.Mapper阶段
(1)用户自定义的Mapper要继承
2.Reduce阶段
(1)用户自定义的Reduce要继承自己的父类
(2)Reduce的输入数据类型对应的是Mapper的输出数据类型,也是kv
(3)Reduce的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同的k的组调用一次reduce()方法
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。
1.6WordCount案例操作
1.需求
给定文本文件,统计文本文件中单词的个数,输出每个单词出现的总次数。
(1)输入数据
I am somebodyI am smart and kindI am importantI am starve of educationI have places to goI have people to
(2)输出数据
2.需求分析
Mapper部分
key代表偏移量,v1这一行的内容,text
输入 k1=0,v1=\"I am somebody\"
k1=14,v1=\"I am smart and kind\"
3.1将MapTask传给我们的文本内容先转换成String
I am somebody
3.2根据空格将这一行切分成单词
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper{ @Override protected void map(LongWritable key1,Text value1,Context context) throws IOException,InterruptedException{ //1.将数据转成String类型 String data=value1.toString(); //2.分词 String[] words=data.split(\" \"); for (String word:words){ //输出 key2,value2 context.write(new text(word),new IntWritable(1)); } }}
图
如下:
如下:
v3是v2的集合
如下:
Reduce阶段:
输入:,v3是v2同一个key的集合
输出:
1.汇总各个key的个数
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver { public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException { //1. Configuration configuration=new Configuration(); Job job=Job.getInstance(configuration); //2. job.setJarByClass(WordCountDriver.class); //3. job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordcountReduce.class); //4. job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5. job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6. FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); FileInputFormat.setInputPaths(job,new Path(\"/root/IdeaProjects/mapReduceText/lotus.txt\")); FileOutputFormat.setOutputPath(job,new Path(\"/root/IdeaProjects/mapReduceText/WordCountoutput\")); //7. boolean result=job.waitForCompletion(true); System.exit(result?0:1); }}
3.1输入数据处理InputFormat
作用:1.数据切分 2.为Mapper提供输入数据
3.1.1切片与MapTask并行度决定机制
1.MapTask的并行度决定Map阶段的任务处理并发度,进而影响到Job的处理速度。
MapTask不是越多越好。
2.MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分层一块一块。
数据切片:数据切片只是在逻辑上对输入进行切片,并不会在磁盘上将其切分成片进行存储。
1.一个Job的map阶段并行度由客户端在提交Job时的切片数决定
2.每一个切片分配一个MapTask并行实例处理
3.默认情况下,切片大小=blocksize(128M)
4.切片时不考虑数据集整体情况,而是逐个针对每一个文件单独切片(不让文件合并再切,不管你几个文件,一个文件一个文件处理)
3.1.3FileInputFormat实现类
1.FileInputFormat常见的接口实现类包括:TextInputFormat,KeyValueTextInput,NLineFormat,CombineTextInputFormat和自定义InputFormat等。
TextInputFormat是默认的FileInputFormat实现类。
MapReduce的默认输入格式是?TextInputFormat
3.2Shuffle
3.3输出数据OutputFormat
3.3.1OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。
OutputFormat的实现类有TextOutputFormat,SequenceFileOutputFormat,自定义OutputFormat。
默认的输出格式是TextOutputFormat,他把每条记录都写为文本行。
3.3.2自定义OutputFormat
1.使用场景
为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。
例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。
2.自定义OutputFormat步骤
(1)自定义一个类继承FileOutputFormat
(2)改写RecordWriter,具体改写输出数据的方法write()。
Partition定义:
Mapper任务划分数据的过程称作Partition。
负责实现数据的类称作Partitioner,默认的分区是Hash分区(Hash Partition)。Partition作用:将map阶段产生的所有对分配给不同的Reducer处理,可以将Reduce阶段的处理负载进行分摊。
→ Partition的数量决定Reducer的数量。
整个过程涉及五个独立的实体:
(1)客户端:提交MapReduce作业
(2)YARN Resource Manager:负责协调集群上计算机资源的分配
(3)YARN Node Manager:负责启动和监视集群中机器上的计算容器(container)
(4) MapReduce的Application Master,负责协调运行MapReduce作业的任务。它和
MapReduce任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理
(5)分布式文件系统(一般为HDFS):共享作业文件
YARN是一种通用资源调度框架,不仅仅支持MapReduce
(3)NodeManager(节点管理器)
NodeManager是每个节点上的资源和任务管理器。
定时向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态;接收并处理来自ApplicationManager的Container启动/停止等请求。
(4)ApplicationMaster (主应用)
ApplicationMaster是一个详细的框架库,它结合从 ResourceManager 获得的资源和NodeManager 协同工作来运行和监控任务。
用户提交的每一个应用程序均包含一个ApplicationMaster.主要功能包括:
1)、与ResourceManager调度器协商以获取抽象资源 (Container);
2)、负责应用的监控,跟踪应用执行状态,重启失败任务等;
3)、并且与NodeManager协同工作完成Task的执行和监控。
(1)作业调度
(2)任务进度监控(跟踪任务,重启失败任务,记录任务的流水)
ResourceManager作业调度
(1)作业调度
(2)任务进度监控(跟踪任务,重启失败任务,记录任务的流水)
YARN调度器
YARN调度器分为三种:(1)FIFO Scheduler:先进先出调度器 (2)Capacity Scheduler:容器调度器 (3)Fair Scheduler:公平调度器
(1) FIFO Scheduler:先进先出调度器,是最简单的调度器,任务是按顺序执行的,哪个任务先提交,就先执行哪个任务,而且任务执行时资源利用率为100%。
(2)Capacity Schedule:容量调度器 分成多个队列来执行不同的任务,每个队列占用一定资源,可以看作是FIFO Scheduler的多队列版本。每个队列可以限制资源使用量。但是,队列间的资源分配以使用量作为排列依据,使得容量小的队列有竞争优势。需要注意的是:如果不限制某队列最大容量,则运行过程中,它可以占用全部资源。
YARN默认采用CapacityScheduler(容量调度器)
(3)Fair Scheduler:公平调度器所谓的“公平”,强调的是任务按队列公平的使用YARN资源,即,队列内的任务公平使用队列中的资源。需要注意的是:假设每个任务具有相同的优先级,采用公平调度器将平均分配系统的资源