Service Framework¶
Warning
This section is incomplete.
Mitogen includes a simple framework for implementing services exposed to other contexts, with some built-in subclasses to capture common designs. This is a work in progress, and new functionality will be added as common usage patterns emerge.
Overview¶
Service
User-supplied class with explicitly exposed methods.
May be auto-imported/constructed in a child from a parent simply by calling it
Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass) by default, but may use any naming scheme the configured activator understands.
Children receive refusals if the class is not already activated by a aprent
Has an associated Select instance which may be dynamically loaded with receivers over time, on_message_received() invoked if any receiver becomes ready.
Invoker
Abstracts mechanism for calling a service method and verifying permissions.
Built-in ‘service.Invoker’: concurrent execution of all methods on the thread pool.
Built-in ‘service.SerializedInvoker’: serialization of all calls on a single thread borrowed from the pool while any request is pending.
Built-in ‘service.DeduplicatingInvoker’: requests are aggregated by distinct (method, kwargs) key, only one such method ever executes, return value is cached and broadcast to all request waiters. Waiters do not block additional pool threads.
Activator
Abstracts mechanism for activating a service and verifying activation permission.
Built-in activator looks for service by fully.qualified.ClassName using Python import mechanism, and only permits parents to trigger activation.
Pool
Manages a fixed-size thread pool, mapping of service name to Invoker, and an aggregate Select over every activate service’s Selects.
Constructed automatically in children in response to the first CALL_SERVICE message sent to them by a parent.
Must be constructed manually in parent context.
Has close() and add() methods.
Example¶
import mitogen
import mitogen.service
class FileService(mitogen.service.Service):
"""
Simple file server, for demonstration purposes only! Use of this in
real code would be a security vulnerability as it would permit children
to read any file from the master's disk.
"""
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.arg_spec(spec={
'path': str
})
def read_file(self, path):
with open(path, 'rb') as fp:
return fp.read()
def download_file(source_context, path):
s = source_context.call_service(
service_name=FileService, # may also be string 'pkg.mod.FileService'
method_name='read_file',
path=path,
)
with open(path, 'w') as fp:
fp.write(s)
def download_some_files(source_context, paths):
for path in paths:
download_file(source_context, path)
@mitogen.main()
def main(router):
pool = mitogen.service.Pool(router, services=[
FileService(router),
])
remote = router.ssh(hostname='k3')
remote.call(download_some_files,
source_context=router.myself(),
paths=[
'/etc/passwd',
'/etc/hosts',
]
)
pool.stop()
Reference¶
-
class
mitogen.service.
Policy
¶ Base security policy.
-
class
mitogen.service.
AllowParents
¶
-
class
mitogen.service.
AllowAny
¶
-
mitogen.service.
arg_spec
(spec)¶ Annotate a method as requiring arguments with a specific type. This only validates required arguments. For optional arguments, write a manual check within the function.
@mitogen.service.arg_spec({ 'path': str }) def fetch_path(self, path, optional=None): ...
- Parameters
spec (dict) – Mapping from argument name to expected type.
-
mitogen.service.
expose
(policy)¶ Annotate a method to permit access to contexts matching an authorization policy. The annotation may be specified multiple times. Methods lacking any authorization policy are not accessible.
@mitogen.service.expose(policy=mitogen.service.AllowParents()) def unsafe_operation(self): ...
- Parameters
policy (mitogen.service.Policy) – The policy to require.
-
mitogen.service.
Service
(router)¶
-
class
mitogen.service.
Invoker
(service)¶
-
class
mitogen.service.
SerializedInvoker
(**kwargs)¶
-
class
mitogen.service.
DeduplicatingInvoker
(service)¶ A service that deduplicates and caches expensive responses. Requests are deduplicated according to a customizable key, and the single expensive response is broadcast to all requestors.
A side effect of this class is that processing of the single response is always serialized according to the result of
key_from_request()
.Only one pool thread is blocked during generation of the response, regardless of the number of requestors.
-
class
mitogen.service.
Service
(router) -
NO_REPLY
= <object object>¶ Sentinel object to suppress reply generation, since returning
None
will trigger a response message containing the pickledNone
.
-
on_message
(event)¶ Called when a message arrives on any of
select
’s registered receivers.- Parameters
event (mitogen.select.Event) –
-
on_shutdown
()¶ Called by Pool.shutdown() once the last worker thread has exitted.
-
-
class
mitogen.service.
Pool
(router, services=(), size=1, overwrite=False, recv=None)¶ Manage a pool of at least one thread that will be used to process messages for a collection of services.
Internally this is implemented by subscribing every
Service
’smitogen.core.Receiver
using a singlemitogen.select.Select
, then arranging for every thread to consume messages delivered to that select.In this way the threads are fairly shared by all available services, and no resources are dedicated to a single idle service.
There is no penalty for exposing large numbers of services; the list of exposed services could even be generated dynamically in response to your program’s configuration or its input data.
- Parameters
router (mitogen.core.Router) –
mitogen.core.Router
to listen formitogen.core.CALL_SERVICE
messages.services (list) – Initial list of services to register.
recv (mitogen.core.Receiver) –
mitogen.core.CALL_SERVICE
receiver to reuse. This is used byget_or_create_pool()
to hand off a queue of messages from the Dispatcher stub handler while avoiding a race.
-
activator_class
¶ alias of
Activator
-
defer
(func, *args, **kwargs)¶ Arrange for func(*args, **kwargs) to be invoked in the context of a service pool thread.
Built-in Services¶
-
class
mitogen.service.
FileService
(router)¶ Streaming file server, used to serve small and huge files alike. Paths must be registered by a trusted context before they will be served to a child.
Transfers are divided among the physical streams that connect external contexts, ensuring each stream never has excessive data buffered in RAM, while still maintaining enough to fully utilize available bandwidth. This is achieved by making an initial bandwidth assumption, enqueueing enough chunks to fill that assumed pipe, then responding to delivery acknowledgements from the receiver by scheduling new chunks.
Transfers proceed one-at-a-time per stream. When multiple contexts exist on a stream (e.g. one is the SSH account, another is a sudo account, and a third is a proxied SSH connection), each request is satisfied in turn before subsequent requests start flowing. This ensures when a stream is contended, priority is given to completing individual transfers rather than potentially aborting many partial transfers, causing the bandwidth to be wasted.
- Theory of operation:
Trusted context (i.e. WorkerProcess) calls register(), making a file available to any untrusted context.
Requestee context creates a mitogen.core.Receiver() to receive chunks, then calls fetch(path, recv.to_sender()), to set up the transfer.
fetch() replies to the call with the file’s metadata, then schedules an initial burst up to the window size limit (1MiB).
Chunks begin to arrive in the requestee, which calls acknowledge() for each 128KiB received.
The acknowledge() call arrives at FileService, which scheduled a new chunk to refill the drained window back to the size limit.
When the last chunk has been pumped for a single transfer, Sender.close() is called causing the receive loop in target.py::_get_file() to exit, allowing that code to compare the transferred size with the total file size from the metadata.
If the sizes mismatch, _get_file()’s caller is informed which will discard the result and log/raise an error.
- Shutdown:
process.py calls service.Pool.shutdown(), which arranges for the service pool threads to exit and be joined, guranteeing no new requests can arrive, before calling Service.on_shutdown() for each registered service.
FileService.on_shutdown() walks every in-progress transfer and calls Sender.close(), causing Receiver loops in the requestees to exit early. The size check fails and any partially downloaded file is discarded.
Control exits _get_file() in every target, and graceful shutdown can proceed normally, without the associated thread needing to be forcefully killed.
-
acknowledge
(size, msg)¶ Acknowledge bytes received by a transfer target, scheduling new chunks to keep the window full. This should be called for every chunk received by the target.
-
fetch
(path, sender, msg)¶ Start a transfer for a registered path.
- Parameters
path (str) – File path.
sender (mitogen.core.Sender) – Sender to receive file data.
- Returns
Dict containing the file metadata:
size
: File size in bytes.mode
: Integer file mode.owner
: Owner account name on host machine.group
: Owner group name on host machine.mtime
: Floating point modification time.ctime
: Floating point change time.
- Raises
Error – Unregistered path, or Sender did not match requestee context.
-
classmethod
get
(context, path, out_fp)¶ Streamily download a file from the connection multiplexer process in the controller.
- Parameters
context (mitogen.core.Context) – Reference to the context hosting the FileService that will be used to fetch the file.
path (bytes) – FileService registered name of the input file.
out_path (bytes) – Name of the output path on the local disk.
- Returns
Tuple of (ok, metadata), where ok is
True
on success, orFalse
if the transfer was interrupted and the output should be discarded.metadata is a dictionary of file metadata as documented in
fetch()
.
-
on_shutdown
()¶ Respond to shutdown by sending close() to every target, allowing their receive loop to exit and clean up gracefully.
-
register
(path)¶ Authorize a path for access by children. Repeat calls with the same path has no effect.
- Parameters
path (str) – File path.
-
register_prefix
(path)¶ Authorize a path and any subpaths for access by children. Repeat calls with the same path has no effect.
- Parameters
path (str) – File path.
-
window_size_bytes
= 1048576¶ Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5.
-
class
mitogen.service.
PushFileService
(**kwargs)¶ Push-based file service. Files are delivered and cached in RAM, sent recursively from parent to child. A child that requests a file via
get()
will block until it has been delivered by a parent.This service will eventually be merged into FileService.
-
get
(path)¶ Fetch a file from the cache.
-
propagate_paths_and_modules
(context, paths, overridden_sources=None, extra_sys_paths=None)¶ One size fits all method to ensure a target context has been preloaded with a set of small files and Python modules.
overridden_sources: optional dict containing source code to override path’s source code extra_sys_paths: loads additional sys paths for use in finding modules; beneficial
in situations like loading Ansible Collections because source code dependencies come from different file paths than where the source lives
-
propagate_to
(context, path, overridden_source=None)¶ If the optional parameter ‘overridden_source’ is passed, use that instead of the path’s code as source code. This works around some bugs of source modules such as relative imports on unsupported Python versions
-