`

如何编写Hadoop调度器

阅读更多

中国第一个在线Hadoop教育平台—小象学院,推荐给Hadoop初学者和实践者,网址是:

本博客微信公共账号:hadoop123(微信号为:hadoop-123),分享hadoop技术内幕,hadoop最新技术进展,发布hadoop相关职位和求职信息,hadoop技术交流聚会、讲座以及会议等。二维码如下:


1. 编写目的

 

在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法。

2. Hadoop调度框架

Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheduler和JobTracker的关系。

(1) TaskScheduler

如果用户要编写自己的调度器,需要继承抽象类TaskScheduler,该类的接口如下:

abstract class TaskScheduler implements Configurable {

protected Configuration conf; //配置文件

protected TaskTrackerManager taskTrackerManager; //一般会设为JobTracker

public Configuration getConf() {

  return conf;

}

public void setConf(Configuration conf) {

  this.conf = conf;

}

public synchronized void setTaskTrackerManager(

TaskTrackerManager taskTrackerManager) {

  this.taskTrackerManager = taskTrackerManager;

}

public void start() throws IOException { //初始化函数,如加载配置文件等

  // do nothing

}

public void terminate() throws IOException { //结束函数

// do nothing

}

//最重要的函数,为该taskTracker分配合适的task

public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)

throws IOException;

  //根据队列名字获job列表

public abstract Collection<JobInProgress> getJobs(String queueName);

}

(2) JobTracker

JobTracker是Hadoop最核心的组件,它监控整个集群中的作业运行情况并对资源进行管理和调度。

每个TaskTracker每个3s(默认值,可配置)通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量,内存剩余量,正在运行的task,空闲的slot数目等,一旦JobTracker发现该TaskTracker出现了空闲的slot,便会调用调度器中的AssignTasks方法为该TaskTracker分配task。

下面分析JobTracker调用TaskScheduler的具体流程:

……

private final TaskScheduler taskScheduler; //声明调度器对象

……

public static JobTracker startTracker(JobConf conf, String identifier) {

  …….

  result = new JobTracker(conf, identifier);

  result.taskScheduler.setTaskTrackerManager(result); //设置调度器的manager

  ……

}

//创建调度器

JobTracker(JobConf conf, String identifier) {

  ……

  // Create the scheduler

  Class<? extends TaskScheduler> schedulerClass

  = conf.getClass("mapred.jobtracker.taskScheduler",

    JobQueueTaskScheduler.class, TaskScheduler.class);

  taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);

  …..

}

//run forever

public void offerService() {

  ……

  taskScheduler.start(); //启动调度器

  ……

}

。。。。。

HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

  …….

  // Check for new tasks to be executed on the tasktracker

  if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {

    ……

    //使用调度器,为该taskTracker分配作业

    tasks = taskScheduler.assignTasks(taskTrackerStatus);

    ……

  }

}

从上面的分析可以知道,Scheduler和JobTracker之间会相互包含(实际上是组合模式),Scheduler中要包含JobTracker(实际上就是TaskTrackerManager)对象,以便获取整个Hadoop集群的一些信息,如slot总数,QueueManager对象,添加JobInProgressListener以便增加或删除job时,通知Scheduler;JobTracker中要包含Scheduler对象,以便可以对每个TaskTracker分配task。

3. 编写Hadoop调度器

假设我们要编写一个新的调度器,为MyHadoopScheduler,需要进行以下工作:

(1) 用户需要自己实现的类

@ MyHadoopSchedulerConf:配置文件管理类,读取你自己的配置文件,并保存到合适的数据结构中,一般而言,这个类应该支持动态加载配置文件。

@ MyHadoopSchedulerListener:编写自己的JobInProgressListener,并调用JobTracker的addJobInProgressListener(),将之加到系统的Listener队列中,以便系统中添加或删除job后,JobTracker可立刻告诉调度器。

@ MyHadoopScheduler:调度器的核心实现算法

(2) 用户要用到的系统类

@ JobTracker:JobTracker在startTracker函数中,会将MyHadoopScheduler的taskTrackerManager赋值为JobTracker对象,这样,在MyHadoopScheduler中,可调用Jobracker中的所有public方法和成员变量,常用的有:

$ getClusterStatus():获取集群的状态,如tasktracker列表,map slot总数,reduce slot总数,当前正在运行的map/reduce task总数等

$ getQueueManager():如果MyHadoopScheduler支持多队列,那么需要使用该方法获取QueueManager对象,通过该对象,会用可以获取系统的所有队列名称,每个队列的ACL(Access Control List),具体参考:http://hadoop.apache.org/common/docs/current/service_level_auth.html

$ killJob:可以调用该函数杀死某个job

$ killTask:如果调度器支持资源抢占,可调用该函数 杀死某个task以便进行资源抢占。

@ JobInprogress:用户向Hadoop中提交一个job后,Hadoop会为该job创建一个叫JobInProgress的对象,该对象中包含了job相关的基本信息,且它会伴随某个job的一生(与job共存亡)。该对象中包含的job信息有:该job包含的所有task的信息(如:正在运行的task列表,已经完成的task列表,尚未运行的task列表等),作业的优先级,作业的提交时间,开始运行时间,运行结束时间等信息。

在JobInprogress的task列表中,每个task以对象TaskInProgress的形式保存,该对象中包含了每个task的基本信息,包括:task要处理的数据split,task创建时间,task开始执行时间,task结束时间等信息。这些信息肯定会在调度器中使用。

@ JobConf

每个作业的运行参数和配置选项被保存到一个JobConf对象中,该对象包含了配置文件mapred-site.xml,core-site.xml和hdfs-site.xml设置的选项和该作业的特有属性(用户名,InputFormat,Mapper等),一般是以key/value的形式保存,比如:想获取当前用户名,可以这样:

JobConf conf;

…….

String username = conf.get("user.name");

用户也可以通过该对象传递一些自己定义的全局属性,如用户自己定义了一个属性叫mapred.job.deadline(作业的deadline时间),用户可以在提交作业时设定该值:

hadoop jar hadoop-examples.jar wordcount -files cachefile.txt \

-D mapred.job.deadline=100000 \

input output

然后在调度器中这样获取该属性的值:

JobConf conf;

…….

int deadline=conf.getInt("mapred.job.deadline", -1); //获取mapred.job.deadline属性,如果没有设置,则返回-1

4. 总结

调度器是Hadoop的中枢,其重要性可想而知。用户如果要设计Hadoop调度器,需要对Hadoop的整个框架有比较深入的理解,同时需阅读一些很重要的类(如JobTracker和JobInprogress等)的源码,以便利用这些类完成你的调度算法。

Hadoop目前自带了三个比较常用的调度器,分别为JobQueueTaskScheduler (FIFO,但队列调度器),Capacity Scheduler(多队列多用户调度器)和Fair Scheduler(多队列多用户调度器),它们是你学习Hadoop调度器的最好资料。

5. 参考资料

(1) Hadoop-0.20.2源代码

原创文章,转载请注明: 转载自董的博客

本文链接地址: http://dongxicheng.org/mapreduce/how-to-write-hadoop-schedulers/

作者:Dong,作者介绍:http://dongxicheng.org/about/

本博客的文章集合:

 

分享到:
评论

相关推荐

    Hadoop任务调度器

    Hadoop任务调度器 基础知识 • Hadoop调度流程 • Hadoop自带调度器介绍 • 编写自己的Hadoop调度器 • 总结

    Hadoop实战中文版

    8.11 多用户作业的调度 8.11.1 多个JobTracker 8.11.2 公平调度器 8.12 小结第三部分 Hadoop也疯狂 第9章 在云上运行Hadoop 9.1 Amazon Web Services 简介 9.2 安装AWS 9.2.1 获得AWS身份认证凭据 9.2.2 ...

    Hadoop权威指南 第二版(中文版)

     作业的调度  Fair Scheduler  Capacity Scheduler  shuffle和排序  map端  reduce端  配置的调优  任务的执行  推测式执行  重用JVM  跳过坏记录  任务执行环境 第7章 MapReduce的类型与格式  ...

    hebo:基于 cascalog 的 hadoop 任务数据流调度器

    一个基于 cascalog 的数据流调度器,用于 Hadoop 任务相互依赖。 一种以优雅方式编写单个 Hadoop 数据处理任务的 DSL。 设计理念 在实践中,时间序列数据可以按不同的时间粒度进行管理,例如每年、每月、每天或每...

    一、Hadoop简介 和 Hadoop结构介绍

    是Apache公司使用Java语言编写的开源的,分布式系统的基础架构 分布式就是,当储存数据很多很大时,一台机器储存不了时,需要将数据切成块,使用多台计算机分布式储存这些数据。 由于专业的大数据的服务器比较昂贵,...

    Hadoop权威指南(中文版)2015上传.rar

    作业的调度 Fair Scheduler Capacity Scheduler shuffle和排序 map端 reduce端 配置的调优 任务的执行 推测式执行 重用JVM 跳过坏记录 任务执行环境 第7章 MapReduce的类型与格式 MapReduce的类型 默认的MapReduce...

    Hadoop实战中文版.PDF

    1568.11 多用户作业的调度 1578.11.1 多个JobTracker 1588.11.2 公平调度器 1588.12 小结 160第三部分 Hadoop也疯狂第9章 在云上运行Hadoop 1629.1 Amazon Web Services简介 1629.2 安装AWS 1639.2.1...

    Hadoop实战(第2版)

    技术点34 定位reduce 端数据倾斜问题技术点35 确定reduce 任务是否存在整体吞吐量过低技术点36 缓慢的洗牌(shuffle)和排序 .6.2.4 任务的一般性能问题技术点37 作业竞争和调度器限制技术点38 使用堆转储...

    Hadoop实战

    1528.6 删减DataNode 1528.7 增加DataNode 1538.8 管理NameNode和SNN 1538.9 恢复失效的NameNode 1558.10 感知网络布局和机架的设计 1568.11 多用户作业的调度 1578.11.1 多个JobTracker 1588.11.2 公平调度器 ...

    大数据与Hadoop.doc

    Hadoop YARN:一个作业调度和群集资源管理框架。 Hadoop MapReduce:基于YARN的大型数据分布式并行编程模式和程序执行框架,是Google的Map Reduce的开源实现。它帮助用户编写处理大型数据集的并行运行程序。...

    Hadoop实战(陆嘉恒)译

    管理Hadoop8.1 为实际应用设置特定参数值8.2 系统体检8.3 权限设置8.4 配额管理8.5 启用回收站8.6 删减DataNode8.7 增加DataNode8.8 管理NameNode 和SNN8.9 恢复失效的NameNode8.10 感知网络布局和机架的设计8.11 多...

    一种基于小型Hadoop集群的数据分层调度处理算法研究

    针对当前抓取调度数据量巨大且计算复杂耗时长的问题,根据数据集的维度特征属性,通过凝聚层次聚类的方式对数据进行分层处理,并将其运用到小型Hadoop分布式系统中,通过服务器Master来对一般数据库MySQL数据库进行...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点37 作业竞争和调度器限制 技术点38 使用堆转储来查找未优化的用户代码 6.2.5 硬件性能问题 技术点39 查找硬件的失效 技术点40 CPU 竞争 . 技术点41 内存交换 技术点42 磁盘健康 技术点43 网络 6.3...

    kite-apps:基于Kite和Hadoop说明性应用程序

    该库处理所有调度工作,生成并部署所需的Oozie协调器,工作流和应用程序库本身。 该库仍在日趋成熟,并且可能会进行非被动更改。 已在CDH 5.4上进行了测试。编写风筝应用程序该库的用户使用两个主要概念: ...

    罗盘(Compass) 大数据任务诊断平台

    支持多种调度平台(海豚调度器、气流或自研等) 支持 Spark 2.x 或 3.x、Hadoop 2.x 或 3.x 故障排除。 支持工作流层异常诊断,识别各种故障并基线耗时异常 问题。 支持 Spark 引擎层异常诊断,包括数据倾斜、大...

    从 Oozie 工作流到 Airflow DAG 的迁移工具_python_代码_下载

    它是一个以编程方式编写、调度和监控工作流的平台。气流工作流被设计为Python 中任务的有向无环图 (DAG)。Airflow 调度程序在遵循指定依赖项的同时在一组工作人员上执行您的任务。 Apache Oozie 是一个用于管理 ...

    TitanDataOperationSystem:《 Titan数据运营系统》,本项目是一个全栈闭环系统,我们有利用数据可视化的网络系统,然后用flume-kafaka-flume进行日志的读取,在蜂巢设计数仓,编写spark代码进行数仓表之间的转换以及ads层表到mysql的迁移,使用azkaban进行定时任务的调度,使用技术:JavaScala语言,Hadoop,Spark,Hive,Kafka,Flume,Azkaban,SpringBoot,Bootstrap,Echart等;

    我们先用flume-kafaka-flume对埋点日志服务器中日志进行读取,然后将日志放到我们的hdfs,然后在hive设计数仓,编写spark代码进行数仓表之间的转换以及ads层表到mysql的迁移,之后使用azkaban进行定时任务的调度,...

    大数据面试题.doc

    Hadoop 默认调度器策略为 FIFO( ) 27. 集群内每个节点都应该配 RAID,这样避免单磁盘损坏,影响整个节点运行。( ) 28. 因为 HDFS 有多个副本,所以 NameNode 是不存在单点问题的。( ) 29. 每个 map 槽就是一个线程...

    大数据面试题(1).doc

    Hadoop 默认调度器策略为 FIFO( ) 27. 集群内每个节点都应该配 RAID,这样避免单磁盘损坏,影响整个节点运行。( ) 28. 因为 HDFS 有多个副本,所以 NameNode 是不存在单点问题的。( ) 29. 每个 map 槽就是一个线程...

    大数据面试题.docx

    Hadoop 默认调度器策略为 FIFO( ) 27. 集群内每个节点都应该配 RAID,这样避免单磁盘损坏,影响整个节点运行。( ) 28. 因为 HDFS 有多个副本,所以 NameNode 是不存在单点问题的。( ) 29. 每个 map 槽就是一个线程...

Global site tag (gtag.js) - Google Analytics