基于Aiida的串行和并行累加器

基于Aiida实现的整数循环累加器,以下为代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from aiida.orm import AbstractCode, Int, List
from aiida.orm import load_code
from aiida.plugins.factories import CalculationFactory, DataFactory
from aiida import load_profile
from aiida.engine import ToContext, calcfunction, WorkChain, while_, append_, run
from aiida_vasp.utils.workchains import prepare_process_inputs, compose_exit_code
from aiida.common.extendeddicts import AttributeDict
import random
import numpy as np

ArithmeticAddCalculation = CalculationFactory('core.arithmetic.add')

class AddTwoIntsWorkChain(WorkChain):
"""WorkChain to add two ints, for testing and demonstration purposes."""
@classmethod
def define(cls, spec):
"""Specify inputs and outputs."""
super().define(spec)
spec.input('x', valid_type=Int)
spec.input('y', valid_type=Int)
spec.input('code', valid_type=AbstractCode)
spec.outline(
cls.add,
cls.validate_result,
cls.result,
)
spec.output('result', valid_type=Int)
spec.exit_code(400, 'ERROR_NEGATIVE_NUMBER', message='The result is a negative number.')

def add(self):
"""Add two numbers using the `ArithmeticAddCalculation` calculation job plugin."""
inputs = {'x': self.inputs.x, 'y': self.inputs.y, 'code': self.inputs.code}
future = self.submit(ArithmeticAddCalculation, **inputs)

return ToContext(addition=future)

def validate_result(self):
"""Make sure the result is not negative."""
result = self.ctx.addition.outputs.sum

if result.value < 0:
return self.exit_codes.ERROR_NEGATIVE_NUMBER

def result(self):
"""Add the result to the outputs."""
self.out('result', self.ctx.addition.outputs.sum)


class AddMultiIntsSerialWorkChain(WorkChain):
"""Add Multiply Ints in a Serial way"""

_verbose = False
_next_workchain = AddTwoIntsWorkChain

@classmethod
def define(cls, spec):
super().define(spec)
spec.expose_inputs(cls._next_workchain, exclude=['x', 'y'])
spec.input_namespace('ints',
valid_type=DataFactory('core.int'),
dynamic=True,
help='a dictionary of ints to use')
spec.input('code', valid_type=AbstractCode)
spec.exit_code(0, 'NO_ERROR', message='the sun is shining')
spec.exit_code(420, 'ERROR_NO_CALLED_WORKCHAIN', message='no called workchain detected')
spec.exit_code(500, 'ERROR_UNKNOWN', message='unknown error detected in the workchain')
spec.outline(
cls.initialize,
while_(cls.run_next_workchains)(
cls.init_next_workchain,
cls.run_next_workchain,
cls.verify_next_workchain,
cls.extract_result
),
cls.finalize
)
spec.output('final_result', valid_type=DataFactory('core.array'))

def initialize(self):
self._init_context()
self._init_inputs()

def _init_context(self):
self.ctx.exit_code = self.exit_codes.ERROR_UNKNOWN
self.ctx.ints = dict(self.inputs.ints)
self.ctx.is_finished = False
self.ctx.iteration = 0
self.ctx.inputs = AttributeDict()
self.ctx.add_rlts = [Int(0)]

def _init_inputs(self):
"""Initialize inputs."""
try:
self._verbose = self.inputs.verbose.value
except AttributeError:
pass

def run_next_workchains(self):
return not self.ctx.is_finished

def init_next_workchain(self):
self.ctx.iteration += 1
try:
self.ctx.inputs
except AttributeError:
raise ValueError('No input dictionary was defined in self.ctx.inputs')
self.ctx.inputs.update(self.exposed_inputs(self._next_workchain))
ls_keys = list(self.ctx.ints.keys())
self.ctx.inputs.x = self.ctx.add_rlts[-1]
self.ctx.inputs.y = self.ctx.ints.pop(ls_keys[0])
self.ctx.inputs = prepare_process_inputs(self.ctx.inputs)

def run_next_workchain(self):
inputs = self.ctx.inputs
running = self.submit(self._next_workchain, **inputs)
self.report('launching {}<{}> iteration #{}'.format(self._next_workchain.__name__, running.pk, self.ctx.iteration))
self.to_context(workchains=append_(running))

def verify_next_workchain(self):
try:
workchain = self.ctx.workchains[-1]
except IndexError:
self.report('There is no {} in the called workchain list.'.format(self._next_workchain.__name__))
return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN
next_workchain_exit_status = workchain.exit_status
next_workchain_exit_message = workchain.exit_message
if not next_workchain_exit_status:
self.ctx.exit_code = self.exit_codes.NO_ERROR
else:
self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message)
self.report('The called {}<{}> returned a non-zero exit status. '
'The exit status {} is inherited'.format(workchain.__class__.__name__, workchain.pk, self.ctx.exit_code))
if not self.ctx.ints:
self.ctx.is_finished = True
return self.ctx.exit_code

def extract_result(self):
workchain = self.ctx.workchains[-1]
now_add_rlt = workchain.outputs.result
self.ctx.add_rlts.append(now_add_rlt)

def finalize(self):
dict_inputs = dict(self.inputs.ints)
inputs = DataFactory('core.list')(list=[dict_inputs[key] for key in dict_inputs])
outputs = DataFactory('core.list')(list=self.ctx.add_rlts)
f_value = self.ctx.add_rlts[-1]
final_data = ExtractFinalResult(inputs, f_value, total=outputs)
self.out('final_result', final_data)


class AddMultiIntsParallelWorkChain(WorkChain):
"""Add Multiply Ints in a Parallel way"""

_verbose = False
_next_workchain = AddTwoIntsWorkChain

@classmethod
def define(cls, spec):
super().define(spec)
spec.expose_inputs(cls._next_workchain, exclude=['x', 'y'])
spec.input_namespace('ints',
valid_type=DataFactory('core.int'),
dynamic=True,
help='a dictionary of ints to use')
spec.input('code', valid_type=AbstractCode)
spec.exit_code(0, 'NO_ERROR', message='the sun is shining')
spec.exit_code(420, 'ERROR_NO_CALLED_WORKCHAIN', message='no called workchain detected')
spec.exit_code(500, 'ERROR_UNKNOWN', message='unknown error detected in the workchain')
spec.outline(
cls.initialize,
while_(cls.run_next_workchains)(
cls.init_and_run_next_workchains,
cls.verify_next_workchain,
cls.extract_result
),
cls.finalize
)
spec.output('final_result', valid_type=DataFactory('core.array'))

def initialize(self):
self._init_context()
self._init_inputs()

def _init_context(self):
self.ctx.exit_code = self.exit_codes.ERROR_UNKNOWN
now_ints = dict(self.inputs.ints)
self.ctx.next_ints = [now_ints[key] for key in now_ints.keys()]
self.ctx.is_finished = False
self.ctx.iteration = 0
self.ctx.add_rlts = dict()
self.ctx.add_rlts['iteration{}'.format(self.ctx.iteration)] = List(self.ctx.next_ints)

def _init_inputs(self):
"""Initialize inputs."""
try:
self._verbose = self.inputs.verbose.value
except AttributeError:
pass

def run_next_workchains(self):
return not self.ctx.is_finished

def init_and_run_next_workchains(self):
self.ctx.iteration +=1
ls_now_ints = self.ctx.next_ints.copy()
if len(ls_now_ints)%2 ==1:
ls_now_ints.append(Int(0))
now_run_i = 0
while(len(ls_now_ints)!=0):
now_run_i += 1
inputs = AttributeDict()
inputs.update(self.exposed_inputs(self._next_workchain))
inputs.x = ls_now_ints.pop()
inputs.y = ls_now_ints.pop()
inputs = prepare_process_inputs(inputs)
running = self.submit(self._next_workchain, **inputs)
self.report('launching {}<{}> iteration #{}'.format(self._next_workchain.__name__, running.pk, self.ctx.iteration))
future_name = 'workchains.iteration{}.index{}'.format(self.ctx.iteration, now_run_i)
self.to_context(**{future_name:running})

def verify_next_workchain(self):
try:
running_workchains = self.ctx.workchains.get('iteration{}'.format(self.ctx.iteration))
except AttributeError:
self.report('There is no workchains iterations #{} in the called workchain list.'.format(self.iteration))
return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN
for key in running_workchains.keys():
workchain = running_workchains[key]
next_workchain_exit_status = workchain.exit_status
next_workchain_exit_message = workchain.exit_message
if not next_workchain_exit_status:
self.ctx.exit_code = self.exit_codes.NO_ERROR
else:
self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message)
self.report('The called {}<{}> returned a non-zero exit status. '
'The exit status {} is inherited'.format(workchain.__class__.__name__, workchain.pk, self.ctx.exit_code))
return self.ctx.exit_code
if len(running_workchains)==1:
self.ctx.is_finished = True
return self.ctx.exit_code

def extract_result(self):
ls_next_ints = []
running_workchains = self.ctx.workchains.get('iteration{}'.format(self.ctx.iteration))
for key in running_workchains.keys():
workchain = running_workchains[key]
now_add_rlt = workchain.outputs.result
ls_next_ints.append(now_add_rlt)
self.ctx.next_ints = ls_next_ints
self.ctx.add_rlts['iteration{}'.format(self.ctx.iteration)] = List(self.ctx.next_ints)

def finalize(self):
inputs = [self.inputs.ints[key] for key in self.inputs.ints.keys()]
sum_value = int(self.ctx.next_ints[-1])
step_add_rlts = self.ctx.add_rlts
final_data = ExtractFinalResult(inputs, sum_value, **step_add_rlts)
self.out('final_result', final_data)

@calcfunction
def ExtractFinalResult(inputs, sum_value, **kwargs):
final_data = DataFactory('core.array')()
final_data.set_array('add_result', np.array([int(sum_value)]))
final_data.set_array('inputs_nums', np.array(inputs))
for key in kwargs:
final_data.set_array('step_'+key, np.array(kwargs[key]))
return final_data

load_profile()

def get_ints(int_count):
ints = {}
for i in range(int_count):
ints["int_at_{}".format(i)] = Int(random.randint(1, 20))
return ints

int_count = 30
add_code = load_code('add@tutor')
inputs = AttributeDict()
inputs.ints = get_ints(int_count)
inputs.code = add_code

# run(AddMultiIntsSerialWorkChain, **inputs)
run(AddMultiIntsParallelWorkChain, **inputs)

Note

1. 启动自定义的workchains

在向 daemon 提交自定义 workchain(submit)时,需要确保 daemon 在需要加载该类时能够找到它,这里有两种方法:

  • 通过插件系统以指定的 entry point 注册类
  • 将该定义类的模块添加到 python 路径中
    对于第二种方法,对于非 conda 环境,使用:
1
echo "export PYTHONPATH=$PYTHONPATH:<yourdirectory>" >> ~/env/aiida-vasp/bin/activate

对于 conda 环境,使用:

1
echo "export PYTHONPATH=$PYTHONPATH:<yourdirectory>" >> $CONDA_PREFIX/etc/conda/activate.d/env_vars.sh

其中 <yourdirectory> 包含我们的自定义模块。

2. aiida 中的 Str 类型与 python 中 str 的区别

当我们将 python 中的 str 转成 aiida 中的 Str 类型,aiida 会在字符串前面加上字符串的 uuid, 导致字符串实际的内容变了,实际使用时,需要使用 .value 性质来实际的内容,例如:

1
2
3
test = 'test'
from aiida.orm import Str
Str(test)

<Str: uuid: 941642f6-38f1-4a11-be16-f45c6389e1da (unstored) value: test>

要获得实际值:

1
Str(test).value

‘test’

3. 提交任务给 daemon 时的默认文件夹

在提交 (submit) 任务给 daemon 时,默认文件夹不是提交作业脚本的文件夹,而是启动数据的文件夹,需要非常注意!

参考资料

  1. aiida basic tutorial
  2. write and extend workflow
  3. aiida source code
  4. Plumpy
  5. aiida-vasp