Skip to content

Commit 9f11788

Browse files
authored
Merge pull request #459 from lsst/tickets/SP-2149
tickets/SP-2149: Improved logging and regularization of archive bucket in prenight sim archive code
2 parents ef9e3cf + a0221dc commit 9f11788

File tree

4 files changed

+107
-11
lines changed

4 files changed

+107
-11
lines changed

batch/run_prenight_sims.sh

+3-1
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,12 @@ SCHEDULER_CONFIG_SCRIPT=$(scheduler_config_at_time latiss)
6161
# so we do not accidentally run one from the adjusted PATH below.
6262
PRENIGHT_SIM=$(which prenight_sim)
6363

64+
export SIM_ARCHIVE_LOG_FILE=${WORK_DIR}/sim_archive_log.txt
65+
export PRENIGHT_LOG_FILE=${WORK_DIR}/prenight_log.txt
6466
export PYTHONPATH=${PACKAGE_DIR}:${PYTHONPATH}
6567
export PATH=${PACKAGE_DIR}/bin:${PATH}
6668
printenv > env.out
6769
date --iso=s
68-
time ${PRENIGHT_SIM} --scheduler auxtel.pickle.xz --opsim None --repo=${TS_CONFIG_OCS_REPO} --script ${SCHEDULER_CONFIG_SCRIPT} --config_version ${TS_CONFIG_OCS_VERSION}
70+
time ${PRENIGHT_SIM} --scheduler auxtel.pickle.xz --opsim None --repo=${TS_CONFIG_OCS_REPO} --script ${SCHEDULER_CONFIG_SCRIPT} --config_version ${TS_CONFIG_OCS_VERSION} 2>&1 > ${WORK_DIR}/prenight_sim.out
6971
date --iso=s
7072
echo "******* END of run_prenight_sims.sh *********"

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ version_file = "rubin_sim/version.py"
9595
addopts = "--ignore-glob=*/version.py --ignore-glob=*data_dir/*"
9696

9797
[tool.mypy]
98-
disallow_untyped_defs = "False"
98+
disallow_untyped_defs = "True"
9999
ignore_missing_imports = "True"
100100
exclude = "version.py"
101101

rubin_sim/sim_archive/prenight.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import os
1616
import pickle
1717
import typing
18+
import warnings
1819
from datetime import datetime
1920
from functools import partial
2021
from tempfile import TemporaryFile
@@ -45,7 +46,7 @@
4546
except ModuleNotFoundError:
4647
get_baseline = partial(warn, "Cannot find default baseline because rubin_sim is not installed.")
4748

48-
DEFAULT_ARCHIVE_URI = "s3://rubin-scheduler-prenight/opsim/"
49+
DEFAULT_ARCHIVE_URI = "s3://rubin:rubin-scheduler-prenight/opsim/"
4950

5051

5152
def _run_sim(
@@ -344,6 +345,31 @@ def prenight_sim_cli(cli_args: list = []) -> None:
344345
parser.add_argument("--scheduler", type=str, default=None, help="pickle file of the scheduler to run.")
345346
parser.add_argument("--config_version", type=str, default=None, help="Version of ts_config_ocs used.")
346347

348+
# Configure logging
349+
stream_handler = logging.StreamHandler()
350+
stream_handler.setLevel(logging.INFO)
351+
log_handlers = [stream_handler]
352+
353+
log_file = os.environ.get("PRENIGHT_LOG_FILE", None)
354+
if log_file is not None:
355+
file_handler = logging.FileHandler(log_file, mode="w")
356+
file_handler.setLevel(logging.DEBUG)
357+
log_handlers.append(file_handler)
358+
359+
logging.basicConfig(
360+
level=logging.DEBUG,
361+
format="%(asctime)s: %(message)s",
362+
datefmt="%Y-%m-%dT%H:%M:%S%z",
363+
handlers=[stream_handler, file_handler],
364+
)
365+
366+
# FIXME
367+
warnings.filterwarnings(
368+
"ignore",
369+
category=UserWarning,
370+
message="IntRounded being used with a potentially too-small scale factor.",
371+
)
372+
347373
# Only pass a default if we have an opsim
348374
baseline = get_baseline()
349375
if baseline is not None:

rubin_sim/sim_archive/sim_archive.py

+76-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66
"check_opsim_archive_resource",
77
"read_archived_sim_metadata",
88
"make_sim_archive_cli",
9+
"compile_sim_metadata",
10+
"read_sim_metadata_from_hdf",
11+
"verify_compiled_sim_metadata",
912
"drive_sim",
13+
"compile_sim_archive_metadata_cli",
14+
"find_latest_prenight_sim_for_nights",
15+
"fetch_latest_prenight_sim_for_nights",
16+
"fetch_obsloctap_visits",
1017
]
1118

1219
import argparse
@@ -243,7 +250,7 @@ def convert_mjd_to_dayobs(mjd):
243250
return data_path
244251

245252

246-
def transfer_archive_dir(archive_dir, archive_base_uri="s3://rubin-scheduler-prenight/opsim/"):
253+
def transfer_archive_dir(archive_dir, archive_base_uri="s3://rubin:rubin-scheduler-prenight/opsim/"):
247254
"""Transfer the contents of an archive directory to an resource.
248255
249256
Parameters
@@ -253,22 +260,25 @@ def transfer_archive_dir(archive_dir, archive_base_uri="s3://rubin-scheduler-pre
253260
transferred.
254261
archive_base_uri : `str`, optional
255262
The base URI where the archive files will be transferred to.
256-
Default is "s3://rubin-scheduler-prenight/opsim/".
263+
Default is "s3://rubin:rubin-scheduler-prenight/opsim/".
257264
258265
Returns
259266
-------
260267
resource_rpath : `ResourcePath`
261268
The destination resource.
262269
"""
263270

271+
LOGGER.debug(f"Beginning copy of {archive_dir} to {archive_base_uri}.")
264272
metadata_fname = Path(archive_dir).joinpath("sim_metadata.yaml")
265273
with open(metadata_fname, "r") as metadata_io:
266274
sim_metadata = yaml.safe_load(metadata_io)
275+
LOGGER.debug(f"Completed read of {archive_dir}.")
267276

268277
insert_date = datetime.datetime.utcnow().date().isoformat()
269278
insert_date_rpath = ResourcePath(archive_base_uri).join(insert_date, forceDirectory=True)
270279
if not insert_date_rpath.exists():
271280
insert_date_rpath.mkdir()
281+
LOGGER.debug(f"Created {insert_date_rpath}.")
272282

273283
# Number the sims in the insert date dir by
274284
# looing for all the interger directories, and choosing the next one.
@@ -285,6 +295,7 @@ def transfer_archive_dir(archive_dir, archive_base_uri="s3://rubin-scheduler-pre
285295
new_id = max(found_ids) + 1 if len(found_ids) > 0 else 1
286296
resource_rpath = insert_date_rpath.join(f"{new_id}", forceDirectory=True)
287297
resource_rpath.mkdir()
298+
LOGGER.debug(f"Created {resource_rpath}.")
288299

289300
# Include the metadata file itself.
290301
sim_metadata["files"]["metadata"] = {"name": "sim_metadata.yaml"}
@@ -293,6 +304,7 @@ def transfer_archive_dir(archive_dir, archive_base_uri="s3://rubin-scheduler-pre
293304
source_fname = Path(archive_dir).joinpath(file_info["name"])
294305
with open(source_fname, "rb") as source_io:
295306
content = source_io.read()
307+
LOGGER.debug(f"Read {source_fname}.")
296308

297309
destination_rpath = resource_rpath.join(file_info["name"])
298310
destination_rpath.write(content)
@@ -315,17 +327,24 @@ def check_opsim_archive_resource(archive_uri):
315327
validity: `dict`
316328
A dictionary of files checked, and their validity.
317329
"""
330+
LOGGER.debug(f"Starting to check file hashes in opsim sim archive {archive_uri}.")
318331
metadata_path = ResourcePath(archive_uri).join("sim_metadata.yaml")
319332
with metadata_path.open(mode="r") as metadata_io:
320333
sim_metadata = yaml.safe_load(metadata_io)
334+
LOGGER.debug(f"Read sim metadata from {metadata_path}.)")
321335

322336
results = {}
323337

324338
for file_info in sim_metadata["files"].values():
325339
resource_path = ResourcePath(archive_uri).join(file_info["name"])
340+
LOGGER.info(f"Reading {resource_path}.")
326341
content = resource_path.read()
327342

328343
results[file_info["name"]] = file_info["md5"] == hashlib.md5(content).hexdigest()
344+
if results[file_info["name"]]:
345+
LOGGER.debug(f"{resource_path} checked and found to match recorded md5.")
346+
else:
347+
LOGGER.debug(f"{resource_path} has an md5 that differs from the recorded md5!")
329348

330349
return results
331350

@@ -386,11 +405,15 @@ def read_archived_sim_metadata(
386405
"""
387406
latest_mjd = int(Time.now().mjd if latest is None else Time(latest).mjd)
388407
earliest_mjd = int(latest_mjd - (num_nights - 1))
408+
LOGGER.debug(
409+
f"Looking for simulation metadata with MJD between {earliest_mjd} and {latest_mjd} in {base_uri}."
410+
)
389411

390412
compilation = {}
391413
compiled_uris_by_date = {}
392414
max_compiled_date = "1900-01-01"
393415
if compilation_resource is not None:
416+
LOGGER.debug(f"Reading metadata cache {compilation_resource}.")
394417
try:
395418
compilation.update(read_sim_metadata_from_hdf(compilation_resource))
396419
for uri in compilation:
@@ -399,6 +422,7 @@ def read_archived_sim_metadata(
399422
compiled_uris_by_date[iso_date] = []
400423
compiled_uris_by_date[iso_date].append(uri)
401424
max_compiled_date = max(max_compiled_date, iso_date)
425+
LOGGER.debug(f"Maximum simulation execution date in metadata cache: {max_compiled_date}")
402426
except FileNotFoundError:
403427
LOGGER.warning(f"No metadata cache {compilation_resource}, not using cache.")
404428
pass
@@ -418,17 +442,25 @@ def read_archived_sim_metadata(
418442
):
419443
for found_file in found_files:
420444
found_resource = ResourcePath(base_dir).join(found_file)
445+
LOGGER.debug(f"Found {found_resource}")
421446
sim_uri = str(found_resource.dirname())
422447
if sim_uri in compilation:
448+
LOGGER.debug(f"Not reading {found_resource}, already in the read compliation.")
423449
these_metadata = compilation[sim_uri]
424450
else:
451+
LOGGER.debug(f"Reading {found_resource} (absent from compilation).")
425452
these_metadata = yaml.safe_load(found_resource.read().decode("utf-8"))
426453
these_metadata["label"] = _build_archived_sim_label(
427454
base_uri, found_resource, these_metadata
428455
)
456+
LOGGER.debug(f"Read successfully: {found_resource}")
429457
if iso_date < max_compiled_date:
430-
print(f"Simulation at {sim_uri} expected but not found in compilation.")
458+
LOGGER.error(
459+
f"Simulation at {sim_uri} expected but not found in compilation."
460+
)
431461
all_metadata[sim_uri] = these_metadata
462+
else:
463+
LOGGER.debug(f"No simulations found with generation date of {iso_date}")
432464
else:
433465
if iso_date in compiled_uris_by_date:
434466
for sim_uri in compiled_uris_by_date[iso_date]:
@@ -438,7 +470,18 @@ def read_archived_sim_metadata(
438470
if iso_date in compiled_uris_by_date:
439471
for sim_uri in compiled_uris_by_date[iso_date]:
440472
if sim_uri not in all_metadata:
441-
print(f"Simulation at {sim_uri} in compiled metadata but not archive.")
473+
message = f"Simulation at {sim_uri} in compiled metadata but not archive."
474+
print(message)
475+
LOGGER.error(message)
476+
else:
477+
LOGGER.debug(
478+
f"Date {iso_date} not expected to be in the metadata compilation, not checking for it."
479+
)
480+
481+
if len(all_metadata) == 0:
482+
earliest_iso = Time(earliest_mjd, format="mjd").iso[:10]
483+
latest_iso = Time(latest_mjd, format="mjd").iso[:10]
484+
LOGGER.info(f"No simulations run between {earliest_iso} through {latest_iso} found in {base_uri}")
442485

443486
return all_metadata
444487

@@ -488,7 +531,7 @@ def make_sim_archive_cli(*args):
488531
parser.add_argument(
489532
"--archive_base_uri",
490533
type=str,
491-
default="s3://rubin-scheduler-prenight/opsim/",
534+
default="s3://rubin:rubin-scheduler-prenight/opsim/",
492535
help="Base URI for the archive",
493536
)
494537
parser.add_argument("--tags", type=str, default=[], nargs="*", help="The tags on the simulation.")
@@ -529,8 +572,10 @@ def make_sim_archive_cli(*args):
529572
label=arg_values.label,
530573
capture_env=arg_values.current_env,
531574
)
575+
LOGGER.info(f"Created simulation archived directory: {data_path.name}")
532576

533577
sim_archive_uri = transfer_archive_dir(data_path.name, arg_values.archive_base_uri)
578+
LOGGER.info(f"Transferred {data_path} to {sim_archive_uri}")
534579

535580
return sim_archive_uri
536581

@@ -558,6 +603,7 @@ def compile_sim_metadata(
558603
compilation_fname : `ResourcePath`
559604
The resource to which the hdf5 file was written.
560605
"""
606+
LOGGER.debug("Starting compile_sim_metadata.")
561607

562608
if append:
563609
sim_metadata = read_archived_sim_metadata(
@@ -637,6 +683,7 @@ def read_sim_metadata_from_hdf(compilation_resource: str | ResourcePath) -> dict
637683

638684
with compilation_resource.as_local() as local_compilation_resource:
639685
compilation_fname: str = local_compilation_resource.ospath
686+
LOGGER.debug(f"{compilation_resource} copied to {compilation_fname}.")
640687
sim_df = pd.read_hdf(compilation_fname, "simulations")
641688
file_df = pd.read_hdf(compilation_fname, "files")
642689
sim_runner_kwargs_df = pd.read_hdf(compilation_fname, "kwargs")
@@ -868,14 +915,17 @@ def drive_sim(
868915
in_files["notebook"] = notebook
869916

870917
with TemporaryDirectory() as local_data_dir:
918+
LOGGER.debug(f"Using temporary directory {local_data_dir}.")
871919
# We want to store the state of the scheduler at the start of
872920
# the sim, so we need to save it now before we run the simulation.
873921
scheduler_path = Path(local_data_dir).joinpath("scheduler.pickle.xz")
874922
with lzma.open(scheduler_path, "wb", format=lzma.FORMAT_XZ) as pio:
875923
pickle.dump(scheduler, pio)
876924
in_files["scheduler"] = scheduler_path.as_posix()
877925

926+
LOGGER.debug("About to call sim_runner.")
878927
sim_results = sim_runner(observatory, scheduler, **kwargs)
928+
LOGGER.debug("sim_runner complete.")
879929

880930
observations = sim_results[2]
881931
reward_df = sim_results[3] if scheduler.keep_rewards else None
@@ -924,6 +974,14 @@ def compile_sim_archive_metadata_cli(*args):
924974
+ "but add new simulations with dates after the last current entry.",
925975
)
926976

977+
log_file = os.environ.get("SIM_ARCHIVE_LOG_FILE", None)
978+
if log_file is not None:
979+
logging.basicConfig(
980+
filename=log_file, format="%(asctime)s: %(message)s", datefmt="%Y-%m-%dT%H:%M:%S%z"
981+
)
982+
else:
983+
logging.basicConfig(level=logging.INFO)
984+
927985
arg_values = parser.parse_args() if len(args) == 0 else parser.parse_args(args)
928986
archive_uri = arg_values.archive_base_uri
929987
compilation_uri = arg_values.compilation_uri
@@ -983,6 +1041,7 @@ def find_latest_prenight_sim_for_nights(
9831041
sim_metadata = read_archived_sim_metadata(
9841042
archive_uri, num_nights=max_simulation_age, compilation_resource=compilation_uri
9851043
)
1044+
LOGGER.debug(f"Total simulations it the last {max_simulation_age} days: {len(sim_metadata)}.")
9861045

9871046
best_sim = None
9881047
for uri, sim in sim_metadata.items():
@@ -1010,6 +1069,9 @@ def find_latest_prenight_sim_for_nights(
10101069
.join(f"{best_sim['date_index']}", forceDirectory=True)
10111070
.join(best_sim["files"]["observations"]["name"])
10121071
)
1072+
LOGGER.info(f"Most recent simulation meeting requested criteria is {best_sim['uri']}.")
1073+
else:
1074+
LOGGER.debug("No simulations met the requested criteria.")
10131075

10141076
return best_sim
10151077

@@ -1022,7 +1084,7 @@ def fetch_latest_prenight_sim_for_nights(
10221084
archive_uri: str = "s3://rubin:rubin-scheduler-prenight/opsim/",
10231085
compilation_uri: str = "s3://rubin:rubin-scheduler-prenight/opsim/compiled_metadata_cache.h5",
10241086
**kwargs,
1025-
) -> pd.DataFrame:
1087+
) -> pd.DataFrame | None:
10261088
"""Fetches visit parameters from the latest archived pre-night simulation
10271089
with requested tags for a specified day of observing.
10281090
@@ -1059,9 +1121,15 @@ def fetch_latest_prenight_sim_for_nights(
10591121
sim_metadata = find_latest_prenight_sim_for_nights(
10601122
first_day_obs, last_day_obs, tags, max_simulation_age, archive_uri, compilation_uri
10611123
)
1062-
visits = get_sim_data(sim_metadata["opsim_rp"], **kwargs)
1124+
if sim_metadata is None:
1125+
LOGGER.info("No simulations meet requested criteria.")
1126+
result = None
1127+
else:
1128+
visits = get_sim_data(sim_metadata["opsim_rp"], **kwargs)
1129+
LOGGER.debug(f"Loaded {len(visits)} from {sim_metadata['opsim_rp']}")
1130+
result = pd.DataFrame(visits)
10631131

1064-
return pd.DataFrame(visits)
1132+
return result
10651133

10661134

10671135
def fetch_obsloctap_visits(day_obs: str | None = None, nights: int = 2) -> pd.DataFrame:

0 commit comments

Comments
 (0)