Flow Control
Triggers
Triggers are used to declare dependencies between two tasks. For instance, the second task might need data created by the first task.
When ecFlow tries to start a task, it first evaluates the trigger expression. If the condition is correct, the task is started, otherwise, the task stays queued.
Triggers can be between tasks, or between families, or a mixture. There are two rules regarding triggers:
A family is complete when all its tasks are complete
A task will be started if its triggers and the triggers of all its parent families evaluate to true
A node can only have one trigger expression, but very complex expressions can be built. Keep in mind that the triggers of the parent nodes are also implicit triggers.
Sometimes triggers can be used to prevent too many jobs from running at the same time. In this case, the use of a limit may be a better solution.
Triggers can be very complex, and pyflow supports all kinds of conditions. In addition, they can also reference node attributes like event, meter, variable, repeat, limits and generated variables.
[2]:
with pf.Suite('test') as s:
with pf.Family('f1'):
pf.Variable('SLEEP', 20)
t1 = pf.Task('t1')
t2 = pf.Task('t2')
t1 >> t2
s
[2]:
suite test edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "default" family f1 edit SLEEP '20' task t1 task t2 trigger t1 eq complete endfamily endsuite
Embedded Triggers
Trigger expressions can be embedded within the scripts using --wait child command. Whilst the expression is not true, the job will hold.
Where possible you should give preference to triggers on the definitions, since these are checked on creation, whereas embedded triggers are checked at run time.
[3]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
with pf.Family('f1'):
pf.Variable('SLEEP', 20)
pf.Task('t1')
pf.Task('t2', script='ecflow_client --wait="t1 == complete"')
s
[3]:
suite test edit ECF_FILES '/test' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" family f1 edit SLEEP '20' task t1 task t2 endfamily endsuite
[4]:
s.deploy_suite(target=pf.Notebook)
[4]:
File: /test/t1.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task echo "Current working directory: $(pwd)" %nopp %end
File: /test/t2.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task echo "Current working directory: $(pwd)" %nopp ecflow_client --wait="t1 == complete" %end
Events
Sometimes waiting for the completion of a task is not good enough. If a task is producing several results, another task may start as soon as the first results are ready. For that, pyflow introduces the concept of events. An event is a message that a task will report to ecFlow server while it is running.
Events have names and a task can set several of them.
[5]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
with pf.Family('f1'):
pf.Variable('SLEEP', 20)
t1 = pf.Task('t1')
with pf.Task('t2', script=[
'echo "I will now sleep for %SLEEP% seconds"',
'sleep %SLEEP%',
'ecflow_client --event=a # Set the first event',
'sleep %SLEEP% # Sleep a bit more',
'ecflow_client --event=b # Set the second event',
'sleep %SLEEP% # A last nap...',
]) as t2:
pf.Event('a')
pf.Event('b')
t3 = pf.Task('t3')
t4 = pf.Task('t4')
t2.triggers = t1
t3.triggers = t2.a
t4.triggers = t2.b
s
[5]:
suite test edit ECF_FILES '/test' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" family f1 edit SLEEP '20' task t1 task t2 trigger t1 eq complete event a event b task t3 trigger t2:a task t4 trigger t2:b endfamily endsuite
[6]:
s.deploy_suite(target=pf.Notebook)
[6]:
File: /test/t1.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task echo "Current working directory: $(pwd)" %nopp %end
File: /test/t2.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task echo "Current working directory: $(pwd)" %nopp echo "I will now sleep for %SLEEP% seconds" sleep %SLEEP% ecflow_client --event=a # Set the first event sleep %SLEEP% # Sleep a bit more ecflow_client --event=b # Set the second event sleep %SLEEP% # A last nap... %end
File: /test/t3.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task echo "Current working directory: $(pwd)" %nopp %end
File: /test/t4.ecf
#!/bin/bash echo "Running on: $(hostname)" || true set -uex export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task echo "Current working directory: $(pwd)" %nopp %end
Complete
Sometimes a task should not be run when a certain condition is met. The condition can be signalled by an event. For example, event t2.b might indicate that task t2 did not manage to produce the expected result, so we do not need to run task t4.
In this case, you can use the complete attribute. This has a similar usage to the trigger attribute but sets a task complete rather than running it.
When ecFlow server tries to start a task, it evaluates the trigger and complete expressions. If the complete expression condition is true, the task will set itself complete.
Note
Complete expression evaluation takes precedence over the trigger.
Completes can be between tasks, between families, or both. It can be used in conjunction with a trigger.
[7]:
with pf.Suite('test', host=pf.LocalHost(), files='/test') as s:
with pf.Family('f1'):
pf.Variable('SLEEP', 20)
t1 = pf.Task('t1')
with pf.Task('t2', script=[
'echo "I will now sleep for %SLEEP% seconds"',
'sleep %SLEEP%',
'ecflow_client --event=a # Set the first event',
'sleep %SLEEP% # Sleep a bit more',
'ecflow_client --event=b # Set the second event',
'sleep %SLEEP% # A last nap...',
]) as t2:
pf.Event('a')
pf.Event('b')
t3 = pf.Task('t3')
t4 = pf.Task('t4')
t2.triggers = t1
t3.triggers = t2.a
t4.completes = t2.b
t4.triggers = t2
s
[7]:
suite test edit ECF_FILES '/test' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" family f1 edit SLEEP '20' task t1 task t2 trigger t1 eq complete event a event b task t3 trigger t2:a task t4 complete t2:b trigger t2 eq complete endfamily endsuite
Expressions in Triggers and Completes
ecFlow has a rich languge and (associated behaviour) for expressions that trigger dependencies and conditional behaviour in suites. These expressions are ultimately strings that are parsed by the ecFlow server and evaluated to control the suite.
Within pyflow, all of the components that make up ecFlow expressions are already present as objects in the script. This means we can generate type-safe, validated expressions by using the existing objects directly. These can then be assigned to the triggers or completes attributes of any appropriate node.
Trigger expressions should follow the natural arithmetic expressing the problem.
[8]:
with pf.Suite('s') as s:
with pf.Family('repeat1') as repeat1:
pf.RepeatDate('YMD', datetime.date(2019, 1, 1), datetime.date(2019, 12, 31))
with pf.Family('repeat2') as repeat2:
pf.RepeatDate('YMD', datetime.date(2019, 1, 1), datetime.date(2019, 12, 31))
repeat2.triggers = (repeat1 == pf.state.complete) | (repeat1.YMD > repeat2.YMD)
pf.Task('t3').completes = (repeat2.YMD > '20190616')
s
[8]:
suite s edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "default" family repeat1 repeat date YMD 20190101 20191231 1 endfamily family repeat2 trigger (repeat1 eq complete) eq complete or repeat1:YMD gt repeat2:YMD repeat date YMD 20190101 20191231 1 endfamily task t3 complete repeat2:YMD gt 20190616 endsuite
Shortcut properties
A number of shortcut properties exist to construct standard expression components. The following sets of examples are equivalent.
[9]:
t = MyTask('a_task')
exprn = (t == pf.state.aborted)
exprn = (t == pf.state.complete)
exprn = (t == pf.state.unknown)
exprn = (t == pf.state.queued)
exprn = (t == pf.state.submitted)
exprn = (t == pf.state.active)
t = MyTask('a_task')
exprn = t.aborted
exprn = t.complete
exprn = t.unknown
exprn = t.queued
exprn = t.submitted
exprn = t.active
Combined Expressions
Expressions can be combined with logical operators, both unary and binary.
[10]:
with pf.Suite('s') as s:
t1 = MyTask('t1')
t2 = MyTask('t2')
t3 = MyTask('t3')
t1.triggers = t2.complete & t3.aborted
s
[10]:
suite s edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "default" task t1 trigger t2 eq complete and t3 eq aborted edit HALF '0' edit LIMIT '0' label counter_label "count to 0" task t2 edit HALF '0' edit LIMIT '0' label counter_label "count to 0" task t3 edit HALF '0' edit LIMIT '0' label counter_label "count to 0" endsuite
[11]:
with pf.Suite('s') as s:
t1 = MyTask('t1')
t2 = MyTask('t2')
t3 = MyTask('t3')
t1.triggers = t2.complete
t1.triggers |= t3.aborted
s
[11]:
suite s edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "default" task t1 trigger t2 eq complete or t3 eq aborted edit HALF '0' edit LIMIT '0' label counter_label "count to 0" task t2 edit HALF '0' edit LIMIT '0' label counter_label "count to 0" task t3 edit HALF '0' edit LIMIT '0' label counter_label "count to 0" endsuite
Shortcut Dependencies
The most common trigger expression to express is one of dependencies. Task A runs only after Task B has completed. We provide a special operator to simplify this approach.
The following are equivalent approaches.
[12]:
with pf.Suite('s') as s:
t1 = MyTask('t1')
t2 = MyTask('t2')
t2.triggers = t1.complete
with pf.Suite('s') as s:
t1 = MyTask('t1')
t2 = MyTask('t2')
t2.triggers = (t1 == pf.state.complete)
with pf.Suite('s') as s:
t1 = MyTask('t1')
t2 = MyTask('t2')
t1 >> t2
with pf.Suite("s") as s:
(
MyTask('t1')
>>
MyTask('t2')
)
s
[12]:
suite s edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "default" task t1 edit HALF '0' edit LIMIT '0' label counter_label "count to 0" task t2 trigger t1 eq complete edit HALF '0' edit LIMIT '0' label counter_label "count to 0" endsuite
Looping Constructs
pyflow supports ecFlow looping constructs, and ensures that they are initialised in a type-safe manner. The values of these looping constructs can be accessed from scripts in the same manner as normal ecFlow variables.
[13]:
class LabelSetter(pf.Task):
def __init__(self, *args, **kwargs):
"""
Accepts a sequence of label-value tuples
"""
script = [
pf.TemplateScript(
'ecflow_client --alter=change label {{ LABEL.name }} "{{ VALUE }}" {{ LABEL.parent.fullname }}',
LABEL=label, VALUE=value
) for label, value in args
]
name = kwargs.pop('name', 'set_labels')
super().__init__(name, script=script, **kwargs)
class WaitSeconds(pf.Task):
def __init__(self, seconds, **kwargs):
name = kwargs.pop('name', 'wait_{}'.format(seconds))
super().__init__(name, script='sleep {}'.format(seconds), **kwargs)
with CourseSuite('looping_constructs') as s:
with pf.Family('date_family'):
pf.RepeatDate('REPEAT_DATE',
datetime.date(year=2019, month=1, day=1),
datetime.date(year=2019, month=12, day=31))
with pf.Family('hour_family', labels={'date_time': ''}) as f:
pf.RepeatInteger('REPEAT_HOUR', 1, 24)
(
LabelSetter((f.date_time, '$REPEAT_DATE hour $REPEAT_HOUR'))
>>
WaitSeconds(2)
)
s
[13]:
suite looping_constructs defstatus suspended edit ECF_FILES '/path/to/scratch/files/looping_constructs' edit ECF_HOME '/path/to/scratch/out' edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "localhost" family date_family repeat date REPEAT_DATE 20190101 20191231 1 family hour_family repeat integer REPEAT_HOUR 1 24 label date_time "" task set_labels task wait_2 trigger set_labels eq complete endfamily endfamily endsuite
External ecFlow Dependencies
pyflow builds its dependency trees using python objects. This means that if we wish to have connections to external suites, that are not built from the same repository, then we must build shadow objects that map to the nodes we wish to connect to.
A full range of these Extern* objects exist which may be used in the normal way.
[ ]:
with pf.Suite('s') as s:
etask = pf.ExternTask('/a/b/c/d')
efamily = pf.ExternFamily('/f/g/h/i')
eymd = pf.ExternRepeat('/a/b/c/d:YMD')
eevent = pf.ExternEvent('/e/f/g/h:ev')
emeter = pf.ExternMeter('/g/h/i/j:mt')
t1 = pf.Task('t1')
t1.triggers = etask & efamily
s
suite s edit ECF_JOB_CMD 'bash -c 'export ECF_PORT=%ECF_PORT%; export ECF_HOST=%ECF_HOST%; export ECF_NAME=%ECF_NAME%; export ECF_PASS=%ECF_PASS%; export ECF_TRYNO=%ECF_TRYNO%; export PATH=/usr/local/apps/ecflow/%ECF_VERSION%/bin:$PATH; ecflow_client --init="$$" && %ECF_JOB% && ecflow_client --complete || ecflow_client --abort ' 1> %ECF_JOBOUT% 2>&1 &' edit ECF_KILL_CMD 'pkill -15 -P %ECF_RID%' edit ECF_STATUS_CMD 'true' edit ECF_OUT '%ECF_HOME%' label exec_host "default" task t1 trigger /a/b/c/d eq complete and /f/g/h/i eq complete endsuite