Additional Examples
Suite Example
In this section we construct a component of a test suite which will obtain testing data from MARS, perform some “test” on it, and then clean up after itself. This demonstrates a number of characteristics of object-oriented suite design:
Functionality that is configurable on a data description.
Functionality that is encapsulated in re-usable subcomponents
Delegation or inheritance to fine-tune behaviour within an existing framework
Firstly we create a helper class that can understand MARS requests, and output them in a useful format.
[2]:
class MarsRequest:
separator = ",\n "
def __init__(self, verb, request_dict):
self._verb = verb
self._request_dict = request_dict
def __str__(self):
return (
self._verb +
self.separator +
self.separator.join("{}={}".format(k, self._resolve(v)) for k, v in self._request_dict.items())
)
@staticmethod
def _resolve(v):
'''Convert values into something understood by MARS'''
if isinstance(v, bool):
return "on" if v else "off"
if isinstance(v, list):
return '/'.join(MarsRequest._resolve(vv) for vv in v)
if isinstance(v, str) and ('/' in v or '$' in v):
return '"{}"'.format(v)
return str(v)
These requests are useful in the context of a MarsTask. This makes use of the MarsRequest object defined above to do something in the current working directory. It also creates a label for monitoring in ecflow and a timers file for diagnostics according to the environment variables understood by MARS.
[3]:
mars_task_script = """
req=$(mktemp req.XXXX)
cat > $req <<@
{{ REQUEST }}
@
mars $req
rm $req
"""
class MarsTask(pf.Task):
verb = None
def __init__(self, request_dict, **kwargs):
# Construct a MarsRequest object from the dictionary supplied
assert self.verb is not None
request = MarsRequest(self.verb, request_dict)
name = kwargs.get('name', "{}_data".format(self.verb))
super().__init__(name,
labels={'info': ''},
script=pf.TemplateScript(mars_task_script, REQUEST=request),
**kwargs)
self.script.define_environment_variable('MARS_ECFLOW_LABEL', self.info)
self.script.define_environment_variable('MARS_TIMERS_FILE', "{}.timers".format(name))
class ArchiveTask(MarsTask):
verb = 'archive'
class RetrieveTask(MarsTask):
verb = 'retrieve'
There are two major object-oriented approaches to making encapsulated: inheritance and delegation.
Suite Objects using Inheritance
In this first example we are going to choose to use inheritance, although this is a fairly arbitrary choice. Which is desirable depends very much on context. We are also going to avoid using the ArchiveTask defined above just to avoid having to put lots of safety-related code into these examples.
We wish to define a standard test pattern. This will:
Create a temporary (scratch) directory within the scratch space configured for the given host
Retrieve testing data, which is specified by the derived class
Run a test, which is defined by the derived class
Clean up after ourselves
[4]:
class Cleanup(pf.Task):
def __init__(self, path, name='cleanup', **kwargs):
assert path != "/"
super().__init__(name, script='rm -rf "{}"'.format(path), **kwargs)
class TestBase(pf.AnchorFamily):
"""This class is an interface"""
def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)
# Generate a unique working directory
self._workdir = os.path.join(self.host.scratch_directory,
self.suite.name, self.fullname.replace('/', '_'))
# Ensure that the data gets put somewhere
self._data_filename = 'retrieved.grib'
request = self.request_dict().copy()
request['target'] = self._data_filename
with self:
(
RetrieveTask(request, workdir=self._workdir)
>>
self.build_test()
>>
Cleanup(self._workdir)
)
def request_dict(self):
raise NotImplementedError("abstract base property")
def build_test(self):
raise NotImplementedError("abstract base method")
Classes should be derived from this abstract base test class, implementing the request_dict property and build_test methods. These derived classes can be further derived, or set up according to configuration passed in from outside.
[5]:
class GribLsTest(TestBase):
def __init__(self, date, param, **kwargs):
self._date = date
self._param = param
name = kwargs.pop('name', 'grib_ls')
super().__init__(name, **kwargs)
def request_dict(self):
return {
'class': 'od',
'expver': '0001',
'stream': 'oper',
'date': self._date,
'time': [0, 12],
'step': 0,
'type': 'an',
'levtype': 'ml',
'levelist': 1,
'param': self._param,
}
def build_test(self):
return pf.Task('grib_ls', workdir=self._workdir, script='grib_ls -m {}'.format(self._data_filename))
class LsTest(TestBase):
def __init__(self, **kwargs):
super().__init__('ls', **kwargs)
def request_dict(self):
return {
'class': 'od',
'expver': '0001',
'stream': 'oper',
'date': -1,
'time': [0, 12],
'step': 0,
'type': 'an',
'levtype': 'ml',
'levelist': 1,
'param': 't',
}
def build_test(self):
with pf.Family('test_family') as f:
pf.Task('ls', workdir=self._workdir, script='ls -l {}'.format(self._data_filename))
return f
These tests can be combined inside a suite.
[6]:
with CourseSuite('inheritance_example') as s:
with pf.Family('tests'):
(
GribLsTest(datetime.date.today() - datetime.timedelta(days=2), 't')
>>
GribLsTest(datetime.date.today() - datetime.timedelta(days=1), 'z', name='grib_ls_2')
>>
LsTest()
)
s
[6]:
suite inheritance_example defstatus suspended edit ECF_FILES '/path/to/scratch/files/inheritance_example' edit ECF_HOME '/path/to/scratch/out' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" family tests family grib_ls edit ECF_FILES '/path/to/scratch/files/inheritance_example/tests/grib_ls' edit ECF_INCLUDE '/path/to/scratch/files/inheritance_example/tests/grib_ls' task retrieve_data label info "" task grib_ls trigger retrieve_data eq complete task cleanup trigger grib_ls eq complete endfamily family grib_ls_2 trigger grib_ls eq complete edit ECF_FILES '/path/to/scratch/files/inheritance_example/tests/grib_ls_2' edit ECF_INCLUDE '/path/to/scratch/files/inheritance_example/tests/grib_ls_2' task retrieve_data label info "" task grib_ls trigger retrieve_data eq complete task cleanup trigger grib_ls eq complete endfamily family ls trigger grib_ls_2 eq complete edit ECF_FILES '/path/to/scratch/files/inheritance_example/tests/ls' edit ECF_INCLUDE '/path/to/scratch/files/inheritance_example/tests/ls' task retrieve_data label info "" family test_family trigger retrieve_data eq complete task ls endfamily task cleanup trigger test_family eq complete endfamily endfamily endsuite
Suite Objects using Delegation
Alternatively, we can take the approach of delegation such that decisions about the data request to use and the test to construct are delegated to a configuration object that is injected from the controlling scope. If we do this then the resultant Test class is now a concrete class (and we no longer need to derive from it), changing the structure of the suite somewhat.
In this case, we build our Test class to delegate the construction to a config object whose type is unknown.
[7]:
class DelegatingTest(pf.AnchorFamily):
def __init__(self, config, **kwargs):
name = config.name
super().__init__(name, **kwargs)
# Generate a unique working directory
workdir = os.path.join(self.host.scratch_directory,
self.suite.name, self.fullname.replace('/', '_'))
# Ensure that the data gets put somewhere
data_filename = 'retrieved.grib'
request = config.request_dict.copy()
request['target'] = data_filename
with self:
(
RetrieveTask(request, workdir=workdir)
>>
config.build_test(workdir, data_filename)
>>
Cleanup(workdir)
)
We can now create config classes that provide this functionality. They do not have to be built in the same way, or related to each other in any way other than that they provide the given functionality.
[8]:
class LsConfig:
name = 'ls'
request_dict = {
'class': 'od',
'expver': '0001',
'stream': 'oper',
'date': -1,
'time': [0, 12],
'step': 0,
'type': 'an',
'levtype': 'ml',
'levelist': 1,
'param': 't',
}
@staticmethod
def build_test(workdir, data_filename):
with pf.Family('test_family') as f:
return pf.Task('ls', workdir=workdir, script='ls -l {}'.format(data_filename))
class GribLsConfig:
def __init__(self, date, param, name='grib_ls'):
self.name = name
self._date = date
self._param = param
@property
def request_dict(self):
return {
'class': 'od',
'expver': '0001',
'stream': 'oper',
'date': self._date,
'time': [0, 12],
'step': 0,
'type': 'an',
'levtype': 'ml',
'levelist': 1,
'param': self._param,
}
def build_test(self, workdir, data_filename):
return pf.Task('grib_ls', workdir=workdir, script='grib_ls -m {}'.format(data_filename))
We can then construct a combined configuration object.
[9]:
class CombinedConfig:
def __init__(self):
self.tests = [
GribLsConfig(datetime.date.today() - datetime.timedelta(days=2), 't'),
GribLsConfig(datetime.date.today() - datetime.timedelta(days=1), 'z', name='grib_ls_2'),
LsConfig # n.b. here we just used a raw class.
]
And we then configure the suite with the config object.
[10]:
class DelegatedSuite(CourseSuite):
def __init__(self, config):
super().__init__('delegated_example')
with self:
pf.sequence(DelegatingTest(test_cfg) for test_cfg in config.tests)
s = DelegatedSuite(CombinedConfig())
s
[10]:
suite delegated_example defstatus suspended edit ECF_FILES '/path/to/scratch/files/delegated_example' edit ECF_HOME '/path/to/scratch/out' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" family grib_ls edit ECF_FILES '/path/to/scratch/files/delegated_example/grib_ls' edit ECF_INCLUDE '/path/to/scratch/files/delegated_example/grib_ls' task retrieve_data label info "" task grib_ls trigger retrieve_data eq complete task cleanup trigger grib_ls eq complete endfamily family grib_ls_2 trigger grib_ls eq complete edit ECF_FILES '/path/to/scratch/files/delegated_example/grib_ls_2' edit ECF_INCLUDE '/path/to/scratch/files/delegated_example/grib_ls_2' task retrieve_data label info "" task grib_ls trigger retrieve_data eq complete task cleanup trigger grib_ls eq complete endfamily family ls trigger grib_ls_2 eq complete edit ECF_FILES '/path/to/scratch/files/delegated_example/ls' edit ECF_INCLUDE '/path/to/scratch/files/delegated_example/ls' task retrieve_data label info "" family test_family task ls trigger ../retrieve_data eq complete endfamily task cleanup trigger test_family/ls eq complete endfamily endsuite
Conditional Suite Structure
One of the goals of building an Object-Oriented suite is avoiding tangled, procedural complexity in constructing suites. Making a suite configurable, and multi-purpose requires conditionality in how the suite is constructed.
The most obvious way to do this is to put conditional expressions, namely if statements, into the suite structure. This works, but leads to a long-term increase in the complexity of the suite. But worse, it puts the configuration- and system-dependent logic about how a suite should be built into the structure of the suite rather than with the configuration where it belongs.
This example shows delegation of conditional behaviour to a configuration, such that the configuration can use arbitrary logic and complexity (in this case just a lookup) to determine which subsections of a suite get built.
[11]:
class Config:
def __init__(self, **tests):
# Default tests that should be built. Otherwise assume not
self.enabled_tests = {
'test3': True
}
self.enabled_tests.update(tests)
def build_test(self, cls, name, *args, **kwargs):
if self.enabled_tests.get(name, False):
return cls(name, *args, **kwargs)
class ATest(pf.Task):
def __init__(self, name, val):
super().__init__(name, script="echo test={} : val={}".format(name, val))
class TestingSuite(CourseSuite):
def __init__(self, name, config, **kwargs):
super().__init__(name, **kwargs)
with self:
config.build_test(ATest, 'test1', 1234)
config.build_test(ATest, 'test2', 4321)
config.build_test(ATest, 'test3', 6666)
config.build_test(ATest, 'test4', 7777)
TestingSuite('default_tests', Config())
[11]:
suite default_tests defstatus suspended edit ECF_FILES '/path/to/scratch/files/default_tests' edit ECF_HOME '/path/to/scratch/out' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" task test3 endsuite
[12]:
TestingSuite('add_test4', Config(test4=True))
[12]:
suite add_test4 defstatus suspended edit ECF_FILES '/path/to/scratch/files/add_test4' edit ECF_HOME '/path/to/scratch/out' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" task test3 task test4 endsuite
[13]:
TestingSuite('override_default_test', Config(test1=True, test2=True, test3=False))
[13]:
suite override_default_test defstatus suspended edit ECF_FILES '/path/to/scratch/files/override_default_test' edit ECF_HOME '/path/to/scratch/out' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" task test1 task test2 endsuite
Structural Delegation
This first example demonstrates delegating a structural decision to a configuration object. We wish to loop over two different axes - one an integer axis, and the other a string based one. The configuration objects decide how this should be done, and the order of the looping.
Further configuration objects can be derived from Config1 and Config2 to update the values, while leaving the structures the same.
Once the suite has delegated construction of the looping structure to the config, the construction of the tasks within the looping structure can be continued in the normal way.
[14]:
class ConfigBase:
suite_name = None
min_integer = 1
max_integer = 5
strings = ['a', 'b', 'c', 'd', 'e']
def build_nested_loops(self, **kwargs):
raise NotImplementedError
class Config1(ConfigBase):
suite_name = 'config_string_integer'
def build_nested_loops(self, **kwargs):
with pf.Family('string_looper'):
pf.RepeatEnumerated('REPEAT_STRING', self.strings)
with pf.Family('integer_looper', **kwargs) as inner:
pf.RepeatInteger('REPEAT_INTEGER', self.min_integer, self.max_integer)
return inner
class Config2(ConfigBase):
suite_name = 'config_integer_string'
def build_nested_loops(self, **kwargs):
with pf.Family('integer_looper'):
pf.RepeatInteger('REPEAT_INTEGER', self.min_integer, self.max_integer)
with pf.Family('string_looper', **kwargs) as inner:
pf.RepeatEnumerated('REPEAT_STRING', self.strings)
return inner
class NestedLoopingSuite(CourseSuite):
def __init__(self, config):
super().__init__(config.suite_name)
with self:
with config.build_nested_loops(labels={'info': ''}) as f:
(
LabelSetter((f.info, '$REPEAT_INTEGER : $REPEAT_STRING'))
>>
WaitSeconds(2)
)
s1 = NestedLoopingSuite(Config1())
s2 = NestedLoopingSuite(Config2())
s1
[14]:
suite config_string_integer defstatus suspended edit ECF_FILES '/path/to/scratch/files/config_string_integer' edit ECF_HOME '/path/to/scratch/out' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" family string_looper repeat enumerated REPEAT_STRING "a" "b" "c" "d" "e" family integer_looper repeat integer REPEAT_INTEGER 1 5 label info "" task set_labels task wait_2 trigger set_labels eq complete endfamily endfamily endsuite
[15]:
s1.deploy_suite(pf.Notebook)
[15]:
File: /path/to/scratch/files/config_string_integer/set_labels.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task export REPEAT_INTEGER="%REPEAT_INTEGER%" export REPEAT_STRING="%REPEAT_STRING%" echo "Current working directory: $(pwd)" %nopp ecflow_client --alter=change label info "$REPEAT_INTEGER : $REPEAT_STRING" /config_string_integer/string_looper/integer_looper %end
File: /path/to/scratch/files/config_string_integer/wait_2.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task echo "Current working directory: $(pwd)" %nopp sleep 2 %end
[16]:
s2
[16]:
suite config_integer_string defstatus suspended edit ECF_FILES '/path/to/scratch/files/config_integer_string' edit ECF_HOME '/path/to/scratch/out' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" family integer_looper repeat integer REPEAT_INTEGER 1 5 family string_looper repeat enumerated REPEAT_STRING "a" "b" "c" "d" "e" label info "" task set_labels task wait_2 trigger set_labels eq complete endfamily endfamily endsuite
[17]:
s2.deploy_suite(pf.Notebook)
[17]:
File: /path/to/scratch/files/config_integer_string/set_labels.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task export REPEAT_INTEGER="%REPEAT_INTEGER%" export REPEAT_STRING="%REPEAT_STRING%" echo "Current working directory: $(pwd)" %nopp ecflow_client --alter=change label info "$REPEAT_INTEGER : $REPEAT_STRING" /config_integer_string/integer_looper/string_looper %end
File: /path/to/scratch/files/config_integer_string/wait_2.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task echo "Current working directory: $(pwd)" %nopp sleep 2 %end