# -*- coding: utf-8 -*-
import io
import logging
import os
import stat
import sys
from jinja2 import Environment, FileSystemLoader
from plumbum import SshMachine, local, ProcessExecutionError
from freckles.defaults import MODULE_FOLDER
from freckles.exceptions import FrecklesConfigException
log = logging.getLogger("freckles")
[docs]class ShellRunner(object):
def __init__(self):
if not hasattr(sys, "frozen"):
template_dir = os.path.join(MODULE_FOLDER, "templates", "shell_adapter")
else:
template_dir = os.path.join(
sys._MEIPASS, "freckles", "templates", "shell_adapter"
)
self.jinja_env = Environment(loader=FileSystemLoader(template_dir))
[docs] def render_environment(self, run_env_dir, tasklist, skip_exodus=True):
# making the run environment user accessible only
os.chmod(run_env_dir, 0o0700)
result = {"run_dir": run_env_dir, "run_dir_name": os.path.basename(run_env_dir)}
working_dir = os.path.join(run_env_dir, "working_dir")
os.mkdir(working_dir)
executables_dir = os.path.join(run_env_dir, "executables")
os.mkdir(executables_dir)
result["working_dir"] = working_dir
result["executables_dir"] = executables_dir
log.debug("Creating shell script from template...")
template = self.jinja_env.get_template("shell_runner.sh.j2")
extra_paths = []
functions = {}
all_exodus_binaries = []
for task in tasklist:
for f_name, f_details in task.get("files", {}).items():
target = os.path.join(executables_dir, f_name)
f_type = f_details["type"]
if f_type == "string_content":
content = f_details["content"]
with io.open(target, "w", encoding="utf-8") as f:
f.write(content)
# make executable
st = os.stat(target)
os.chmod(target, st.st_mode | stat.S_IEXEC)
elif f_type == "exodus-binary":
name = f_details["binary-name"]
if name not in all_exodus_binaries:
all_exodus_binaries.append(name)
else:
raise FrecklesConfigException(
"Unknown external file type: {}".format(f_type)
)
for f_name, f_details in task.get("functions", {}).items():
if f_name in functions.keys():
log.warning("Duplicate function: {}".format(f_name))
functions[f_name] = f_details
if not skip_exodus:
exodus_bundle = os.path.join(run_env_dir, "exodus-binaries", "bundle.sh")
exodus_cmd = local["exodus"]
exodus_args = ["-o", exodus_bundle]
exodus_args.append(all_exodus_binaries)
rc, stdout, stderr = exodus_cmd.run(exodus_args)
result["tasklist"] = tasklist
repl_dict = {
"extra_paths": extra_paths,
"functions": functions,
"tasklist": tasklist,
}
rendered = template.render(repl_dict)
run_script = os.path.join(run_env_dir, "run.sh")
with io.open(run_script, "w", encoding="utf-8") as rs:
rs.write(rendered)
# make run.sh executable
st = os.stat(run_script)
os.chmod(run_script, st.st_mode | stat.S_IEXEC)
result["run_script"] = run_script
return result
[docs] def run(
self,
run_properties,
run_cnf,
# output_callback,
result_callback,
parent_task,
# callback_adapter,
delete_env=False,
):
# run_dir = run_properties["run_dir"]
hostname = run_cnf.get("host")
connection_type = run_cnf.get("connection_type", None)
if connection_type is None:
if hostname in ["localhost", "127.0.0.1"]:
connection_type = "local"
else:
connection_type = "ssh"
if connection_type == "ssh":
remote = True
ssh_key = run_cnf.get("ssh_key")
user = run_cnf.get("user")
host_ip = run_cnf.get("host_ip")
ssh_port = run_cnf.get("ssh_port")
# otherwise we run into problems with Vagrant
if host_ip:
h = host_ip
else:
h = hostname
machine = SshMachine(h, port=ssh_port, user=user, keyfile=ssh_key)
else:
remote = False
machine = local
no_run = run_cnf.get("no_run")
if no_run:
return run_properties
if remote:
raise Exception("Not implemented yet")
# copy execution environment
# td = TaskDetail(
# "uploading execution environment", task_type="upload", task_parent=td
# )
# upload_task = td.add_subtask(
# "uploading execution environment", category="upload"
# )
# machine.upload(run_dir, "/tmp")
# upload_task.finish(success=True, changed=True, skipped=False)
# run_script = os.path.join("/tmp", run_properties["run.sh"])
else:
run_script = run_properties["run_script"]
machine.env["ECHO_TASK_START"] = "true"
machine.env["ECHO_TASK_FINISHED"] = "true"
cmd = machine["bash"]
# rc, stdout, stderr = cmd.run([run_script], retcode=None)
current_task = parent_task
current_task_stdout = None
current_task_stderr = None
pending_finshed = None
try:
popen = cmd.popen(run_script)
current_task_id = -1
log.debug("Reading command output...")
for line in popen.iter_lines():
log.debug(line)
stderr = line[1]
stdout = line[0]
stderr_processed = False
if stdout:
if stdout.startswith("STARTING_TASK["):
if pending_finshed:
stdout_msg = "\n".join(current_task_stdout)
stderr_msg = "\n".join(current_task_stderr)
current_task = current_task.finish(
msg=stdout_msg, error_msg=stderr_msg, **pending_finshed
)
pending_finshed = None
current_task_stdout = []
current_task_stderr = []
index = stdout.index("]")
task_id = int(stdout[14:index])
current_msg = stdout[index + 2 :].strip() # noqa
# print(stdout)
if task_id > current_task_id:
# print("starting: {}".format(msg))
# td = TaskDetail(
# task_name=current_msg,
# task_type="script-command",
# task_parent=current_task,
# task_title=current_msg,
# freckles_task_id=task_id,
# )
current_task = current_task.add_subtask(
task_name=current_msg,
category="script-command",
reference=task_id,
)
# output_callback.task_started(td)
current_task_id = task_id
if stderr:
current_task_stderr.append(stderr)
stderr_processed = True
elif stdout.startswith("FINISHED_TASK["):
index = stdout.index("]")
task_id = int(stdout[14:index])
rc = int(stdout[index + 2 :]) # noqa
if rc == 0:
success = True
skipped = False
changed = True
elif rc == 100:
success = True
skipped = False
changed = False
elif rc == 101:
success = True
skipped = True
changed = False
else:
success = False
skipped = None
changed = None
if stderr:
current_task_stderr.append(stderr)
stderr_processed = True
pending_finshed = dict(
success=success, changed=changed, skipped=skipped
)
else:
if stdout:
if stdout.lower().startswith("error:"):
current_task_stderr.append(stdout[6:])
else:
current_task_stdout.append(stdout)
if stderr and not stderr_processed:
current_task_stderr.append(stderr)
rc = popen._proc.returncode
if pending_finshed:
stdout_msg = "\n".join(current_task_stdout)
stderr_msg = "\n".join(current_task_stderr)
current_task = current_task.finish(
msg=stdout_msg, error_msg=stderr_msg, **pending_finshed
)
run_properties["return_code"] = rc
run_properties["signal_status"] = -1
except (ProcessExecutionError) as e:
# for l in current_task_stdout:
# current_task.add_result_msg(l)
# for l in current_task_stderr:
# current_task.add_result_error(l)
current_task.finish(success=False, msg=stdout, error_msg=str(e))
if remote:
raise Exception("not implemented yet")
# if delete_env:
# td = TaskDetail(
# "deleting execution environment",
# task_type="delete",
# task_parent=current_task,
# )
# task_
# output_callback.task_started(td)
# delete = machine["rm"]
# rc, stdout, stderr = delete.run(
# ["-r", os.path.join("/tmp", run_properties["env_dir_name"])]
# )
# success = rc == 0
# output_callback.register_task_finished(td, success=success)
# machine.close()
return run_properties