Neptune

Dark, cold, and whipped by supersonic winds, ice giant Neptune is the eighth and most distant planet in our solar system.

  menu
32 文章
0 浏览
ღゝ◡╹)ノ❤️

【bigdata】Hadoop总结

hadoop组成

image

hdfs架构

namenode:存储文件的元数据

datanode: 存储文件块数据
文件块大小,2以前64M,2之后是128M

secondary namenode:监控hdfs状态的辅助后台程序,每隔一段时间获取元数据的快照
image

hdfs写数据流程

image
1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。

2)NameNode返回是否可以上传。

3)客户端请求第一个 Block上传到哪几个DataNode服务器上。

4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。

5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。

6)dn1、dn2、dn3逐级应答客户端。

7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。

8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。

hdfs读数据流程

image
1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。

2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。

3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。

4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

NameNode和SecondaryNameNode
工作机制

image

  1. 第一阶段:NameNode启动

(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。

(2)客户端对元数据进行增删改的请求。

(3)NameNode记录操作日志,更新滚动日志。

(4)NameNode在内存中对数据进行增删改。
2. 第二阶段:Secondary NameNode工作

(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。

(2)Secondary NameNode请求执行CheckPoint。

(3)NameNode滚动正在写的Edits日志。

(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。

(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件fsimage.chkpoint。

(7)拷贝fsimage.chkpoint到NameNode。

(8)NameNode将fsimage.chkpoint重新命名成fsimage。
namenode故障处理

1.删除数据直接复制:杀掉namenode,删除数据,复制secondary namenode数据到namenode,重启

2.使用-importCheckpoint选项启动NameNode守护进程,从而将SecondaryNameNode中数据拷贝到NameNode目录中:

bin/hdfs namenode -importCheckpoint

datanode
工作机制

image
1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。

2)DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。

3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。

4)集群运行中可以安全加入和退出一些机器。

yarn架构

image

mapreduce架构概述

mapreduce将计算过程分成两个阶段:

1)map阶段并行处理输入数据

2)reduce阶段对map结果进行汇总

常用数据类型:

Java类型Hadoop Writable类型writable
booleanBooleanWritable
byteByteWritable
intIntWritable
floatFloatWritable
longLongWritable
doubleDoubleWritable
StringText
mapMapWritable
arrayArrayWritable

mr编程(mapper,reducer,driver)

image
image
示例代码:
driver

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
Driver驱动
*/
public class WordcountDriver {

 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

  // 1 获取配置信息以及封装任务
  Configuration configuration = new Configuration();
  Job job = Job.getInstance(configuration);

  // 2 设置jar加载路径
  job.setJarByClass(WordcountDriver.class);

  // 3 设置map和reduce类
  job.setMapperClass(WordcountMapper.class);
  job.setReducerClass(WordcountReducer.class);

  // 4 设置map输出
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);

  // 5 设置最终输出kv类型
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  // 6 设置输入和输出路径
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  // 7 提交
  boolean result = job.waitForCompletion(true);

  System.exit(result ? 0 : 1);
 }
}

mapper

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 
 Text k = new Text();
 IntWritable v = new IntWritable(1);
 
 @Override
 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  
  // 1 获取一行
  String line = value.toString();
  
  // 2 切割
  String[] words = line.split(" ");
  
  // 3 输出
  for (String word : words) {
   
   k.set(word);
   context.write(k, v);
  }
 }
}

reducer

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

int sum;
IntWritable v = new IntWritable();

 @Override
 protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
  
  // 1 累加求和
  sum = 0;
  for (IntWritable count : values) {
   sum += count.get();
  }
  
  // 2 输出
       v.set(sum);
  context.write(key,v);
 }
}

自定义bean对象实现序列化接口(Writable)

具体实现bean对象序列化步骤如下7步。

(1)必须实现Writable接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
 super();
}

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
 out.writeLong(upFlow);
 out.writeLong(downFlow);
 out.writeLong(sumFlow);
}

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
 upFlow = in.readLong();
 downFlow = in.readLong();
 sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。

(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。

@Override
public int compareTo(FlowBean o) {
 // 倒序排列,从大到小
 return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
示例
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// 1 实现writable接口
public class FlowBean implements Writable{


 private long upFlow;//上行流量
 private long downFlow;//下行流量
 private long sumFlow;//总流量
 
 //2  反序列化时,需要反射调用空参构造函数,所以必须有
 public FlowBean() {
  super();
 }

 public FlowBean(long upFlow, long downFlow) {
  super();
  this.upFlow = upFlow;
  this.downFlow = downFlow;
  this.sumFlow = upFlow + downFlow;
 }
 
 //3  写序列化方法
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeLong(upFlow);
  out.writeLong(downFlow);
  out.writeLong(sumFlow);
 }
 
 //4 反序列化方法
 //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
 @Override
 public void readFields(DataInput in) throws IOException {
  this.upFlow  = in.readLong();
  this.downFlow = in.readLong();
  this.sumFlow = in.readLong();
 }

 // 6 编写toString方法,方便后续打印到文本
 @Override
 public String toString() {
  return upFlow + "\t" + downFlow + "\t" + sumFlow;
 }

 public long getUpFlow() {
  return upFlow;
 }

 public void setUpFlow(long upFlow) {
  this.upFlow = upFlow;
 }

 public long getDownFlow() {
  return downFlow;
 }

 public void setDownFlow(long downFlow) {
  this.downFlow = downFlow;
 }

 public long getSumFlow() {
  return sumFlow;
 }

 public void setSumFlow(long sumFlow) {
  this.sumFlow = sumFlow;
 }
}

Hadoop参数调优

1.dfs.namenode.handler.count==20×〖log〗_e^(Cluster Size)
默认值10,NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作,对于大集群或者有大量客户端的集群来说,通常需要增大参数

2.yarn参数调优,提高内存利用率:
单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小


标题:【bigdata】Hadoop总结
作者:凌陨心
地址:https://jditlee.github.io/articles/2023/03/16/1678937719444.html