Service Framework

Warning

This section is incomplete.

Mitogen includes a simple framework for implementing services exposed to other contexts, with some built-in subclasses to capture common designs. This is a work in progress, and new functionality will be added as common usage patterns emerge.

Overview

Service

  • User-supplied class with explicitly exposed methods.

  • May be auto-imported/constructed in a child from a parent simply by calling it

  • Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass) by default, but may use any naming scheme the configured activator understands.

  • Children receive refusals if the class is not already activated by a aprent

  • Has an associated Select instance which may be dynamically loaded with receivers over time, on_message_received() invoked if any receiver becomes ready.

Invoker

  • Abstracts mechanism for calling a service method and verifying permissions.

  • Built-in ‘service.Invoker’: concurrent execution of all methods on the thread pool.

  • Built-in ‘service.SerializedInvoker’: serialization of all calls on a single thread borrowed from the pool while any request is pending.

  • Built-in ‘service.DeduplicatingInvoker’: requests are aggregated by distinct (method, kwargs) key, only one such method ever executes, return value is cached and broadcast to all request waiters. Waiters do not block additional pool threads.

Activator

  • Abstracts mechanism for activating a service and verifying activation permission.

  • Built-in activator looks for service by fully.qualified.ClassName using Python import mechanism, and only permits parents to trigger activation.

Pool

  • Manages a fixed-size thread pool, mapping of service name to Invoker, and an aggregate Select over every activate service’s Selects.

  • Constructed automatically in children in response to the first CALL_SERVICE message sent to them by a parent.

  • Must be constructed manually in parent context.

  • Has close() and add() methods.

Example

import mitogen
import mitogen.service


class FileService(mitogen.service.Service):
    """
    Simple file server, for demonstration purposes only! Use of this in
    real code would be a security vulnerability as it would permit children
    to read any file from the master's disk.
    """

    @mitogen.service.expose(policy=mitogen.service.AllowAny())
    @mitogen.service.arg_spec(spec={
        'path': str
    })
    def read_file(self, path):
        with open(path, 'rb') as fp:
            return fp.read()


def download_file(source_context, path):
    s = source_context.call_service(
        service_name=FileService,  # may also be string 'pkg.mod.FileService'
        method_name='read_file',
        path=path,
    )

    with open(path, 'w') as fp:
        fp.write(s)


def download_some_files(source_context, paths):
    for path in paths:
        download_file(source_context, path)


@mitogen.main()
def main(router):
    pool = mitogen.service.Pool(router, services=[
        FileService(router),
    ])

    remote = router.ssh(hostname='k3')
    remote.call(download_some_files,
        source_context=router.myself(),
        paths=[
            '/etc/passwd',
            '/etc/hosts',
        ]
    )
    pool.stop()

Reference

class mitogen.service.Policy

Base security policy.

class mitogen.service.AllowParents
class mitogen.service.AllowAny
mitogen.service.arg_spec(spec)

Annotate a method as requiring arguments with a specific type. This only validates required arguments. For optional arguments, write a manual check within the function.

@mitogen.service.arg_spec({
    'path': str
})
def fetch_path(self, path, optional=None):
    ...
Parameters

spec (dict) – Mapping from argument name to expected type.

mitogen.service.expose(policy)

Annotate a method to permit access to contexts matching an authorization policy. The annotation may be specified multiple times. Methods lacking any authorization policy are not accessible.

@mitogen.service.expose(policy=mitogen.service.AllowParents())
def unsafe_operation(self):
    ...
Parameters

policy (mitogen.service.Policy) – The policy to require.

mitogen.service.Service(router)
class mitogen.service.Invoker(service)
class mitogen.service.SerializedInvoker(**kwargs)
class mitogen.service.DeduplicatingInvoker(service)

A service that deduplicates and caches expensive responses. Requests are deduplicated according to a customizable key, and the single expensive response is broadcast to all requestors.

A side effect of this class is that processing of the single response is always serialized according to the result of key_from_request().

Only one pool thread is blocked during generation of the response, regardless of the number of requestors.

class mitogen.service.Service(router)
NO_REPLY = <object object>

Sentinel object to suppress reply generation, since returning None will trigger a response message containing the pickled None.

on_message(event)

Called when a message arrives on any of select’s registered receivers.

Parameters

event (mitogen.select.Event) –

on_shutdown()

Called by Pool.shutdown() once the last worker thread has exitted.

class mitogen.service.Pool(router, services=(), size=1, overwrite=False, recv=None)

Manage a pool of at least one thread that will be used to process messages for a collection of services.

Internally this is implemented by subscribing every Service’s mitogen.core.Receiver using a single mitogen.select.Select, then arranging for every thread to consume messages delivered to that select.

In this way the threads are fairly shared by all available services, and no resources are dedicated to a single idle service.

There is no penalty for exposing large numbers of services; the list of exposed services could even be generated dynamically in response to your program’s configuration or its input data.

Parameters
  • router (mitogen.core.Router) – mitogen.core.Router to listen for mitogen.core.CALL_SERVICE messages.

  • services (list) – Initial list of services to register.

  • recv (mitogen.core.Receiver) – mitogen.core.CALL_SERVICE receiver to reuse. This is used by get_or_create_pool() to hand off a queue of messages from the Dispatcher stub handler while avoiding a race.

activator_class

alias of Activator

defer(func, *args, **kwargs)

Arrange for func(*args, **kwargs) to be invoked in the context of a service pool thread.

Built-in Services

class mitogen.service.FileService(router)

Streaming file server, used to serve small and huge files alike. Paths must be registered by a trusted context before they will be served to a child.

Transfers are divided among the physical streams that connect external contexts, ensuring each stream never has excessive data buffered in RAM, while still maintaining enough to fully utilize available bandwidth. This is achieved by making an initial bandwidth assumption, enqueueing enough chunks to fill that assumed pipe, then responding to delivery acknowledgements from the receiver by scheduling new chunks.

Transfers proceed one-at-a-time per stream. When multiple contexts exist on a stream (e.g. one is the SSH account, another is a sudo account, and a third is a proxied SSH connection), each request is satisfied in turn before subsequent requests start flowing. This ensures when a stream is contended, priority is given to completing individual transfers rather than potentially aborting many partial transfers, causing the bandwidth to be wasted.

Theory of operation:
  1. Trusted context (i.e. WorkerProcess) calls register(), making a file available to any untrusted context.

  2. Requestee context creates a mitogen.core.Receiver() to receive chunks, then calls fetch(path, recv.to_sender()), to set up the transfer.

  3. fetch() replies to the call with the file’s metadata, then schedules an initial burst up to the window size limit (1MiB).

  4. Chunks begin to arrive in the requestee, which calls acknowledge() for each 128KiB received.

  5. The acknowledge() call arrives at FileService, which scheduled a new chunk to refill the drained window back to the size limit.

  6. When the last chunk has been pumped for a single transfer, Sender.close() is called causing the receive loop in target.py::_get_file() to exit, allowing that code to compare the transferred size with the total file size from the metadata.

  7. If the sizes mismatch, _get_file()’s caller is informed which will discard the result and log/raise an error.

Shutdown:
  1. process.py calls service.Pool.shutdown(), which arranges for the service pool threads to exit and be joined, guranteeing no new requests can arrive, before calling Service.on_shutdown() for each registered service.

  2. FileService.on_shutdown() walks every in-progress transfer and calls Sender.close(), causing Receiver loops in the requestees to exit early. The size check fails and any partially downloaded file is discarded.

  3. Control exits _get_file() in every target, and graceful shutdown can proceed normally, without the associated thread needing to be forcefully killed.

acknowledge(size, msg)

Acknowledge bytes received by a transfer target, scheduling new chunks to keep the window full. This should be called for every chunk received by the target.

fetch(path, sender, msg)

Start a transfer for a registered path.

Parameters
Returns

Dict containing the file metadata:

  • size: File size in bytes.

  • mode: Integer file mode.

  • owner: Owner account name on host machine.

  • group: Owner group name on host machine.

  • mtime: Floating point modification time.

  • ctime: Floating point change time.

Raises

Error – Unregistered path, or Sender did not match requestee context.

classmethod get(context, path, out_fp)

Streamily download a file from the connection multiplexer process in the controller.

Parameters
  • context (mitogen.core.Context) – Reference to the context hosting the FileService that will be used to fetch the file.

  • path (bytes) – FileService registered name of the input file.

  • out_path (bytes) – Name of the output path on the local disk.

Returns

Tuple of (ok, metadata), where ok is True on success, or False if the transfer was interrupted and the output should be discarded.

metadata is a dictionary of file metadata as documented in fetch().

on_shutdown()

Respond to shutdown by sending close() to every target, allowing their receive loop to exit and clean up gracefully.

register(path)

Authorize a path for access by children. Repeat calls with the same path has no effect.

Parameters

path (str) – File path.

register_prefix(path)

Authorize a path and any subpaths for access by children. Repeat calls with the same path has no effect.

Parameters

path (str) – File path.

window_size_bytes = 1048576

Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5.

class mitogen.service.PushFileService(**kwargs)

Push-based file service. Files are delivered and cached in RAM, sent recursively from parent to child. A child that requests a file via get() will block until it has been delivered by a parent.

This service will eventually be merged into FileService.

get(path)

Fetch a file from the cache.

propagate_paths_and_modules(context, paths, overridden_sources=None, extra_sys_paths=None)

One size fits all method to ensure a target context has been preloaded with a set of small files and Python modules.

overridden_sources: optional dict containing source code to override path’s source code extra_sys_paths: loads additional sys paths for use in finding modules; beneficial

in situations like loading Ansible Collections because source code dependencies come from different file paths than where the source lives

propagate_to(context, path, overridden_source=None)

If the optional parameter ‘overridden_source’ is passed, use that instead of the path’s code as source code. This works around some bugs of source modules such as relative imports on unsupported Python versions