Flow Control

Triggers

Triggers are used to declare dependencies between two tasks. For instance, the second task might need data created by the first task.

When ecFlow tries to start a task, it first evaluates the trigger expression. If the condition is correct, the task is started, otherwise, the task stays queued.

Triggers can be between tasks, or between families, or a mixture. There are two rules regarding triggers:

  • A family is complete when all its tasks are complete

  • A task will be started if its triggers and the triggers of all its parent families evaluate to true

A node can only have one trigger expression, but very complex expressions can be built. Keep in mind that the triggers of the parent nodes are also implicit triggers.

Sometimes triggers can be used to prevent too many jobs from running at the same time. In this case, the use of a limit may be a better solution.

Triggers can be very complex, and pyflow supports all kinds of conditions. In addition, they can also reference node attributes like event, meter, variable, repeat, limits and generated variables.

[2]:
with pf.Suite('test') as s:
    with pf.Family('f1'):
        pf.Variable('SLEEP', 20)
        t1 = pf.Task('t1')
        t2 = pf.Task('t2')

    t1 >> t2

s
[2]:
suite test
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "default"
  family f1
    edit SLEEP '20'
    task t1
    task t2
      trigger t1 eq complete
  endfamily
endsuite

Embedded Triggers

Trigger expressions can be embedded within the scripts using --wait child command. Whilst the expression is not true, the job will hold.

Where possible you should give preference to triggers on the definitions, since these are checked on creation, whereas embedded triggers are checked at run time.

[3]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
    with pf.Family('f1'):
        pf.Variable('SLEEP', 20)
        pf.Task('t1')
        pf.Task('t2', script='ecflow_client --wait="t1 == complete"')

s
[3]:
suite test
  edit ECF_FILES '/test'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  family f1
    edit SLEEP '20'
    task t1
    task t2
  endfamily
endsuite
[4]:
s.deploy_suite(target=pf.Notebook)
[4]:

File: /test/t1.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

echo "Current working directory: $(pwd)"

%nopp


%end

File: /test/t2.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

echo "Current working directory: $(pwd)"

%nopp

ecflow_client --wait="t1 == complete"

%end

Events

Sometimes waiting for the completion of a task is not good enough. If a task is producing several results, another task may start as soon as the first results are ready. For that, pyflow introduces the concept of events. An event is a message that a task will report to ecFlow server while it is running.

Events have names and a task can set several of them.

[5]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
    with pf.Family('f1'):
        pf.Variable('SLEEP', 20)
        t1 = pf.Task('t1')
        with pf.Task('t2', script=[
            'echo "I will now sleep for %SLEEP% seconds"',
            'sleep %SLEEP%',
            'ecflow_client --event=a    # Set the first event',
            'sleep %SLEEP%              # Sleep a bit more',
            'ecflow_client --event=b    # Set the second event',
            'sleep %SLEEP%              # A last nap...',
        ]) as t2:
            pf.Event('a')
            pf.Event('b')
        t3 = pf.Task('t3')
        t4 = pf.Task('t4')

    t2.triggers = t1
    t3.triggers = t2.a
    t4.triggers = t2.b

s
[5]:
suite test
  edit ECF_FILES '/test'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  family f1
    edit SLEEP '20'
    task t1
    task t2
      trigger t1 eq complete
      event a
      event b
    task t3
      trigger t2:a
    task t4
      trigger t2:b
  endfamily
endsuite
[6]:
s.deploy_suite(target=pf.Notebook)
[6]:

File: /test/t1.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

echo "Current working directory: $(pwd)"

%nopp


%end

File: /test/t2.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

echo "Current working directory: $(pwd)"

%nopp

echo "I will now sleep for %SLEEP% seconds"
sleep %SLEEP%
ecflow_client --event=a    # Set the first event
sleep %SLEEP%              # Sleep a bit more
ecflow_client --event=b    # Set the second event
sleep %SLEEP%              # A last nap...

%end

File: /test/t3.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

echo "Current working directory: $(pwd)"

%nopp


%end

File: /test/t4.ecf


#!/bin/bash

echo "Running on: $(hostname)" || true
set -uex


export ECF_PORT=%ECF_PORT%    # The server port number
export ECF_HOST=%ECF_HOST%    # The host name where the server is running
export ECF_NAME=%ECF_NAME%    # The name of this current task
export ECF_PASS=%ECF_PASS%    # A unique password
export ECF_TRYNO=%ECF_TRYNO%  # Current try number of the task

echo "Current working directory: $(pwd)"

%nopp


%end

Complete

Sometimes a task should not be run when a certain condition is met. The condition can be signalled by an event. For example, event t2.b might indicate that task t2 did not manage to produce the expected result, so we do not need to run task t4.

In this case, you can use the complete attribute. This has a similar usage to the trigger attribute but sets a task complete rather than running it.

When ecFlow server tries to start a task, it evaluates the trigger and complete expressions. If the complete expression condition is true, the task will set itself complete.

Note

Complete expression evaluation takes precedence over the trigger.

Completes can be between tasks, between families, or both. It can be used in conjunction with a trigger.

[7]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
    with pf.Family('f1'):
        pf.Variable('SLEEP', 20)
        t1 = pf.Task('t1')
        with pf.Task('t2', script=[
            'echo "I will now sleep for %SLEEP% seconds"',
            'sleep %SLEEP%',
            'ecflow_client --event=a    # Set the first event',
            'sleep %SLEEP%              # Sleep a bit more',
            'ecflow_client --event=b    # Set the second event',
            'sleep %SLEEP%              # A last nap...',
        ]) as t2:
            pf.Event('a')
            pf.Event('b')
        t3 = pf.Task('t3')
        t4 = pf.Task('t4')

    t2.triggers = t1
    t3.triggers = t2.a
    t4.completes = t2.b
    t4.triggers = t2

s
[7]:
suite test
  edit ECF_FILES '/test'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  family f1
    edit SLEEP '20'
    task t1
    task t2
      trigger t1 eq complete
      event a
      event b
    task t3
      trigger t2:a
    task t4
      complete t2:b
      trigger t2 eq complete
  endfamily
endsuite

Expressions in Triggers and Completes

ecFlow has a rich languge and (associated behaviour) for expressions that trigger dependencies and conditional behaviour in suites. These expressions are ultimately strings that are parsed by the ecFlow server and evaluated to control the suite.

Within pyflow, all of the components that make up ecFlow expressions are already present as objects in the script. This means we can generate type-safe, validated expressions by using the existing objects directly. These can then be assigned to the triggers or completes attributes of any appropriate node.

Trigger expressions should follow the natural arithmetic expressing the problem.

[8]:
with pf.Suite('s') as s:

    with pf.Family('repeat1') as repeat1:
        pf.RepeatDate('YMD', datetime.date(2019, 1, 1), datetime.date(2019, 12, 31))

    with pf.Family('repeat2') as repeat2:
        pf.RepeatDate('YMD', datetime.date(2019, 1, 1), datetime.date(2019, 12, 31))

    repeat2.triggers = (repeat1 == pf.state.complete) | (repeat1.YMD > repeat2.YMD)

    pf.Task('t3').completes = (repeat2.YMD > '20190616')

s
[8]:
suite s
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "default"
  family repeat1
    repeat date YMD 20190101 20191231 1
  endfamily
  family repeat2
    trigger (repeat1 eq complete) eq complete or repeat1:YMD gt repeat2:YMD
    repeat date YMD 20190101 20191231 1
  endfamily
  task t3
    complete repeat2:YMD gt 20190616
endsuite

Shortcut properties

A number of shortcut properties exist to construct standard expression components. The following sets of examples are equivalent.

[9]:
t = MyTask('a_task')
exprn = (t == pf.state.aborted)
exprn = (t == pf.state.complete)
exprn = (t == pf.state.unknown)
exprn = (t == pf.state.queued)
exprn = (t == pf.state.submitted)
exprn = (t == pf.state.active)

t = MyTask('a_task')
exprn = t.aborted
exprn = t.complete
exprn = t.unknown
exprn = t.queued
exprn = t.submitted
exprn = t.active

Combined Expressions

Expressions can be combined with logical operators, both unary and binary.

[10]:
with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t3 = MyTask('t3')

    t1.triggers = t2.complete & t3.aborted

s
[10]:
suite s
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "default"
  task t1
    trigger t2 eq complete and t3 eq aborted
    edit HALF '0'
    edit LIMIT '0'
    label counter_label "count to 0"
  task t2
    edit HALF '0'
    edit LIMIT '0'
    label counter_label "count to 0"
  task t3
    edit HALF '0'
    edit LIMIT '0'
    label counter_label "count to 0"
endsuite
[11]:
with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t3 = MyTask('t3')

    t1.triggers = t2.complete
    t1.triggers |= t3.aborted

s
[11]:
suite s
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "default"
  task t1
    trigger t2 eq complete or t3 eq aborted
    edit HALF '0'
    edit LIMIT '0'
    label counter_label "count to 0"
  task t2
    edit HALF '0'
    edit LIMIT '0'
    label counter_label "count to 0"
  task t3
    edit HALF '0'
    edit LIMIT '0'
    label counter_label "count to 0"
endsuite

Shortcut Dependencies

The most common trigger expression to express is one of dependencies. Task A runs only after Task B has completed. We provide a special operator to simplify this approach.

The following are equivalent approaches.

[12]:
with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t2.triggers = t1.complete

with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t2.triggers = (t1 == pf.state.complete)

with pf.Suite('s') as s:
    t1 = MyTask('t1')
    t2 = MyTask('t2')
    t1 >> t2

with pf.Suite("s") as s:
    (
        MyTask('t1')
        >>
        MyTask('t2')
    )

s
[12]:
suite s
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "default"
  task t1
    edit HALF '0'
    edit LIMIT '0'
    label counter_label "count to 0"
  task t2
    trigger t1 eq complete
    edit HALF '0'
    edit LIMIT '0'
    label counter_label "count to 0"
endsuite

Looping Constructs

pyflow supports ecFlow looping constructs, and ensures that they are initialised in a type-safe manner. The values of these looping constructs can be accessed from scripts in the same manner as normal ecFlow variables.

[13]:
class LabelSetter(pf.Task):

    def __init__(self, *args, **kwargs):
        """
        Accepts a sequence of label-value tuples
        """
        script = [
            pf.TemplateScript(
                'ecflow_client --alter=change label {{ LABEL.name }} "{{ VALUE }}" {{ LABEL.parent.fullname }}',
                LABEL=label, VALUE=value
            ) for label, value in args
        ]

        name = kwargs.pop('name', 'set_labels')
        super().__init__(name, script=script, **kwargs)


class WaitSeconds(pf.Task):
    def __init__(self, seconds, **kwargs):
        name = kwargs.pop('name', 'wait_{}'.format(seconds))
        super().__init__(name, script='sleep {}'.format(seconds), **kwargs)


with CourseSuite('looping_constructs') as s:

    with pf.Family('date_family'):
        pf.RepeatDate('REPEAT_DATE',
                      datetime.date(year=2019, month=1, day=1),
                      datetime.date(year=2019, month=12, day=31))

        with pf.Family('hour_family', labels={'date_time': ''}) as f:
            pf.RepeatInteger('REPEAT_HOUR', 1, 24)
            (
                LabelSetter((f.date_time, '$REPEAT_DATE hour $REPEAT_HOUR'))
                >>
                WaitSeconds(2)
            )

s
[13]:
suite looping_constructs
  defstatus suspended
  edit ECF_FILES '/path/to/scratch/files/looping_constructs'
  edit ECF_HOME '/path/to/scratch/out'
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "localhost"
  family date_family
    repeat date REPEAT_DATE 20190101 20191231 1
    family hour_family
      repeat integer REPEAT_HOUR 1 24
      label date_time ""
      task set_labels
      task wait_2
        trigger set_labels eq complete
    endfamily
  endfamily
endsuite

External ecFlow Dependencies

pyflow builds its dependency trees using python objects. This means that if we wish to have connections to external suites, that are not built from the same repository, then we must build shadow objects that map to the nodes we wish to connect to.

A full range of these Extern* objects exist which may be used in the normal way.

[ ]:
with pf.Suite('s') as s:

    etask = pf.ExternTask('/a/b/c/d')
    efamily = pf.ExternFamily('/f/g/h/i')

    eymd = pf.ExternRepeat('/a/b/c/d:YMD')
    eevent = pf.ExternEvent('/e/f/g/h:ev')
    emeter = pf.ExternMeter('/g/h/i/j:mt')

    t1 = pf.Task('t1')
    t1.triggers = etask & efamily

s
suite s
  edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &'
  edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%'
  edit ECF_STATUS_CMD 'true'
  edit ECF_OUT '%ECF_HOME%'
  label exec_host "default"
  task t1
    trigger /a/b/c/d eq complete and /f/g/h/i eq complete
endsuite