下载安卓APP箭头
箭头给我发消息

客服QQ:3315713922

软件设计:spark存储管理之磁盘存储--DiskStore

作者:the.forgotten     来源: https://www.cnblogs.com/zhuge134/p/11007328.html点击数:917发布时间: 2020-04-15 15:08:07

标签: Hadoop数据处理软件设计

  ApacheSpark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UCBerkeleyAMPlab(加州大学伯克利分校的AMP实验室)所开源的类HadoopMapReduce的通用并行框架,Spark,拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是--Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

  DiskStore

  接着上一篇,本篇,我们分析一下实现磁盘存储的功能类DiskStore,这个类相对简单。在正式展开之前,我觉得有必要大概分析一下BlockManager的背景,或者说它的运行环境,运行的作用范围。Blockmanager这个类其实在运行时的每个节点都会有一个实例(包括driver和executor进程),因为不论是driver端进行广播变量的创建,还是executor端shuffle过程中写shuffle块,或者是任务运行时结果太大需要通过BlockManager传输,或者是RDD的缓存,其实在每个运行节点上都会通过Blockmanager来管理程序内部对于本地的内存和磁盘的读写,所以综上,我想表达的核心意思就是每个进程(driver和executor)都有一Blockmanager实例,而这些Blockmanager实例是通过BlockManagerId类来进行唯一区分的,BlockManagerId实际上是对进程物理位置的封装。

  DiskStore.put

  首先我们来看一个最常用的写入方法

  defput(blockId:BlockId)(writeFunc:WritableByteChannel=>Unit):Unit={

  //通过DiskBlockManager对象检查这个blockId对应的文件名的文件是否存在

  if(contains(blockId)){

  thrownewIllegalStateException(s"Block$blockIdisalreadypresentinthediskstore")

  }

  logdebug(s"Attemptingtoputblock$blockId")

  valstartTime=System.currentTimeMillis

  //通过DiskBlockManager获取一个文件用于写入数据

  valfile=diskManager.getFile(blockId)

  //用CountingWritableChannel包装一下,以便于记录写入的字节数

  valout=newCountingWritableChannel(openForWrite(file))

  varthrewException:Boolean=true

  try{

  writeFunc(out)

  //关键步骤,记录到内部的map结构中

  blockSizes.put(blockId,out.getCount)

  threwException=false

  }finally{

  try{

  out.close()

  }catch{

  caseioe:IOException=>

  if(!threwException){

  threwException=true

  throwioe

  }

  }finally{

  if(threwException){

  remove(blockId)

  }

  }

  }

  valfinishTime=System.currentTimeMillis

  logDebug("Block%sstoredas%sfileondiskin%dms".format(

  file.getName,

  Utils.bytesToString(file.length()),

  finishTime-startTime))

  }

  这个方法很简单,没什么好说的,但是调用了一个比较重要的类DiskBlockManager,这个类的功能就是对磁盘上的目录和文件进行管理,会在磁盘上按照一定规则创建一些目录和子目录,在分配文件名时也会尽量均匀第分配在这些目录和子目录下。

  DiskStore.putBytes

  这个方法就不说了,简单处理一下直接调用put方法。

  defputBytes(blockId:BlockId,bytes:ChunkedByteBuffer):Unit={

  put(blockId){channel=>

  bytes.writeFully(channel)

  }

  }

  DiskStore.getBytes

  我们来看一下这个方法,首先通过DiskBlockManager获取对应的文件名,然后将其包装成一个BlockData对象,分为加密和不加密两种。

  defgetBytes(blockId:BlockId):BlockData={

  valfile=diskManager.getFile(blockId.name)

  valblockSize=getSize(blockId)

  securityManager.getIOEncryptionKey()match{

  caseSome(key)=>

  //Encryptedblockscannotbememorymapped;returnaspecialobjectthatdoesdecryption

  //andprovidesInputStream/FileRegionimplementationsforreadingthedata.

  newEncryptedBlockData(file,blockSize,conf,key)

  case_=>

  //看一下DiskBlockData

  newDiskBlockData(minMemoryMapBytes,maxMemoryMapBytes,file,blockSize)

  }

  }

  DiskBlockData

  这个类作为磁盘文件的包装类,主要功能是提供了几个方便的接口,将磁盘文件中的数据读取出来并生成缓冲对象。

  这个类中有两个重要的方法toChunkedByteBuffer和toByteBuffer,toByteBuffer就不说了,调用ReadableByteChannel.read(ByteBufferdst)方法读取文件数据,我们看一下toChunkedByteBuffer

  DiskBlockData.toChunkedByteBuffer

  这个方法也很简单,在数据量比较大的时候,由于每次申请的内存块大小有限制maxMemoryMapBytes,所以需要切分成多个块

  overridedeftoChunkedByteBuffer(allocator:(Int)=>ByteBuffer):ChunkedByteBuffer={

  //Utils.tryWithResource调用保证在使用完资源后关闭资源

  //基本等同于java中的try{}finally{}

  Utils.tryWithResource(open()){channel=>

  varremaining=blockSize

  valchunks=newListBuffer[ByteBuffer]()

  while(remaining>0){

  //这里取剩余大小和maxMemoryMapBytes的较小值,

  //也就是说每次申请的内存块大小不超过maxMemoryMapBytes

  valchunkSize=math.min(remaining,maxMemoryMapBytes)

  valchunk=allocator(chunkSize.toInt)

  remaining-=chunkSize

  JavaUtils.readFully(channel,chunk)

  chunk.flip()

  chunks+=chunk

  }

  newChunkedByteBuffer(chunks.toArray)

  }

  }

  DiskBlockManager

  这个类之前也分析过,主要是用来管理spark运行过程中写入的一些临时文件,以及目录的管理。

  1首先会根据参数配置创建本地目录(可以是逗号分隔的多个目录),参数的优先顺序是:如果是运行在yarn上,则会使用yarn参数LOCAL_DIRS配置的本地目录;否则获取环境变量SPARK_LOCAL_DIRS的值;否则获取spark.local.dir参数的值;最后如果都没有配置,那么就用java系统参数java.io.tmpdir的值作为临时目录。

  2其次,关于文件在目录之间分配的问题,使用文件名的hash值对目录数量取余的方法来尽量将文件均匀地分配到不同的目录下。

  3另外一点要说的是文件名的命名规则,是根据不同作用的Block来区别命名的,例如RDD缓存写入的block的id就是RDDBlockId,它的文件名拼接规则是"rdd_"+rddId+"_"+splitIndex

  Spark是一种与Hadoop相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使Spark在某些工作负载方面表现得更加优越,换句话说,Spark启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。

赞(9)
踩(0)
分享到:
华为认证网络工程师 HCIE直播课视频教程