Pipeline Workflow¶
KM3Pipe is a lightweight framework which tries to give you a lose structure and workflow for data analysis. It has a simple, yet powerful module system which allows you to organise and reuse code.
The main structure is a Pipeline
which is meant to hold everything
together. The building blocks are simply called Modules and are either
basic Python functions or instances of the class Module
.
To setup a workflow, you first create a pipeline, attach the modules to it
and to fire up the analysis chain, you call .drain()
on your pipeline
and let the flow go.
The following script shows the module system of KM3Pipe.
There is a Pump
which is in this case a dummy data generator. The other
Modules do some modifications on the data and pass them through to the next
module in the pipeline.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | #!/usr/bin/env python
__author__ = 'tamasgal'
from km3pipe.core import Pipeline, Module, Pump
class DummyPump(Pump):
"""A pump demonstration with a dummy list as data."""
def configure(self):
self.data = [{'nr': 1}, {'nr': 2}]
self.blobs = self.blob_generator()
def process(self, blob):
return next(self.blobs)
def blob_generator(self):
"""Create a blob generator."""
for blob in self.data:
yield blob
class Foo(Module):
"""A dummy module with optional and required parameters"""
def configure(self):
self.foo = self.get('foo', default='default_foo') # optional
self.bar = self.get('bar', default=23) # optional
self.baz = self.require('baz') # required
self.i = 0
def process(self, blob):
print("This is the current blob: " + str(blob))
self.i += 1
blob['foo_entry'] = self.foo
return blob
def finish(self):
print("My process() method was called {} times.".format(self.i))
def moo(blob):
"""A simple function to attach"""
blob['moo_entry'] = 42
return blob
class PrintBlob(Module):
def process(self, blob):
print(blob)
return blob
pipe = Pipeline()
pipe.attach(DummyPump, 'the_pump')
pipe.attach(Foo, bar='dummybar', baz=69)
pipe.attach(moo)
pipe.attach(PrintBlob)
pipe.drain()
|
Which will print the following::
Pipeline and module initialisation took 0.000s (CPU 0.000s).
This is the current blob: {'nr': 1}
{'nr': 1, 'foo_entry': 'default_foo', 'moo_entry': 42}
This is the current blob: {'nr': 2}
{'nr': 2, 'foo_entry': 'default_foo', 'moo_entry': 42}
My process() method was called 2 times.
============================================================
2 cycles drained in 0.000553s (CPU 0.000525s). Memory peak: 154.08 MB
wall mean: 0.000058s medi: 0.000058s min: 0.000055s max: 0.000062s std: 0.000004s
CPU mean: 0.000059s medi: 0.000059s min: 0.000056s max: 0.000062s std: 0.000003s
Modules¶
A module is a configurable building block which can be attached to a pipeline.
It has a process()
method, which is called every time with the current
data (“blob”) in the pipeline cycle. This piece of data can be analysed,
manipulated and finally returned to allow the handover to the next module
in the pipeline system.
Instance variables can be initialised within the configure()
method.
User defined parameters are accessible via the get()
or required()
method. Both of them return the passed value or None
if not defined.
This allows an easy way to define default values as seen in the example below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | class Foo(Module):
"""A dummy module with optional and required parameters"""
def configure(self):
self.foo = self.get('foo', default='default_foo') # optional
self.bar = self.get('bar', default=23) # optional
self.baz = self.require('baz') # required
self.i = 0
def process(self, blob):
print("This is the current blob: " + str(blob))
self.i += 1
blob['foo_entry'] = self.foo
return blob
def finish(self):
print("My process() method was called {} times.".format(self.i))
|
To override the default parameters, the desired values can be set when
attaching the module to the pipeline. Always use the class itself, since
the attach()
method of the pipeline will care about the initialisation:
pipe.attach(Foo, bar='dummybar', baz=69)
Pumps / Sinks¶
The pump and sink are special types of Module
and are usually the
first and last ones to be attached to a pipeline. They are responsible
for reading and writing data to/from files, or streams from socket
connections.
Pump
and Sink
inherits from the Module
class. The
__init__()
method should be used to set up the file or socket
handler and the finish()
has to close them. The actual data is
passed via the process()
method. A data chunk is internally called
Blob
and usually represents an event.
To end the data pumping, the pump has to raise a StopIteration
exception. One elegant way to implement this in Python is using a
generator.
The following example shows a very basic pump, which simply initialises
a list of dictionaries and “io” one blob after another on each
process()
call to the next module in the pipeline.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | class DummyPump(Pump):
"""A pump demonstration with a dummy list as data."""
def configure(self):
self.data = [{'nr': 1}, {'nr': 2}]
self.blobs = self.blob_generator()
def process(self, blob):
return next(self.blobs)
def blob_generator(self):
"""Create a blob generator."""
for blob in self.data:
yield blob
|
Logging and Printing¶
Every module inheriting from the Module
class has a fancy logger and a
printer available to produce output which is unique (an actual colour code
is generated using a hash of the module name).
Inside any method of the module, use self.log
to access the logger, which
comes with the usual functions like self.log.debug()
, self.log.info()
,
self.log.warning()
, self.log.error()
or self.log.critical()
.
The self.print
function can be used to print messages which are colour
coded with the same colours used for the logger.
Configuring the Pipeline using Configuration Files¶
The pipeline and all the attached modules can be configured by a TOML formatted
file, sitting at the current working directory (where the initial script is
invoked to launch the pipeline). The default filename is pipeline.toml
but a different filename can be chosen when creating the Pipeline
instance
using Pipeline(configfile='your_desired_filename.toml')
.
Here is an example of the file:
1 2 3 4 5 6 7 8 9 10 | [StatusBar]
every = 1000
[AModule]
some_parameter = "foo"
other_parameter = 23
a_list = [1, 2, 3, 23]
[BModule]
bar = 42
|