Additional Examples

Suite Example

In this section we construct a component of a test suite which will obtain testing data from MARS, perform some “test” on it, and then clean up after itself. This demonstrates a number of characteristics of object-oriented suite design:

  1. Functionality that is configurable on a data description.

  2. Functionality that is encapsulated in re-usable subcomponents

  3. Delegation or inheritance to fine-tune behaviour within an existing framework

Firstly we create a helper class that can understand MARS requests, and output them in a useful format.

[2]:
class MarsRequest:

    separator = ",\n    "

    def __init__(self, verb, request_dict):
        self._verb = verb
        self._request_dict = request_dict

    def __str__(self):
        return (
            self._verb +
            self.separator +
            self.separator.join("{}={}".format(k, self._resolve(v)) for k, v in self._request_dict.items())
        )

    @staticmethod
    def _resolve(v):
        '''Convert values into something understood by MARS'''
        if isinstance(v, bool):
            return "on" if v else "off"
        if isinstance(v, list):
            return '/'.join(MarsRequest._resolve(vv) for vv in v)
        if isinstance(v, str) and ('/' in v or '$' in v):
            return '"{}"'.format(v)
        return str(v)

These requests are useful in the context of a MarsTask. This makes use of the MarsRequest object defined above to do something in the current working directory. It also creates a label for monitoring in ecflow and a timers file for diagnostics according to the environment variables understood by MARS.

[3]:
mars_task_script = """
req=$(mktemp req.XXXX)
cat > $req <<@
{{ REQUEST }}
@
mars $req
rm $req
"""


class MarsTask(pf.Task):

    verb = None

    def __init__(self, request_dict, **kwargs):

        # Construct a MarsRequest object from the dictionary supplied
        assert self.verb is not None
        request = MarsRequest(self.verb, request_dict)

        name = kwargs.get('name', "{}_data".format(self.verb))

        super().__init__(name,
                         labels={'info': ''},
                         script=pf.TemplateScript(mars_task_script, REQUEST=request),
                         **kwargs)

        self.script.define_environment_variable('MARS_ECFLOW_LABEL', self.info)
        self.script.define_environment_variable('MARS_TIMERS_FILE', "{}.timers".format(name))


class ArchiveTask(MarsTask):
    verb = 'archive'


class RetrieveTask(MarsTask):
    verb = 'retrieve'

There are two major object-oriented approaches to making encapsulated: inheritance and delegation.

Suite Objects using Inheritance

In this first example we are going to choose to use inheritance, although this is a fairly arbitrary choice. Which is desirable depends very much on context. We are also going to avoid using the ArchiveTask defined above just to avoid having to put lots of safety-related code into these examples.

We wish to define a standard test pattern. This will:

  1. Create a temporary (scratch) directory within the scratch space configured for the given host

  2. Retrieve testing data, which is specified by the derived class

  3. Run a test, which is defined by the derived class

  4. Clean up after ourselves

[4]:
class Cleanup(pf.Task):
    def __init__(self, path, name='cleanup', **kwargs):
        assert path != "/"
        super().__init__(name, script='rm -rf "{}"'.format(path), **kwargs)


class TestBase(pf.AnchorFamily):

    """This class is an interface"""

    def __init__(self, name, **kwargs):
        super().__init__(name, **kwargs)

        # Generate a unique working directory
        self._workdir = os.path.join(self.host.scratch_directory,
                                     self.suite.name, self.fullname.replace('/', '_'))

        # Ensure that the data gets put somewhere
        self._data_filename = 'retrieved.grib'
        request = self.request_dict().copy()
        request['target'] = self._data_filename

        with self:
            (
                RetrieveTask(request, workdir=self._workdir)
                >>
                self.build_test()
                >>
                Cleanup(self._workdir)
            )

    def request_dict(self):
        raise NotImplementedError("abstract base property")

    def build_test(self):
        raise NotImplementedError("abstract base method")

Classes should be derived from this abstract base test class, implementing the request_dict property and build_test methods. These derived classes can be further derived, or set up according to configuration passed in from outside.

[5]:
class GribLsTest(TestBase):
    def __init__(self, date, param, **kwargs):
        self._date = date
        self._param = param
        name = kwargs.pop('name', 'grib_ls')
        super().__init__(name, **kwargs)

    def request_dict(self):
        return {
            'class': 'od',
            'expver': '0001',
            'stream': 'oper',
            'date': self._date,
            'time': [0, 12],
            'step': 0,
            'type': 'an',
            'levtype': 'ml',
            'levelist': 1,
            'param': self._param,
        }

    def build_test(self):
        return pf.Task('grib_ls', workdir=self._workdir, script='grib_ls -m {}'.format(self._data_filename))


class LsTest(TestBase):
    def __init__(self, **kwargs):
        super().__init__('ls', **kwargs)

    def request_dict(self):
        return {
            'class': 'od',
            'expver': '0001',
            'stream': 'oper',
            'date': -1,
            'time': [0, 12],
            'step': 0,
            'type': 'an',
            'levtype': 'ml',
            'levelist': 1,
            'param': 't',
        }

    def build_test(self):
        with pf.Family('test_family') as f:
            pf.Task('ls', workdir=self._workdir, script='ls -l {}'.format(self._data_filename))
        return f

These tests can be combined inside a suite.

[6]:
with CourseSuite('inheritance_example') as s:
    with pf.Family('tests'):
        (
            GribLsTest(datetime.date.today() - datetime.timedelta(days=2), 't')
            >>
            GribLsTest(datetime.date.today() - datetime.timedelta(days=1), 'z', name='grib_ls_2')
            >>
            LsTest()
        )

s
[6]:
suite inheritance_example
  defstatus suspended
  edit ECF_FILES '/path/to/scratch/files/inheritance_example'
  edit ECF_HOME '/path/to/scratch/out'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  family tests
    family grib_ls
      edit ECF_FILES '/path/to/scratch/files/inheritance_example/tests/grib_ls'
      edit ECF_INCLUDE '/path/to/scratch/files/inheritance_example/tests/grib_ls'
      task retrieve_data
        label info ""
      task grib_ls
        trigger retrieve_data eq complete
      task cleanup
        trigger grib_ls eq complete
    endfamily
    family grib_ls_2
      trigger grib_ls eq complete
      edit ECF_FILES '/path/to/scratch/files/inheritance_example/tests/grib_ls_2'
      edit ECF_INCLUDE '/path/to/scratch/files/inheritance_example/tests/grib_ls_2'
      task retrieve_data
        label info ""
      task grib_ls
        trigger retrieve_data eq complete
      task cleanup
        trigger grib_ls eq complete
    endfamily
    family ls
      trigger grib_ls_2 eq complete
      edit ECF_FILES '/path/to/scratch/files/inheritance_example/tests/ls'
      edit ECF_INCLUDE '/path/to/scratch/files/inheritance_example/tests/ls'
      task retrieve_data
        label info ""
      family test_family
        trigger retrieve_data eq complete
        task ls
      endfamily
      task cleanup
        trigger test_family eq complete
    endfamily
  endfamily
endsuite

Suite Objects using Delegation

Alternatively, we can take the approach of delegation such that decisions about the data request to use and the test to construct are delegated to a configuration object that is injected from the controlling scope. If we do this then the resultant Test class is now a concrete class (and we no longer need to derive from it), changing the structure of the suite somewhat.

In this case, we build our Test class to delegate the construction to a config object whose type is unknown.

[7]:
class DelegatingTest(pf.AnchorFamily):
    def __init__(self, config, **kwargs):

        name = config.name
        super().__init__(name, **kwargs)

        # Generate a unique working directory
        workdir = os.path.join(self.host.scratch_directory,
                               self.suite.name, self.fullname.replace('/', '_'))

        # Ensure that the data gets put somewhere
        data_filename = 'retrieved.grib'
        request = config.request_dict.copy()
        request['target'] = data_filename

        with self:
            (
                RetrieveTask(request, workdir=workdir)
                >>
                config.build_test(workdir, data_filename)
                >>
                Cleanup(workdir)
            )

We can now create config classes that provide this functionality. They do not have to be built in the same way, or related to each other in any way other than that they provide the given functionality.

[8]:
class LsConfig:
    name = 'ls'
    request_dict = {
        'class': 'od',
        'expver': '0001',
        'stream': 'oper',
        'date': -1,
        'time': [0, 12],
        'step': 0,
        'type': 'an',
        'levtype': 'ml',
        'levelist': 1,
        'param': 't',
    }

    @staticmethod
    def build_test(workdir, data_filename):
        with pf.Family('test_family') as f:
            return pf.Task('ls', workdir=workdir, script='ls -l {}'.format(data_filename))


class GribLsConfig:

    def __init__(self, date, param, name='grib_ls'):
        self.name = name
        self._date = date
        self._param = param

    @property
    def request_dict(self):
        return {
            'class': 'od',
            'expver': '0001',
            'stream': 'oper',
            'date': self._date,
            'time': [0, 12],
            'step': 0,
            'type': 'an',
            'levtype': 'ml',
            'levelist': 1,
            'param': self._param,
        }

    def build_test(self, workdir, data_filename):
        return pf.Task('grib_ls', workdir=workdir, script='grib_ls -m {}'.format(data_filename))

We can then construct a combined configuration object.

[9]:
class CombinedConfig:
    def __init__(self):
        self.tests = [
            GribLsConfig(datetime.date.today() - datetime.timedelta(days=2), 't'),
            GribLsConfig(datetime.date.today() - datetime.timedelta(days=1), 'z', name='grib_ls_2'),
            LsConfig # n.b. here we just used a raw class.
        ]

And we then configure the suite with the config object.

[10]:
class DelegatedSuite(CourseSuite):
    def __init__(self, config):
        super().__init__('delegated_example')

        with self:
            pf.sequence(DelegatingTest(test_cfg) for test_cfg in config.tests)


s = DelegatedSuite(CombinedConfig())
s
[10]:
suite delegated_example
  defstatus suspended
  edit ECF_FILES '/path/to/scratch/files/delegated_example'
  edit ECF_HOME '/path/to/scratch/out'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  family grib_ls
    edit ECF_FILES '/path/to/scratch/files/delegated_example/grib_ls'
    edit ECF_INCLUDE '/path/to/scratch/files/delegated_example/grib_ls'
    task retrieve_data
      label info ""
    task grib_ls
      trigger retrieve_data eq complete
    task cleanup
      trigger grib_ls eq complete
  endfamily
  family grib_ls_2
    trigger grib_ls eq complete
    edit ECF_FILES '/path/to/scratch/files/delegated_example/grib_ls_2'
    edit ECF_INCLUDE '/path/to/scratch/files/delegated_example/grib_ls_2'
    task retrieve_data
      label info ""
    task grib_ls
      trigger retrieve_data eq complete
    task cleanup
      trigger grib_ls eq complete
  endfamily
  family ls
    trigger grib_ls_2 eq complete
    edit ECF_FILES '/path/to/scratch/files/delegated_example/ls'
    edit ECF_INCLUDE '/path/to/scratch/files/delegated_example/ls'
    task retrieve_data
      label info ""
    family test_family
      task ls
        trigger ../retrieve_data eq complete
    endfamily
    task cleanup
      trigger test_family/ls eq complete
  endfamily
endsuite

Conditional Suite Structure

One of the goals of building an Object-Oriented suite is avoiding tangled, procedural complexity in constructing suites. Making a suite configurable, and multi-purpose requires conditionality in how the suite is constructed.

The most obvious way to do this is to put conditional expressions, namely if statements, into the suite structure. This works, but leads to a long-term increase in the complexity of the suite. But worse, it puts the configuration- and system-dependent logic about how a suite should be built into the structure of the suite rather than with the configuration where it belongs.

This example shows delegation of conditional behaviour to a configuration, such that the configuration can use arbitrary logic and complexity (in this case just a lookup) to determine which subsections of a suite get built.

[11]:
class Config:

    def __init__(self, **tests):

        # Default tests that should be built. Otherwise assume not
        self.enabled_tests = {
            'test3': True
        }
        self.enabled_tests.update(tests)

    def build_test(self, cls, name, *args, **kwargs):
        if self.enabled_tests.get(name, False):
            return cls(name, *args, **kwargs)


class ATest(pf.Task):
    def __init__(self, name, val):
        super().__init__(name, script="echo test={} : val={}".format(name, val))


class TestingSuite(CourseSuite):

    def __init__(self, name, config, **kwargs):
        super().__init__(name, **kwargs)
        with self:
            config.build_test(ATest, 'test1', 1234)
            config.build_test(ATest, 'test2', 4321)
            config.build_test(ATest, 'test3', 6666)
            config.build_test(ATest, 'test4', 7777)


TestingSuite('default_tests', Config())
[11]:
suite default_tests
  defstatus suspended
  edit ECF_FILES '/path/to/scratch/files/default_tests'
  edit ECF_HOME '/path/to/scratch/out'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  task test3
endsuite
[12]:
TestingSuite('add_test4', Config(test4=True))
[12]:
suite add_test4
  defstatus suspended
  edit ECF_FILES '/path/to/scratch/files/add_test4'
  edit ECF_HOME '/path/to/scratch/out'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  task test3
  task test4
endsuite
[13]:
TestingSuite('override_default_test', Config(test1=True, test2=True, test3=False))
[13]:
suite override_default_test
  defstatus suspended
  edit ECF_FILES '/path/to/scratch/files/override_default_test'
  edit ECF_HOME '/path/to/scratch/out'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  task test1
  task test2
endsuite

Structural Delegation

This first example demonstrates delegating a structural decision to a configuration object. We wish to loop over two different axes - one an integer axis, and the other a string based one. The configuration objects decide how this should be done, and the order of the looping.

Further configuration objects can be derived from Config1 and Config2 to update the values, while leaving the structures the same.

Once the suite has delegated construction of the looping structure to the config, the construction of the tasks within the looping structure can be continued in the normal way.

[14]:
class ConfigBase:
    suite_name = None
    min_integer = 1
    max_integer = 5
    strings = ['a', 'b', 'c', 'd', 'e']

    def build_nested_loops(self, **kwargs):
        raise NotImplementedError


class Config1(ConfigBase):
    suite_name = 'config_string_integer'
    def build_nested_loops(self, **kwargs):
        with pf.Family('string_looper'):
            pf.RepeatEnumerated('REPEAT_STRING', self.strings)
            with pf.Family('integer_looper', **kwargs) as inner:
                pf.RepeatInteger('REPEAT_INTEGER', self.min_integer, self.max_integer)
        return inner


class Config2(ConfigBase):
    suite_name = 'config_integer_string'
    def build_nested_loops(self, **kwargs):
        with pf.Family('integer_looper'):
            pf.RepeatInteger('REPEAT_INTEGER', self.min_integer, self.max_integer)
            with pf.Family('string_looper', **kwargs) as inner:
                pf.RepeatEnumerated('REPEAT_STRING', self.strings)
        return inner


class NestedLoopingSuite(CourseSuite):
    def __init__(self, config):
        super().__init__(config.suite_name)

        with self:
            with config.build_nested_loops(labels={'info': ''}) as f:
                (
                    LabelSetter((f.info, '$REPEAT_INTEGER : $REPEAT_STRING'))
                    >>
                    WaitSeconds(2)
                )


s1 = NestedLoopingSuite(Config1())
s2 = NestedLoopingSuite(Config2())
s1
[14]:
suite config_string_integer
  defstatus suspended
  edit ECF_FILES '/path/to/scratch/files/config_string_integer'
  edit ECF_HOME '/path/to/scratch/out'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  family string_looper
    repeat enumerated REPEAT_STRING "a" "b" "c" "d" "e"
    family integer_looper
      repeat integer REPEAT_INTEGER 1 5
      label info ""
      task set_labels
      task wait_2
        trigger set_labels eq complete
    endfamily
  endfamily
endsuite
[15]:
s1.deploy_suite(pf.Notebook)
[15]:

File: /path/to/scratch/files/config_string_integer/set_labels.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

export REPEAT_INTEGER="%REPEAT_INTEGER%"
export REPEAT_STRING="%REPEAT_STRING%"

echo "Current working directory: $(pwd)"

%nopp

ecflow_client --alter=change label info "$REPEAT_INTEGER : $REPEAT_STRING" /config_string_integer/string_looper/integer_looper

%end

File: /path/to/scratch/files/config_string_integer/wait_2.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

echo "Current working directory: $(pwd)"

%nopp

sleep 2

%end

[16]:
s2
[16]:
suite config_integer_string
  defstatus suspended
  edit ECF_FILES '/path/to/scratch/files/config_integer_string'
  edit ECF_HOME '/path/to/scratch/out'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  family integer_looper
    repeat integer REPEAT_INTEGER 1 5
    family string_looper
      repeat enumerated REPEAT_STRING "a" "b" "c" "d" "e"
      label info ""
      task set_labels
      task wait_2
        trigger set_labels eq complete
    endfamily
  endfamily
endsuite
[17]:
s2.deploy_suite(pf.Notebook)
[17]:

File: /path/to/scratch/files/config_integer_string/set_labels.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

export REPEAT_INTEGER="%REPEAT_INTEGER%"
export REPEAT_STRING="%REPEAT_STRING%"

echo "Current working directory: $(pwd)"

%nopp

ecflow_client --alter=change label info "$REPEAT_INTEGER : $REPEAT_STRING" /config_integer_string/integer_looper/string_looper

%end

File: /path/to/scratch/files/config_integer_string/wait_2.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

echo "Current working directory: $(pwd)"

%nopp

sleep 2

%end