其他分享
首页 > 其他分享> > 将参数传递给MRjob中的reducer

将参数传递给MRjob中的reducer

作者:互联网

我正在使用MRjob在我们的HBase实例上运行Hadoop流作业.对于我一生,我无法弄清楚如何将参数传递给我的减速器.我有两个要从运行作业时传递给我的减速器的参数:startDate和endDate.这是我当前的减速器的外观:

def reducer(self, groupId, meterList):
    """
    Print bucket.
    """
    sys.stderr.write("Working on group = " + str(groupId) + "\n")
    #print "Opening connection..."
    conn = open_connection(hostname)
    #print "Getting table..."
    table = get_table(conn, tableName)

    compositeDf = DataFrame()

    for meterId in meterList:
        sys.stderr.write("Querying: " + str(meterId) + "\n")
        df = extract_meter_data(table, meterId, startDate, endDate)

我似乎无法将startDate和endDate作为参数传递给我的reducer.我可以选择参数的唯一方法是通过类顶部的全局变量.

startDate = datetime.datetime(2012, 6, 10)
endDate = datetime.datetime(2012, 6, 11)

class MRDataQuality(MRJob):
    """
    MapReduce job that does a data quality check on the meter data in HBase.
    """

但这很脏.我想通过打电话来传递它.我尝试了很多方法.将其设置为实例变量,将其设置为静态类变量,为MRDataQualityJob创建重载的构造函数….似乎没有任何效果.我从顶层脚本中以编程方式调用它,如下所示:

if args.hadoop:
    mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
    mrdq_job = MRDataQuality(args=[meterFile])

with mrdq_job.make_runner() as runner:
    runner.run()

无论我对mrdq_job实例做什么,似乎Runner.run()都在使用该类的新实例,而该实例没有定义实例或静态变量.如何将我的参数传递给减速器?我可以在常规Hadoop流中通过传递字符串来做到这一点:“-reducer reducer.py arg1 arg2”. MRjob有什么等效的吗?

解决方法:

如何将参数传递给作业配置,然后使用get_jobconf_value读取它们呢?

像这样:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer(self, groupId, meterList):
    ...
    startDate = get_jobconf_value("my.job.settings.startdate")
    endDate = get_jobconf_value("my.job.settings.enddate")

    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, startDate, endDate)    

然后像上面一样在代码中设置参数

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])

标签:mapreduce,mrjob,python
来源: https://codeday.me/bug/20191030/1969039.html