runner
runner ¶
File containing functionality for running the various steps in the workflow.
dumpconfig ¶
dumpconfig(config: ImmutableConfig) -> None
Dump configuration files used to do stuff, useful for debugging config issues after the fact
Source code in src/mapyde/runner.py
def dumpconfig(config: ImmutableConfig) -> None:
"""
Dump configuration files used to do stuff, useful for debugging config issues after the fact
"""
output_path = (
Path(config["base"]["path"])
.joinpath(config["base"]["output"])
.joinpath("configs")
.resolve()
)
output_path.mkdir(parents=True, exist_ok=True)
now = datetime.now()
with utils.output_path(config).joinpath(
f"configs/config_{now.year}{now.month}{now.day}{now.hour}{now.minute}{now.second}.json"
).open(
"w",
encoding="utf-8",
) as outfile:
json.dump(config, outfile, ensure_ascii=False, indent=4)
mounts ¶
mounts(config: ImmutableConfig) -> list[tuple[PathOrStr, PathOrStr]]
define mount points for all containers
Source code in src/mapyde/runner.py
def mounts(config: ImmutableConfig) -> list[tuple[PathOrStr, PathOrStr]]:
"""
define mount points for all containers
"""
return [
(str(Path(config["base"]["cards_path"]).resolve()), "/cards"),
(str(Path(config["base"]["scripts_path"]).resolve()), "/scripts"),
(str(Path(config["base"]["likelihoods_path"]).resolve()), "/likelihoods"),
(str(utils.output_path(config)), "/data"),
]
run_ana ¶
run_ana(config: ImmutableConfig) -> tuple[bytes, bytes]
Run analysis.
Source code in src/mapyde/runner.py
def run_ana(config: ImmutableConfig) -> tuple[bytes, bytes]:
"""
Run analysis.
"""
xsec = 1000.0
if config["analysis"]["XSoverride"] > 0:
xsec = config["analysis"]["XSoverride"]
else:
if (
"run_without_decays" in config["madgraph"]
and config["madgraph"]["run_without_decays"]
):
# modify config to access XS from run without decays
origcard = config["madgraph"]["proc"]["card"]
origout = config["base"]["output"]
config["madgraph"]["proc"]["card"] = (
config["madgraph"]["proc"]["card"] + "_nodecays"
)
config["base"]["output"] = config["base"]["output"] + "_nodecays"
with utils.output_path(config).joinpath(
config["base"]["logs"], "docker_mgpy.log"
).open(encoding="utf-8") as fpointer:
for line in fpointer.readlines():
# TODO: can we flip this logic around to be better?
# refactor into a parse_xsec utility or something?
if "Cross-section :" in line:
xsec = float(line.split()[2]) # take the last instance
# change config options back
config["madgraph"]["proc"]["card"] = origcard
config["base"]["output"] = origout
# if we're doing MLM matching and not trusting the final XS output by Pythia, then
# fix the XS from before decays to account for matching efficiency
if config["madgraph"]["run"]["options"]["xqcut"] > 0:
with utils.output_path(config).joinpath(
config["base"]["logs"], "docker_mgpy.log"
).open(encoding="utf-8") as fpointer:
for line in fpointer.readlines():
if "Nb of events after merging" in line:
xsec *= (
float(line.split()[6]) / config["madgraph"]["nevents"]
) # take the last instance
elif (
config["madspin"]["skip"] is False
and "branchingratio" in config["analysis"]
and config["analysis"]["branchingratio"] > 0
):
# we've run madspin AND set a non-zero BR in the configuration, so we're going
# to take the cross section from before madspin runs.
with utils.output_path(config).joinpath(
config["base"]["logs"], "docker_mgpy.log"
).open(encoding="utf-8") as fpointer:
for line in fpointer.readlines():
# TODO: can we flip this logic around to be better?
# refactor into a parse_xsec utility or something?
if "Cross-section :" in line:
xsec = float(line.split()[2]) # take the first instance
break
# if we're doing MLM matching and not trusting the final XS output by Pythia, then
# fix the XS from before decays to account for matching efficiency
if config["madgraph"]["run"]["options"]["xqcut"] > 0:
with utils.output_path(config).joinpath(
config["base"]["logs"], "docker_mgpy.log"
).open(encoding="utf-8") as fpointer:
for line in fpointer.readlines():
if "Nb of events after merging" in line:
xsec *= (
float(line.split()[6]) / config["madgraph"]["nevents"]
) # take the last instance
else:
with utils.output_path(config).joinpath(
config["base"]["logs"], "docker_mgpy.log"
).open(encoding="utf-8") as fpointer:
for line in fpointer.readlines():
# TODO: can we flip this logic around to be better?
# refactor into a parse_xsec utility or something?
if config["madgraph"]["run"]["options"]["xqcut"] > 0:
if "Matched cross-section :" in line:
xsec = float(line.split()[3]) # take the last instance
else:
if "Cross-section :" in line:
xsec = float(line.split()[2]) # take the last instance
if "branchingratio" in config["analysis"]:
xsec *= config["analysis"]["branchingratio"]
if config["analysis"]["kfactor"] > 0:
xsec *= config["analysis"]["kfactor"]
image = f"ghcr.io/scipp-atlas/mapyde/{config['delphes']['version']}"
command = bytes(
f"""mkdir -p {Path(config['analysis']['output']).parent} && \
/scripts/{config['analysis']['script']} --input {Path('/data').joinpath(config['delphes']['output'])} \
--output {config['analysis']['output']} \
--lumi {config['analysis']['lumi']} \
--XS {xsec} && \
rsync -rav . /data/""",
"utf-8",
)
with Container(
image=image,
name=f"{config['base']['output']}__hists",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
) as container:
stdout, stderr = container.call(command)
return stdout, stderr
run_delphes ¶
run_delphes(config: ImmutableConfig) -> tuple[bytes, bytes]
Run delphes.
Source code in src/mapyde/runner.py
def run_delphes(config: ImmutableConfig) -> tuple[bytes, bytes]:
"""
Run delphes.
"""
# ./test/wrapper_delphes.py config_file
image = f"ghcr.io/scipp-atlas/mapyde/{config['delphes']['version']}"
command = bytes(
f"""pwd && ls -lavh && ls -lavh /data && \
find /data/madgraph -name "*hepmc.gz" && \
cp $(find /data/madgraph -name "*hepmc.gz") hepmc.gz && \
gunzip -f hepmc.gz && \
cp /cards/delphes/{config['delphes']['card']} . && \
/bin/ls -ltrh --color && \
mkdir -p {Path(config['delphes']['output']).parent} && \
set -x && \
/usr/local/share/delphes/delphes/DelphesHepMC2 {config['delphes']['card']} {Path(config['delphes']['output'])} hepmc && \
set +x && \
rsync -rav --exclude hepmc . /data/""",
"utf-8",
)
with Container(
image=image,
name=f"{config['base']['output']}__delphes",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
) as container:
stdout, stderr = container.call(command)
return stdout, stderr
run_madgraph ¶
run_madgraph(config: ImmutableConfig) -> tuple[bytes, bytes]
Run madgraph.
Source code in src/mapyde/runner.py
def run_madgraph(config: ImmutableConfig) -> tuple[bytes, bytes]:
"""
Run madgraph.
"""
# in some cases we'll need to run MG once to get a XS, e.g. without decays, and then run again with the "real" proc card.
if (
"run_without_decays" in config["madgraph"]
and config["madgraph"]["run_without_decays"]
):
# modify config to run without decays and store in a separate area
origcard = config["madgraph"]["proc"]["card"]
origout = config["base"]["output"]
origpythia = config["pythia"]["skip"]
config["madgraph"]["proc"]["card"] = (
config["madgraph"]["proc"]["card"] + "_nodecays"
)
config["base"]["output"] = config["base"]["output"] + "_nodecays"
config["pythia"]["skip"] = True
madgraph.generate_mg5config(config)
image = f"ghcr.io/scipp-atlas/mapyde/{config['madgraph']['version']}"
command = bytes(
f"mg5_aMC /data/{config['madgraph']['output']} && rsync -a PROC_madgraph /data/madgraph\n",
"utf-8",
)
with Container(
image=image,
name=f"{config['base']['output']}__mgpy",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
) as container:
stdout, stderr = container.call(command)
# change config options back
config["madgraph"]["proc"]["card"] = origcard
config["base"]["output"] = origout
config["pythia"]["skip"] = origpythia
madgraph.generate_mg5config(config)
image = f"ghcr.io/scipp-atlas/mapyde/{config['madgraph']['version']}"
command = bytes(
f"mg5_aMC /data/{config['madgraph']['output']} && rsync -a PROC_madgraph /data/madgraph\n",
"utf-8",
)
if config["madgraph"].get("keep_output", False):
command = bytes(
f"mg5_aMC /data/{config['madgraph']['output']} && \
mkdir -p /data/madgraph && \
rsync -a PROC_madgraph/Events/run_01/unweighted_events.lhe.gz /data/madgraph/ && \
rsync -a PROC_madgraph/Events/run_01/tag_1_pythia8_events.hepmc.gz /data/madgraph/ \n",
"utf-8",
)
with Container(
image=image,
name=f"{config['base']['output']}__mgpy",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
) as container:
stdout, stderr = container.call(command)
return stdout, stderr
run_pyhf ¶
run_pyhf(config: ImmutableConfig) -> tuple[bytes, bytes, MutableConfig]
Run statistical inference via pyhf.
Source code in src/mapyde/runner.py
def run_pyhf(
config: ImmutableConfig,
) -> tuple[bytes, bytes, MutableConfig]:
"""
Run statistical inference via pyhf.
"""
assert config
image = f"ghcr.io/scipp-atlas/mapyde/{config['pyhf']['image']}"
script = Path("/scripts", config["pyhf"]["script"])
command = bytes(
f"""python3.8 {script} -b /likelihoods/{config['pyhf']['likelihood']} -s {config['sa2json']['output']} -n {config['base']['output']} {config['pyhf']['gpu-options']} {config['pyhf']['other-options']}""",
"utf-8",
)
dumpconfig(config)
addl_opts = None
if "-c" not in config["pyhf"]["gpu-options"]:
addl_opts = ["--gpus", "all"]
with Container(
image=image,
name=f"{config['base']['output']}__{script.stem}",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
cwd="/data",
additional_options=addl_opts,
) as container:
stdout, stderr = container.call(command)
with Path(config["base"]["path"]).joinpath(
config["base"]["output"], f"{script.stem}_results.json"
).open(encoding="utf-8") as fpointer:
data = json.load(fpointer)
return (
stdout,
stderr,
data,
)
run_root2hdf5 ¶
run_root2hdf5(config: ImmutableConfig) -> tuple[bytes, bytes]
Transform ROOT file to hdf5 format
Source code in src/mapyde/runner.py
def run_root2hdf5(config: ImmutableConfig) -> tuple[bytes, bytes]:
"""
Transform ROOT file to hdf5 format
"""
assert config
image = "ghcr.io/scipp-atlas/mapyde/pyplotting:latest"
command = bytes(
f"""python3 /scripts/root2hdf5.py {config['root2hdf5']['input']}:{config['root2hdf5']['treename']} """,
"utf-8",
)
with Container(
image=image,
name=f"{config['base']['output']}__root2hdf5",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
cwd="/data",
) as container:
stdout, stderr = container.call(command)
return stdout, stderr
run_sa2json ¶
run_sa2json(config: ImmutableConfig) -> tuple[bytes, bytes]
Convert SA ROOT file to HiFa JSON.
Source code in src/mapyde/runner.py
def run_sa2json(config: ImmutableConfig) -> tuple[bytes, bytes]:
"""
Convert SA ROOT file to HiFa JSON.
"""
assert config
inputstr = ""
for i in config["sa2json"]["inputs"].split(): # pylint: disable=consider-using-join
inputstr += f" -i {i} " # pylint: disable=consider-using-join
scalefactorstring = ""
if "hepmc" in config["simpleanalysis"]["input"]:
# scale weights up by kfactor*br and down by number of generated events
scalefactor = config["analysis"]["kfactor"] / config["madgraph"]["nevents"]
scalefactorstring = f"--scale {scalefactor}"
image = f"ghcr.io/scipp-atlas/mapyde/{config['sa2json']['image']}"
command = bytes(
f"""python /scripts/SAtoJSON.py {inputstr} -o {config['sa2json']['output']} -n {config['base']['output']} -b /likelihoods/{config['pyhf']['likelihood']} -l {config['analysis']['lumi']} {config['sa2json']['options']} {scalefactorstring}""",
"utf-8",
)
with Container(
image=image,
name=f"{config['base']['output']}__SA2json",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
cwd="/data",
) as container:
stdout, stderr = container.call(command)
return stdout, stderr
run_sherpa ¶
run_sherpa(config: ImmutableConfig) -> tuple[bytes, bytes]
Run sherpa.
Source code in src/mapyde/runner.py
def run_sherpa(config: ImmutableConfig) -> tuple[bytes, bytes]:
"""
Run sherpa.
"""
output_path = (
Path(config["base"]["path"]).joinpath(config["base"]["output"]).resolve()
)
output_path.mkdir(parents=True, exist_ok=True)
image = "sherpamc/sherpa:2.2.7"
command = bytes(
f"""/bin/bash -c "mkdir sherpa && \
cd sherpa && \
cp -p /cards/sherpa/{config['sherpa']['proc']} . && \
ls -ltrh && \
cat {config['sherpa']['proc']} && \
mpirun -n {config['sherpa']['cores']} Sherpa -f {config['sherpa']['proc']} -e {config['sherpa']['nevents']} && \
mv sherpa.hepmc.hepmc2g sherpa.hepmc.gz && \
cd ../ && \
cp -a sherpa/ /data/" """,
"utf-8",
)
with Container(
image=image,
name=f"{config['base']['output']}__sherpa",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
) as container:
stdout, stderr = container.call(command)
return stdout, stderr
run_simpleanalysis ¶
run_simpleanalysis(config: ImmutableConfig) -> tuple[bytes, bytes]
Run SimpleAnalysis.
Source code in src/mapyde/runner.py
def run_simpleanalysis(config: ImmutableConfig) -> tuple[bytes, bytes]:
"""
Run SimpleAnalysis.
"""
image = "gitlab-registry.cern.ch/atlas-sa/simple-analysis:master"
command = bytes(
f"""mkdir -p tmp_SA && cd tmp_SA && \
/opt/SimpleAnalysis/ci/entrypoint.sh simpleAnalysis -a {config['simpleanalysis']['name']} ../{config['analysis']['output']} -n && \
mv {config['simpleanalysis']['name']}.root ../{config['simpleanalysis']['name']}{config['simpleanalysis']['outputtag']}.root && \
mv {config['simpleanalysis']['name']}.txt ../{config['simpleanalysis']['name']}{config['simpleanalysis']['outputtag']}.txt && \
cd ../ && rm -rf tmp_SA""",
"utf-8",
)
if (
"input" in config["simpleanalysis"]
and "hepmc" in config["simpleanalysis"]["input"]
):
command = bytes(
f"""mkdir -p tmp_SA && cd tmp_SA && \
find /data -name "*hepmc.gz" && \
cp $(find /data/madgraph -name "*hepmc.gz") hepmc.gz && \
gunzip -f hepmc.gz && \
/opt/SimpleAnalysis/ci/entrypoint.sh simpleAnalysis -a {config['simpleanalysis']['name']} {config['simpleanalysis']['input']} -n && \
mv {config['simpleanalysis']['name']}.root ../{config['simpleanalysis']['name']}{config['simpleanalysis']['outputtag']}.root && \
mv {config['simpleanalysis']['name']}.txt ../{config['simpleanalysis']['name']}{config['simpleanalysis']['outputtag']}.txt && \
rm hepmc && \
cd ../ && rm -rf tmp_SA""",
"utf-8",
)
with Container(
image=image,
name=f"{config['base']['output']}__simpleanalysis",
engine=config["base"].get("engine", "docker"),
mounts=mounts(config),
stdout=sys.stdout,
cwd="/data",
output_path=utils.output_path(config),
logs_path=config["base"]["logs"],
) as container:
stdout, stderr = container.call(command)
return stdout, stderr
Last update: June 15, 2023