hadoop8天课程——第二天,hdfs详解
< 返回列表时间: 2020-03-25来源:OSCHINA
hdfs的集群中的三种角色:Namenode(NN),Datanode(DN),SecondNamenode(SN)
Namenode的工作机制
NN是hdfs的管理节点,主要职责包括:1.管理文件系统的命名空间,维护着hdfs中的虚拟文件目录树结构。2.保存文件系统中所有文件和目录元数据信息(机制复杂,内存中一份完整的,fsimage和edits log加起来是一份完整的)。3.相应客户端的请求,无论读写hdfs都会先访问NN,节点的交互细节都被封装在FileSystem类中了。4.NN中也会记载每个文件的各个块所在数据节点信息,但是NN不会持久化保存文件块与节点的映射信息,因为这些信息会在系统启动时候根据数据节点汇报上来的信息进行重建。
客户端向集群中存储数据之前,首先需要访问NN申请写入文件,NN检测对应的虚拟文件是否存在,如果不存在就向客户端返回可以存入并返回分配的DN,客户端对文件进行切分,将各个block存入NN返回的DN列表中写入。而一个blk的多个副本是由DN向其他的节点写入的,而不是由客户端来直接写入的,当DN在写某个副本时候失败了,会将失败信息返回给NN,NN会重新分配DN进行副本写入。文件的最后一个blk可能不满128M。但是这样的一个blk也要在NN中含有一条元信息记录(一个blk的元数据信息大约150B)。所以hdfs不善于存储小文件,因为小文件耗费NN的存储空间,MapReduce的性能也降低。
向集群中读写文件,都需要访问NN,所以NN的负载很大。那么如何提高NN的相应速度呢?每次对NN的访问,都会涉及到元数据的读取,为了尽可能快速读取元数据,可以将元数据全放内存,但是内存是遗失性存储,这样做安全性无法得到保证,但是元数据全放磁盘则查询又会太慢。
所以hdfs维护了两份元数据:内存中一份元数据(为了提高查询速度,所以也可以看到,NN需要将元数据全部都放在内存中,所以内存大小可能会限制整个hdfs的文件存储规模),磁盘中一份数据fsimage(为了元数据持久化),还有一个用于记录hdfs集群最新操作日志的小文件edits log。内存中的元数据和fsimage不总是完全一致的,内存中的元数据总是领先fsimage一个edits log的内容。因为最新的元数据总是先写入edits log,写入成功后让客户端去写文件,文件写入成功之后,NN将这份最新的元数据加载到内存中。当SN执行checkpoint操作时,会将fsimage和editslog的内容合并成新的新的fsimage,这个时候fsimage和内存中的元数据几乎相同(可能仍然还差着一个edits.new)。
edits log是一个小文件(默认是64M),并不能存太多的数据,以免影响写入速度。所以当edits log写满了的时候,则应该将edits log中的数据全部合并到fsimage中。但是edits log是日志格式,与fsimage的格式不一样,所以需要一定的资源用于合并,如果这个任务交给NN,则会加大NN的压力,所以引入了SN来解决fsimage和edits log合并的问题。当edits log写满时或者到了一个额定时间(默认3600s),NN通知SN来做checkpoint操作合并fsimage和edits log。SN进行check point操作:首先通知NN不要继续向原来edits中写入新数据了,NN把最新的元数据写入到edits.new中,然后SN从NN下载fsimage和edits,SN将fsimage导入内存,然后对其应用edits中的日志操作,最后生成一个fsimage.checkpoint文件并保存,然后将fsimage.checkpoint上传到NN。NN用将fsimage.checkpoint重命名为fsimage,edits.new重命名为edits,以替换掉旧的fsimage和edits。
以上的这一套机制,只能做到数据可靠而无法保证系统高可用。猜想:双NN,其中一个作为完全热备,但是需要保证双NN上的元数据的一致性。
Datanode的工作机制
DN提供真实数据的存储服务和数据库检索服务,DN会定期向NN发送他们所存储的块的列表。hdfs参数dfs.block.size能够控制块大小,默认是128M。即使一个文件没有达到block的大小,仍然会占用一条元数据。所以说hdfs最好存大文件,小文件比较费NN的元数据存储空间。hdfs存储文件block不会添加任何额外的内容,完全就是按照字节数来切分,到一个大小就切,到一个大小就开始切。
HDFS的java客户端编写
启动centos的图形界面命令:init 5或者startx
eclipse添加依赖包的过程:java build path->Libraries->Add Library->User Library->User libraries->New->add extermal JARs import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.junit.Before; import org.junit.Test; public class HdfsUtil { FileSystem fs = null; public void init() throws Exception{ //读取classpath下的xxx-site.xml 配置文件,并解析其内容,封装到conf对象中 Configuration conf = new Configuration(); //也可以在代码中对conf中的配置信息进行手动设置,会覆盖掉配置文件中的读取的值 conf.set("fs.defaultFS", "hdfs://weekend110:9000/"); //根据配置信息,去获取一个具体文件系统的客户端操作实例对象,记得hdfs的权限问题 fs = FileSystem.get(new URI("hdfs://weekend110:9000/"),conf,"hadoop"); } /** * 上传文件,比较底层的写法 * * @throws Exception */ public void upload() throws Exception { init(); Path dst = new Path("hdfs://weekend110:9000/aa/qingshu.txt"); FSDataOutputStream os = fs.create(dst); FileInputStream is = new FileInputStream("c:/qingshu.txt"); IOUtils.copy(is, os); } /** * 上传文件,封装好的写法 * @throws Exception * @throws IOException */ public void upload2() throws Exception, IOException{ init(); fs.copyFromLocalFile(new Path("c:/qingshu.txt"), new Path("hdfs://weekend110:9000/aaa/bbb/ccc/qingshu2.txt")); } /** * 下载文件,比较底层的写法 * @throws Exception */ public void download2() throws Exception { init(); FSDataInputStream is = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz")); FileOutputStream os = new FileOutputStream("c:/jdk7.tgz"); IOUtils.copy(is, os); } /** * 下载文件,封装好的写法 * @throws Exception * @throws IllegalArgumentException */ public void download() throws Exception { init(); fs.copyToLocalFile(new Path("hdfs://weekend110:9000/aa/qingshu2.txt"), new Path("c:/qingshu2.txt")); } /** * 查看文件信息 * @throws IOException * @throws IllegalArgumentException * @throws FileNotFoundException * */ public void listFiles() throws FileNotFoundException, IllegalArgumentException, IOException { init(); // listFiles列出的只是文件信息,不会列出文件夹的信息,可以对文件夹递归遍历 RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/"), true); while(files.hasNext()){ LocatedFileStatus file = files.next(); Path filePath = file.getPath(); String fileName = filePath.getName(); System.out.println(fileName); } System.out.println("---------------------------------"); //listStatus 可以列出文件和文件夹的信息,但是不提供自带的递归遍历 FileStatus[] listStatus = fs.listStatus(new Path("/")); for(FileStatus status: listStatus){ String name = status.getPath().getName(); System.out.println(name + (status.isDirectory()?" is dir":" is file")); } } /** * 创建文件夹 * @throws Exception * @throws IllegalArgumentException */ public void mkdir() throws IllegalArgumentException, Exception { init(); fs.mkdirs(new Path("/aaa/bbb/ccc")); } /** * 删除文件或文件夹 * @throws IOException * @throws IllegalArgumentException */ public void rm() throws IllegalArgumentException, IOException { init(); fs.delete(new Path("/aa"), true); } public static void main(String[] args) throws Exception { download(); } }
fileSystem设计思想
将所有的文件系统抽象成了一个FileSystem之后,访问具体的对象时候,只管调用方法,不关心底层的实例对象到底是谁,这样MapReduce程序与底层的文件系统解耦合了。
hadoop框架中的RPC调用机制
RPC 远程过程调用,主要应用于分布式系统。比如有三台机器,a,b,c。a是一个客户端,访问了b,b中没有请求的服务,这个服务在远程的c机器上。b通过RPC机制能够向服务在本地一样地去调用c上的服务程序。具体如下:b将调用的服务类,方法,方法的参数等信息封装以下通过本地的socket client发送给c上的socket server,然后c机器根据调用信息,使用反射机制,获得一个服务类对象,调用相应的方法,然后再将方法的执行结果通过本地socket client发送给b上的一个socket server,然后b将这个结果解析出来。
NN和client,NN和DN,DN和DN之间很多通信,使用了大量的RPC通信机制,例如DN会定期的向NN报告本身的block情况。HADOOP实现了一个RPC框架——代理类机制(使用了动态代理和反射和socket技术)。1.生成调用端socket程序的动态代理对象。2.调用动态代理对象的业务方法。3.调用socket的请求方法。4发送调用请求。5.服务端开始了:生成业务类的动态代理对象。6.调用业务类动态代理对象的具体业务方法。7.获取调用结果。8.返回调用结果给服务端。9.给原始调用者返回结果。
Hadoop的RPC框架调用
主要的以依赖包都在share/common中。使用hadoop的RPC框架的最基本的部分:1.协议(表现为java接口),代理类和业务类都需要实现这个接口已确保方法的一致性。值得注意的是在协议接口中需要指定一个 public static final long versionID 的协议版本号,后面代理类调用方法时候,会用到这个参数,用来验证代理类和业务类的协议版本的一致性。2.业务类:一般类名以Impl结尾,在业务类中需要实现协议接口中的所有方法,这些被实现的方法都会以服务的形式发布出去。3.controller类,controller类接收客户端的请求,利用RPC.getProxy()方法生成一个代理类对象,然后用这个代理类对象调用一个协议接口中的方法,通过RPC机制远程调用业务类的方法,得到结果再返回给客户端。4.Starter类这个类名随意,主要的作用是将业务类中实现的协议接口的方法发布为一个服务:新建一个RPC.Builder类对象,然后设置服务的地址,端口,协议以及业务类,之后利用builder对象获得一个server对象,再启动起来就ok了。
1.协议接口 接口实现: public interface LoginServiceInterface { public static final long versionID=1L;//用于协定协议的版本号,RPC调用的时候会用到 public String login(String username,String password); }
2.服务端 业务类的实现: public class LoginServiceImpl implements LoginServiceInterface { @Override public String login(String username, String password) { return username + " logged in successfully!"; } } 启动类的实现: import java.io.IOException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Builder; import org.apache.hadoop.ipc.RPC.Server; public class Starter { public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { Builder builder = new RPC.Builder(new Configuration()); builder.setBindAddress("weekend110").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl()); Server server = builder.build(); server.start(); } }
客户端 import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; public class LoginController { public static void main(String[] args) throws Exception { LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("weekend110", 10000), new Configuration()); String result = proxy.login("mijie", "123456"); System.out.println(result); } }
思考题:服务的动态转发和负载均衡???zookeeper
hdfs 源码分析 FileSystem对象的创建过程(创建与NN进行通信的RPC代理类) hdfs的一个核心类:FileSystem,fs必要的成员:1.rpcProxy代理类,这个代理类应该实现一个接口clientProtocal
源码重点:如何根据conf来获取对应的fs,获取fs时候,如何获得RPCProxy代理类对象。
首先获取conf中的URI,然后将URI解析成scheme(hdfs)和authority(weekend110:9000)
getInternal方法:单例模式,懒汉模式。先从一个Map中根据Key来获取,如果没有获取到,再去创建Filesysytem
createFileSystem方法:获取对应的fileSystem的class,在反射出来一个相应对象(这个对象是空的,里面的各个数据域都是基本值)
fs.initalize()方法:这是具体实现类的方法,设置DFS类的dfs,uri,workDir。设置dfs时候是构造了一个DFSClient对象。dfs的namenode就是rpc代理对象。
总结:根据conf对象,利用反射机制拿到DistributedFileSystem(是FileSystem的一个继承子类)对象,然后设置fs的各个数据域,其中有一个很重要的数据域 打开输入流的过程(创建与DN进行通信的RPC代理类)
先利用那个namenode代理类与NN通信,拿到文件的元信息(各个block的位置)
DFSInputStream的blockReader是与DN进行交互获得各个blk的详细过程。
locatedBlocks记录各个blk的详细信息。
热门排行