本文共 4251 字,大约阅读时间需要 14 分钟。
hdfs缓存是为了减少对数据的重复访问请求,hdfs的缓存通过缓存块实现。
缓存块由普通文件块转换得来。缓存指在要访问的dataNode的内存中,访问时命中缓存则无需读取磁盘,可以大大提高用户读取文件的速度。
在缓存块中,其内部枚举State代码如下:
private static enum State { CACHING, CACHING_CANCELLED, CACHED, UNCACHING; private State() { } public boolean shouldAdvertise() { return this == CACHED; } }
可看到由CACHING(正在缓存),CACHING_CHACHED(缓存取消),CACHED(已缓存),UNCACHING(缓存块正在取消缓存状态)
块缓存状态保存在 FSDatasetCache的内存类Value中
private static final class Value { final FsDatasetCache.State state; final MappableBlock mappableBlock; Value(MappableBlock mappableBlock, FsDatasetCache.State state) { this.mappableBlock = mappableBlock; this.state = state; } }
其中块Id对象和Value对象组成缓存映射图,保存在变量mappableBlockMap中
private final HashMapmappableBlockMap = new HashMap();
块缓存时,通过传入的块的各种参数信息,使用cacheBlock方法缓存,并将块ID–>Value数据加到mappableBlockMap中
synchronized void cacheBlock(long blockId, String bpid, String blockFileName, long length, long genstamp, Executor volumeExecutor) { ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); FsDatasetCache.Value prevValue = (FsDatasetCache.Value)this.mappableBlockMap.get(key); if (prevValue != null) { LOG.debug("Block with id {}, pool {} already exists in the FsDatasetCache with state {}", new Object[]{ blockId, bpid, prevValue.state}); this.numBlocksFailedToCache.incrementAndGet(); } else { this.mappableBlockMap.put(key, new FsDatasetCache.Value((MappableBlock)null, FsDatasetCache.State.CACHING)); volumeExecutor.execute(new FsDatasetCache.CachingTask(key, blockFileName, length, genstamp)); LOG.debug("Initiating caching for Block with id {}, pool {}", blockId, bpid); } }
同理,块缓存清除时,使用uncacheBlock方法,方式类似操作相反
synchronized void uncacheBlock(String bpid, long blockId) { ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); FsDatasetCache.Value prevValue = (FsDatasetCache.Value)this.mappableBlockMap.get(key); boolean deferred = false; if (!this.dataset.datanode.getShortCircuitRegistry().processBlockMunlockRequest(key)) { deferred = true; } if (prevValue == null) { LOG.debug("Block with id {}, pool {} does not need to be uncached, because it is not currently in the mappableBlockMap.", blockId, bpid); this.numBlocksFailedToUncache.incrementAndGet(); } else { switch(prevValue.state) { case CACHING: LOG.debug("Cancelling caching for block with id {}, pool {}.", blockId, bpid); this.mappableBlockMap.put(key, new FsDatasetCache.Value(prevValue.mappableBlock, FsDatasetCache.State.CACHING_CANCELLED)); break; case CACHED: this.mappableBlockMap.put(key, new FsDatasetCache.Value(prevValue.mappableBlock, FsDatasetCache.State.UNCACHING)); if (deferred) { LOG.debug("{} is anchored, and can't be uncached now. Scheduling it for uncaching in {} ", key, DurationFormatUtils.formatDurationHMS(this.revocationPollingMs)); this.deferredUncachingExecutor.schedule(new FsDatasetCache.UncachingTask(key, this.revocationMs), this.revocationPollingMs, TimeUnit.MILLISECONDS); } else { LOG.debug("{} has been scheduled for immediate uncaching.", key); this.uncachingExecutor.execute(new FsDatasetCache.UncachingTask(key, 0L)); } break; default: LOG.debug("Block with id {}, pool {} does not need to be uncached, because it is in state {}.", new Object[]{ blockId, bpid, prevValue.state}); this.numBlocksFailedToUncache.incrementAndGet(); } } }
DataNode 用来缓存块的最大内存空间大小,单位用字节表示。系统变量 RLIMIT_MEMLOCK 至少需要设置
得比此配置值要大,否则DataNode会出现启动失败的现象。在默认的情况下,此配置值为0,表明默认关闭内存缓存的功能。dfs.datanode.max.locked.memory D
缓存使用于HDFS中的热点公共资源文件和短期临时的热点数据文件
一种是公共资源文件,如存放在HDFS上共享的全局资源文件(jar包等) 一种是短期使用的热点数据文件,如每天要做报表统计时,需读取前一天的数据做分析(如次日留存率,日数据对比等)转载地址:http://zykai.baihongyu.com/