Skip to content

Commit 6df32ae

Browse files
authored
Merge pull request #38 from douglasjacobsen/mirror_spack
Update mirror logic, and mirror software
2 parents e1f86c8 + bc656e9 commit 6df32ae

File tree

10 files changed

+276
-110
lines changed

10 files changed

+276
-110
lines changed

lib/ramble/ramble/application.py

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def __init__(self, file_path):
5656
self._setup_phases = []
5757
self._analyze_phases = []
5858
self._archive_phases = ['archive_experiments']
59+
self._mirror_phases = ['mirror_inputs']
5960

6061
self._file_path = file_path
6162

@@ -345,9 +346,19 @@ def _add_expand_vars(self, expander):
345346

346347
expander.set_var('spack_setup', '', 'experiment')
347348

348-
def all_inputs_and_fetchers(self):
349-
expected_files = {}
350-
for workload_name, workload in self.workloads.items():
349+
def _inputs_and_fetchers(self, workload=None):
350+
"""Extract all inputs for a given workload
351+
352+
Take a workload name and extract all inputs for the workload.
353+
If the workload is set to None, extract all inputs for all workloads.
354+
"""
355+
356+
workload_names = [workload] if workload else self.workloads.keys()
357+
358+
inputs = {}
359+
for workload_name in workload_names:
360+
workload = self.workloads[workload_name]
361+
351362
for input_file in workload['inputs']:
352363
input_conf = self.inputs[input_file]
353364

@@ -361,49 +372,40 @@ def all_inputs_and_fetchers(self):
361372

362373
namespace = f'{self.name}.{workload_name}'
363374

364-
expected_files[file_name] = {'fetcher': fetcher,
365-
'namespace': namespace,
366-
'extension': fetcher.extension,
367-
'input_name': input_file}
368-
369-
return expected_files
375+
inputs[file_name] = {'fetcher': fetcher,
376+
'namespace': namespace,
377+
'target_dir': input_conf['target_dir'],
378+
'extension': fetcher.extension,
379+
'input_name': input_file}
380+
return inputs
370381

371-
def mirror_inputs(self, mirror_root, mirror, mirror_stats):
372-
required_inputs = self.all_inputs_and_fetchers()
373-
for input_name, conf in required_inputs.items():
382+
def _mirror_inputs(self, workspace, expander):
383+
for input_file, input_conf in self._inputs_and_fetchers(expander.workload_name).items():
374384
mirror_paths = ramble.mirror.mirror_archive_paths(
375-
conf['fetcher'], os.path.join(self.name, input_name))
376-
stage = ramble.stage.InputStage(conf['fetcher'], name=conf['namespace'],
377-
path=mirror_root,
378-
mirror_paths=mirror_paths, lock=False)
385+
input_conf['fetcher'], os.path.join(self.name, input_file))
386+
fetch_dir = os.path.join(workspace._input_mirror_path, self.name)
387+
fs.mkdirp(fetch_dir)
388+
stage = ramble.stage.InputStage(input_conf['fetcher'], name=input_conf['namespace'],
389+
path=fetch_dir, mirror_paths=mirror_paths, lock=False)
379390

380-
stage.cache_mirror(mirror, mirror_stats)
391+
stage.cache_mirror(workspace._input_mirror_cache, workspace._input_mirror_stats)
381392

382393
def _get_inputs(self, workspace, expander):
383394
workload_namespace = '%s.%s' % (expander.application_name,
384395
expander.workload_name)
385396

386-
workload = self.workloads[expander.workload_name]
387-
388-
for input_file in workload['inputs']:
389-
input_conf = self.inputs[input_file]
390-
input_url = input_conf['url']
391-
397+
for input_file, input_conf in self._inputs_and_fetchers(expander.workload_name).items():
392398
if not workspace.dry_run:
393-
fetcher = ramble.fetch_strategy.URLFetchStrategy(**input_conf)
394-
395-
file_name = '.'.join(os.path.basename(input_url).split('.')[0:-1])
396-
397399
mirror_paths = ramble.mirror.mirror_archive_paths(
398-
fetcher, os.path.join(self.name, file_name))
400+
input_conf['fetcher'], os.path.join(self.name, input_file))
399401

400-
with ramble.stage.InputStage(fetcher, name=workload_namespace,
402+
with ramble.stage.InputStage(input_conf['fetcher'], name=workload_namespace,
401403
path=expander.application_input_dir,
402404
mirror_paths=mirror_paths) \
403405
as stage:
404406
stage.set_subdir(expander.expand_var(input_conf['target_dir']))
405407
stage.fetch()
406-
if input_conf['sha256']:
408+
if input_conf['fetcher'].digest:
407409
stage.check()
408410
stage.cache_local()
409411
if input_conf['expand']:
@@ -412,7 +414,7 @@ def _get_inputs(self, workspace, expander):
412414
except spack.util.executable.ProcessError:
413415
pass
414416
else:
415-
tty.msg('DRY-RUN: Would download %s' % input_url)
417+
tty.msg('DRY-RUN: Would download %s' % input_conf['fetcher'].url)
416418

417419
def _make_experiments(self, workspace, expander):
418420
experiment_run_dir = expander.experiment_run_dir

lib/ramble/ramble/application_types/spack.py

Lines changed: 121 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
# option. This file may not be copied, modified, or distributed
77
# except according to those terms.
88

9+
import os
10+
911
import llnl.util.tty as tty
1012

1113
from ramble.application import ApplicationBase, ApplicationError
@@ -44,13 +46,19 @@ class SpackApplication(ApplicationBase):
4446
def __init__(self, file_path):
4547
super().__init__(file_path)
4648
self._setup_phases = [
49+
'create_spack_env',
4750
'install_software',
4851
'get_inputs',
4952
'make_experiments'
5053
]
5154

5255
self._analyze_phases = ['analyze_experiments']
5356
self._archive_phases = ['archive_experiments']
57+
self._mirror_phases = [
58+
'mirror_inputs',
59+
'create_spack_env',
60+
'mirror_software'
61+
]
5462

5563
self.application_class = 'SpackApplication'
5664

@@ -84,39 +92,43 @@ def _add_expand_vars(self, expander):
8492
except ramble.spack_runner.RunnerError as e:
8593
tty.die(e)
8694

87-
def _install_software(self, workspace, expander):
88-
import os
95+
def _extract_specs(self, workspace, expander, spec_name, app_name):
96+
"""Build a list of all specs which the named spec requires
97+
98+
Traverse a spec and all of its dependencies to extract a list
99+
of specs
100+
"""
101+
spec_list = []
102+
spec = workspace.get_named_spec(spec_name, app_name)
103+
if 'dependencies' in spec:
104+
for dep in spec['dependencies']:
105+
spec_list.extend(
106+
self._extract_specs(workspace,
107+
expander,
108+
dep, app_name))
109+
spec['application_name'] = app_name
110+
spec_list.append((spec_name, spec))
111+
return spec_list
112+
113+
def _create_spack_env(self, workspace, expander):
114+
"""Create the spack environment for this experiment
115+
116+
Extract all specs this experiment uses, and write the spack environment
117+
file for it.
118+
"""
89119

90120
# See if we cached this already, and if so return
91121
namespace = expander.spec_namespace
92122
if not namespace:
93123
raise ApplicationError('Ramble spec_namespace is set to None.')
94124

95-
cache_tupl = ('spack', namespace)
125+
cache_tupl = ('spack-env', namespace)
96126
if workspace.check_cache(cache_tupl):
97127
tty.debug('{} already in cache.'.format(cache_tupl))
98128
return
99129
else:
100130
workspace.add_to_cache(cache_tupl)
101131

102-
def extract_specs(workspace, expander, spec_name, app_name):
103-
"""Build a list of all specs the named spec requires
104-
105-
Traverse a spec and all of its dependencies to extract a list
106-
of specs
107-
"""
108-
spec_list = []
109-
spec = workspace.get_named_spec(spec_name, app_name)
110-
if 'dependencies' in spec:
111-
for dep in spec['dependencies']:
112-
spec_list.extend(
113-
extract_specs(workspace,
114-
expander,
115-
dep, app_name))
116-
spec['application_name'] = app_name
117-
spec_list.append((spec_name, spec))
118-
return spec_list
119-
120132
try:
121133
runner = ramble.spack_runner.SpackRunner(dry_run=workspace.dry_run)
122134

@@ -145,8 +157,8 @@ def extract_specs(workspace, expander, spec_name, app_name):
145157
runner.add_spec(workspace.spec_string(mpi_spec,
146158
use_custom_specifier=True))
147159

148-
pkg_specs = extract_specs(workspace, expander, name,
149-
app_context)
160+
pkg_specs = self._extract_specs(workspace, expander, name,
161+
app_context)
150162
for pkg_name, pkg_info in pkg_specs:
151163
if pkg_name not in added_specs:
152164
added_specs[pkg_name] = True
@@ -170,20 +182,44 @@ def extract_specs(workspace, expander, spec_name, app_name):
170182
'defined in ramble.yaml')
171183

172184
runner.concretize()
185+
186+
except ramble.spack_runner.RunnerError as e:
187+
tty.die(e)
188+
189+
def _install_software(self, workspace, expander):
190+
"""Install application's software using spack"""
191+
192+
# See if we cached this already, and if so return
193+
namespace = expander.spec_namespace
194+
if not namespace:
195+
raise ApplicationError('Ramble spec_namespace is set to None.')
196+
197+
cache_tupl = ('spack-install', namespace)
198+
if workspace.check_cache(cache_tupl):
199+
tty.debug('{} already in cache.'.format(cache_tupl))
200+
return
201+
else:
202+
workspace.add_to_cache(cache_tupl)
203+
204+
try:
205+
runner = ramble.spack_runner.SpackRunner(dry_run=workspace.dry_run)
206+
runner.set_env(expander.expand_var('{spack_env}'))
207+
208+
runner.activate()
173209
runner.install()
174210

211+
app_context = expander.expand_var('{spec_name}')
175212
for name, spec_info in \
176213
workspace.all_application_specs(app_context):
177-
if 'mpi' in spec_info and \
178-
spec_info['mpi'] not in mpi_added:
214+
if 'mpi' in spec_info:
179215
mpi_spec = workspace.get_named_spec(spec_info['mpi'],
180216
'mpi_library')
181217
spec_str = workspace.spec_string(mpi_spec)
182218
package_path = runner.get_package_path(spec_str)
183219
expander.set_package_path(name, package_path)
184220

185-
pkg_specs = extract_specs(workspace, expander, name,
186-
app_context)
221+
pkg_specs = self._extract_specs(workspace, expander, name,
222+
app_context)
187223
for pkg_name, pkg_info in pkg_specs:
188224
spec = workspace._build_spec_dict(pkg_info,
189225
app_name=app_context)
@@ -194,3 +230,61 @@ def extract_specs(workspace, expander, spec_name, app_name):
194230

195231
except ramble.spack_runner.RunnerError as e:
196232
tty.die(e)
233+
234+
def _mirror_software(self, workspace, expander):
235+
"""Mirror software source for this experiment using spack"""
236+
import re
237+
238+
# See if we cached this already, and if so return
239+
namespace = expander.spec_namespace
240+
if not namespace:
241+
raise ApplicationError('Ramble spec_namespace is set to None.')
242+
243+
cache_tupl = ('spack-mirror', namespace)
244+
if workspace.check_cache(cache_tupl):
245+
tty.debug('{} already in cache.'.format(cache_tupl))
246+
return
247+
else:
248+
workspace.add_to_cache(cache_tupl)
249+
250+
try:
251+
runner = ramble.spack_runner.SpackRunner(dry_run=workspace.dry_run)
252+
runner.set_env(expander.expand_var('{spack_env}'))
253+
254+
runner.activate()
255+
256+
mirror_output = runner.mirror_environment(workspace._software_mirror_path)
257+
258+
present = 0
259+
added = 0
260+
failed = 0
261+
262+
present_regex = re.compile(r'\s+(?P<num>[0-9]+)\s+already present')
263+
present_match = present_regex.search(mirror_output)
264+
if present_match:
265+
present = int(present_match.group('num'))
266+
267+
added_regex = re.compile(r'\s+(?P<num>[0-9]+)\s+added')
268+
added_match = added_regex.search(mirror_output)
269+
if added_match:
270+
added = int(added_match.group('num'))
271+
272+
failed_regex = re.compile(r'\s+(?P<num>[0-9]+)\s+failed to fetch.')
273+
failed_match = failed_regex.search(mirror_output)
274+
if failed_match:
275+
failed = int(failed_match.group('num'))
276+
277+
added_start = len(workspace._software_mirror_stats.new)
278+
for i in range(added_start, added_start + added):
279+
workspace._software_mirror_stats.new[i] = i
280+
281+
present_start = len(workspace._software_mirror_stats.present)
282+
for i in range(present_start, present_start + present):
283+
workspace._software_mirror_stats.present[i] = i
284+
285+
error_start = len(workspace._software_mirror_stats.errors)
286+
for i in range(error_start, error_start + failed):
287+
workspace._software_mirror_stats.errors.add(i)
288+
289+
except ramble.spack_runner.RunnerError as e:
290+
tty.die(e)

lib/ramble/ramble/cmd/mirror.py

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
66
# option. This file may not be copied, modified, or distributed
77
# except according to those terms.
8-
import sys
9-
108
import llnl.util.tty as tty
119

1210
import ramble.cmd.common.arguments as arguments
@@ -32,11 +30,6 @@ def setup_parser(subparser):
3230
sp = subparser.add_subparsers(
3331
metavar='SUBCOMMAND', dest='mirror_command')
3432

35-
# Create
36-
create_parser = sp.add_parser('create', help=mirror_create.__doc__)
37-
create_parser.add_argument('-d', '--directory', default=None,
38-
help="directory in which to create mirror")
39-
4033
# Destroy
4134
destroy_parser = sp.add_parser('destroy', help=mirror_destroy.__doc__)
4235

@@ -182,36 +175,6 @@ def _read_specs_from_file(filename):
182175
return specs
183176

184177

185-
def mirror_create(args):
186-
"""Create a directory to be used as a ramble mirror, and fill it with
187-
input archives."""
188-
ws = ramble.cmd.require_active_workspace(cmd_name='mirror create')
189-
190-
mirror = ramble.mirror.Mirror(
191-
args.directory or ramble.config.get('config:input_cache'))
192-
193-
directory = url_util.format(mirror.push_url)
194-
195-
existed = web_util.url_exists(directory)
196-
197-
# Actually do the work to create the mirror
198-
present, mirrored, error = ramble.mirror.create(
199-
directory, ws)
200-
p, m, e = len(present), len(mirrored), len(error)
201-
202-
verb = "updated" if existed else "created"
203-
tty.msg(
204-
"Successfully %s mirror in %s" % (verb, directory),
205-
"Archive stats:",
206-
" %-4d already present" % p,
207-
" %-4d added" % m,
208-
" %-4d failed to fetch." % e)
209-
if error:
210-
tty.error("Failed downloads:")
211-
tty.colify(s.cformat("{name}") for s in error)
212-
sys.exit(1)
213-
214-
215178
def mirror_destroy(args):
216179
"""Given a url, recursively delete everything under it."""
217180
mirror_url = None
@@ -226,8 +189,7 @@ def mirror_destroy(args):
226189

227190

228191
def mirror(parser, args):
229-
action = {'create': mirror_create,
230-
'destroy': mirror_destroy,
192+
action = {'destroy': mirror_destroy,
231193
'add': mirror_add,
232194
'remove': mirror_remove,
233195
'rm': mirror_remove,

0 commit comments

Comments
 (0)