HDFS写入过程方法调用逻辑 & 源码注释解读
1. 框架图展示
2. 源码解读
2.1 HDFS客户端新建FileSystem对象
2.1.1 注释文档翻译
An abstract base class for a fairly generic filesystem. It
may be implemented as a distributed filesystem, or as a "local"
one that reflects the locally-connected disk. The local version
exists for small Hadoop instances and for testing.
All user code that may potentially use the Hadoop Distributed
File System should be written to use a FileSystem object. The
Hadoop DFS is a multi-machine system that appears as a single
disk. It's useful because of its fault tolerance and potentially
very large capacity.
2.1.2 新建Configuration对象
static{ //print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); if (cL == null) { cL = Configuration.class.getClassLoader(); } if(cL.getResource("hadoop-site.xml")!=null) { LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " + "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, " + "mapred-site.xml and hdfs-site.xml to override properties of " + "core-default.xml, mapred-default.xml and hdfs-default.xml " + "respectively"); } addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); }
2.1.3 获取FileSystem对象
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; }
2.2 在名称节点上新建元数据
2.2.1 注释文档翻译
Implementation of the abstract FileSystem for the DFS system.
This object is the way end-user code interacts with a Hadoop
DFSClient can connect to a Hadoop Filesystem and
perform basic file tasks. It uses the ClientProtocol
to communicate with a NameNode daemon, and connects
directly to DataNodes to read/write block data.
Hadoop DFS users should obtain an instance of
DistributedFileSystem, which uses DFSClient to handle
filesystem tasks.
DFSOutputStream creates files from a stream of bytes.
The client application writes data that is cached internally by
this stream. Data is broken up into packets, each packet is
typically 64K in size. A packet comprises of chunks. Each chunk
is typically 512 bytes and has an associated checksum with it.
2.2.2 新建元数据源码解读
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress ) throws IOException { return this.create(f, FsPermission.getFileDefault().applyUMask( FsPermission.getUMask(getConf())), overwrite, bufferSize, replication, blockSize, progress); }
@Override public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { @Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); }
public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); } FsPermission masked = permission.applyUMask(dfsClientConf.uMask); if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes)); beginFileLease(result.getFileId(), result); return result; }
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForCreate", src); try { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( AccessControlException.class, DSQuotaExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throw new IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); out.start(); return out; } finally { scope.close(); } }
查看关键代码,我们发现这个stat对象是调用namenode的create方法产生的,而ctrl + 左键点击namenode后发现namenode正是之前注释里面提到的ClientProtocal的一个实例对象,而ClientProtocal是一个接口,它的一个实现子类名字叫做ClientNamenodeProtocalTranslatorPB就是我们想要的,我们找寻这个类的方法,最终发现了create方法!!!而返回值是通过调用rpcProxy的create方法实现的,这里用到的是Google的Protobuf序列化技术
@Override public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) throws AccessControlException, AlreadyBeingCreatedException, DSQuotaExceededException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() .setSrc(src) .setMasked(PBHelper.convert(masked)) .setClientName(clientName) .setCreateFlag(PBHelper.convertCreateFlag(flag)) .setCreateParent(createParent) .setReplication(replication) .setBlockSize(blockSize); builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions)); CreateRequestProto req = builder.build(); try { CreateResponseProto res = rpcProxy.create(null, req); return res.hasFs() ? PBHelper.convert(res.getFs()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
2.3 新建FSDataOutputStream对象
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); out.start(); return out;
private synchronized void start() { streamer.start(); }
@Override public void run() { long lastPacket = Time.monotonicNow(); TraceScope scope = NullScope.INSTANCE; while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder if (hasError && response != null) { try { response.close(); response.join(); response = null; } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } DFSPacket one; try { // process datanode IO errors if any boolean doSleep = false; if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) { doSleep = processDatanodeError(); } synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.monotonicNow(); while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING)? timeout : 1000; try { dataQueue.wait(timeout); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.monotonicNow(); } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // get packet to be sent. if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); assert one != null; } else { one = dataQueue.getFirst(); // regular data packet long parents[] = one.getTraceParents(); if (parents.length > 0) { scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); // TODO: use setParents API once it's available from HTrace 3.2 // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); // scope.getSpan().setParents(parents); } } } // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Allocating new block"); } setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src); } if (one.isLastPacketInBlock()) { // wait for all data packets have been successfully acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { try { // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet Span span = null; synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { span = scope.detach(); one.setTraceSpan(span); dataQueue.removeFirst(); ackQueue.addLast(one); dataQueue.notifyAll(); } } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one); } // write out data to remote datanode TraceScope writeScope = Trace.startSpan("writeTo", span); try { one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. tryMarkPrimaryDatanodeFailed(); throw e; } finally { writeScope.close(); } lastPacket = Time.monotonicNow(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // Is this block full? if (one.isLastPacketInBlock()) { // wait for the close packet has been acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } endBlock(); } if (progress != null) { progress.progress(); } // This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) { Thread.sleep(artificialSlowdown); } } catch (Throwable e) { // Log warning if there was a real error. if (restartingNodeIndex.get() == -1) { DFSClient.LOG.warn("DataStreamer Exception", e); } if (e instanceof IOException) { setLastException((IOException)e); } else { setLastException(new IOException("DataStreamer Exception: ",e)); } hasError = true; if (errorIndex == -1 && restartingNodeIndex.get() == -1) { // Not a datanode issue streamerClosed = true; } } finally { scope.close(); } } closeInternal(); } private void closeInternal() { closeResponder(); // close and join closeStream(); streamerClosed = true; setClosed(); synchronized (dataQueue) { dataQueue.notifyAll(); } }
标签:HDFS,DFSClient,create,写入,源码,&&,FileSystem,DFSOutputStream,dfsClient 来源: https://www.cnblogs.com/w950219/p/11907083.html