Source code for hadoopy._runner

#!/usr/bin/env python
# (C) Copyright 2010 Brandyn A. White
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

__author__ = 'Brandyn A. White <bwhite@cs.umd.edu>'
__license__ = 'GPL V3'


import subprocess
import os
import shutil
import hadoopy._freeze
import json
import tempfile
import stat
import logging
import time
import select
import atexit

# These two globals are only used in the follow function
WARNED_HADOOP_HOME = False
HADOOP_STREAMING_PATH_CACHE = None


def _find_hstreaming():
    """Finds the whole path to the hadoop streaming jar.

    If the environmental var HADOOP_HOME is specified, then start the search
    from there.

    Returns:
        Full path to the hadoop streaming jar if found, else return an empty
        string.
    """
    global WARNED_HADOOP_HOME, HADOOP_STREAMING_PATH_CACHE
    if HADOOP_STREAMING_PATH_CACHE:
        return HADOOP_STREAMING_PATH_CACHE
    try:
        search_root = os.environ['HADOOP_HOME']
    except KeyError:
        search_root = '/'
    cmd = 'find %s -name hadoop*streaming*.jar' % (search_root)
    p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    HADOOP_STREAMING_PATH_CACHE = p.communicate()[0].split('\n')[0]
    if search_root == '/' and not WARNED_HADOOP_HOME:
        WARNED_HADOOP_HOME = True
        hadoop_home = HADOOP_STREAMING_PATH_CACHE[:HADOOP_STREAMING_PATH_CACHE.rfind('/contrib/')]
        logging.warn('Set the HADOOP_HOME environmental variable to your hadoop path to improve performance. Put the following [export HADOOP_HOME="%s"] in ~/.bashrc' % hadoop_home)
    return HADOOP_STREAMING_PATH_CACHE


def _listeq_to_dict(jobconfs):
    """Convert iterators of 'key=val' into a dictionary with later values taking priority."""
    if not isinstance(jobconfs, dict):
        jobconfs = dict(x.split('=', 1) for x in jobconfs)
    return dict((str(k), str(v)) for k, v in jobconfs.items())


def _parse_info(script_path, python_cmd='python'):
    if not os.path.exists(script_path):
        raise ValueError('Script [%s] not found!' % script_path)
    p = subprocess.Popen([python_cmd, script_path, 'info'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = p.communicate()
    try:
        info = json.loads(stdout)
    except ValueError:
        raise ValueError('Cannot execute script [%s]!  Script output below...\nstdout\n%s\nstderr\n%s' % (script_path, stdout, stderr))
    info['jobconfs'] = _listeq_to_dict(info.get('jobconfs', ()))
    return info


def _check_requirements(files, cmdenvs, required_files, required_cmdenvs):
    files = set(os.path.basename(x) for x in files)
    cmdenvs = set(cmdenvs)
    required_files = set(required_files)
    required_cmdenvs = set(required_cmdenvs)
    missing_files = required_files - files
    missing_cmdenvs = required_cmdenvs - cmdenvs
    if missing_files or missing_cmdenvs:
        error_out = []
        if missing_files:
            error_out.append('Missing required file(s), include them using the "files" argument: [%s]' % ', '.join(missing_files))
        if missing_cmdenvs:
            error_out.append('Missing required cmdenvs(s), include them using the "cmdenvs" argument: [%s]' % ', '.join(missing_cmdenvs))
        raise ValueError('\n'.join(error_out))


def _check_script(script_path, files, python_cmd):
    logging.info('Sanity checking script in a temp directory... (disable with check_script=False)')
    orig_pwd = os.path.abspath('.')
    try:
        temp_dir = tempfile.mkdtemp()
        for f in files:
            if f.endswith('.py') or f.endswith('.pyc'):
                shutil.copy(f, temp_dir)
        os.chdir(temp_dir)
        try:
            _parse_info(os.path.basename(script_path), python_cmd)
        except ValueError:
            logging.error('Sanity check failed: This is often due to local imports not included in the "files" argument.')
            raise
    finally:
        shutil.rmtree(temp_dir)
        os.chdir(orig_pwd)
    

[docs]def launch(in_name, out_name, script_path, partitioner=False, files=(), jobconfs=(), cmdenvs=(), libjars=(), input_format=None, output_format=None, copy_script=True, wait=True, hstreaming=None, name=None, use_typedbytes=True, use_seqoutput=True, use_autoinput=True, remove_output=False, add_python=True, config=None, pipe=True, python_cmd="python", num_mappers=None, num_reducers=None, script_dir='', remove_ext=False, check_script=True, make_executable=True, required_files=(), required_cmdenvs=(), **kw): """Run Hadoop given the parameters :param in_name: Input path (string or list) :param out_name: Output path :param script_path: Path to the script (e.g., script.py) :param partitioner: If True, the partitioner is the value. :param files: Extra files (other than the script) (iterator). NOTE: Hadoop copies the files into working directory :param jobconfs: Extra jobconf parameters (iterator of strings or dict) :param cmdenvs: Extra cmdenv parameters (iterator of strings or dict) :param libjars: Extra jars to include with the job (iterator of strings). :param input_format: Custom input format (if set overrides use_autoinput) :param output_format: Custom output format (if set overrides use_seqoutput) :param copy_script: If True, the script is added to the files list. :param wait: If True, wait till the process is completed (default True) this is useful if you want to run multiple jobs concurrently by using the 'process' entry in the output. :param hstreaming: The full hadoop streaming path to call. :param name: Set the job name to this (default None, job name is the script name) :param use_typedbytes: If True (default), use typedbytes IO. :param use_seqoutput: True (default), output sequence file. If False, output is text. If output_format is set, this is not used. :param use_autoinput: If True (default), sets the input format to auto. If input_format is set, this is not used. :param remove_output: If True, output directory is removed if it exists. (defaults to False) :param add_python: If true, use 'python script_name.py' :param config: If a string, set the hadoop config path :param pipe: If true (default) then call user code through a pipe to isolate it and stop bugs when printing to stdout. See project docs. :param python_cmd: The python command to use. The default is "python". Can be used to override the system default python, e.g. python_cmd = "python2.6" :param num_mappers: The number of mappers to use (i.e., jobconf mapred.map.tasks=num_mappers). :param num_reducers: The number of reducers to use (i.e., jobconf mapred.reduce.tasks=num_reducers). :param script_dir: Where the script is relative to working dir, will be prefixed to script_path with a / (default '' is current dir) :param remove_ext: If True, remove the script extension (default False) :param check_script: If True, then copy script and .py(c) files to a temporary directory and verify that it can be executed. This catches the majority of errors related to not included locally imported files. (default True) :param make_executable: If True, ensure that script is executable and has a #! line at the top. :param required_files: Iterator of files that must be specified (verified against the files argument) :param required_cmdenvs: Iterator of cmdenvs that must be specified (verified against the cmdenvs argument) :rtype: Dictionary with some of the following entries (depending on options) :returns: freeze_cmds: Freeze command(s) ran :returns: frozen_tar_path: HDFS path to frozen file :returns: hadoop_cmds: Hadoopy command(s) ran :returns: process: subprocess.Popen object :returns: output: Iterator of (key, value) pairs :raises: subprocess.CalledProcessError: Hadoop error. :raises: OSError: Hadoop streaming not found. :raises: TypeError: Input types are not correct. :raises: ValueError: Script not found or check_script failed """ if isinstance(files, (str, unicode)) or isinstance(jobconfs, (str, unicode)) or isinstance(cmdenvs, (str, unicode)): raise TypeError('files, jobconfs, and cmdenvs must be iterators of strings and not strings!') jobconfs = _listeq_to_dict(jobconfs) cmdenvs = _listeq_to_dict(cmdenvs) libjars = list(libjars) script_info = _parse_info(script_path, python_cmd) if make_executable and script_path.endswith('.py') and pipe: script_path = hadoopy._runner._make_script_executable(script_path) job_name = os.path.basename(script_path).rsplit('.', 1)[0] script_name = os.path.basename(script_path) # Add required cmdenvs/files, num_reducers from script required_cmdenvs = list(script_info.get('required_cmdenvs', ())) + list(required_cmdenvs) required_files = list(script_info.get('required_files', ())) + list(required_files) _check_requirements(files, cmdenvs, required_files, required_cmdenvs) try: hadoop_cmd = 'hadoop jar ' + hstreaming except TypeError: hadoop_cmd = 'hadoop jar ' + _find_hstreaming() if remove_ext: script_name = script_name.rsplit('.', 1)[0] if add_python: script_name = '%s %s' % (python_cmd, script_name) if script_dir: script_name = ''.join([script_dir, '/', script_name]) if 'map' in script_info['tasks']: c = 'pipe map' if pipe else 'map' mapper = ' '.join((script_name, c)) if 'reduce' in script_info['tasks']: c = 'pipe reduce' if pipe else 'reduce' reducer = ' '.join((script_name, c)) else: reducer = None if 'combine' in script_info['tasks']: c = 'pipe combine' if pipe else 'combine' combiner = ' '.join((script_name, c)) else: combiner = None cmd = [hadoop_cmd] # Add libjars if libjars: cmd += ['--libjars', ','.join(libjars)] cmd += ['-output', out_name] # Add inputs if isinstance(in_name, (str, unicode)): in_name = [in_name] for f in in_name: cmd += ['-input', f] # Add mapper/reducer cmd += ['-mapper', '"%s"' % (mapper)] if reducer: cmd += ['-reducer', '"%s"' % (reducer)] else: cmd += ['-reducer', 'NONE'] if combiner: cmd += ['-combiner', '"%s"' % (combiner)] if partitioner: cmd += ['-partitioner', '"%s"' % (partitioner)] if num_mappers is not None: if 'mapred.map.tasks' not in jobconfs: jobconfs['mapred.map.tasks'] = str(num_mappers) if num_reducers is not None: if 'mapred.reduce.tasks' not in jobconfs: jobconfs['mapred.reduce.tasks'] = str(num_reducers) # Add files if copy_script: files = list(files) files.append(script_path) # BUG: CDH3 doesn't copy directories properly this enumerates them new_files = [] for f in files: if os.path.isdir(f): new_files += ['%s/%s' % (f, x) for x in os.listdir(f)] else: new_files.append(f) files = new_files del new_files # END BUG if check_script: _check_script(script_path, files, python_cmd) for f in files: cmd += ['-file', f] # Add jobconfs if name is None: jobconfs['mapred.job.name'] = job_name else: jobconfs['mapred.job.name'] = str(name) # Handle additional jobconfs listed in the job itself # these go at the beginning of the list as later jobconfs # override them. Launch specified confs override job specified ones # as Hadoop takes the last one you provide. jobconfs_all = dict(script_info['jobconfs']) jobconfs_all.update(jobconfs) jobconfs = jobconfs_all for x in jobconfs_all.items(): cmd += ['-jobconf', '"%s=%s"' % x] # Add cmdenv for x in cmdenvs.items(): cmd += ['-cmdenv', '"%s=%s"' % x] # Add IO if use_typedbytes: cmd += ['-io', 'typedbytes'] # Add Outputformat if output_format is not None: cmd += ['-outputformat', output_format] else: if use_seqoutput: cmd += ['-outputformat', 'org.apache.hadoop.mapred.SequenceFileOutputFormat'] # Add InputFormat if input_format is not None: cmd += ['-inputformat', input_format] else: if use_autoinput: cmd += ['-inputformat', 'AutoInputFormat'] # Add config if config: cmd += ['--config', config] # Remove output if remove_output and hadoopy.exists(out_name): logging.warn('Removing output directory [%s]' % out_name) hadoopy.rmr(out_name) # Run command and wait till it has completed hadoop_start_time = time.time() logging.info('/\\%s%s Output%s/\\' % ('-' * 10, 'Hadoop', '-' * 10)) logging.info('hadoopy: Running[%s]' % (' '.join(cmd))) out = {} out['process'] = process = subprocess.Popen(' '.join(cmd), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # NOTE(brandyn): Read a line from stdout/stderr and log it. This allows us # to use the logging system instead printing to the console. if wait: def check_fps(): fps = select.select([process.stdout, process.stderr], [], [], 1.)[0] for fp in fps: line = fp.readline() if line: # NOTE(brandyn): Should have at least a newline line = line[:-1] if line.find(' ERROR ') != -1: logging.error(line) elif line.find(' WARN ') != -1: logging.warn(line) else: logging.info(line) while process.poll() is None: check_fps() check_fps() if process.wait(): raise subprocess.CalledProcessError(process.returncode, ' '.join(cmd)) logging.info('Hadoop took [%f] seconds' % (time.time() - hadoop_start_time)) logging.info('\\/%s%s Output%s\\/' % ('-' * 10, 'Hadoop', '-' * 10)) # NOTE(brandyn): Postpones calling readtb def _read_out(): for x in hadoopy.readtb(out_name): yield x out['output'] = _read_out() out['hadoop_cmds'] = [' '.join(cmd)] return out
[docs]def launch_frozen(in_name, out_name, script_path, frozen_tar_path=None, temp_path='_hadoopy_temp', cache=True, check_script=False, **kw): """Freezes a script and then launches it. This function will freeze your python program, and place it on HDFS in 'temp_path'. It will not remove it afterwards as they are typically small, you can easily reuse/debug them, and to avoid any risks involved with removing the file. :param in_name: Input path (string or list) :param out_name: Output path :param script_path: Path to the script (e.g., script.py) :param frozen_tar_path: If not None, use this path to a previously frozen archive. You can get such a path from the return value of this function, it is particularly helpful in iterative programs. :param cache: If True (default) then use previously frozen scripts. Cache is stored in memory (not persistent). :param temp_path: HDFS path that we can use to store temporary files (default to _hadoopy_temp) :param partitioner: If True, the partitioner is the value. :param wait: If True, wait till the process is completed (default True) this is useful if you want to run multiple jobs concurrently by using the 'process' entry in the output. :param files: Extra files (other than the script) (iterator). NOTE: Hadoop copies the files into working directory :param jobconfs: Extra jobconf parameters (iterator) :param cmdenvs: Extra cmdenv parameters (iterator) :param hstreaming: The full hadoop streaming path to call. :param name: Set the job name to this (default None, job name is the script name) :param use_typedbytes: If True (default), use typedbytes IO. :param use_seqoutput: True (default), output sequence file. If False, output is text. :param use_autoinput: If True (default), sets the input format to auto. :param config: If a string, set the hadoop config path :param pipe: If true (default) then call user code through a pipe to isolate it and stop bugs when printing to stdout. See project docs. :param python_cmd: The python command to use. The default is "python". Can be used to override the system default python, e.g. python_cmd = "python2.6" :param num_mappers: The number of mappers to use, i.e. the argument given to 'numMapTasks'. If None, then do not specify this argument to hadoop streaming. :param num_reducers: The number of reducers to use, i.e. the argument given to 'numReduceTasks'. If None, then do not specify this argument to hadoop streaming. :param check_script: If True, then copy script and .py(c) files to a temporary directory and verify that it can be executed. This catches the majority of errors related to not included locally imported files. The default is False when using launch_frozen as the freeze process packages local files. :rtype: Dictionary with some of the following entries (depending on options) :returns: freeze_cmds: Freeze command(s) ran :returns: frozen_tar_path: HDFS path to frozen file :returns: hadoop_cmds: Hadoopy command(s) ran :returns: process: subprocess.Popen object :returns: output: Iterator of (key, value) pairs :raises: subprocess.CalledProcessError: Hadoop error. :raises: OSError: Hadoop streaming not found. :raises: TypeError: Input types are not correct. :raises: ValueError: Script not found """ if (('files' in kw and isinstance(kw['files'], (str, unicode))) or ('jobconfs' in kw and isinstance(kw['jobconfs'], (str, unicode))) or ('cmdenvs' in kw and isinstance(kw['cmdenvs'], (str, unicode)))): raise TypeError('files, jobconfs, and cmdenvs must be iterators of strings and not strings!') if 'jobconfs' in kw: kw['jobconfs'] = _listeq_to_dict(kw['jobconfs']) if 'cmdenvs' in kw: kw['cmdenvs'] = _listeq_to_dict(kw['cmdenvs']) cmds = [] if not frozen_tar_path: freeze_out = hadoopy.freeze_script(script_path, temp_path=temp_path, cache=cache) frozen_tar_path = freeze_out['frozen_tar_path'] cmds += freeze_out['cmds'] jobconfs = kw.get('jobconfs', {}) jobconfs['mapred.cache.archives'] = '%s#_frozen' % frozen_tar_path jobconfs['mapreduce.job.cache.archives'] = '%s#_frozen' % frozen_tar_path kw['copy_script'] = False kw['add_python'] = False kw['jobconfs'] = jobconfs out = launch(in_name, out_name, script_path, script_dir='_frozen', remove_ext=True, check_script=check_script, make_executable=False, **kw) out['freeze_cmds'] = cmds out['frozen_tar_path'] = frozen_tar_path return out
def _make_script_executable(script_path, temp_copy=True): cur_mode = os.stat(script_path).st_mode & 07777 script_data = open(script_path).read() if not stat.S_IXUSR & cur_mode or script_data[:2] != '#!': if temp_copy: logging.warn('Script is not executable which is a requirement when pipe=True. A temporary copy will be modified to correct this.') temp_fp = tempfile.NamedTemporaryFile(suffix=os.path.basename(script_path)) shutil.copy(script_path, temp_fp.name) atexit.register(temp_fp.close) # NOTE(brandyn): This keeps the file from being deleted when wait=False in launch script_path = temp_fp.name if not stat.S_IXUSR & cur_mode: logging.warn('Making script [%s] executable.' % script_path) os.chmod(script_path, stat.S_IXUSR | cur_mode) if script_data[:2] != '#!': logging.warn('Adding "#!/usr/bin/env python" to script [%s]. This will make line numbers off by one from the original.' % script_path) with open(script_path, 'w') as fp: fp.write('#!/usr/bin/env python\n' + script_data) return script_path