API Reference

Package Layout

mitogen Package

On the Mitogen master, this is imported from mitogen/__init__.py as would be expected. On the slave, it is built dynamically during startup.

mitogen.__version__ = (0, 2, 9)

Library version as a tuple.

mitogen.is_master = True

This is False in slave contexts. Previously it was used to prevent re-execution of __main__ in single file programs, however that now happens automatically.

mitogen.context_id = 0

This is 0 in a master, otherwise it is the master-assigned ID unique to the slave context used for message routing.

mitogen.parent_id = None

This is None in a master, otherwise it is the master-assigned ID unique to the slave’s parent context.

mitogen.parent_ids = []

This is an empty list in a master, otherwise it is a list of parent context IDs ordered from most direct to least direct.

mitogen.main(log_level='INFO', profiling=False)

Convenience decorator primarily useful for writing discardable test scripts.

In the master process, when func is defined in the __main__ module, arranges for func(router) to be invoked immediately, with mitogen.master.Router construction and destruction handled just as in mitogen.utils.run_with_router(). In slaves, this function does nothing.

Parameters

Example:

import mitogen
import requests

def get_url(url):
    return requests.get(url).text

@mitogen.main()
def main(router):
    z = router.ssh(hostname='k3')
    print(z.call(get_url, 'https://example.org/')))))

mitogen.core

This module implements most package functionality, but remains separate from non-essential code in order to reduce its size, since it is also serves as the bootstrap implementation sent to every new slave context.

@mitogen.core.takes_econtext

Decorator that marks a function or class method to automatically receive a kwarg named econtext, referencing the mitogen.core.ExternalContext active in the context in which the function is being invoked in. The decorator is only meaningful when the function is invoked via CALL_FUNCTION.

When the function is invoked directly, econtext must still be passed to it explicitly.

@mitogen.core.takes_router

Decorator that marks a function or class method to automatically receive a kwarg named router, referencing the mitogen.core.Router active in the context in which the function is being invoked in. The decorator is only meaningful when the function is invoked via CALL_FUNCTION.

When the function is invoked directly, router must still be passed to it explicitly.

mitogen.master

This module implements functionality required by master processes, such as starting new contexts via SSH. Its size is also restricted, since it must be sent to any context that will be used to establish additional child contexts.

mitogen.parent

This module defines functionality common to master and parent processes. It is sent to any child context that is due to become a parent, due to recursive connection.

mitogen.fakessh

https://d33wubrfki0l68.cloudfront.net/2556491bc4efd05d7f9b3a2726c7edd026cca919/01f85/_images/fakessh.svg

mitogen.fakessh is a stream implementation that starts a subprocess with its environment modified such that PATH searches for ssh return a Mitogen implementation of SSH. When invoked, this implementation arranges for the command line supplied by the caller to be executed in a remote context, reusing the parent context’s (possibly proxied) connection to that remote context.

This allows tools like rsync and scp to transparently reuse the connections and tunnels already established by the host program to connect to a target machine, without wasteful redundant SSH connection setup, 3-way handshakes, or firewall hopping configurations, and enables these tools to be used in impossible scenarios, such as over sudo with requiretty enabled.

The fake ssh command source is written to a temporary file on disk, and consists of a copy of the mitogen.core source code (just like any other child context), with a line appended to cause it to connect back to the host process over an FD it inherits. As there is no reliance on an existing filesystem file, it is possible for child contexts to use fakessh.

As a consequence of connecting back through an inherited FD, only one SSH invocation is possible, which is fine for tools like rsync, however in future this restriction will be lifted.

Sequence:

  1. fakessh Context and Stream created by parent context. The stream’s buffer has a _fakessh_main() CALL_FUNCTION enqueued.

  2. Target program (rsync/scp/sftp) invoked, which internally executes ssh from PATH.

  3. mitogen.core bootstrap begins, recovers the stream FD inherited via the target program, established itself as the fakessh context.

  4. _fakessh_main() CALL_FUNCTION is read by fakessh context,

    1. sets up IoPump for stdio, registers stdin_handle for local context.

    2. Enqueues CALL_FUNCTION for _start_slave() invoked in target context,

      1. the program from the ssh command line is started

      2. sets up IoPump for ssh command line process’s stdio pipes

      3. returns (control_handle, stdin_handle) to _fakessh_main()

  5. _fakessh_main() receives control/stdin handles from from _start_slave(),

    1. registers remote’s stdin_handle with local IoPump.

    2. sends (“start”, local_stdin_handle) to remote’s control_handle

    3. registers local IoPump with mitogen.core.Broker.

    4. loops waiting for local stdout closed && remote stdout closed

  6. _start_slave() control channel receives (“start”, stdin_handle),

    1. registers remote’s stdin_handle with local IoPump

    2. registers local IoPump with mitogen.core.Broker.

    3. loops waiting for local stdout closed && remote stdout closed

mitogen.fakessh.run(dest, router, args, daedline=None, econtext=None)

Run the command specified by args such that PATH searches for SSH by the command will cause its attempt to use SSH to execute a remote program to be redirected to use mitogen to execute that program using the context dest instead.

Parameters
  • args (list[str]) – Argument vector.

  • dest (mitogen.core.Context) – The destination context to execute the SSH command line in.

  • router (mitogen.core.Router) –

  • args – Command line arguments for local program, e.g. ['rsync', '/tmp', 'remote:/tmp']

Returns

Exit status of the child process.

Message Class

class mitogen.core.Message(**kwargs)

Messages are the fundamental unit of communication, comprising fields from the Stream Protocol header, an optional reference to the receiving mitogen.core.Router for ingress messages, and helper methods for deserialization and generating replies.

auth_id = None

Context ID under whose authority the message is acting. See Source Verification.

data = b''

Raw message data bytes.

classmethod dead(reason=None, **kwargs)

Syntax helper to construct a dead message.

dst_id = None

Integer target context ID. Router delivers messages locally when their dst_id matches mitogen.context_id, otherwise they are routed up or downstream.

handle = None

Integer target handle in the destination context. This is one of the Standard Handles, or a dynamically generated handle used to receive a one-time reply, such as the return value of a function call.

property is_dead

True if reply_to is set to the magic value IS_DEAD, indicating the sender considers the channel dead. Dead messages can be raised in a variety of circumstances, see IS_DEAD for more information.

classmethod pickled(obj, **kwargs)

Construct a pickled message, setting data to the serialization of obj, and setting remaining fields using kwargs.

Returns

The new message.

receiver = None

The Receiver over which the message was last received. Part of the mitogen.select.Select interface. Defaults to None.

reply(msg, router=None, **kwargs)

Compose a reply to this message and send it using router, or router is router is None.

Parameters
  • obj – Either a Message, or an object to be serialized in order to construct a new message.

  • router – Optional router to use if router is None.

  • kwargs – Optional keyword parameters overriding message fields in the reply.

reply_to = None

Integer target handle to direct any reply to this message. Used to receive a one-time reply, such as the return value of a function call. IS_DEAD has a special meaning when it appears in this field.

router = None

The Router responsible for routing the message. This is None for locally originated messages.

src_id = None

Integer source context ID. Used as the target of replies if any are generated.

unpickle(throw=True, throw_dead=True)

Unpickle data, optionally raising any exceptions present.

Parameters

throw_dead (bool) – If True, raise exceptions, otherwise it is the caller’s responsibility.

Raises
  • CallError – The serialized data contained CallError exception.

  • ChannelError – The is_dead field was set.

Router Class

class mitogen.core.Router(broker)

Route messages between contexts, and invoke local handlers for messages addressed to this context. Router.route() straddles the Broker thread and user threads, it is safe to call anywhere.

Note: This is the somewhat limited core version of the Router class used by child contexts. The master subclass is documented below this one.

add_handler(fn, handle=None, persist=True, policy=None, respondent=None, overwrite=False)

Invoke fn(msg) on the Broker thread for each Message sent to handle from this context. Unregister after one invocation if persist is False. If handle is None, a new handle is allocated and returned.

Parameters
  • handle (int) – If not None, an explicit handle to register, usually one of the mitogen.core.* constants. If unspecified, a new unused handle will be allocated.

  • persist (bool) – If False, the handler will be unregistered after a single message has been received.

  • respondent (mitogen.core.Context) –

    Context that messages to this handle are expected to be sent from. If specified, arranges for a dead message to be delivered to fn when disconnection of the context is detected.

    In future respondent will likely also be used to prevent other contexts from sending messages to the handle.

  • policy (function) –

    Function invoked as policy(msg, stream) where msg is a mitogen.core.Message about to be delivered, and stream is the mitogen.core.Stream on which it was received. The function must return True, otherwise an error is logged and delivery is refused.

    Two built-in policy functions exist:

    • has_parent_authority(): requires the message arrived from a parent context, or a context acting with a parent context’s authority (auth_id).

    • mitogen.parent.is_immediate_child(): requires the message arrived from an immediately connected child, for use in messaging patterns where either something becomes buggy or insecure by permitting indirect upstream communication.

    In case of refusal, and the message’s reply_to field is nonzero, a mitogen.core.CallError is delivered to the sender indicating refusal occurred.

  • overwrite (bool) – If True, allow existing handles to be silently overwritten.

Returns

handle, or if handle was None, the newly allocated handle.

Raises

Error – Attemp to register handle that was already registered.

context_by_id(context_id, via_id=None, create=True, name=None)

Return or construct a Context given its ID. An internal mapping of ID to the canonical Context representing that ID, so that Signals can be raised.

This may be called from any thread, lookup and construction are atomic.

Parameters
  • context_id (int) – The context ID to look up.

  • via_id (int) – If the Context does not already exist, set its Context.via to the Context matching this ID.

  • create (bool) – If the Context does not already exist, create it.

  • name (str) – If the Context does not already exist, set its name.

Returns

Context, or return None if create is False and no Context previously existed.

context_class

The mitogen.core.Context subclass to use when constructing new Context objects in myself() and context_by_id(). Permits Router subclasses to extend the Context interface, as done in mitogen.parent.Router.

alias of Context

del_handler(handle)

Remove the handle registered for handle

Raises

KeyError – The handle wasn’t registered.

myself()

Return a Context referring to the current process. Since Context is serializable, this is convenient to use in remote function call parameter lists.

register(context, stream)

Register a newly constructed context and its associated stream, and add the stream’s receive side to the I/O multiplexer. This method remains public while the design has not yet settled.

route(msg)

Arrange for the Message msg to be delivered to its destination using any relevant downstream context, or if none is found, by forwarding the message upstream towards the master context. If msg is destined for the local context, it is dispatched using the handles registered with add_handler().

This may be called from any thread.

stream_by_id(dst_id)

Return the Stream that should be used to communicate with dst_id. If a specific route for dst_id is not known, a reference to the parent context’s stream is returned. If the parent is disconnected, or when running in the master context, return None instead.

This can be used from any thread, but its output is only meaningful from the context of the Broker thread, as disconnection or replacement could happen in parallel on the broker thread at any moment.

unidirectional = False

When True, permit children to only communicate with the current context or a parent of the current context. Routing between siblings or children of parents is prohibited, ensuring no communication is possible between intentionally partitioned networks, such as when a program simultaneously manipulates hosts spread across a corporate and a production network, or production networks that are otherwise air-gapped.

Sending a prohibited message causes an error to be logged and a dead message to be sent in reply to the errant message, if that message has reply_to set.

The value of unidirectional becomes the default for the local() unidirectional parameter.

class mitogen.parent.Router(broker)
add_route(target_id, stream)

Arrange for messages whose dst_id is target_id to be forwarded on a directly connected Stream. Safe to call from any thread.

This is called automatically by RouteMonitor in response to mitogen.core.ADD_ROUTE messages, but remains public while the design has not yet settled, and situations may arise where routing is not fully automatic.

Parameters
  • target_id (int) – Target context ID to add a route for.

  • stream (mitogen.core.Stream) – Stream over which messages to the target should be routed.

context_class

alias of Context

del_route(target_id)

Delete any route that exists for target_id. It is not an error to delete a route that does not currently exist. Safe to call from any thread.

This is called automatically by RouteMonitor in response to mitogen.core.DEL_ROUTE messages, but remains public while the design has not yet settled, and situations may arise where routing is not fully automatic.

Parameters

target_id (int) – Target context ID to delete route for.

disconnect(context)

Disconnect a context and forget its stream, assuming the context is directly connected.

get_streams()

Return an atomic snapshot of all streams in existence at time of call. This is safe to call from any thread.

class mitogen.master.Router(broker=None)

Extend mitogen.core.Router with functionality useful to masters, and child contexts who later become masters. Currently when this class is required, the target context’s router is upgraded at runtime.

Note

You may construct as many routers as desired, and use the same broker for multiple routers, however usually only one broker and router need exist. Multiple routers may be useful when dealing with separate trust domains, for example, manipulating infrastructure belonging to separate customers or projects.

Parameters
  • broker (mitogen.master.Broker) – Broker to use. If not specified, a private Broker is created.

  • max_message_size (int) –

    Override the maximum message size this router is willing to receive or transmit. Any value set here is automatically inherited by any children created by the router.

    This has a liberal default of 128 MiB, but may be set much lower. Beware that setting it below 64KiB may encourage unexpected failures as parents and children can no longer route large Python modules that may be required by your application.

broker_class

alias of Broker

enable_debug()

Cause this context and any descendant child contexts to write debug logs to /tmp/mitogen.<pid>.log.

get_stats()

Return performance data for the module responder.

Returns

Dict containing keys:

profiling = False

When True, cause the broker thread and any subsequent broker and main threads existing in any child to write /tmp/mitogen.stats.<pid>.<thread_name>.log containing a cProfile dump on graceful exit. Must be set prior to construction of any Broker, e.g. via:

mitogen.master.Router.profiling = True

Connection Methods

Router.buildah(container=None, buildah_path=None, username=None, **kwargs)

Construct a context on the local machine over a buildah invocation. Accepts all parameters accepted by local(), in addition to:

Parameters
  • container (str) – The name of the Buildah container to connect to.

  • doas_path (str) – Filename or complete path to the buildah binary. PATH will be searched if given as a filename. Defaults to buildah.

  • username (str) – Username to use, defaults to unset.

Router.fork(on_fork=None, on_start=None, debug=False, profiling=False, via=None)

Construct a context on the local machine by forking the current process. The forked child receives a new identity, sets up a new broker and router, and responds to function calls identically to children created using other methods.

The use of this method is strongly discouraged. It requires Python 2.6 or newer, as older Pythons made no effort to reset threading state upon fork.

For long-lived processes, local() is always better as it guarantees a pristine interpreter state that inherited little from the parent. Forking should only be used in performance-sensitive scenarios where short-lived children must be spawned to isolate potentially buggy code, and only after accounting for all the bad things possible as a result of, at a minimum:

  • Files open in the parent remaining open in the child, causing the lifetime of the underlying object to be extended indefinitely.

    • From the perspective of external components, this is observable in the form of pipes and sockets that are never closed, which may break anything relying on closure to signal protocol termination.

    • Descriptors that reference temporary files will not have their disk space reclaimed until the child exits.

  • Third party package state, such as urllib3’s HTTP connection pool, attempting to write to file descriptors shared with the parent, causing random failures in both parent and child.

  • UNIX signal handlers installed in the parent process remaining active in the child, despite associated resources, such as service threads, child processes, resource usage counters or process timers becoming absent or reset in the child.

  • Library code that makes assumptions about the process ID remaining unchanged, for example to implement inter-process locking, or to generate file names.

  • Anonymous MAP_PRIVATE memory mappings whose storage requirement doubles as either parent or child dirties their pages.

  • File-backed memory mappings that cannot have their space freed on disk due to the mapping living on in the child.

  • Difficult to diagnose memory usage and latency spikes due to object graphs becoming unreferenced in either parent or child, causing immediate copy-on-write to large portions of the process heap.

  • Locks held in the parent causing random deadlocks in the child, such as when another thread emits a log entry via the logging package concurrent to another thread calling fork(), or when a C extension module calls the C library allocator, or when a thread is using the C library DNS resolver, for example via socket.gethostbyname().

  • Objects existing in Thread-Local Storage of every non-fork() thread becoming permanently inaccessible, and never having their object destructors called, including TLS usage by native extension code, triggering many new variants of all the issues above.

  • Pseudo-Random Number Generator state that is easily observable by network peers to be duplicate, violating requirements of cryptographic protocols through one-time state reuse. In the worst case, children continually reuse the same state due to repeatedly forking from a static parent.

fork() cleans up Mitogen-internal objects, in addition to locks held by the logging package, reseeds random.random(), and the OpenSSL PRNG via ssl.RAND_add(), but only if the ssl module is already loaded. You must arrange for your program’s state, including any third party packages in use, to be cleaned up by specifying an on_fork function.

The associated stream implementation is mitogen.fork.Stream.

Parameters
  • on_fork (function) – Function invoked as on_fork() from within the child process. This permits supplying a program-specific cleanup function to break locks and close file descriptors belonging to the parent from within the child.

  • on_start (function) – Invoked as on_start(econtext) from within the child process after it has been set up, but before the function dispatch loop starts. This permits supplying a custom child main function that inherits rich data structures that cannot normally be passed via a serialization.

  • via (mitogen.core.Context) – Same as the via parameter for local().

  • debug (bool) – Same as the debug parameter for local().

  • profiling (bool) – Same as the profiling parameter for local().

Router.local(remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None)

Construct a context on the local machine as a subprocess of the current process. The associated stream implementation is mitogen.master.Stream.

Parameters
  • remote_name (str) –

    The argv[0] suffix for the new process. If remote_name is test, the new process argv[0] will be mitogen:test.

    If unspecified, defaults to <username>@<hostname>:<pid>.

    This variable cannot contain slash characters, as the resulting argv[0] must be presented in such a way as to allow Python to determine its installation prefix. This is required to support virtualenv.

  • python_path (str|list) –

    String or list path to the Python interpreter to use for bootstrap. Defaults to sys.executable for local connections, and python for remote connections.

    It is possible to pass a list to invoke Python wrapped using another tool, such as ["/usr/bin/env", "python"].

  • debug (bool) – If True, arrange for debug logging (enable_debug()) to be enabled in the new context. Automatically True when enable_debug() has been called, but may be used selectively otherwise.

  • unidirectional (bool) – If True, arrange for the child’s router to be constructed with unidirectional routing enabled. Automatically True when it was enabled for this router, but may still be explicitly set to False.

  • connect_timeout (float) – Fractional seconds to wait for the subprocess to indicate it is healthy. Defaults to 30 seconds.

  • profiling (bool) – If True, arrange for profiling (profiling) to be enabled in the new context. Automatically True when profiling is True, but may be used selectively otherwise.

  • via (mitogen.core.Context) –

    If not None, arrange for construction to occur via RPCs made to the context via, and for ADD_ROUTE messages to be generated as appropriate.

    # SSH to the remote machine.
    remote_machine = router.ssh(hostname='mybox.com')
    
    # Use the SSH connection to create a sudo connection.
    remote_root = router.sudo(username='root', via=remote_machine)
    

Router.doas(username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, **kwargs)

Construct a context on the local machine over a doas invocation. The doas process is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • username (str) – Username to use, defaults to root.

  • password (str) – The account password to use if requested.

  • doas_path (str) – Filename or complete path to the doas binary. PATH will be searched if given as a filename. Defaults to doas.

  • password_prompt (bytes) – A string that indicates doas is requesting a password. Defaults to Password:.

  • incorrect_prompts (list) – List of bytestrings indicating the password is incorrect. Defaults to (b”doas: authentication failed”).

Raises

mitogen.doas.PasswordError – A password was requested but none was provided, the supplied password was incorrect, or the target account did not exist.

Router.docker(container=None, image=None, docker_path=None, **kwargs)

Construct a context on the local machine within an existing or temporary new Docker container using the docker program. One of container or image must be specified.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • container (str) – Existing container to connect to. Defaults to None.

  • username (str) – Username within the container to setuid() to. Defaults to None, which Docker interprets as root.

  • image (str) – Image tag to use to construct a temporary container. Defaults to None.

  • docker_path (str) – Filename or complete path to the Docker binary. PATH will be searched if given as a filename. Defaults to docker.

Router.jail(container, jexec_path=None, **kwargs)

Construct a context on the local machine within a FreeBSD jail using the jexec program.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • container (str) – Existing container to connect to. Defaults to None.

  • username (str) – Username within the container to setuid() to. Defaults to None, which jexec interprets as root.

  • jexec_path (str) – Filename or complete path to the jexec binary. PATH will be searched if given as a filename. Defaults to /usr/sbin/jexec.

Router.kubectl(pod, kubectl_path=None, kubectl_args=None, **kwargs)

Construct a context in a container via the Kubernetes kubectl program.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • pod (str) – Kubernetes pod to connect to.

  • kubectl_path (str) – Filename or complete path to the kubectl binary. PATH will be searched if given as a filename. Defaults to kubectl.

  • kubectl_args (list) – Additional arguments to pass to the kubectl command.

Router.lxc(container, lxc_attach_path=None, **kwargs)

Construct a context on the local machine within an LXC classic container using the lxc-attach program.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • container (str) – Existing container to connect to. Defaults to None.

  • lxc_attach_path (str) – Filename or complete path to the lxc-attach binary. PATH will be searched if given as a filename. Defaults to lxc-attach.

Router.lxd(container, lxc_path=None, **kwargs)

Construct a context on the local machine within a LXD container using the lxc program.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • container (str) – Existing container to connect to. Defaults to None.

  • lxc_path (str) – Filename or complete path to the lxc binary. PATH will be searched if given as a filename. Defaults to lxc.

Router.setns(container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, **kwargs)

Construct a context in the style of local(), but change the active Linux process namespaces via calls to setns(2) before executing Python.

The namespaces to use, and the active root file system are taken from the root PID of a running Docker, LXC, LXD, or systemd-nspawn container.

The setns method depends on the built-in ctypes module, and thus does not support Python 2.4.

A program is required only to find the root PID, after which management of the child Python interpreter is handled directly.

Parameters
  • container (str) – Container to connect to.

  • kind (str) – One of docker, lxc, lxd or machinectl.

  • username (str) – Username within the container to setuid() to. Defaults to root.

  • docker_path (str) – Filename or complete path to the Docker binary. PATH will be searched if given as a filename. Defaults to docker.

  • lxc_path (str) – Filename or complete path to the LXD lxc binary. PATH will be searched if given as a filename. Defaults to lxc.

  • lxc_info_path (str) – Filename or complete path to the LXC lxc-info binary. PATH will be searched if given as a filename. Defaults to lxc-info.

  • machinectl_path (str) – Filename or complete path to the machinectl binary. PATH will be searched if given as a filename. Defaults to machinectl.

Router.su(username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, **kwargs)

Construct a context on the local machine over a su invocation. The su process is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • username (str) – Username to pass to su, defaults to root.

  • password (str) – The account password to use if requested.

  • su_path (str) – Filename or complete path to the su binary. PATH will be searched if given as a filename. Defaults to su.

  • password_prompt (bytes) – The string that indicates su is requesting a password. Defaults to Password:.

  • incorrect_prompts (str) – Strings that signal the password is incorrect. Defaults to (“su: sorry”, “su: authentication failure”).

Raises

mitogen.su.PasswordError – A password was requested but none was provided, the supplied password was incorrect, or (on BSD) the target account did not exist.

Router.sudo(username=None, sudo_path=None, password=None, **kwargs)

Construct a context on the local machine over a sudo invocation. The sudo process is started in a newly allocated pseudo-terminal, and supports typing interactive passwords.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • username (str) – Username to pass to sudo as the -u parameter, defaults to root.

  • sudo_path (str) – Filename or complete path to the sudo binary. PATH will be searched if given as a filename. Defaults to sudo.

  • password (str) – The password to use if/when sudo requests it. Depending on the sudo configuration, this is either the current account password or the target account password. mitogen.sudo.PasswordError will be raised if sudo requests a password but none is provided.

  • set_home (bool) – If True, request sudo set the HOME environment variable to match the target UNIX account.

  • preserve_env (bool) – If True, request sudo to preserve the environment of the parent process.

  • selinux_type (str) – If not None, the SELinux security context to use.

  • selinux_role (str) – If not None, the SELinux role to use.

  • sudo_args (list) – Arguments in the style of sys.argv that would normally be passed to sudo. The arguments are parsed in-process to set equivalent parameters. Re-parsing ensures unsupported options cause mitogen.core.StreamError to be raised, and that attributes of the stream match the actual behaviour of sudo.

Router.ssh(hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, **kwargs)

Construct a remote context over an OpenSSH ssh invocation.

The ssh process is started in a newly allocated pseudo-terminal to support typing interactive passwords and responding to prompts, if a password is specified, or check_host_keys=accept. In other scenarios, BatchMode is enabled and no PTY is allocated. For many-target configurations, both options should be avoided as most systems have a conservative limit on the number of pseudo-terminals that may exist.

Accepts all parameters accepted by local(), in addition to:

Parameters
  • username (str) – The SSH username; default is unspecified, which causes SSH to pick the username to use.

  • ssh_path (str) – Absolute or relative path to ssh. Defaults to ssh.

  • ssh_args (list) – Additional arguments to pass to the SSH command.

  • port (int) – Port number to connect to; default is unspecified, which causes SSH to pick the port number.

  • check_host_keys (str) –

    Specifies the SSH host key checking mode. Defaults to enforce.

    • ignore: no host key checking is performed. Connections never fail due to an unknown or changed host key.

    • accept: known hosts keys are checked to ensure they match, new host keys are automatically accepted and verified in future connections.

    • enforce: known host keys are checked to ensure they match, unknown hosts cause a connection failure.

  • password (str) – Password to type if/when ssh requests it. If not specified and a password is requested, mitogen.ssh.PasswordError is raised.

  • identity_file (str) –

    Path to an SSH private key file to use for authentication. Default is unspecified, which causes SSH to pick the identity file.

    When this option is specified, only identity_file will be used by the SSH client to perform authenticaion; agent authentication is automatically disabled, as is reading the default private key from ~/.ssh/id_rsa, or ~/.ssh/id_dsa.

  • identities_only (bool) – If True and a password or explicit identity file is specified, instruct the SSH client to disable any authentication identities inherited from the surrounding environment, such as those loaded in any running ssh-agent, or default key files present in ~/.ssh. This ensures authentication attempts only occur using the supplied password or SSH key.

  • compression (bool) – If True, enable ssh compression support. Compression has a minimal effect on the size of modules transmitted, as they are already compressed, however it has a large effect on every remaining message in the otherwise uncompressed stream protocol, such as function call arguments and return values.

  • ssh_debug_level (int) – Optional integer 0..3 indicating the SSH client debug level.

Raises
  • mitogen.ssh.PasswordError – A password was requested but none was specified, or the specified password was incorrect.

  • mitogen.ssh.HostKeyError – When check_host_keys is set to either accept, indicates a previously recorded key no longer matches the remote machine. When set to enforce, as above, but additionally indicates no previously recorded key exists for the remote machine.

Context Class

class mitogen.core.Context(router, context_id, name=None)

Represent a remote context regardless of the underlying connection method. Context objects are simple facades that emit messages through an associated router, and have Signals raised against them in response to various events relating to the context.

Note: This is the somewhat limited core version, used by child contexts. The master subclass is documented below this one.

Contexts maintain no internal state and are thread-safe.

Prefer Router.context_by_id() over constructing context objects explicitly, as that method is deduplicating, and returns the only context instance Signals will be raised on.

Parameters
  • router (mitogen.core.Router) – Router to emit messages through.

  • context_id (int) – Context ID.

  • name (str) – Context name.

send(msg)

Arrange for msg to be delivered to this context. dst_id is set to the target context ID.

Parameters

msg (Message) – Message.

send_async(msg, persist=False)

Arrange for msg to be delivered to this context, with replies directed to a newly constructed receiver. dst_id is set to the target context ID, and reply_to is set to the newly constructed receiver’s handle.

Parameters
  • persist (bool) – If False, the handler will be unregistered after a single message has been received.

  • msg (mitogen.core.Message) – The message.

Returns

Receiver configured to receive any replies sent to the message’s reply_to handle.

send_await(msg, deadline=None)

Like send_async(), but expect a single reply (persist=False) delivered within deadline seconds.

Parameters
Returns

Deserialized reply.

Raises

TimeoutError – No message was received and deadline passed.

class mitogen.parent.Context(*args, **kwargs)

Extend mitogen.core.Context with functionality useful to masters, and child contexts who later become parents. Currently when this class is required, the target context’s router is upgraded at runtime.

call(fn, *args, **kwargs)

See CallChain.call().

call_async(fn, *args, **kwargs)

See CallChain.call_async().

call_chain_class

A CallChain instance constructed by default, with pipelining disabled. call(), call_async() and call_no_reply() use this instance.

alias of CallChain

call_no_reply(fn, *args, **kwargs)

See CallChain.call_no_reply().

shutdown(wait=False)

Arrange for the context to receive a SHUTDOWN message, triggering graceful shutdown.

Due to a lack of support for timers, no attempt is made yet to force terminate a hung context using this method. This will be fixed shortly.

Parameters

wait (bool) – If True, block the calling thread until the context has completely terminated.

Returns

If wait is False, returns a mitogen.core.Latch whose get() method returns None when shutdown completes. The timeout parameter may be used to implement graceful timeouts.

class mitogen.parent.CallChain(context, pipelined=False)

Deliver mitogen.core.CALL_FUNCTION messages to a target context, optionally threading related calls so an exception in an earlier call cancels subsequent calls.

Parameters

call(), call_no_reply() and call_async() normally issue calls and produce responses with no memory of prior exceptions. If a call made with call_no_reply() fails, the exception is logged to the target context’s logging framework.

Pipelining

When pipelining is enabled, if an exception occurs during a call, subsequent calls made by the same CallChain fail with the same exception, including those already in-flight on the network, and no further calls execute until reset() is invoked.

No exception is logged for calls made with call_no_reply(), instead the exception is saved and reported as the result of subsequent call() or call_async() calls.

Sequences of asynchronous calls can be made without wasting network round-trips to discover if prior calls succeed, and chains originating from multiple unrelated source contexts may overlap concurrently at a target context without interference.

In this example, 4 calls complete in one round-trip:

chain = mitogen.parent.CallChain(context, pipelined=True)
chain.call_no_reply(os.mkdir, '/tmp/foo')

# If previous mkdir() failed, this never runs:
chain.call_no_reply(os.mkdir, '/tmp/foo/bar')

# If either mkdir() failed, this never runs, and the exception is
# asynchronously delivered to the receiver.
recv = chain.call_async(subprocess.check_output, '/tmp/foo')

# If anything so far failed, this never runs, and raises the exception.
chain.call(do_something)

# If this code was executed, the exception would also be raised.
if recv.get().unpickle() == 'baz':
    pass

When pipelining is enabled, reset() must be invoked to ensure any exception is discarded, otherwise unbounded memory usage is possible in long-running programs. The context manager protocol is supported to ensure reset() is always invoked:

with mitogen.parent.CallChain(context, pipelined=True) as chain:
    chain.call_no_reply(...)
    chain.call_no_reply(...)
    chain.call_no_reply(...)
    chain.call(...)

# chain.reset() automatically invoked.
call(fn, *args, **kwargs)

Like call_async(), but block until the return value is available. Equivalent to:

call_async(fn, *args, **kwargs).get().unpickle()
Returns

The function’s return value.

Raises

mitogen.core.CallError – An exception was raised in the remote context during execution.

call_async(fn, *args, **kwargs)

Arrange for fn(*args, **kwargs) to be invoked on the context’s main thread.

Parameters
  • fn

    A free function in module scope or a class method of a class directly reachable from module scope:

    # mymodule.py
    
    def my_func():
        '''A free function reachable as mymodule.my_func'''
    
    class MyClass:
        @classmethod
        def my_classmethod(cls):
            '''Reachable as mymodule.MyClass.my_classmethod'''
    
        def my_instancemethod(self):
            '''Unreachable: requires a class instance!'''
    
        class MyEmbeddedClass:
            @classmethod
            def my_classmethod(cls):
                '''Not directly reachable from module scope!'''
    

  • args (tuple) – Function arguments, if any. See RPC Serialization Rules for permitted types.

  • kwargs (dict) – Function keyword arguments, if any. See RPC Serialization Rules for permitted types.

Returns

mitogen.core.Receiver configured to receive the result of the invocation:

recv = context.call_async(os.check_output, 'ls /tmp/')
try:
    # Prints output once it is received.
    msg = recv.get()
    print(msg.unpickle())
except mitogen.core.CallError, e:
    print('Call failed:', str(e))

Asynchronous calls may be dispatched in parallel to multiple contexts and consumed as they complete using mitogen.select.Select.

call_no_reply(fn, *args, **kwargs)

Like call_async(), but do not wait for a return value, and inform the target context no reply is expected. If the call fails and pipelining is disabled, the exception will be logged to the target context’s logging framework.

reset()

Instruct the target to forget any related exception.

Receiver Class

class mitogen.core.Receiver(router, handle=None, persist=True, respondent=None, policy=None, overwrite=False)

Receivers maintain a thread-safe queue of messages sent to a handle of this context from another context.

Parameters
  • router (mitogen.core.Router) – Router to register the handler on.

  • handle (int) – If not None, an explicit handle to register, otherwise an unused handle is chosen.

  • persist (bool) – If False, unregister the handler after one message is received. Single-message receivers are intended for RPC-like transactions, such as in the case of mitogen.parent.Context.call_async().

  • respondent (mitogen.core.Context) – Context this receiver is receiving from. If not None, arranges for the receiver to receive a dead message if messages can no longer be routed to the context due to disconnection, and ignores messages that did not originate from the respondent context.

close()

Unregister the receiver’s handle from its associated router, and cause ChannelError to be raised in any thread waiting in get() on this receiver.

empty()

Return size() == 0.

Deprecated since version 0.2.8: Use size() instead.

Raises

LatchError – The latch has already been marked closed.

get(timeout=None, block=True, throw_dead=True)

Sleep waiting for a message to arrive on this receiver.

Parameters

timeout (float) – If not None, specifies a timeout in seconds.

Raises
Returns

Message that was received.

handle = None

The handle.

notify = None

If not None, a function invoked as notify(receiver) after a message has been received. The function is invoked on Broker thread, therefore it must not block. Used by mitogen.select.Select to efficiently implement waiting on multiple event sources.

size()

Return the number of items currently buffered.

As with Queue.Queue, 0 may be returned even though a subsequent call to get() will succeed, since a message may be posted at any moment between size() and get().

As with Queue.Queue, >0 may be returned even though a subsequent call to get() will block, since another waiting thread may be woken at any moment between size() and get().

Raises

LatchError – The underlying latch has already been marked closed.

to_sender()

Return a Sender configured to deliver messages to this receiver. As senders are serializable, this makes it convenient to pass (context_id, handle) pairs around:

def deliver_monthly_report(sender):
    for line in open('monthly_report.txt'):
        sender.send(line)
    sender.close()

@mitogen.main()
def main(router):
    remote = router.ssh(hostname='mainframe')
    recv = mitogen.core.Receiver(router)
    remote.call(deliver_monthly_report, recv.to_sender())
    for msg in recv:
        print(msg)

Sender Class

class mitogen.core.Sender(context, dst_handle)

Senders are used to send pickled messages to a handle in another context, it is the inverse of mitogen.core.Receiver.

Senders may be serialized, making them convenient to wire up data flows. See mitogen.core.Receiver.to_sender() for more information.

Parameters
  • context (mitogen.core.Context) – Context to send messages to.

  • dst_handle (int) – Destination handle to send messages to.

close()

Send a dead message to the remote, causing ChannelError() to be raised in any waiting thread.

send(data)

Send data to the remote end.

Select Class

class mitogen.select.Event

Represents one selected event.

data = None

The mitogen.core.Message delivered to a receiver, or the object posted to a latch.

source = None

The first Receiver or Latch the event traversed.

class mitogen.select.Select(receivers=(), oneshot=True)

Support scatter/gather asynchronous calls and waiting on multiple receivers, channels, latches, and sub-selects.

If oneshot is True, then remove each receiver as it yields a result; since __iter__() terminates once the final receiver is removed, this makes it convenient to respond to calls made in parallel:

total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]

for msg in mitogen.select.Select(recvs):
    print('Got %s from %s' % (msg, msg.receiver))
    total += msg.unpickle()

# Iteration ends when last Receiver yields a result.
print('Received total %s from %s receivers' % (total, len(recvs)))

Select may drive a long-running scheduler:

with mitogen.select.Select(oneshot=False) as select:
    while running():
        for msg in select:
            process_result(msg.receiver.context, msg.unpickle())
        for context, workfunc in get_new_work():
            select.add(context.call_async(workfunc))

Select may be nested:

subselects = [
    mitogen.select.Select(get_some_work()),
    mitogen.select.Select(get_some_work()),
    mitogen.select.Select([
        mitogen.select.Select(get_some_work()),
        mitogen.select.Select(get_some_work())
    ])
]

for msg in mitogen.select.Select(selects):
    print(msg.unpickle())

Select may be used to mix inter-thread and inter-process IO:

latch = mitogen.core.Latch()
start_thread(latch)
recv = remote_host.call_async(os.getuid)

sel = Select([latch, recv])
event = sel.get_event()
if event.source is latch:
    # woken by a local thread
else:
    # woken by function call result
add(recv)

Add a mitogen.core.Receiver, Select or mitogen.core.Latch to the select.

Raises

mitogen.select.Error – An attempt was made to add a Select to which this select is indirectly a member of.

classmethod all(receivers)

Take an iterable of receivers and retrieve a Message from each, returning the result of calling Message.unpickle() on each in turn. Results are returned in the order they arrived.

This is sugar for handling batch Context.call_async invocations:

print('Total disk usage: %.02fMiB' % (sum(
    mitogen.select.Select.all(
        context.call_async(get_disk_usage)
        for context in contexts
    ) / 1048576.0
),))

However, unlike in a naive comprehension such as:

recvs = [c.call_async(get_disk_usage) for c in contexts]
sum(recv.get().unpickle() for recv in recvs)

Result processing happens in the order results arrive, rather than the order requests were issued, so all() should always be faster.

close()

Remove the select’s notifier function from each registered receiver, mark the associated latch as closed, and cause any thread currently sleeping in get() to be woken with mitogen.core.LatchError.

This is necessary to prevent memory leaks in long-running receivers. It is called automatically when the Python with statement is used.

empty()

Return size() == 0.

Deprecated since version 0.2.8: Use size() instead.

get(timeout=None, block=True)

Call get_event(timeout, block) returning Event.data of the first available event.

get_event(timeout=None, block=True)

Fetch the next available Event from any source, or raise mitogen.core.TimeoutError if no value is available within timeout seconds.

On success, the message’s receiver attribute is set to the receiver.

Parameters
Returns

Event.

Raises
iter_data()

Yield Event.data until no receivers remain in the select, either because oneshot is True, or each receiver was explicitly removed via remove().

__iter__() is an alias for iter_data(), allowing loops like:

for msg in Select([recv1, recv2]):
    print msg.unpickle()
iter_events()

Yield Event instances until no receivers remain in the select.

remove(recv)

Remove an object from from the select. Note that if the receiver has notified prior to remove(), it will still be returned by a subsequent get(). This may change in a future version.

size()

Return the number of items currently buffered.

As with Queue.Queue, 0 may be returned even though a subsequent call to get() will succeed, since a message may be posted at any moment between size() and get().

As with Queue.Queue, >0 may be returned even though a subsequent call to get() will block, since another waiting thread may be woken at any moment between size() and get().

Channel Class

class mitogen.core.Channel(router, context, dst_handle, handle=None)

A channel inherits from mitogen.core.Sender and mitogen.core.Receiver to provide bidirectional functionality.

Deprecated since version 0.2.0: This class is incomplete and obsolete, it will be removed in Mitogen 0.3.

Channels were an early attempt at syntax sugar. It is always easier to pass around unidirectional pairs of senders/receivers, even though the syntax is baroque:

# Wire up a ping/pong counting loop between 2 subprocesses.

from __future__ import print_function
import mitogen.core
import mitogen.select


@mitogen.core.takes_router
def ping_pong(control_sender, router):
    with mitogen.core.Receiver(router) as recv:
        # Tell caller how to communicate with us.
        control_sender.send(recv.to_sender())

        # Wait for caller to tell us how to talk back:
        data_sender = recv.get().unpickle()

        n = 0
        while (n + 1) < 30:
            n = recv.get().unpickle()
            print('the number is currently', n)
            data_sender.send(n + 1)


@mitogen.main()
def main(router):
    # Create a receiver for control messages.
    with mitogen.core.Receiver(router) as recv:
        # Start ping_pong() in child 1 and fetch its sender.
        c1 = router.local()
        c1_call = c1.call_async(ping_pong, recv.to_sender())
        c1_sender = recv.get().unpickle()

        # Start ping_pong() in child 2 and fetch its sender.
        c2 = router.local()
        c2_call = c2.call_async(ping_pong, recv.to_sender())
        c2_sender = recv.get().unpickle()

        # Tell the children about each others' senders.
        c1_sender.send(c2_sender)
        c2_sender.send(c1_sender)

    # Start the loop.
    c1_sender.send(0)

    # Wait for both functions to return.
    mitogen.select.Select.all([c1_call, c2_call])

Since all handles aren’t known until after both ends are constructed, for both ends to communicate through a channel, it is necessary for one end to retrieve the handle allocated to the other and reconfigure its own channel to match. Currently this is a manual task.

close()

Send a dead message to the remote, causing ChannelError() to be raised in any waiting thread.

Broker Class

class mitogen.core.Broker(poller_class=None, activate_compat=True)

Responsible for handling I/O multiplexing in a private thread.

Note: This somewhat limited core version is used by children. The master subclass is documented below.

defer = None

Arrange for func(*args, **kwargs) to be executed on the broker thread, or immediately if the current thread is the broker thread. Safe to call from any thread.

defer_sync(func)

Arrange for func() to execute on Broker thread, blocking the current thread until a result or exception is available.

Returns

Return value of func().

join()

Wait for the broker to stop, expected to be called after shutdown().

keep_alive()

Return True if any reader’s Side.keep_alive attribute is True, or any Context is still registered that is not the master. Used to delay shutdown while some important work is in progress (e.g. log draining).

poller_class

alias of Poller

shutdown()

Request broker gracefully disconnect streams and stop. Safe to call from any thread.

shutdown_timeout = 3.0

Seconds grace to allow streams to shutdown gracefully before force-disconnecting them during shutdown().

start_receive(stream)

Mark the receive_side on stream as ready for reading. Safe to call from any thread. When the associated file descriptor becomes ready for reading, BasicStream.on_receive() will be called.

stop_receive(stream)

Mark the receive_side on stream as not ready for reading. Safe to call from any thread.

class mitogen.master.Broker(install_watcher=True)

Note

You may construct as many brokers as desired, and use the same broker for multiple routers, however usually only one broker need exist. Multiple brokers may be useful when dealing with sets of children with differing lifetimes. For example, a subscription service where non-payment results in termination for one customer.

Parameters

install_watcher (bool) –

If True, an additional thread is started to monitor the lifetime of the main thread, triggering shutdown() automatically in case the user forgets to call it, or their code crashed.

You should not rely on this functionality in your program, it is only intended as a fail-safe and to simplify the API for new users. In particular, alternative Python implementations may not be able to support watching the main thread.

poller_class

alias of mitogen.parent.EpollPoller

shutdown()

Request broker gracefully disconnect streams and stop. Safe to call from any thread.

Fork Safety

class mitogen.os_fork.Corker(brokers=(), pools=())

Arrange for mitogen.core.Broker and optionally mitogen.service.Pool to be temporarily “corked” while fork operations may occur.

In a mixed threading/forking environment, it is critical no threads are active at the moment of fork, as they could hold mutexes whose state is unrecoverably snapshotted in the locked state in the fork child, causing deadlocks at random future moments.

To ensure a target thread has all locks dropped, it is made to write a large string to a socket with a small buffer that has os.O_NONBLOCK disabled. CPython will drop the GIL and enter the write() system call, where it will block until the socket buffer is drained, or the write side is closed.

mitogen.core.Poller is used to ensure the thread really has blocked outside any Python locks, by checking if the socket buffer has started to fill.

Since this necessarily involves posting a message to every existent thread and verifying acknowledgement, it will never be a fast operation.

This does not yet handle the case of corking being initiated from within a thread that is also a cork target.

Parameters
  • brokers – Sequence of mitogen.core.Broker instances to cork.

  • pools – Sequence of mitogen.core.Pool instances to cork.

cork()

Arrange for any associated brokers and pools to be paused with no locks held. This will not return until each thread acknowledges it has ceased execution.

uncork()

Arrange for paused threads to resume operation.

Utility Functions

mitogen.core.now()

A reference to time.time() on Python 2, or time.monotonic() on Python >3.3. We prefer time.monotonic() when available to ensure timers are not impacted by system clock changes.

A random assortment of utility functions useful on masters and children.

mitogen.utils.cast(obj)

Many tools love to subclass built-in types in order to implement useful functionality, such as annotating the safety of a Unicode string, or adding additional methods to a dict. However, cPickle loves to preserve those subtypes during serialization, resulting in CallError during call in the target when it tries to deserialize the data.

This function walks the object graph obj, producing a copy with any custom sub-types removed. The functionality is not default since the resulting walk may be computationally expensive given a large enough graph.

See RPC Serialization Rules for a list of supported types.

Parameters

obj – Object to undecorate.

Returns

Undecorated object.

mitogen.utils.setup_gil()

Set extremely long GIL release interval to let threads naturally progress through CPU-heavy sequences without forcing the wake of another thread that may contend trying to run the same CPU-heavy code. For the new-style Ansible work, this drops runtime ~33% and involuntary context switches by >80%, essentially making threads cooperatively scheduled.

mitogen.utils.disable_site_packages()

Remove all entries mentioning site-packages or Extras from :attr:sys.path. Used primarily for testing on OS X within a virtualenv, where OS X bundles some ancient version of the six module.

mitogen.utils.log_to_file(path=None, io=False, level='INFO')

Install a new logging.Handler writing applications logs to the filesystem. Useful when debugging slave IO problems.

Parameters to this function may be overridden at runtime using environment variables. See Logging Environment Variables.

Parameters
  • path (str) – If not None, a filesystem path to write logs to. Otherwise, logs are written to sys.stderr.

  • io (bool) – If True, include extremely verbose IO logs in the output. Useful for debugging hangs, less useful for debugging application code.

  • level (str) – Name of the logging package constant that is the minimum level to log at. Useful levels are DEBUG, INFO, WARNING, and ERROR.

mitogen.utils.run_with_router(func, *args, **kwargs)

Arrange for func(router, *args, **kwargs) to run with a temporary mitogen.master.Router, ensuring the Router and Broker are correctly shut down during normal or exceptional return.

Returns

func’s return value.

@mitogen.utils.with_router

Decorator version of run_with_router(). Example:

@with_router
def do_stuff(router, arg):
    pass

do_stuff(blah, 123)

Exceptions

class mitogen.core.Error(fmt=None, *args)

Base for all exceptions raised by Mitogen.

Parameters
  • fmt (str) – Exception text, or format string if args is non-empty.

  • args (tuple) – Format string arguments.

class mitogen.core.CallError(fmt=None, *args)

Serializable Error subclass raised when Context.call() fails. A copy of the traceback from the external context is appended to the exception message.

class mitogen.core.ChannelError(fmt=None, *args)

Raised when a channel dies or has been closed.

class mitogen.core.LatchError(fmt=None, *args)

Raised when an attempt is made to use a mitogen.core.Latch that has been marked closed.

class mitogen.core.StreamError(fmt=None, *args)

Raised when a stream cannot be established.

class mitogen.core.TimeoutError(fmt=None, *args)

Raised when a timeout occurs on a stream.

class mitogen.parent.EofError(fmt=None, *args)

Raised by Connection when an empty read is detected from the remote process before bootstrap completes.

class mitogen.parent.CancelledError(fmt=None, *args)

Raised by Connection when mitogen.core.Broker.shutdown() is called before bootstrap completes.