Source code for hadoopy._local

import hadoopy
import logging
import os
import sys
import select
import subprocess
import tempfile
import shutil
import contextlib


@contextlib.contextmanager
def chdir(path):
    orig_pwd = os.path.abspath('.')
    try:
        os.chdir(path)
        yield
    finally:
        os.chdir(orig_pwd)


class LocalTask(object):

    def __init__(self, script_path, task, files=(), max_input=None, pipe=True, python_cmd='python', remove_tempdir=True):
        self.remove_tempdir = remove_tempdir
        self.temp_dir = tempfile.mkdtemp()
        self.script_path = script_path
        self.task = task
        self.max_input = max_input if task == 'map' else None
        self.pipe = pipe
        self.python_cmd = python_cmd
        if not files:
            files = []
        else:
            files = list(files)
        files.append(script_path)
        files = [os.path.abspath(f) for f in files]
        self.files = files
        # Check on script
        script_info = hadoopy._runner._parse_info(script_path, python_cmd)
        assert task in script_info['tasks']
        self._setup()

    def _setup(self):
        with chdir(self.temp_dir):
            if self.files:
                for f in self.files:
                    shutil.copy(f, os.path.basename(f))
            if self.pipe:
                hadoopy._runner._make_script_executable(os.path.basename(self.script_path), temp_copy=False)

    def __del__(self):
        if self.remove_tempdir:
            shutil.rmtree(self.temp_dir)
        else:
            logging.warn('Temporary directory not removed[%s]' % self.temp_dir)

    def _setup_env(self, cmdenvs):
        cmdenvs = hadoopy._runner._listeq_to_dict(cmdenvs)
        env = dict(os.environ)
        env['stream_map_input'] = 'typedbytes'
        env['hadoopy_flush_tb_writes'] = '1'
        env.update(cmdenvs)
        return env

    def run_task(self, kvs, cmdenvs=(), poll=None):
        env = self._setup_env(cmdenvs)
        # Setup pipes
        task = 'pipe %s' % self.task if self.pipe else self.task
        in_r_fd, in_w_fd = os.pipe()
        out_r_fd, out_w_fd = os.pipe()
        cmd = ('%s %s %s' % (self.python_cmd, self.script_path, task)).split()
        a = os.fdopen(in_r_fd, 'r')
        b = os.fdopen(out_w_fd, 'w')

        # Start the read/write loop
        try:
            with chdir(self.temp_dir):
                p = subprocess.Popen(cmd,
                                     stdin=a,
                                     stdout=b,
                                     close_fds=True,
                                     env=env)
            a.close()
            b.close()
            with hadoopy.TypedBytesFile(read_fd=out_r_fd, unbuffered_reads=True) as tbfp_r:
                with hadoopy.TypedBytesFile(write_fd=in_w_fd, flush_writes=True) as tbfp_w:
                    for num, kv in enumerate(kvs):
                        if self.max_input is not None and self.max_input <= num:
                            break
                        timeout = None
                        wrote = False
                        while True:
                            r, w, _ = select.select([out_r_fd], [in_w_fd], [], timeout)
                            if r:  # If data is available to be read, than get it
                                yield tbfp_r.next()
                            elif w and not wrote:
                                tbfp_w.write(kv)
                                wrote = True
                                timeout = .0001
                            else:
                                if wrote and (poll is None or poll()):
                                    break
                # Get any remaining values
                while True:
                    try:
                        yield tbfp_r.next()
                    except EOFError:
                        break
        finally:
            try:
                returncode = p.wait()
                if returncode != 0:
                    raise RuntimeError('Process returned [%d]' % returncode)
            except NameError:  # If p isn't defined yet
                pass


[docs]def launch_local(in_name, out_name, script_path, poll=None, max_input=None, files=(), cmdenvs=(), pipe=True, python_cmd='python', remove_tempdir=True, identity_mapper=False, num_reducers=None, **kw): """A simple local emulation of hadoop This doesn't run hadoop and it doesn't support many advanced features, it is intended for simple debugging. The input/output uses HDFS if an HDFS path is given. This allows for small tasks to be run locally (primarily while debugging). A temporary working directory is used and removed. Support * Environmental variables * Map-only tasks * Combiner * Files * Pipe (see below) * Display of stdout/stderr * Iterator of KV pairs as input or output (bypassing HDFS) :param in_name: Input path (string or list of strings) or Iterator of (key, value). If it is an iterator then no input is taken from HDFS. :param out_name: Output path or None. If None then output is not placed on HDFS, it is available through the 'output' key of the return value. :param script_path: Path to the script (e.g., script.py) :param poll: If not None, then only attempt to get a kv pair from kvs if when called, poll returns True. :param max_input: Maximum number of Mapper inputs, None (default) then unlimited. :param files: Extra files (other than the script) (iterator). NOTE: Hadoop copies the files into working directory :param cmdenvs: Extra cmdenv parameters (iterator) :param pipe: If true (default) then call user code through a pipe to isolate it and stop bugs when printing to stdout. See project docs. :param python_cmd: The python command to use. The default is "python". Can be used to override the system default python, e.g. python_cmd = "python2.6" :param remove_tempdir: If True (default), then rmtree the temporary dir, else print its location. Useful if you need to see temporary files or how input files are copied. :param identity_mapper: If True, use an identity mapper, regardless of what is in the script. :param num_reducers: If 0, don't run the reducer even if one exists, else obey what is in the script. :rtype: Dictionary with some of the following entries (depending on options) :returns: freeze_cmds: Freeze command(s) ran :returns: frozen_tar_path: HDFS path to frozen file :returns: hadoop_cmds: Hadoopy command(s) ran :returns: process: subprocess.Popen object :returns: output: Iterator of (key, value) pairs :raises: subprocess.CalledProcessError: Hadoop error. :raises: OSError: Hadoop streaming not found. :raises: TypeError: Input types are not correct. :raises: ValueError: Script not found """ if isinstance(files, (str, unicode)) or isinstance(cmdenvs, (str, unicode)) or ('cmdenvs' in kw and isinstance(kw['cmdenvs'], (str, unicode))): raise TypeError('files and cmdenvs must be iterators of strings and not strings!') logging.info('Local[%s]' % script_path) script_info = hadoopy._runner._parse_info(script_path, python_cmd) if isinstance(in_name, (str, unicode)) or (in_name and isinstance(in_name, (list, tuple)) and isinstance(in_name[0], (str, unicode))): in_kvs = hadoopy.readtb(in_name) else: in_kvs = in_name if 'reduce' in script_info['tasks'] and num_reducers != 0: if identity_mapper: kvs = in_kvs else: kvs = list(LocalTask(script_path, 'map', files, max_input, pipe, python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs, poll)) if 'combine' in script_info['tasks']: kvs = hadoopy.Test.sort_kv(kvs) kvs = list(LocalTask(script_path, 'combine', files, max_input, pipe, python_cmd, remove_tempdir).run_task(kvs, cmdenvs)) kvs = hadoopy.Test.sort_kv(kvs) kvs = LocalTask(script_path, 'reduce', files, max_input, pipe, python_cmd, remove_tempdir).run_task(kvs, cmdenvs) else: if identity_mapper: kvs = in_kvs else: kvs = LocalTask(script_path, 'map', files, max_input, pipe, python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs, poll) out = {} if out_name is not None: hadoopy.writetb(out_name, kvs) out['output'] = hadoopy.readtb(out_name) else: out['output'] = kvs return out