Programming Documentation

This section contains library documentation for Hadoopy.

Job Launchers (Start Hadoopy Jobs)

hadoopy.launch(in_name, out_name, script_path[, partitioner=False, files=(), jobconfs=(), cmdenvs=(), copy_script=True, wait=True, hstreaming=None, name=None, use_typedbytes=True, use_seqoutput=True, use_autoinput=True, add_python=True, config=None, pipe=True, python_cmd="python", num_mappers=None, num_reducers=None, script_dir='', remove_ext=False, **kw])[source]

Run Hadoop given the parameters

Parameters:
  • in_name – Input path (string or list)
  • out_name – Output path
  • script_path – Path to the script (e.g., script.py)
  • partitioner – If True, the partitioner is the value.
  • files – Extra files (other than the script) (iterator). NOTE: Hadoop copies the files into working directory
  • jobconfs – Extra jobconf parameters (iterator of strings or dict)
  • cmdenvs – Extra cmdenv parameters (iterator of strings or dict)
  • libjars – Extra jars to include with the job (iterator of strings).
  • input_format – Custom input format (if set overrides use_autoinput)
  • output_format – Custom output format (if set overrides use_seqoutput)
  • copy_script – If True, the script is added to the files list.
  • wait – If True, wait till the process is completed (default True) this is useful if you want to run multiple jobs concurrently by using the ‘process’ entry in the output.
  • hstreaming – The full hadoop streaming path to call.
  • name – Set the job name to this (default None, job name is the script name)
  • use_typedbytes – If True (default), use typedbytes IO.
  • use_seqoutput – True (default), output sequence file. If False, output is text. If output_format is set, this is not used.
  • use_autoinput – If True (default), sets the input format to auto. If input_format is set, this is not used.
  • remove_output – If True, output directory is removed if it exists. (defaults to False)
  • add_python – If true, use ‘python script_name.py’
  • config – If a string, set the hadoop config path
  • pipe – If true (default) then call user code through a pipe to isolate it and stop bugs when printing to stdout. See project docs.
  • python_cmd – The python command to use. The default is “python”. Can be used to override the system default python, e.g. python_cmd = “python2.6”
  • num_mappers – The number of mappers to use (i.e., jobconf mapred.map.tasks=num_mappers).
  • num_reducers – The number of reducers to use (i.e., jobconf mapred.reduce.tasks=num_reducers).
  • script_dir – Where the script is relative to working dir, will be prefixed to script_path with a / (default ‘’ is current dir)
  • remove_ext – If True, remove the script extension (default False)
  • check_script – If True, then copy script and .py(c) files to a temporary directory and verify that it can be executed. This catches the majority of errors related to not included locally imported files. (default True)
  • make_executable – If True, ensure that script is executable and has a #! line at the top.
  • required_files – Iterator of files that must be specified (verified against the files argument)
  • required_cmdenvs – Iterator of cmdenvs that must be specified (verified against the cmdenvs argument)
Return type:

Dictionary with some of the following entries (depending on options)

Returns:

freeze_cmds: Freeze command(s) ran

Returns:

frozen_tar_path: HDFS path to frozen file

Returns:

hadoop_cmds: Hadoopy command(s) ran

Returns:

process: subprocess.Popen object

Returns:

output: Iterator of (key, value) pairs

Raises:

subprocess.CalledProcessError: Hadoop error.

Raises:

OSError: Hadoop streaming not found.

Raises:

TypeError: Input types are not correct.

Raises:

ValueError: Script not found or check_script failed

hadoopy.launch_frozen(in_name, out_name, script_path[, frozen_tar_path=None, temp_path='_hadoopy_temp', partitioner=False, wait=True, files=(), jobconfs=(), cmdenvs=(), hstreaming=None, name=None, use_typedbytes=True, use_seqoutput=True, use_autoinput=True, add_python=True, config=None, pipe=True, python_cmd="python", num_mappers=None, num_reducers=None, **kw])[source]

Freezes a script and then launches it.

This function will freeze your python program, and place it on HDFS in ‘temp_path’. It will not remove it afterwards as they are typically small, you can easily reuse/debug them, and to avoid any risks involved with removing the file.

Parameters:
  • in_name – Input path (string or list)
  • out_name – Output path
  • script_path – Path to the script (e.g., script.py)
  • frozen_tar_path – If not None, use this path to a previously frozen archive. You can get such a path from the return value of this function, it is particularly helpful in iterative programs.
  • cache – If True (default) then use previously frozen scripts. Cache is stored in memory (not persistent).
  • temp_path – HDFS path that we can use to store temporary files (default to _hadoopy_temp)
  • partitioner – If True, the partitioner is the value.
  • wait – If True, wait till the process is completed (default True) this is useful if you want to run multiple jobs concurrently by using the ‘process’ entry in the output.
  • files – Extra files (other than the script) (iterator). NOTE: Hadoop copies the files into working directory
  • jobconfs – Extra jobconf parameters (iterator)
  • cmdenvs – Extra cmdenv parameters (iterator)
  • hstreaming – The full hadoop streaming path to call.
  • name – Set the job name to this (default None, job name is the script name)
  • use_typedbytes – If True (default), use typedbytes IO.
  • use_seqoutput – True (default), output sequence file. If False, output is text.
  • use_autoinput – If True (default), sets the input format to auto.
  • config – If a string, set the hadoop config path
  • pipe – If true (default) then call user code through a pipe to isolate it and stop bugs when printing to stdout. See project docs.
  • python_cmd – The python command to use. The default is “python”. Can be used to override the system default python, e.g. python_cmd = “python2.6”
  • num_mappers – The number of mappers to use, i.e. the argument given to ‘numMapTasks’. If None, then do not specify this argument to hadoop streaming.
  • num_reducers – The number of reducers to use, i.e. the argument given to ‘numReduceTasks’. If None, then do not specify this argument to hadoop streaming.
  • check_script – If True, then copy script and .py(c) files to a temporary directory and verify that it can be executed. This catches the majority of errors related to not included locally imported files. The default is False when using launch_frozen as the freeze process packages local files.
Return type:

Dictionary with some of the following entries (depending on options)

Returns:

freeze_cmds: Freeze command(s) ran

Returns:

frozen_tar_path: HDFS path to frozen file

Returns:

hadoop_cmds: Hadoopy command(s) ran

Returns:

process: subprocess.Popen object

Returns:

output: Iterator of (key, value) pairs

Raises:

subprocess.CalledProcessError: Hadoop error.

Raises:

OSError: Hadoop streaming not found.

Raises:

TypeError: Input types are not correct.

Raises:

ValueError: Script not found

hadoopy.launch_local(in_name, out_name, script_path[, max_input=-1, files=(), cmdenvs=(), pipe=True, python_cmd='python', remove_tempdir=True, **kw])[source]

A simple local emulation of hadoop

This doesn’t run hadoop and it doesn’t support many advanced features, it is intended for simple debugging. The input/output uses HDFS if an HDFS path is given. This allows for small tasks to be run locally (primarily while debugging). A temporary working directory is used and removed.

Support

  • Environmental variables
  • Map-only tasks
  • Combiner
  • Files
  • Pipe (see below)
  • Display of stdout/stderr
  • Iterator of KV pairs as input or output (bypassing HDFS)
Parameters:
  • in_name – Input path (string or list of strings) or Iterator of (key, value). If it is an iterator then no input is taken from HDFS.
  • out_name – Output path or None. If None then output is not placed on HDFS, it is available through the ‘output’ key of the return value.
  • script_path – Path to the script (e.g., script.py)
  • poll – If not None, then only attempt to get a kv pair from kvs if when called, poll returns True.
  • max_input – Maximum number of Mapper inputs, None (default) then unlimited.
  • files – Extra files (other than the script) (iterator). NOTE: Hadoop copies the files into working directory
  • cmdenvs – Extra cmdenv parameters (iterator)
  • pipe – If true (default) then call user code through a pipe to isolate it and stop bugs when printing to stdout. See project docs.
  • python_cmd – The python command to use. The default is “python”. Can be used to override the system default python, e.g. python_cmd = “python2.6”
  • remove_tempdir – If True (default), then rmtree the temporary dir, else print its location. Useful if you need to see temporary files or how input files are copied.
  • identity_mapper – If True, use an identity mapper, regardless of what is in the script.
  • num_reducers – If 0, don’t run the reducer even if one exists, else obey what is in the script.
Return type:

Dictionary with some of the following entries (depending on options)

Returns:

freeze_cmds: Freeze command(s) ran

Returns:

frozen_tar_path: HDFS path to frozen file

Returns:

hadoop_cmds: Hadoopy command(s) ran

Returns:

process: subprocess.Popen object

Returns:

output: Iterator of (key, value) pairs

Raises:

subprocess.CalledProcessError: Hadoop error.

Raises:

OSError: Hadoop streaming not found.

Raises:

TypeError: Input types are not correct.

Raises:

ValueError: Script not found

Task functions (Usable inside Hadoopy jobs)

hadoopy.run(mapper=None, reducer=None, combiner=None, **kw)[source]

Hadoopy entrance function

This is to be called in all Hadoopy job’s. Handles arguments passed in, calls the provided functions with input, and stores the output.

TypedBytes are used if the following is True os.environ[‘stream_map_input’] == ‘typedbytes’

It is highly recommended that TypedBytes be used for all non-trivial tasks. Keep in mind that the semantics of what you can safely emit from your functions is limited when using Text (i.e., no t or n). You can use the base64 module to ensure that your output is clean.

If the HADOOPY_CHDIR environmental variable is set, this will immediately change the working directory to the one specified. This is useful if your data is provided in an archive but your program assumes it is in that directory.

As hadoop streaming relies on stdin/stdout/stderr for communication, anything that outputs on them in an unexpected way (especially stdout) will break the pipe on the Java side and can potentially cause data errors. To fix this problem, hadoopy allows file descriptors (integers) to be provided to each task. These will be used instead of stdin/stdout by hadoopy. This is designed to combine with the ‘pipe’ command.

To use the pipe functionality, instead of using your_script.py map use your_script.py pipe map which will call the script as a subprocess and use the read_fd/write_fd command line arguments for communication. This isolates your script and eliminates the largest source of errors when using hadoop streaming.

The pipe functionality has the following semantics stdin: Always an empty file stdout: Redirected to stderr (which is visible in the hadoop log) stderr: Kept as stderr read_fd: File descriptor that points to the true stdin write_fd: File descriptor that points to the true stdout

Command Interface
The command line switches added to your script (e.g., script.py) are
python script.py map (read_fd) (write_fd)
Use the provided mapper, optional read_fd/write_fd.
python script.py reduce (read_fd) (write_fd)
Use the provided reducer, optional read_fd/write_fd.
python script.py combine (read_fd) (write_fd)
Use the provided combiner, optional read_fd/write_fd.
python script.py freeze <tar_path> <-Z add_file0 -Z add_file1...>
Freeze the script to a tar file specified by <tar_path>. The extension may be .tar or .tar.gz. All files are placed in the root of the tar. Files specified with -Z will be added to the tar root.
python script.py info
Prints a json object containing ‘tasks’ which is a list of tasks which can include ‘map’, ‘combine’, and ‘reduce’. Also contains ‘doc’ which is the provided documentation through the doc argument to the run function. The tasks correspond to provided inputs to the run function.
Specification of mapper/reducer/combiner
Input Key/Value Types
For TypedBytes/SequenceFileInputFormat, the Key/Value are the decoded TypedBytes
For TextInputFormat, the Key is a byte offset (int) and the Value is a line without the newline (string)

Output Key/Value Types
For TypedBytes, anything Pickle-able can be used
For Text, types are converted to string. Note that neither may contain t or n as these are used in the encoding. Output is keytvaluen

Expected arguments
mapper(key, value) or mapper.map(key, value)
reducer(key, values) or reducer.reduce(key, values)
combiner(key, values) or combiner.reduce(key, values)

Optional methods
func.configure(): Called before any input read. Returns None.
func.close(): Called after all input read. Returns None or Iterator of (key, value)

Expected return
None or Iterator of (key, value)
Parameters:
  • mapper – Function or class following the above spec
  • reducer – Function or class following the above spec
  • combiner – Function or class following the above spec
  • doc – If specified, on error print this and call sys.exit(1)
hadoopy.status(msg[, err=None])[source]

Output a status message that is displayed in the Hadoop web interface

The status message will replace any other, if you want to append you must do this yourself.

Parameters:
  • msg – String representing the status message
  • err – Func that outputs a string, if None then sys.stderr.write is used (default None)
hadoopy.counter(group, counter[, amount=1, err=None])[source]

Output a counter update that is displayed in the Hadoop web interface

Counters are useful for quickly identifying the number of times an error occurred, current progress, or coarse statistics.

Parameters:
  • group – Counter group
  • counter – Counter name
  • amount – Value to add (default 1)
  • err – Func that outputs a string, if None then sys.stderr.write is used (default None)

HDFS functions (Usable locally and in Hadoopy jobs)

hadoopy.readtb(paths[, ignore_logs=True, num_procs=10])[source]

Read typedbytes sequence files on HDFS (with optional compression).

By default, ignores files who’s names start with an underscore ‘_’ as they are log files. This allows you to cat a directory that may be a variety of outputs from hadoop (e.g., _SUCCESS, _logs). This works on directories and files. The KV pairs may be interleaved between files (they are read in parallel).

Parameters:
  • paths – HDFS path (str) or paths (iterator)
  • num_procs – Number of reading procs to open (default 1)
  • java_mem_mb – Integer of java heap size in MB (default 256)
  • ignore_logs – If True, ignore all files who’s name starts with an underscore. Defaults to True.
Returns:

An iterator of key, value pairs.

Raises:

IOError: An error occurred reading the directory (e.g., not available).

hadoopy.writetb(path, kvs)[source]

Write typedbytes sequence file to HDFS given an iterator of KeyValue pairs

Parameters:
  • path – HDFS path (string)
  • kvs – Iterator of (key, value)
  • java_mem_mb – Integer of java heap size in MB (default 256)
Raises:

IOError: An error occurred while saving the data.

hadoopy.abspath(path)[source]

Return the absolute path to a file and canonicalize it

Path is returned without a trailing slash and without redundant slashes. Caches the user’s home directory.

Parameters:path – A string for the path. This should not have any wildcards.
Returns:Absolute path to the file
Raises:IOError – If unsuccessful
hadoopy.ls(path)[source]

List files on HDFS.

Parameters:path – A string (potentially with wildcards).
Return type:A list of strings representing HDFS paths.
Raises:IOError: An error occurred listing the directory (e.g., not available).
hadoopy.get(hdfs_path, local_path)[source]

Get a file from hdfs

Parameters:
  • hdfs_path – Destination (str)
  • local_path – Source (str)
Raises:

IOError: If unsuccessful

hadoopy.put(local_path, hdfs_path)[source]

Put a file on hdfs

Parameters:
  • local_path – Source (str)
  • hdfs_path – Destination (str)
Raises:

IOError: If unsuccessful

hadoopy.rmr(path)[source]

Remove a file if it exists (recursive)

Parameters:path – A string (potentially with wildcards).
Raises:IOError – If unsuccessful
hadoopy.isempty(path)[source]

Check if a path has zero length (also true if it’s a directory)

Parameters:path – A string for the path. This should not have any wildcards.
Returns:True if the path has zero length, False otherwise.
hadoopy.isdir(path)[source]

Check if a path is a directory

Parameters:path – A string for the path. This should not have any wildcards.
Returns:True if the path is a directory, False otherwise.
hadoopy.exists(path)[source]

Check if a file exists.

Parameters:path – A string for the path. This should not have any wildcards.
Returns:True if the path exists, False otherwise.