太阳2:MaxCompute Studio提升UDF和MapReduce开发体验,maxcomputemapreduce

责任编辑:

Shuffle阶段-分配Reducer:把Mapper输出的单词分发给Reducer。Reducer拿到数据后,再做一次排序。因为Reducer拿到的数据已经在Mapper里已经是排序过的了,所以这里的排序只是针对排序过的数据做合并排序。

发布UDF

好了,我们的MyLower.java测试通过了,接下来我们要将其打包成jar资源(这一步可以通过IDE打包,参考用户手册)上传到MaxComptute服务端上:

    1. 在MaxCompute菜单选择Add Resource菜单项:

太阳2 1

    1. 选择要上传到哪个MaxCompute
      project上,jar包路径,要注册的资源名,以及当资源或函数已存在时是否强制更新,然后点击OK。

太阳2 2

  • 3.
    jar包上传成功后,接下来就可以注册UDF了,在MaxCompute菜单选择Create
    Function菜单项。

太阳2 3

  • 4.
    选择需要使用的资源jar,选择主类(studio会自动解析资源jar中包含的主类供用户选择),输入函数名,然后点击OK。

太阳2 4

10. mr如何获取输入表的信息?

A:
参考:
使用Mapper.TaskContext的接口getInputTableInfo(),会得到输入表的TableInfo对象
每个map
worker只会处理来自单一表或分区的数据,在mapper的setup阶段获取该信息即可。


add jar C:test_mrtest_mr.jar -f;//添加资源

    …

关于MaxCompute

欢迎加入MaxCompute钉钉群讨论
太阳2 5

阅读原文请点击

Studio提升UDF和MapReduce开发体验,maxcomputemapreduce UDF全称User
Defined
Function,即用户自定义函数。MaxCompute提供了很多内建函数来满足用…

17. sdk如何通过instance获取logview url?

A: 可以使用如下的方式拿到logview的url

RunningJob rj = JobClient.runJob(job);
com.aliyun.odps.Instance instance = SessionState.get().getOdps().instances().get(rj.getInstanceID());
String logview = SessionState.get().getOdps().logview().generateLogView(instance, 7 * 24);
System.out.println(logview);

通过上述方法,我们可以在Dataworks上跑大于10M的MR作业。

这个命令发起作业。MapReduce的任务是运行在MaxCompute集群上的,客户端需要通过这个命令把任务运行相关的信息告诉集群。

单元测试

依赖于MaxCompute提供的Local
Run框架,您只需要像写普通的单测那样提供输入数据,断言输出就能方便的测试你自己的UDF或MR。在examples目录下会有各种类型的单测实例,可参考例子编写自己的unit
test。这里我们新建一个MyLowerTest的测试类,用于测试我们的MyLower:

太阳2 6

2. MR提交命令中-resources和-classpath的理解?

A:
在MaxCompute中类似MR这类分布式数据处理框架,用户的代码一般在以下两个地点执行:

  • 运行客户端的进程/子进程:这里的用户代码负责准备执行环境、配置任务参数、提交任务,入口通常是main
    class。它不受沙箱限制,执行逻辑由用户代码驱动。同样的,这里的classpath由用户配置,或在console中使用-classpath选项添加依赖的classpath路径。
  • 远程执行的worker进程:这里的代码负责执行数据处理逻辑,入口是mapper/reducer
    class。它受限沙箱限制,且执行逻辑由MaxCompute框架驱动。用户在命令行配置的-classpath在这里无效(显然,远程机器的路径和客户端机器的路径无法保证一致),任何第三方依赖必须作为resource提前上传至MaxCompute,并在提交任务时使用-resources选项或JobConf.setResources(String[])来设定。

摘要:
用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。
解决方案: jar -resources test_mr.

Reduce阶段

sample数据测试

很多用户的需求是能sample部分线上表的数据到本机来测试,而这studio也提供了支持。在editor中UDF类MyLower.java上右键,点击”运行”菜单,弹出run
configuration对话框,配置MaxCompute
project,table和column,这里我们想将hy_test表的name字段转换为小写:

太阳2 7

点击OK后,studio会先通过tunnel自动下载表的sample数据到本地warehouse(如图中高亮的data文件),接着读取指定列的数据并本地运行UDF,用户可以在控制台看到日志输出和结果打印:

太阳2 8

3. Mapper数目如何设置?

A:如果没有输入表是可以直接指定map数目setNumMapTasks
   
有输入表的话,setNumMapTasks不生效,需要通过setSplitSize来控制map数,默认是256M。


用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。

InputUtils.addTable(TableInfo table, JobConf conf)设置了输入的表。

创建UDF

假设我们要实现的UDF需求是将字符串转换为小写(内建函数TOLOWER已实现该逻辑,这里我们只是通过这个简单的需求来示例如何通过studio开发UDF)。studio提供了UDF|UDAF|UDTF|Mapper|Reducer|Driver的模板,这样用户只需要编写自己的业务代码,而框架代码会由模板自动填充。

    1. 在src目录右键 new | MaxCompute Java

太阳2 9

    1. 输入类名,如myudf.MyLower,选择类型,这里我们选择UDF,点击OK。

太阳2 10

  • 3.
    模板已自动填充框架代码,我们只需要编写将字符串转换成小写的函数代码即可。

太阳2 11

12. 如何设置Key排序列的顺序(ASC or DESC)?

A: 类似如下: 
//key按这些列排序
job.setOutputKeySortColumns(new String[] { "custid", "msgtype","amount" });
//设置每个列正序还是倒序
job.setOutputKeySortOrder(new SortOrder[]{SortOrder.ASC,SortOrder.ASC,SortOrder.DESC});


第三步:瘦身Jar,因为Dataworks执行MR作业的时候,一定要本地执行,所以保留个main就可以;

reduce(){

创建MaxCompute Java Module

首先,你得在intellij中创建一个用于开发MaxCompute
Java程序的module。具体的,File | new | module … module类型为MaxCompute
Java,配置Java JDK和MaxCompute
console的安装路径,点击next,输入module名,点击finish。

这里配置console的目的主要有两个:

  • 编写UDF和MR需要依赖MaxCompute框架的相关jar,而这些jar在console的lib目录均存在,studio能帮您将这些lib自动导入到module的依赖库中。

  • studio能集成console,一些动作通过console操作将十分方便。

太阳2 12

至此,一个能开发MaxCompute
java程序的module已建立,如下图的jDev。主要目录包括:

  • src(用户开发UDF|MR程序的源码目录)
  • examples(示例代码目录,包括单测示例,用户可参考这里的例子开发自己的程序或编写单测)
  • warehouse(本地运行需要的schema和data)

太阳2 13

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:


第二步:目前通过MaxCompute
CLI上传的资源,在Dataworks左侧资源列表是找不到的,只能通过list
resources查看确认资源;

JobConfig

MaxCompute Studio提升UDF和MapReduce开发体验,maxcomputemapreduce

UDF全称User Defined
Function,即用户自定义函数。MaxCompute提供了很多内建函数来满足用户的计算需求,同时用户还可以创建自定义函数来满足定制的计算需求。用户能扩展的UDF有三种:UDF(User
Defined Scalar Function),UDTF(User Defined Table Valued
Function)和UDAF(User Defined Aggregation Function)。

太阳2:MaxCompute Studio提升UDF和MapReduce开发体验,maxcomputemapreduce。同时,MaxCompute也提供了MapReduce编程接口,用户可以使用MapReduce提供的接口(Java
API)编写MapReduce程序处理MaxCompute中的数据。

通过MaxCompute
Studio提供的端到端的支持,用户能快速开始和熟悉开发自己的UDF和MapReduce,提高效率。下面我们就以一个例子来介绍如何使用Studio来开发自己的UDF:

14. 框架map或者reduce接口里的Record对象是复用的?

A:是的,为了减少对象的开销,框架对于map,
reduce接口里的Record对象是复用的,也就是说每次map或者reduce的每次迭代,Record对象没有变,只是里面的数据变化了。如果要保存上一次的Record需要toArray()拿到里面的数据对象进行保存。具体可以参考:


list resources;//查看资源

拓展MapReduce

生产使用

上传成功的jar资源和注册成功的function(在Project
Explorer相应project下的Resources和Functions节点中就能及时看到,双击也能显示反编译的源码)就能够实际生产使用了。我们打开studio的sql
editor,就能愉快的使用我们刚写好的mylower函数,语法高亮,函数签名显示都不在话下:

太阳2 14

7. 二次排序功能,MR相关配置解释,setMapOutputKeySchema? setOutputKeySortColumns? setPartitionColumns? setOutputGroupingColumns?

A:
通常情况下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key
schema中。

  • 在Map端,Mapper输出的Record会根据设置的PartitionColumns计算哈希值,决定分配到哪个Reducer,会根据KeySortColumns对Record进行排序。
  • 在Reduce端,输入Records在按照KeySortColumns排序好后,会根据GroupingColumns指定的列对输入的Records进行分组,即会顺序遍历输入的Records,把GroupingColumns所指定列相同的Records作为一次reduce函数调用的输入。

客户端配置AK、EndPoint:

资源表和文件可以让一些小表/小文件可以方便被读取。鉴于读取数据的限制需要小于64次,一般是在setup里读取后缓存起来,具体的例子可以参考这里。

MapReduce

studio对MapReduce的开发流程支持与开发UDF基本类似,主要区别有:

  • MapReduce程序是作用于整张表的,而且输入输出表在Driver中已指定,因此如果使用sample数据测试的话在run
    configuration里只需要指定project即可。

  • MapReduce开发好后,只需要打包成jar上传资源即可,没有注册这一步。

  • 对于MapReduce,如果想在生产实际运行,可以通过studio无缝集成的console来完成。具体的,在Project
    Explorer Window的project上右键,选择Open in
    Console,然后在console命令行中输入类似如下的命令:
    jar -libjars wordcount.jar -classpath D:odpscltwordcount.jar
    com.aliyun.odps.examples.mr.WordCount wc_in wc_out;

5. 报错java.lang.OutOfMemoryError: Java heap space,MR的内存设置问题?

A:mapper或reducer的内存由两部分组成,JVM的heap memory和JVM
之外的框架相关内存。
   
设置JVM内存的接口是(都是Java逻辑的话,调节内存是用下面两个接口):
    setMemoryForMapperJVMsetMemoryForReducerJVM (默认是1024
单位MB)
    设置框架内存(c++部分的)的接口是(一般不需要设置):
    setMemoryForMapTasksetMemoryForReduceTask(默认是2048 单位MB)


原标题:通过简单瘦身,解决Dataworks 10M文件限制问题

以WordCount为例,文档可以参考这里

测试UDF

UDF或MR开发好后,下一步就是要测试自己的代码,看是否符合预期。studio提供两种测试方式:

16. MR支持多路输入输出,应该怎么写这样的程序?

    A:参考:多路输入输出示例
对于多路输入,每个输入源对应单独的一个Map阶段,即一个map
task只会读取一个输入表的数据。可以指定一个表的多级分区列来作为一个输入,例如a,
b, c三分区列,指定分区时可以指定a=1/b=1/c=2类似这样。
   
如果同一级别的多个分区,则需要各自作为单独的分区输入,例如一个表的a=1和a=3分区作为多路输入的俩不同的输入,需要分别指定。
    maponly的作业也同样支持多路输入输出,实现方法类似。


​本文为云栖社区原创内容,未经允许不得转载。返回搜狐,查看更多

setReducerClass(Class theClass)设置Reducer使用的Java类。

4. Reducer数目如何设置?

A: 通过JobConf的接口setNumReduceTasks可以设置。
对于pipeline作业,Jobconf的接口同样可以设置,只不过设置后所有reduce阶段的个数都是同样的值。
如果要分阶段设置,设置方式如下:
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)

.addReducer(SumReducer.class).setNumTasks(5)

.addReducer(IdentityReducer.class).setNumTasks(1).createPipeline();


太阳2 15

后续为了更加清楚地说明问题,我会尽可能地在客户端上操作,而不用IDEA里已经集成的方法。

8. 请问mr job的map或者reduce如果想提前终止job, 执行什么代码?

A:
抛异常就可以,例如throw new RuntimeException("XXX"); 会导致job失败,job也就结束了。


第一步:大于10M的resources通过MaxCompute CLI客户端上传,

setMapOutputValueSchema(Column[] schema)设置 Mapper 输出到 Reducer 的
Value 行属性。和上个设置一起定义了Mapper到Reducer的数据格式。

20. MR代码里有JNI的调用该怎么写?

A:首先project要开通jni的相关权限,在编译准备好so文件后,需要将so以file类型的形式添加为Resource,并在MR作业提交的时候-resources参数里指定,例如:

add file libtestjni.so as libtestjni.so -f;
jar -resources testmr.jar,libtestjni.so -classpath testmr.jar Test.MRDriver xxx xxx;

在MR的java代码使用jni的时候要注意,使用方式如下:

System.loadLibrary("testjni");    // 这里不要写成libtestjni.so,否则会报错,原因是java会自动添加lib前缀和.so后缀的

jni的使用方法可以参考:


解决方案:

另外后续还需要用到客户端,可以参考文档安装。

9. 请问map阶段有时候为什么会有interrupted,但是map 最终仍然完成了?

A:因为有backup instance在跑,产生backup instance一般是因为有某些map
instances明显慢于其他的,就会在别的机器上启动一个同样的worker来跑,这个功能类似于hadoop的预测执行,只要其中某个成功跑完,其他的就可以停掉了(变为interrupted)


客户端下载地址:

    map();

11. 如何使用自定义partitioner ?

A: 参考如下代码:

import com.aliyun.odps.mapred.Partitioner;

...

public static class MyPartitioner extends Partitioner {

@Override
public int getPartition(Record key, Record value, int numPartitions) {
  // numPartitions即对应reducer的个数
  // 通过该函数决定map输出的key value去往哪个reducer
  String k = key.get(0).toString();
  return k.length() % numPartitions;
}
}

在jobconf里进行设置:jobconf.setPartitionerClass(MyPartitioner.class)
另外需要在jobconf里明确指定reducer的个数:jobconf.setNumReduceTasks(num)


作者:隐林

定时调度

6. mr 输出到表或某个分区里时,输出的模式时追加还是覆盖 ?

A: 会覆盖输出表或分区之前的内容


不支持反射/自定义类加载器(所以不支持一些第三方包)

21. MR作业读取表资源,Archive资源应该如何操作?

A: MaxCompute上的资源(file, table,
archive等)可以类比于Hadoop的DistributedCache来理解,同样是会分发到每个计算节点上去,worker再从本地来读取,因而资源文件不能过大,否则分发资源就是一个瓶颈,目前默认有2G的总资源大小限制。
读取资源表,Archive资源总体上来说和读取file类型资源是类似的,只是使用的接口不同。读取资源文件的方法可以参考文档:使用资源示例

对于表资源:
将表添加为资源表: add table xxx as xxx -f;
读资源表的接口为:TaskContext#readResourceTable

对于Archive资源:
将本地archive(.tar, .zip等archive文件)上传为资源: add archive as xxx
-f;
读archive资源的接口为:TaskContext#readResourceArchiveAsStream


…不断更新中…

MapReduce常见问题解答,odpsmaxcompute分区 本文用到的
阿里云数加-大数据计算服务MaxCompute产品地址:…

在odpscmd里执行

1. 作业出现ClassNotFoundException和NoClassDefFoundError异常失败?

A:
对于ClassNotFoundException异常,一般是依赖的class不在你的jar包中,需要把依赖的库打到作业jar包中或者单独上传,并在-resources中指定;
对于NoClassDefFoundError异常,先看看依赖class是否存在于你的jar包,很多情况下是由于版本冲突导致的,可能你依赖的库和服务端自带的jar有冲突。


setOutputKeySortOrder(JobConf.SortOrder[] order)设置 Key
排序列的顺序。

19. 不同的Mapper或者Reducer如何获取可区分的ID?

A:
有些业务场景需要区分不同的Mapper或Reducer,可以通过TaskContextgetTaskID接口获取到一个Mapper/Reducer独有的id。

String id = context.getTaskID().toString();

步骤为

MaxCompute(原ODPS) MapReduce常见问题解答,odpsmaxcompute分区

odpscmd  -u accessId  -p  accessKey  –project=testproject
–endpoint=  -e “jar -resources
aaa.jar -classpath ./aaa.jar com.XXX.A”

18.  MR作业如何指定输入表的Project名字?

A: 可以按如下的方式指定:

InputUtils.addTable(TableInfo.builder().projectName("test_project_name").tableName("test_table_name").build(), job);

通过TableInfo.builder()projectName接口来指定,如果不指定,默认值是在运行MR作业的那个project.


OutputUtils.addTable(TableInfo table, JobConf
conf)设置了输出的表。多路输入输出可以参考这里。

13. 报错kInstanceMonitorTimeout, usually caused by bad udf performance,怎么解决?

A:
报这个错的原因是mapper或者reducer有逻辑执行时间特别长,且没有从输入表的读数据或者写出数据,超过默认10min后,会报这个异常;有两种解决方法:

  • 将超时的时间调的更长一些,可以设置参数odps.function.timeout或者设置JobConf#setFunctionTimeout,最长可以设置为3600,即一个小时。
  • 定期向框架汇报心跳 TaskContext#progress(),注意progress不要调用过于频繁,否则有性能问题,能确保两次调用之间的时间小于设置的timeout时间即可。

将代码拷贝到IDE里,编译打包成mapreduce-examples.jar

15. 写完一条记录后,想把outputRecord里面的数据清空,这个要怎么弄,要不然,再写下一条记录的时候,如果某个字段没有值,就会用原来的记录填充?

   
A:如果写的Record对象是复用的,如果某个值没有新set,则还是保留着之前的值对象。目前没有直接可以清空的api可以用,可以通过Record.getColumnCount拿到column
count,用一个for 循环去一一set null即可。


不允许JNI调用

根据com.aliyun.odps.mapred.open.example.WordCount,找到main方法所在类的路径和名字

功能介绍

安全沙箱

具体的插件的安装方法步骤可以参考文档,本文不在赘言。

摘要:大数据计算服务(MaxCompute)的功能详解和使用心得

jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out

setCombinerClass(Class theClass)设置作业的 combiner。

客户端发起add jar/add
file等资源操作,把在客户端的机器(比如我测试的时候是从我的笔记本)上,运行任务涉及的资源文件传到服务器上。这样后面运行任务的时候,服务器上才能有对应的代码和文件可以用。如果以前已经传过了,这一步可以省略。

setOutputOverwrite(boolean
isOverwrite)设置对输出表是否进行覆盖。类似SQL里的Insert into/overwrite
Talbe的区别。

做数据准备,包括创建表和使用Tunnel命令行工具导入数据

setMapperClass(Class theClass)设置Mapper使用的Java类。

Shuffle-分配Reduce

输入阶段:根据工作量,生成几个Mapper,把这些表的数据分配给这些Mapper。每个Mapper分配到表里的一部分记录。

在JAVA代码里直接调用MapReduce作业,可以通过设置SessionState.setLocalRun(false); 实现,具体可以参考这里。

odpscmd
-e/-f:odpscmd的-e命令可以在shell脚本里直接运行一个odpscmd里的命令,所以可以在shell脚本里运行odpscmd
-e ‘jar -resources
xxxxxx’这样的命令,在shell脚本里调用MapReduce作业。一个完整的例子是

setOutputKeySortColumns(String[] cols)设置 Mapper 输出到 Reducer 的
Key 排序列。

工欲善其事,必先利其器。MR的开发提供了基于IDEA和Eclipse的插件。其中比较推荐用IDEA的插件,因为IDEA我们还在持续做迭代,而Eclipse已经停止做更新了。而且IDEA的功能也比较丰富。

setMapOutputKeySchema(Column[] schema)设置 Mapper 输出到 Reducer 的
Key 行属性。

线上运行

资源表/文件

无法起多线程/多进程

客户端做的就是给服务器发起任务的调度的指令。之前提到的jar命令就是一种方法。鉴于实际上运行场景的多样性,这里介绍其他的几种常见方法:

客户端先解析-classpath参数,找到main方法相关的jar包的位置

-f和-e一样,只是把命令写到文件里,然后用odpscmd -f
xxx.sql引用这个文件,那这个文件里的多个指令都会被执行。

Q:如何实现M->R->M->R这种逻辑呢

点此查看原文:http://click.aliyun.com/m/41384/

任务提交

在odpscmd里执行add jar命令:

详细的SDK的文档,可以在Maven里下载。这是下载地址。

前言

Map阶段:每个Mapper针对每条数据,解析里面的字符串,用空格切开字符串,得到一组单词。针对其中每个单词,写一条记录

输出阶段:输出Reduce的计算结果,写入到表里或者返回给客户端。

MapReduce

任务提交

不允许读本地文件(比如JSON里就用到了,就需要改用GSON)

大数据开发套件可以配置MapReduce作业。

setCombinerOptimizeEnable(boolean
isCombineOpt)设置是否对Combiner进行优化。

太阳2 16

产品限制

大数据开发套件的定时任务/工作流可以配置调度周期和任务依赖,配合前面提到的方法里的MapReduce作业/Shell作业,实现任务的调度。

如果在odpscmd的配置文件里已经配置好了,那只需要写-e的部分。

void setResources(String
resourceNames)有和jar命令的-resources一样的功能,但是优先级高于-resources(也就是说代码里的设置优先级比较高)

setMemoryForJVM(int mem)设置 JVM虚拟机的内存资源,单位:MB,默认值 1024.

生产及周期调度

功能解读

setOutputGroupingColumns(String[]
cols)数据在Reducer里排序好了后,是哪些数据进入到同一个reduce方法的,就是看这里的设置。一般来说,设置的和setPartitionColumns(String[]
cols)一样。可以看到二次排序的用法。

-resources告诉服务器,在运行任务的时候,需要用到的资源有哪些。

运行环境

add jar /JarPath/mapreduce-examples.jar -f;

wc_in wc_out是传给main方法的参数,通过解析main方法传入参数String[]
args获得这个参数

对比前面的快速开始,可以看到除去数据准备阶段,和MR相关的,有资源的上传(add
jar步骤)和jar命令启动MR作业两步。

JobConf定义了这个任务的细节,还是这个图,解释一下JobConf的其他设置项的用法。

如果Reduce后面还需要做进一步的Reduce计算,可以用拓展MapReduce模型(简称MRR)。MRR其实就是Reduce阶段结束后,不直接输出结果,而是再次经过Shuffle后接另外一个Reduce。

数据输出

Map阶段

说起MapReduce就少不了WordCount,我特别喜欢文档里的这个图片。

其他限制

Reduce阶段:Reducer拿前面已经排序好的输入,相同的单词的所有输入进入同一个Redue循环,在循环里,做个数的累加。

读表

详见MaxCompute MR
限制项汇总

setNumReduceTasks(int n)设置 Reducer 任务数,默认为 Mapper 任务数的
1/4。如果是Map
only的任务,需要设置成0。可以参考这里。

Map/Reduce

快速开始

沙箱是MaxCompute的一套安全体系,使得在MaxCompute上运行的作业无法获得其他用户的信息,也无法获得系统的一些信息。主要包括以下几点,完整的列表可以参考文档

A:在Reduce代码里直接嵌套上Map的逻辑就可以了,把第二个M的工作在前一个R里完成,而不是作为计算引擎调度层面上的一个单独步骤,比如

在一个Mapper里,只会读一张表,不同的表的数据会在不同的Mapper
worker上运行,所以可以用示例里的这个方法先获得这个Mapper读的是什么表。

com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out`

setPartitionColumns(String[]
cols)设置作业的分区列,定义了数据分配到Reducer的分配策略。

等待作业执行成功后,可以在SQL通过查询wc_out表的数据,看到执行的结果

MapReduce已经有文档,用户可以参考文档使用。本文是在文档的基础上做一些类似注解及细节解释上的工作。

Shuffle-合并排序

Shuffle阶段-合并排序:也是发生在Mapper上。会先对数据进行排序。比如WordCount的例子,会根据单词进行排序。排序后的合并,又称Combiner阶段,因为前面已经根据单词排序过了,相同的单词都是连在一起的。那可以把2个相邻的合并成1个。Combiner可以减少在后续Reduce端的计算量,也可以减少Mapper往Reducer的数据传输的工作量。

大数据开发套件可以配置Shell作业。可以在Shell作业里参考上面的方法用odpscmd
-e/-f来调度MapReduce作业。

任务的是在MaxComput(ODPS)上运行的,客户端通过jar命令发起请求。

}

setSplitSize(long size)通过调整分片大小来调整Mapper个数,单位
MB,默认256。Mapper个数不通过void setNumMapTasks(int n)设置。

无法访问外部数据源(不能当爬虫,不能读RDS等)

其他

比如有一张很大的表。表里有个String字段记录的是用空格分割开单词。最后需要统计所有记录中,每个单词出现的次数是多少。那整体的计算流程是

输入数据

最后通过JobClient.runJob(job);客户端往服务器发起了这个MapReduce作业。

`jar -resources mapreduce-examples.jar -classpath
mapreduce-examples.jar

这里的/JarPath/mapreduce-examples.jar的路径要替换成本地实际的文件路径。这个命令能把本地的jar包传到服务器上,-f是如果已经有同名的jar包就覆盖,实际使用中对于是报错还是覆盖需要谨慎考虑。

相关文章