Internal API Reference

Note

Internal APIs are subject to rapid change even across minor releases. This page exists to help users modify and extend the library.

Constants

mitogen.core.CHUNK_SIZE = 131072

Default size for calls to Side.read() or Side.write(), and the size of buffers configured by mitogen.parent.create_socketpair(). This value has many performance implications, 128KiB seems to be a sweet spot.

  • When set low, large messages cause many Broker IO loop iterations, burning CPU and reducing throughput.

  • When set high, excessive RAM is reserved by the OS for socket buffers (2x per child), and an identically sized temporary userspace buffer is allocated on each read that requires zeroing, and over a particular size may require two system calls to allocate/deallocate.

Care must be taken to ensure the underlying kernel object and receiving program support the desired size. For example,

  • Most UNIXes have TTYs with fixed 2KiB-4KiB buffers, making them unsuitable for efficient IO.

  • Different UNIXes have varying presets for pipes, which may not be configurable. On recent Linux the default pipe buffer size is 64KiB, but under memory pressure may be as low as 4KiB for unprivileged processes.

  • When communication is via an intermediary process, its internal buffers effect the speed OS buffers will drain. For example OpenSSH uses 64KiB reads.

An ideal Message has a size that is a multiple of CHUNK_SIZE inclusive of headers, to avoid wasting IO loop iterations writing small trailer chunks.

Pollers

class mitogen.core.Poller

A poller manages OS file descriptors the user is waiting to become available for IO. The poll() method blocks the calling thread until one or more become ready. The default implementation is based on select.poll().

Each descriptor has an associated data element, which is unique for each readiness type, and defaults to being the same as the file descriptor. The poll() method yields the data associated with a descriptor, rather than the descriptor itself, allowing concise loops like:

p = Poller()
p.start_receive(conn.fd, data=conn.on_read)
p.start_transmit(conn.fd, data=conn.on_write)

for callback in p.poll():
    callback()  # invoke appropriate bound instance method

Pollers may be modified while poll() is yielding results. Removals are processed immediately, causing pending events for the descriptor to be discarded.

The close() method must be called when a poller is discarded to avoid a resource leak.

Pollers may only be used by one thread at a time.

close()

Close any underlying OS resource used by the poller.

poll(timeout=None)

Block the calling thread until one or more FDs are ready for IO.

Parameters

timeout (float) – If not None, seconds to wait without an event before returning an empty iterable.

Returns

Iterable of data elements associated with ready FDs.

property readers

Return a list of (fd, data) tuples for every FD registered for receive readiness.

start_receive(fd, data=None)

Cause poll() to yield data when fd is readable.

start_transmit(fd, data=None)

Cause poll() to yield data when fd is writeable.

stop_receive(fd)

Stop yielding readability events for fd.

Redundant calls to stop_receive() are silently ignored, this may change in future.

stop_transmit(fd)

Stop yielding writeability events for fd.

Redundant calls to stop_transmit() are silently ignored, this may change in future.

property writers

Return a list of (fd, data) tuples for every FD registered for transmit readiness.

class mitogen.parent.KqueuePoller

Poller based on the FreeBSD/Darwin kqueue(2) interface.

class mitogen.parent.EpollPoller

Poller based on the Linux epoll(2) interface.

class mitogen.parent.PollPoller

Poller based on the POSIX poll(2) interface. Not available on some versions of OS X, otherwise it is the preferred poller for small FD counts, as there is no setup/teardown/configuration system call overhead.

Latch

class mitogen.core.Latch

A latch is a Queue.Queue-like object that supports mutation and waiting from multiple threads, however unlike Queue.Queue, waiting threads always remain interruptible, so CTRL+C always succeeds, and waits where a timeout is set experience no wake up latency. These properties are not possible in combination using the built-in threading primitives available in Python 2.x.

Latches implement queues using the UNIX self-pipe trick, and a per-thread socket.socketpair() that is lazily created the first time any latch attempts to sleep on a thread, and dynamically associated with the waiting Latch only for duration of the wait.

See Waking Sleeping Threads for further discussion.

close()

Mark the latch as closed, and cause every sleeping thread to be woken, with mitogen.core.LatchError raised in each thread.

empty()

Return size() == 0.

Deprecated since version 0.2.8: Use size() instead.

Raises

LatchError – The latch has already been marked closed.

get(timeout=None, block=True)

Return the next enqueued object, or sleep waiting for one.

Parameters
Raises
Returns

The de-queued object.

notify = None

If not None, a function invoked as notify(latch) after a successful call to put(). The function is invoked on the put() caller’s thread, which may be the Broker thread, therefore it must not block. Used by mitogen.select.Select to efficiently implement waiting on multiple event sources.

poller_class

alias of mitogen.parent.PollPoller

put(obj=None)

Enqueue an object, waking the first thread waiting for a result, if one exists.

Parameters

obj – Object to enqueue. Defaults to None as a convenience when using Latch only for synchronization.

Raises

mitogen.core.LatchErrorclose() has been called, and the object is no longer valid.

size()

Return the number of items currently buffered.

As with Queue.Queue, 0 may be returned even though a subsequent call to get() will succeed, since a message may be posted at any moment between size() and get().

As with Queue.Queue, >0 may be returned even though a subsequent call to get() will block, since another waiting thread may be woken at any moment between size() and get().

Raises

LatchError – The latch has already been marked closed.

Logging

See also mitogen.core.IoLoggerProtocol.

class mitogen.core.LogHandler(context)

A logging.Handler subclass that arranges for FORWARD_LOG messages to be sent to a parent context in response to logging messages generated by the current context. This is installed by default in child contexts during bootstrap, so that logging events can be viewed and managed centrally in the master process.

The handler is initially corked after construction, such that it buffers messages until uncork() is called. This allows logging to be installed prior to communication with the target being available, and avoids any possible race where early log messages might be dropped.

Parameters

context (mitogen.core.Context) – The context to send log messages towards. At present this is always the master process.

emit(rec)

Send a FORWARD_LOG message towards the target context.

uncork()

#305: during startup LogHandler may be installed before it is possible to route messages, therefore messages are buffered until uncork() is called by ExternalContext.

class mitogen.master.LogForwarder(router)

Install a mitogen.core.FORWARD_LOG handler that delivers forwarded log events into the local logging framework. This is used by the master’s Router.

The forwarded logging.LogRecord objects are delivered to loggers under mitogen.ctx.* corresponding to their mitogen.core.Context.name, with the message prefixed with the logger name used in the child. The records include some extra attributes:

  • mitogen_message: Unicode original message without the logger name prepended.

  • mitogen_context: mitogen.parent.Context reference to the source context.

  • mitogen_name: Original logger name.

Parameters

router (mitogen.master.Router) – Router to install the handler on.

class mitogen.core.PidfulStreamHandler(stream=None)

A logging.StreamHandler subclass used when Router.enable_debug() has been called, or the debug parameter was specified during context construction. Verifies the process ID has not changed on each call to emit(), reopening the associated log file when a change is detected.

This ensures logging to the per-process output files happens correctly even when uncooperative third party components call os.fork().

emit(record)

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

open_pid = None

PID that last opened the log file.

template = '/tmp/mitogen.%s.%s.log'

Output path template.

Stream, Side & Protocol

class mitogen.core.Stream

A Stream is one readable and optionally one writeable file descriptor (represented by Side) aggregated alongside an associated Protocol that knows how to respond to IO readiness events for those descriptors.

Streams are registered with Broker, and callbacks are invoked on the broker thread in response to IO activity. When registered using Broker.start_receive() or Broker._start_transmit(), the broker may call any of on_receive(), on_transmit(), on_shutdown() or on_disconnect().

It is expected that the Protocol associated with a stream will change over its life. For example during connection setup, the initial protocol may be mitogen.parent.BootstrapProtocol that knows how to enter SSH and sudo passwords and transmit the mitogen.core source to the target, before handing off to MitogenProtocol when the target process is initialized.

Streams connecting to children are in turn aggregated by mitogen.parent.Connection, which contains additional logic for managing any child process, and a reference to any separate stderr Stream connected to that process.

accept(rfp, wfp)

Attach a pair of file objects to receive_side and transmit_side, after wrapping them in Side instances. Side will call set_nonblock() and set_cloexec() on the underlying file descriptors during construction.

The same file object may be used for both sides. The default on_disconnect() is handles the possibility that only one descriptor may need to be closed.

Parameters
  • rfp (file) – The file object to receive from.

  • wfp (file) – The file object to transmit to.

conn = None

In parents, the mitogen.parent.Connection instance.

name = 'default'

The stream name. This is used in the __repr__() output in any log messages, it may be any descriptive string.

on_disconnect(broker)

Invoked by Broker to force disconnect the stream during shutdown, invoked by the default on_shutdown() implementation, and usually invoked by any subclass on_receive() implementation in response to a 0-byte read.

The base implementation fires a disconnect event, then closes receive_side and transmit_side after unregistering the stream from the broker.

on_receive(broker)

Invoked by Broker when the stream’s receive_side has been marked readable using Broker.start_receive() and the broker has detected the associated file descriptor is ready for reading.

Subclasses must implement this if they are registered using Broker.start_receive(), and the method must invoke on_disconnect() if reading produces an empty string.

The default implementation reads Protocol.read_size bytes and passes the resulting bytestring to Protocol.on_receive(). If the bytestring is 0 bytes, invokes on_disconnect() instead.

on_shutdown(broker)

Invoked by Broker.shutdown() to allow the stream time to gracefully shutdown.

The default implementation emits a shutdown signal before invoking on_disconnect().

on_transmit(broker)

Invoked by Broker when the stream’s transmit_side has been marked writeable using Broker._start_transmit() and the broker has detected the associated file descriptor is ready for writing.

Subclasses must implement they are ever registerd with Broker._start_transmit().

The default implementation invokes Protocol.on_transmit().

protocol = None

A Protocol representing the protocol active on the stream.

receive_side = None

A Side representing the stream’s receive file descriptor.

set_protocol(protocol)

Bind a Protocol to this stream, by updating Protocol.stream to refer to this stream, and updating this stream’s Stream.protocol to the refer to the protocol. Any prior protocol’s Protocol.stream is set to None.

transmit_side = None

A Side representing the stream’s transmit file descriptor.

class mitogen.core.BufferedWriter(broker, protocol)

Implement buffered output while avoiding quadratic string operations. This is currently constructed by each protocol, in future it may become fixed for each stream instead.

on_transmit(broker)

Respond to stream writeability by retrying previously buffered write() calls.

write(s)

Transmit s immediately, falling back to enqueuing it and marking the stream writeable if no OS buffer space is available.

class mitogen.core.Side(stream, fp, cloexec=True, keep_alive=True, blocking=False)

Represent one side of a Stream. This allows unidirectional (e.g. pipe) and bidirectional (e.g. socket) streams to operate identically.

Sides are also responsible for tracking the open/closed state of the underlying FD, preventing erroneous duplicate calls to os.close() due to duplicate Stream.on_disconnect() calls, which would otherwise risk silently succeeding by closing an unrelated descriptor. For this reason, it is crucial only one file object exists per unique descriptor.

Parameters
  • stream (mitogen.core.Stream) – The stream this side is associated with.

  • fp (object) – The file or socket object managing the underlying file descriptor. Any object may be used that supports fileno() and close() methods.

  • cloexec (bool) – If True, the descriptor has its fcntl.FD_CLOEXEC flag enabled using fcntl.fcntl().

  • keep_alive (bool) – If True, the continued existence of this side will extend the shutdown grace period until it has been unregistered from the broker.

  • blocking (bool) – If False, the descriptor has its os.O_NONBLOCK flag enabled using fcntl.fcntl().

close()

Call file.close() on fp if it is not None, then set it to None.

fd = None

Integer file descriptor to perform IO on, or None if close() has been called. This is saved separately from the file object, since file.fileno() cannot be called on it after it has been closed.

keep_alive = None

If True, causes presence of this side in Broker’s active reader set to defer shutdown until the side is disconnected.

read(n=131072)

Read up to n bytes from the file descriptor, wrapping the underlying os.read() call with io_op() to trap common disconnection conditions.

read() always behaves as if it is reading from a regular UNIX file; socket, pipe, and TTY disconnection errors are masked and result in a 0-sized read like a regular file.

Returns

Bytes read, or the empty string to indicate disconnection was detected.

stream = None

The Stream for which this is a read or write side.

write(s)

Write as much of the bytes from s as possible to the file descriptor, wrapping the underlying os.write() call with io_op() to trap common disconnection conditions.

Returns

Number of bytes written, or None if disconnection was detected.

class mitogen.core.Protocol

Implement the program behaviour associated with activity on a Stream. The protocol in use may vary over a stream’s life, for example to allow mitogen.parent.BootstrapProtocol to initialize the connected child before handing it off to MitogenProtocol. A stream’s active protocol is tracked in the Stream.protocol attribute, and modified via Stream.set_protocol().

Protocols do not handle IO, they are entirely reliant on the interface provided by Stream and Side, allowing the underlying IO implementation to be replaced without modifying behavioural logic.

read_size = 131072

The size of the read buffer used by Stream when this is the active protocol for the stream.

stream = None

The Stream this protocol is currently bound to, or None.

stream_class

alias of Stream

class mitogen.parent.BootstrapProtocol(broker)

Respond to stdout of a child during bootstrap. Wait for EC0_MARKER to be written by the first stage to indicate it can receive the bootstrap, then await EC1_MARKER to indicate success, and MitogenProtocol can be enabled.

EC0_MARKER = b'MITO000'

Sentinel value emitted by the first stage to indicate it is ready to receive the compressed bootstrap. For mitogen.ssh this must have length of at least max(len(‘password’), len(‘debug1:’))

class mitogen.core.DelimitedProtocol

Provide a Protocol.on_receive() implementation for protocols that are delimited by a fixed string, like text based protocols. Each message is passed to on_line_received() as it arrives, with incomplete messages passed to on_partial_line_received().

When emulating user input it is often necessary to respond to incomplete lines, such as when a “Password: ” prompt is sent. on_partial_line_received() may be called repeatedly with an increasingly complete message. When a complete message is finally received, on_line_received() will be called once for it before the buffer is discarded.

If on_line_received() returns False, remaining data is passed unprocessed to the stream’s current protocol’s on_receive(). This allows switching from line-oriented to binary while the input buffer contains both kinds of data.

delimiter = b'\n'

The delimiter. Defaults to newline.

on_line_received(line)

Receive a line from the stream.

Parameters

line (bytes) – The encoded line, excluding the delimiter.

Returns

False to indicate this invocation modified the stream’s active protocol, and any remaining buffered data should be passed to the new protocol’s on_receive() method.

Any other return value is ignored.

on_partial_line_received(line)

Receive a trailing unterminated partial line from the stream.

Parameters

line (bytes) – The encoded partial line.

class mitogen.parent.LogProtocol(**kwargs)

For “hybrid TTY/socketpair” mode, after connection setup a spare TTY master FD exists that cannot be closed, and to which SSH or sudo may continue writing log messages.

The descriptor cannot be closed since the UNIX TTY layer sends SIGHUP to processes whose controlling TTY is the slave whose master side was closed. LogProtocol takes over this FD and creates log messages for anything written to it.

on_line_received(line)

Read a line, decode it as UTF-8, and log it.

class mitogen.core.IoLoggerProtocol(name)

Attached to one end of a socket pair whose other end overwrites one of the standard stdout or stderr file descriptors in a child context. Received data is split up into lines, decoded as UTF-8 and logged to the logging package as either the stdout or stderr logger.

Logging in child contexts is in turn forwarded to the master process using LogHandler.

classmethod build_stream(name, dest_fd)

Even though the file descriptor dest_fd will hold the opposite end of the socket open, we must keep a separate dup() of it (i.e. wsock) in case some code decides to overwrite dest_fd later, which would prevent break on_shutdown() from calling shutdown() on it.

on_line_received(line)

Decode the received line as UTF-8 and pass it to the logging framework.

on_shutdown(broker)

Shut down the write end of the socket, preventing any further writes to it by this process, or subprocess that inherited it. This allows any remaining kernel-buffered data to be drained during graceful shutdown without the buffer continuously refilling due to some out of control child process.

class mitogen.core.MitogenProtocol(router, remote_id, auth_id=None, local_id=None, parent_ids=None)

Protocol implementing mitogen’s stream protocol.

auth_id = None

If not None, Router stamps this into Message.auth_id of every message received on this stream.

egress_ids = None

Routing records the dst_id of every message arriving from this stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.

is_privileged = False

If not False, indicates the stream has auth_id set and its value is the same as mitogen.context_id or appears in mitogen.parent_ids.

on_message = None

Invoked as on_message(stream, msg) each message received from the peer.

on_receive(broker, buf)

Handle the next complete message on the stream. Raise StreamError on failure.

on_shutdown(broker)

Disable Protocol immediate disconnect behaviour.

on_transmit(broker)

Transmit buffered messages.

pending_bytes()

Return the number of bytes queued for transmission on this stream. This can be used to limit the amount of data buffered in RAM by an otherwise unlimited consumer.

For an accurate result, this method should be called from the Broker thread, for example by using Broker.defer_sync().

send(msg)

Send data to handle, and tell the broker we have output. May be called from any thread.

class mitogen.parent.MitogenProtocol(router, remote_id, auth_id=None, local_id=None, parent_ids=None)

Extend core.MitogenProtocol to cause SHUTDOWN to be sent to the child during graceful shutdown.

on_shutdown(broker)

Respond to the broker’s request for the stream to shut down by sending SHUTDOWN to the child.

class mitogen.core.Waker(broker)

Protocol implementing the UNIX self-pipe trick. Used to wake Broker when another thread needs to modify its state, by enqueing a function call to run on the Broker thread.

defer(func, *args, **kwargs)

Arrange for func() to execute on the broker thread. This function returns immediately without waiting the result of func(). Use defer_sync() to block until a result is available.

Raises

mitogen.core.Errordefer() was called after Broker has begun shutdown.

property keep_alive

Prevent immediate Broker shutdown while deferred functions remain.

on_receive(broker, buf)

Drain the pipe and fire callbacks. Since _deferred is synchronized, defer() and on_receive() can conspire to ensure only one byte needs to be pending regardless of queue length.

Connection & Options

class mitogen.fork.Options(old_router, max_message_size, on_fork=None, debug=False, profiling=False, unidirectional=False, on_start=None, name=None)
importer = None

Reference to the importer, if any, recovered from the parent.

on_fork = None

User-supplied function for cleaning up child process state.

class mitogen.fork.Connection(options, router)
class mitogen.parent.Options(max_message_size, name=None, remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, unidirectional=False, old_router=None)
connect_deadline = None

Derived from connect_timeout; absolute floating point UNIX timestamp after which the connection attempt should be abandoned.

connect_timeout = 30.0

Maximum time to wait for a connection attempt.

debug = False

True to cause context to write verbose /tmp/mitogen.<pid>.log.

max_message_size = None

Passed via Router wrapper methods, must eventually be passed to ExternalContext.main().

profiling = False

True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.

python_path = '/opt/build/repo/venv/bin/python'

The path to the remote Python interpreter.

remote_name = None

Remote name.

unidirectional = False

True if unidirectional routing is enabled in the new child.

class mitogen.parent.Connection(options, router)

Manage the lifetime of a set of Streams connecting to a remote Python interpreter, including bootstrap, disconnection, and external tool integration.

Base for streams capable of starting children.

child_is_immediate_subprocess = True

If True, indicates the child should not be killed during graceful detachment, as it the actual process implementing the child context. In all other cases, the subprocess is SSH, sudo, or a similar tool that should be reminded to quit during disconnection.

static create_child(args, merge_stdio=False, stderr_pipe=False, escalates_privilege=False, preexec_fn=None)

Function with the semantics of create_child() used to create the child process.

create_child_args = {}

Dictionary of extra kwargs passed to create_child.

detached = False

True if the remote has indicated that it intends to detach, and should not be killed on disconnect.

diag_protocol_class

The protocol attached to stderr of the child.

alias of LogProtocol

eof_error_hint = None

Extra text appended to EofError if that exception is raised on a failed connection attempt. May be used in subclasses to hint at common problems with a particular connection method.

exception = None

On failure, the exception object that should be propagated back to the user.

get_python_argv()

Return the initial argument vector elements necessary to invoke Python, by returning a 1-element list containing python_path if it is a string, or simply returning it if it is already a list.

This allows emulation of existing tools where the Python invocation may be set to e.g. [‘/usr/bin/env’, ‘python’].

name_prefix = 'local'

Prefix given to default names generated by connect().

on_stderr_disconnect()

Inverse of on_stdio_disconnect().

on_stdio_disconnect()

Handle stdio stream disconnection by failing the Connection if the stderr stream has already been closed. Otherwise, wait for it to close (or timeout), to allow buffered diagnostic logs to be consumed.

It is normal that when a subprocess aborts, stdio has nothing buffered when it is closed, thus signalling readability, causing an empty read (interpreted as indicating disconnection) on the next loop iteration, even if its stderr pipe has lots of diagnostic logs still buffered in the kernel. Therefore we must wait for both pipes to indicate they are empty before triggering connection failure.

options = None

Options

proc = None

Process

stderr_stream = None

If proc.stderr is set, referencing either a plain pipe or the controlling TTY, this references the corresponding LogProtocol’s stream, allowing it to be disconnected when this stream is disconnected.

stdio_stream = None

mitogen.core.Stream with sides connected to stdin/stdout.

stream_protocol_class

The protocol attached to stdio of the child.

alias of BootstrapProtocol

class mitogen.ssh.Options(hostname, username=None, ssh_path=None, port=None, check_host_keys='enforce', password=None, identity_file=None, compression=True, ssh_args=None, keepalive_enabled=True, keepalive_count=3, keepalive_interval=15, identities_only=True, ssh_debug_level=None, **kwargs)
python_path = 'python'

Default to whatever is available as ‘python’ on the remote machine, overriding sys.executable use.

ssh_debug_level = 0

Number of -v invocations to pass on command line.

ssh_path = 'ssh'

The path to the SSH binary.

class mitogen.ssh.Connection(options, router)
create_child(**kwargs)

Avoid PTY use when possible to avoid a scaling limitation.

diag_protocol_class

alias of SetupProtocol

class mitogen.sudo.Options(username=None, sudo_path=None, password=None, preserve_env=None, set_home=None, sudo_args=None, login=None, selinux_role=None, selinux_type=None, **kwargs)
class mitogen.sudo.Connection(options, router)
static create_child(args, escalates_privilege=False)

Like tty_create_child(), except attach stdin/stdout to a socketpair like create_child(), but leave stderr and the controlling TTY attached to a TTY.

This permits high throughput communication with programs that are reached via some program that requires a TTY for password input, like many configurations of sudo. The UNIX TTY layer tends to have tiny (no more than 14KiB) buffers, forcing many IO loop iterations when transferring bulk data, causing significant performance loss.

Parameters
  • escalates_privilege (bool) – If True, the target program may escalate privileges, causing SELinux to disconnect AF_UNIX sockets, so avoid those.

  • args (list) – Program argument vector.

Returns

Process instance.

diag_protocol_class

alias of SetupProtocol

Import Mechanism

class mitogen.core.Importer(router, context, core_src, whitelist=(), blacklist=())

Import protocol implementation that fetches modules from the parent process.

Parameters

context – Context to communicate via.

class mitogen.master.ModuleResponder(router)
add_source_override(fullname, path, source, is_pkg)

See ModuleFinder.add_source_override().

bad_load_module_count = None

Number of negative LOAD_MODULE messages sent.

get_module_count = None

Number of GET_MODULE messages received.

get_module_secs = None

Total time spent in uncached GET_MODULE.

good_load_module_count = None

Number of successful LOAD_MODULE messages sent.

good_load_module_size = None

Total bytes in successful LOAD_MODULE payloads.

minify_secs = None

Total time spent minifying modules.

neutralize_main(path, src)

Given the source for the __main__ module, try to find where it begins conditional execution based on a “if __name__ == ‘__main__’” guard, and remove any code after that point.

class mitogen.parent.ModuleForwarder(router, parent_context, importer)

Respond to mitogen.core.GET_MODULE requests in a child by forwarding the request to our parent context, or satisfying the request from our local Importer cache.

Module Finders

class mitogen.master.ModuleFinder

Given the name of a loaded module, make a best-effort attempt at finding related modules likely needed by a child context requesting the original module.

add_source_override(fullname, path, source, is_pkg)

Explicitly install a source cache entry, preventing usual lookup methods from being used.

Beware the value of path is critical when is_pkg is specified, since it directs where submodules are searched for.

Parameters
  • fullname (str) – Name of the module to override.

  • path (str) – Module’s path as it will appear in the cache.

  • source (bytes) – Module source code as a bytestring.

  • is_pkg (bool) – True if the module is a package.

Return a list of non-stdlib modules that are imported directly or indirectly by fullname, plus their parents.

This method is like find_related_imports(), but also recursively searches any modules which are imported by fullname.

Parameters

fullname (str) – Fully qualified name of an already imported module for which source code can be retrieved

Return a list of non-stdlib modules that are directly imported by fullname, plus their parents.

The list is determined by retrieving the source code of fullname, compiling it, and examining all IMPORT_NAME ops.

Parameters

fullname (str) – Fully qualified name of an already imported module for which source code can be retrieved

get_module_source(fullname)

Given the name of a loaded module fullname, attempt to find its source code.

Returns

Tuple of (module path, source text, is package?), or None if the source cannot be found.

resolve_relpath(fullname, level)

Given an ImportFrom AST node, guess the prefix that should be tacked on to an alias name to produce a canonical name. fullname is the name of the module in which the ImportFrom appears.

class mitogen.master.FinderMethod

Interface to a method for locating a Python module or package given its name according to the running Python interpreter. You’d think this was a simple task, right? Naive young fellow, welcome to the real world.

find(fullname)

Accept a canonical module name as would be found in sys.modules and return a (path, source, is_pkg) tuple, where:

  • path: Unicode string containing path to source file.

  • source: Bytestring containing source file’s content.

  • is_pkg: True if fullname is a package.

Returns

None if not found, or tuple as described above.

class mitogen.master.DefectivePython3xMainMethod

Recent versions of Python 3.x introduced an incomplete notion of importer specs, and in doing so created permanent asymmetry in the pkgutil interface handling for the __main__ module. Therefore we must handle __main__ specially.

find(fullname)

Find __main__ using its __file__ attribute.

class mitogen.master.PkgutilMethod

Attempt to fetch source code via pkgutil. In an ideal world, this would be the only required implementation of get_module().

find(fullname)

Find fullname using pkgutil.find_loader().

class mitogen.master.SysModulesMethod

Attempt to fetch source code via sys.modules. This was originally specifically to support __main__, but it may catch a few more cases.

find(fullname)

Find fullname using its __file__ attribute.

class mitogen.master.ParentEnumerationMethod

Attempt to fetch source code by examining the module’s (hopefully less insane) parent package, and if no insane parents exist, simply use sys.path to search for it from scratch on the filesystem using the normal Python lookup mechanism.

This is required for older versions of ansible.compat.six, plumbum.colors, Ansible 2.8 ansible.module_utils.distro and its submodule ansible.module_utils.distro._distro.

When some package dynamically replaces itself in sys.modules, but only conditionally according to some program logic, it is possible that children may attempt to load modules and subpackages from it that can no longer be resolved by examining a (corrupted) parent.

For cases like ansible.module_utils.distro, this must handle cases where a package transmuted itself into a totally unrelated module during import and vice versa, where sys.modules is replaced with junk that makes it impossible to discover the loaded module using the in-memory module object or any parent package’s __path__, since they have all been overwritten. Some men just want to watch the world burn.

find(fullname)

See implementation for a description of how this works.

Routing Management

class mitogen.parent.RouteMonitor(router, parent=None)

Generate and respond to mitogen.core.ADD_ROUTE and mitogen.core.DEL_ROUTE messages sent to the local context by maintaining a table of available routes, and propagating messages towards parents and siblings as appropriate.

RouteMonitor is responsible for generating routing messages for directly attached children. It learns of new children via notice_stream() called by Router, and subscribes to their disconnect event to learn when they disappear.

In children, constructing this class overwrites the stub mitogen.core.DEL_ROUTE handler installed by mitogen.core.ExternalContext, which is expected behaviour when a child is beging upgraded in preparation to become a parent of children of its own.

By virtue of only being active while responding to messages from a handler, RouteMonitor lives entirely on the broker thread, so its data requires no locking.

Parameters
get_routes(stream)

Return the set of context IDs reachable on a stream.

Parameters

stream (mitogen.core.Stream) –

Returns

set([int])

notice_stream(stream)

When this parent is responsible for a new directly connected child stream, we’re also responsible for broadcasting mitogen.core.DEL_ROUTE upstream when that child disconnects.

Timer Management

class mitogen.parent.TimerList

Efficiently manage a list of cancellable future events relative to wall clock time. An instance of this class is installed as mitogen.master.Broker.timers by default, and as mitogen.core.Broker.timers in children after a call to mitogen.parent.upgrade_router().

You can use TimerList to cause the broker to wake at arbitrary future moments, useful for implementing timeouts and polling in an asynchronous context.

TimerList methods can only be called from asynchronous context, for example via mitogen.core.Broker.defer().

The broker automatically adjusts its sleep delay according to the installed timer list, and arranges for timers to expire via automatic calls to expire(). The main user interface to TimerList is schedule().

expire()

Invoke callbacks for any events in the past.

get_timeout()

Return the floating point seconds until the next event is due.

Returns

Floating point delay, or 0.0, or None if no events are scheduled.

schedule(when, func)

Schedule a future event.

Parameters
  • when (float) – UNIX time in seconds when event should occur.

  • func (callable) – Callable to invoke on expiry.

Returns

A Timer instance, exposing Timer.cancel(), which may be used to cancel the future invocation.

class mitogen.parent.Timer(when, func)

Represents a future event.

active = True

Set to False if cancel() has been called, or immediately prior to being executed by TimerList.expire().

cancel()

Cancel this event. If it has not yet executed, it will not execute during any subsequent TimerList.expire() call.

Context ID Allocation

class mitogen.master.IdAllocator(router)

Allocate IDs for new contexts constructed locally, and blocks of IDs for children to allocate their own IDs using mitogen.parent.ChildIdAllocator without risk of conflict, and without necessitating network round-trips for each new context.

This class responds to mitogen.core.ALLOCATE_ID messages received from children by replying with fresh block ID allocations.

The master’s IdAllocator instance can be accessed via mitogen.master.Router.id_allocator.

BLOCK_SIZE = 1000

Block allocations are made in groups of 1000 by default.

allocate()

Allocate a context ID by directly incrementing an internal counter.

Returns

The new context ID.

allocate_block()

Allocate a block of IDs for use in a child context.

This function is safe to call from any thread.

Returns

Tuple of the form (id, end_id) where id is the first usable ID and end_id is the last usable ID.

class mitogen.parent.ChildIdAllocator(router)

Allocate new context IDs from a block of unique context IDs allocated by the master process.

allocate()

Allocate an ID, requesting a fresh block from the master if the existing block is exhausted.

Returns

The new context ID.

Warning

This method is not safe to call from the Broker thread, as it may block on IO of its own.

Child Implementation

class mitogen.core.ExternalContext(config)

External context implementation.

This class contains the main program implementation for new children. It is responsible for setting up everything about the process environment, import hooks, standard IO redirection, logging, configuring a Router and Broker, and finally arranging for Dispatcher to take over the main thread after initialization is complete.

broker

The mitogen.core.Broker instance.

context

The mitogen.core.Context instance.

channel

The mitogen.core.Channel over which CALL_FUNCTION requests are received.

importer

The mitogen.core.Importer instance.

stdout_log

The IoLogger connected to sys.stdout.

stderr_log

The IoLogger connected to sys.stderr.

class mitogen.core.Dispatcher(econtext)

Implementation of the CALL_FUNCTION handle for a child context. Listens on the child’s main thread for messages sent by mitogen.parent.CallChain and dispatches the function calls they describe.

If a mitogen.parent.CallChain sending a message is in pipelined mode, any exception that occurs is recorded, and causes all subsequent calls with the same chain_id to fail with the same exception.

Process Management

class mitogen.parent.Reaper(broker, proc, kill, wait_on_shutdown)

Asynchronous logic for reaping Process objects. This is necessary to prevent uncontrolled buildup of zombie processes in long-lived parents that will eventually reach an OS limit, preventing creation of new threads and processes, and to log the exit status of the child in the case of an error.

To avoid modifying process-global state such as with signal.set_wakeup_fd() or installing a signal.SIGCHLD handler that might interfere with the user’s ability to use those facilities, Reaper polls for exit with backoff using timers installed on an associated Broker.

Parameters
  • broker (mitogen.core.Broker) – The Broker on which to install timers

  • proc (mitogen.parent.Process) – The process to reap.

  • kill (bool) – If True, send SIGTERM and SIGKILL to the process.

  • wait_on_shutdown (bool) – If True, delay Broker shutdown if child has not yet exited. If False simply forget the child.

reap()

Reap the child process during disconnection.

class mitogen.parent.Process(pid, stdin, stdout, stderr=None)

Process objects provide a uniform interface to the subprocess and mitogen.fork. This class is extended by PopenProcess and mitogen.fork.Process.

Parameters
  • pid (int) – The process ID.

  • stdin (file) – File object attached to standard input.

  • stdout (file) – File object attached to standard output.

  • stderr (file) – File object attached to standard error, or None.

name = None

Name of the process used in logs. Set to the stream/context name by Connection.

pid = None

The process ID.

poll()

Fetch the child process exit status, or None if it is still running. This should be overridden by subclasses.

Returns

Exit status in the style of the subprocess.Popen.returncode attribute, i.e. with signals represented by a negative integer.

stderr = None

File object attached to standard error.

stdin = None

File object attached to standard input.

stdout = None

File object attached to standard output.

class mitogen.parent.PopenProcess(proc, stdin, stdout, stderr=None)

Process subclass wrapping a subprocess.Popen object.

Parameters

proc (subprocess.Popen) – The subprocess.

poll()

Fetch the child process exit status, or None if it is still running. This should be overridden by subclasses.

Returns

Exit status in the style of the subprocess.Popen.returncode attribute, i.e. with signals represented by a negative integer.

proc = None

The subprocess.

class mitogen.fork.Process(pid, stdin, stdout, stderr=None)
poll()

Fetch the child process exit status, or None if it is still running. This should be overridden by subclasses.

Returns

Exit status in the style of the subprocess.Popen.returncode attribute, i.e. with signals represented by a negative integer.

Helper Functions

Subprocess Functions

mitogen.parent.create_child(args, merge_stdio=False, stderr_pipe=False, escalates_privilege=False, preexec_fn=None)

Create a child process whose stdin/stdout is connected to a socket.

Parameters
  • args (list) – Program argument vector.

  • merge_stdio (bool) – If True, arrange for stderr to be connected to the stdout socketpair, rather than inherited from the parent process. This may be necessary to ensure that no TTY is connected to any stdio handle, for instance when using LXC.

  • stderr_pipe (bool) – If True and merge_stdio is False, arrange for stderr to be connected to a separate pipe, to allow any ongoing debug logs generated by e.g. SSH to be output as the session progresses, without interfering with stdout.

  • escalates_privilege (bool) – If True, the target program may escalate privileges, causing SELinux to disconnect AF_UNIX sockets, so avoid those.

  • preexec_fn (function) – If not None, a function to run within the post-fork child before executing the target program.

Returns

Process instance.

mitogen.parent.hybrid_tty_create_child(args, escalates_privilege=False)

Like tty_create_child(), except attach stdin/stdout to a socketpair like create_child(), but leave stderr and the controlling TTY attached to a TTY.

This permits high throughput communication with programs that are reached via some program that requires a TTY for password input, like many configurations of sudo. The UNIX TTY layer tends to have tiny (no more than 14KiB) buffers, forcing many IO loop iterations when transferring bulk data, causing significant performance loss.

Parameters
  • escalates_privilege (bool) – If True, the target program may escalate privileges, causing SELinux to disconnect AF_UNIX sockets, so avoid those.

  • args (list) – Program argument vector.

Returns

Process instance.

mitogen.parent.tty_create_child(args)

Return a file descriptor connected to the master end of a pseudo-terminal, whose slave end is connected to stdin/stdout/stderr of a new child process. The child is created such that the pseudo-terminal becomes its controlling TTY, ensuring access to /dev/tty returns a new file descriptor open on the slave end.

Parameters

args (list) – Program argument vector.

Returns

Process instance.

Helpers

mitogen.core.has_parent_authority(msg, _stream=None)

Policy function for use with Receiver and Router.add_handler() that requires incoming messages to originate from a parent context, or on a Stream whose auth_id has been set to that of a parent context or the current context.

mitogen.core.io_op(func, *args)

Wrap func(*args) that may raise select.error, IOError, or OSError, trapping UNIX error codes relating to disconnection and retry events in various subsystems:

  • When a signal is delivered to the process on Python 2, system call retry is signalled through errno.EINTR. The invocation is automatically restarted.

  • When performing IO against a TTY, disconnection of the remote end is signalled by errno.EIO.

  • When performing IO against a socket, disconnection of the remote end is signalled by errno.ECONNRESET.

  • When performing IO against a pipe, disconnection of the remote end is signalled by errno.EPIPE.

Returns

Tuple of (return_value, disconnect_reason), where return_value is the return value of func(*args), and disconnected is an exception instance when disconnection was detected, otherwise None.

mitogen.core.pipe()

Create a UNIX pipe pair using os.pipe(), wrapping the returned descriptors in Python file objects in order to manage their lifetime and ensure they are closed when their last reference is discarded and they have not been closed explicitly.

mitogen.core.set_block(fd)

Inverse of set_nonblock(), i.e. cause fd to block the thread when the underlying kernel buffer is exhausted.

mitogen.core.set_cloexec(fd)

Set the file descriptor fd to automatically close on os.execve(). This has no effect on file descriptors inherited across os.fork(), they must be explicitly closed through some other means, such as mitogen.fork.on_fork().

mitogen.core.set_nonblock(fd)

Set the file descriptor fd to non-blocking mode. For most underlying file types, this causes os.read() or os.write() to raise OSError with errno.EAGAIN rather than block the thread when the underlying kernel buffer is exhausted.

mitogen.core.to_text(o)

Coerce o to Unicode by decoding it from UTF-8 if it is an instance of bytes, otherwise pass it to the str constructor. The returned object is always a plain str, any subclass is removed.

mitogen.parent.create_socketpair(size=None)

Create a socket.socketpair() for use as a child’s UNIX stdio channels. As socketpairs are bidirectional, they are economical on file descriptor usage as one descriptor can be used for stdin and stdout. As they are sockets their buffers are tunable, allowing large buffers to improve file transfer throughput and reduce IO loop iterations.

mitogen.master.get_child_modules(path, fullname)

Return the suffixes of submodules directly neated beneath of the package directory at path.

Parameters
  • path (str) – Path to the module’s source code on disk, or some PEP-302-recognized equivalent. Usually this is the module’s __file__ attribute, but is specified explicitly to avoid loading the module.

  • fullname (str) – Name of the package we’re trying to get child modules for

Returns

List of submodule name suffixes.

mitogen.minify.minimize_source(source)

Remove comments and docstrings from Python source, preserving line numbers and syntax of empty blocks.

Parameters

source (str) – The source to minimize.

Returns str

The minimized source.

Signals

Mitogen contains a simplistic signal mechanism to decouple its components. When a signal is fired by an instance of a class, functions registered to receive it are called back.

Warning

As signals execute on the Broker thread, and without exception handling, they are generally unsafe for consumption by user code, as any bugs could trigger crashes and hangs for which the broker is unable to forward logs, or ensure the buggy context always shuts down on disconnect.

Functions

mitogen.core.listen(obj, name, func)

Arrange for func() to be invoked when signal name is fired on obj.

mitogen.core.unlisten(obj, name, func)

Remove func() from the list of functions invoked when signal name is fired by obj.

Raises

ValueErrorfunc() was not on the list.

mitogen.core.fire(obj, name, *args, **kwargs)

Arrange for func(*args, **kwargs) to be invoked for every function registered for signal name on obj.

List

These signals are used internally by Mitogen.

Class

Name

Description

mitogen.core.Stream

disconnect

Fired on the Broker thread when disconnection is detected.

mitogen.core.Stream

shutdown

Fired on the Broker thread when broker shutdown begins.

mitogen.core.Context

disconnect

Fired on the Broker thread during shutdown (???)

mitogen.parent.Process

exit

Fired when mitogen.parent.Reaper detects subprocess has fully exitted.

mitogen.core.Broker

shutdown

Fired after Broker.shutdown() is called, but before shutdown event fires. This can be used to trigger any behaviour that relies on the process remaining intact, as processing of shutdown races with any parent sending the child a signal because it is not shutting down in reasonable time.

mitogen.core.Broker

shutdown

Fired after Broker.shutdown() is called.

mitogen.core.Broker

exit

Fired immediately prior to the broker thread exit.