from concurrent import futures
from functools import partial
import logging
import typing as ty
import os
import sys
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import strax
export, __all__ = strax.exporter()
try:
import npshmex
SHMExecutor = npshmex.ProcessPoolExecutor
npshmex.register_array_wrapper(strax.Chunk, "data")
except ImportError:
# This is allowed to fail, it only crashes if allow_shm = True
SHMExecutor = None
[docs]
@export
class ProcessorComponents(ty.NamedTuple):
"""Specification to assemble a processor."""
plugins: ty.Dict[str, strax.Plugin]
loaders: ty.Dict[str, ty.Callable]
loader_plugins: ty.Dict[str, strax.Plugin] # Required for inline ParallelSource plugin.
savers: ty.Dict[str, ty.List[strax.Saver]]
targets: ty.Tuple[str]
class MailboxDict(dict):
def __init__(self, *args, lazy=False, **kwargs):
super().__init__(*args, **kwargs)
self.lazy = lazy
def __missing__(self, key):
res = self[key] = strax.Mailbox(name=key + "_mailbox", lazy=self.lazy)
return res
[docs]
@export
class ThreadedMailboxProcessor:
mailboxes: ty.Dict[str, strax.Mailbox]
def __init__(
self,
components: ProcessorComponents,
allow_rechunk=True,
allow_shm=False,
allow_multiprocess=False,
allow_lazy=True,
max_workers=None,
max_messages=4,
timeout=60,
is_superrun=False,
):
self.log = logging.getLogger(self.__class__.__name__)
self.components = components
self.log.debug("Processor components are: " + str(components))
if allow_multiprocess and os.name == "nt":
print("You're on Windows! Multiprocessing disabled, here be dragons.")
allow_multiprocess = False
if max_workers in [None, 1]:
# Disable the executors: work in one process.
# Each plugin works completely in its own thread.
self.process_executor = self.thread_executor = None
lazy = allow_lazy
else:
lazy = False
# Use executors for parallelization of computations.
self.thread_executor = futures.ThreadPoolExecutor(max_workers=max_workers)
mp_plugins = {d: p for d, p in components.plugins.items() if p.parallel == "process"}
if allow_multiprocess and len(mp_plugins):
_proc_ex = ProcessPoolExecutor
if allow_shm:
if SHMExecutor is None:
raise RuntimeError(
"You must install npshmex to enable shm transfer of numpy arrays."
)
_proc_ex = SHMExecutor
self.process_executor = _proc_ex(max_workers=max_workers)
# Combine as many plugins /savers as possible in one process
# TODO: more intelligent start determination, multiple starts
start_from = list(mp_plugins.keys())[
int(np.argmin([len(p.depends_on) for p in mp_plugins.values()]))
]
components = strax.ParallelSourcePlugin.inline_plugins(
components, start_from, log=self.log
)
self.components = components
self.log.debug("Altered components for multiprocessing: " + str(components))
else:
self.process_executor = self.thread_executor # type: ignore
# Figure which outputs
# - we should exclude from the flow control in lazy mode,
# because they are produced but not required.
# - we should discard (produced but neither required not saved)
produced = set(components.loaders)
required = set(components.targets)
# Do not just take keys from savers, perhaps some keys
# have no savers are under them (see #444)
saved = set([k for k, v in components.savers.items() if v])
for p in components.plugins.values():
produced.update(p.provides)
required.update(p.depends_on)
to_flow_freely = produced - required
to_discard = to_flow_freely - saved
self.log.debug(
f"to_flow_freely {to_flow_freely}"
f"to_discard {to_discard}"
f"produced {produced}"
f"required {required}"
f"saved {saved}"
)
self.mailboxes = MailboxDict(lazy=lazy)
for d, loader in components.loaders.items():
assert d not in components.plugins
# If paralellizing, use threads for loading
# the decompressor releases the gil, and we have a lot
# of data transfer to do
self.mailboxes[d].add_sender(loader(executor=self.thread_executor), name=f"load:{d}")
multi_output_seen: ty.List[strax.Plugin] = []
for d, p in components.plugins.items():
if p in multi_output_seen:
continue
if p.__class__ in [mp_seen.__class__ for mp_seen in multi_output_seen]:
raise ValueError(
"A multi-output plugin is registered with different "
"instances for its provided data_types!"
)
executor = None
if p.parallel == "process":
executor = self.process_executor
elif p.parallel:
executor = self.thread_executor # type: ignore
if p.multi_output:
multi_output_seen.append(p)
# Create temp mailbox that receives multi-output dicts
# and sends them forth to other mailboxes
mname = p.__class__.__name__ + "_divide_outputs"
self.mailboxes[mname].add_sender(
p.iter(
iters={dep: self.mailboxes[dep].subscribe() for dep in p.depends_on},
executor=executor,
),
name=f"divide_outputs:{d}",
)
# If we have a plugin with double dependency both outputs
# of a multioutput-plugin are required. Hence flow-freely
# is empty an needs to be updated here:
provided_data_types = set(p.provides)
reader_data_types = set(strax.to_str_tuple(d))
double_dependency = provided_data_types - reader_data_types
to_flow_freely |= double_dependency
self.log.debug(f"Updating flow freely for {mname} to be {to_flow_freely}")
self.mailboxes[mname].add_reader(
partial(
strax.divide_outputs,
lazy=lazy,
# make sure to subscribe the outputs of the mp_plugins
mailboxes={k: self.mailboxes[k] for k in p.provides},
flow_freely=to_flow_freely,
outputs=p.provides,
)
)
else:
self.mailboxes[d].add_sender(
p.iter(
iters={dep: self.mailboxes[dep].subscribe() for dep in p.depends_on},
executor=executor,
),
name=f"build:{d}",
)
dtypes_built = {d: p for p in components.plugins.values() for d in p.provides}
for d, savers in components.savers.items():
for s_i, saver in enumerate(savers):
if d in dtypes_built:
can_drive = not lazy
rechunk = dtypes_built[d].can_rechunk(d) and allow_rechunk
else:
# This is storage conversion mode
# TODO: Don't know how to get this info, for now,
# be conservative and don't rechunk
can_drive = True
rechunk = is_superrun and allow_rechunk
self.mailboxes[d].add_reader(
partial(
saver.save_from,
rechunk=rechunk,
# If paralellizing, use threads for saving
# the compressor releases the gil,
# and we have a lot of data transfer to do
executor=self.thread_executor,
),
can_drive=can_drive,
name=f"save_{s_i}:{d}",
)
# For multi-output plugins, an output may be neither saved nor
# required, and thus has to be discarded.
# This should happen rarely in production (when you actually
# care about the data, you will be saving it)
def discarder(source):
for _ in source:
pass
for d in to_discard:
self.mailboxes[d].add_reader(discarder, name=f"discard_{d}")
# Set to preferred number of maximum messages
# TODO: may not work if plugins are inlined??
for d, m in self.mailboxes.items():
m.max_messages = max_messages
m.timeout = timeout
if d in components.plugins:
max_m = components.plugins[d].max_messages
if max_m is not None:
m.max_messages = max_m
# Remove defaultdict-like behaviour; all mailboxes should
# have been made by now. See #444
self.mailboxes = dict(self.mailboxes)
self.log.debug(
f"Created the following mailboxes: {self.mailboxes} with the "
f"following threads: {[(d, m._threads) for d, m in self.mailboxes.items()]}"
)
[docs]
def iter(self):
target = self.components.targets[0]
final_generator = self.mailboxes[target].subscribe()
self.log.debug("Starting threads")
for m in self.mailboxes.values():
self.log.debug(f"start {m}")
m.start()
self.log.debug(f"Yielding {target}")
traceback, exc, reason = None, None, None
try:
yield from final_generator
# GeneratorExit results from exception in caller
# (on garbage collection, .close() is called, see PEP342)
except (Exception, GeneratorExit) as e:
self.log.fatal(f"Target Mailbox ({target}) killed, exception {type(e)}, message {e}")
if isinstance(e, strax.MailboxKilled):
_, exc, traceback = reason = e.args[0]
else:
exc = e
reason = (e.__class__, e, sys.exc_info()[2])
traceback = reason[2]
# We will reraise it in just a moment...
if exc is not None:
if isinstance(exc, GeneratorExit):
print("Main generator exited irregularly?!")
reason[2] = (
"Hm, interesting. Most likely an exception was thrown "
"outside strax, but we did not handle it properly."
)
# Kill the mailboxes
for m in self.mailboxes.values():
if m != target:
self.log.debug(f"Killing {m}")
m.kill(upstream=True, reason=reason)
self.log.debug("Closing threads")
for m in self.mailboxes.values():
m.cleanup()
self.log.debug("Closing threads completed")
self.log.debug("Closing executors")
if self.thread_executor is not None:
self.thread_executor.shutdown(wait=True)
if self.process_executor not in [None, self.thread_executor]:
self.process_executor.shutdown(wait=True)
self.log.debug("Closing executors completed")
if exc is not None:
# Reraise exception. This is outside the except block
# to avoid the 'during handling of this exception, another
# exception occurred' stuff from confusing the traceback
# which is printed for the user
self.log.debug("Reraising exception")
raise exc.with_traceback(traceback)
# Check the savers for any exception that occurred during saving
# These are thrown back to the mailbox, but if that has already closed
# it doesn't trigger a crash...
# TODO: add savers inlined by parallelsourceplugin
# TODO: need to look at plugins too if we ever implement true
# multi-target mode
for k, saver_list in self.components.savers.items():
for s in saver_list:
if s.got_exception:
self.log.fatal(f"Caught error while saving {k}!")
raise s.got_exception
self.log.debug("Processing finished")