# -*- coding: utf-8 -*-
import abc
import csv
import io
import json
import logging
import os
import threading
import time
from collections import OrderedDict
import click
import fasteners
import psutil
import six
from colorama import Fore
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from freckles.defaults import (
FRECKLES_RUN_LOG_FILE_PATH,
FRECKLES_RUN_LOG_FILE_LOCK,
FRECKLES_LAST_RUN_FILE_PATH,
)
from frutils.exceptions import FrklException
run_log_lock = threading.Lock()
log = logging.getLogger("freckles")
[docs]def convert_log_file_row(row):
data = {}
data["uuid"] = row[0]
data["run_alias"] = row[1]
data["frecklet_name"] = row[2]
data["adapter"] = row[3]
data["env_dir"] = row[4]
data["state"] = row[5]
data["timestamp"] = float(row[6])
data["pid"] = int(row[7])
data["proc_name"] = row[8].strip()
return data
[docs]def freckles_run_process_exists(run_data):
if isinstance(run_data, int):
pid = run_data
else:
pid = run_data["pid"]
if pid < 0:
return False
running = psutil.pid_exists(pid)
if not running:
return False
proc = psutil.Process(pid)
if proc.name().strip() != run_data["proc_name"]:
return False
return True
# This should never be called manually, only once using the 'atexit' register method in freckles.py
# @fasteners.interprocess_locked(path=FRECKLES_RUN_LOG_FILE_LOCK)
[docs]def clean_runs_log_file():
if not os.path.exists(FRECKLES_RUN_LOG_FILE_PATH):
return
try:
with fasteners.InterProcessLock(FRECKLES_RUN_LOG_FILE_LOCK):
with run_log_lock:
with io.open(FRECKLES_RUN_LOG_FILE_PATH, "r", encoding="utf-8") as f:
lines = f.readlines()
result = []
for line in lines:
if not line.strip():
continue
data = convert_log_file_row(line.split(","))
if data["state"] != "started":
continue
# if data["pid"] == os.getpid():
# continue
if not freckles_run_process_exists(data):
continue
result.append(line)
with io.open(FRECKLES_RUN_LOG_FILE_PATH, "w", encoding="utf-8") as f:
f.write("".join(result))
except (Exception) as e:
print("CLEANUP FAILED")
log.debug("Could not clean up runs log file: {}".format(e))
# @fasteners.interprocess_locked(path=FRECKLES_RUN_LOG_FILE_LOCK)
[docs]def write_runs_log(properties, adapter_name, state):
try:
# currently, keeping logs is not supported
# the cleanup function above needs to be adjusted for that
keep_logs = False
with fasteners.InterProcessLock(FRECKLES_RUN_LOG_FILE_LOCK):
with run_log_lock:
if state == "started":
pid = os.getpid()
proc_name = psutil.Process(pid).name()
else:
pid = -1
proc_name = "-"
if keep_logs:
row = [
properties["uuid"],
properties["run_metadata"].get(
"run_alias", properties["frecklet_name"]
),
properties["frecklet_name"],
adapter_name,
properties["env_dir"],
state,
time.time(),
pid,
proc_name,
]
with io.open(
FRECKLES_LAST_RUN_FILE_PATH, "w", encoding="utf-8", buffering=1
) as f:
writer = csv.writer(f)
writer.writerow(row)
with io.open(
FRECKLES_RUN_LOG_FILE_PATH, "a", encoding="utf-8", buffering=1
) as f:
writer = csv.writer(f)
writer.writerow(row)
else:
if state == "started":
row = [
properties["uuid"],
properties["run_metadata"].get(
"run_alias", properties["frecklet_name"]
),
properties["frecklet_name"],
adapter_name,
properties["env_dir"],
state,
time.time(),
pid,
proc_name,
]
with io.open(
FRECKLES_RUN_LOG_FILE_PATH,
"a",
encoding="utf-8",
buffering=1,
) as f:
writer = csv.writer(f)
writer.writerow(row)
with io.open(
FRECKLES_LAST_RUN_FILE_PATH,
"w",
encoding="utf-8",
buffering=1,
) as f:
writer = csv.writer(f)
writer.writerow(row)
else:
with open(FRECKLES_RUN_LOG_FILE_PATH, "r") as inp, open(
FRECKLES_RUN_LOG_FILE_PATH + ".tmp", "w", buffering=1
) as out:
writer = csv.writer(out)
for row in csv.reader(inp):
if not row:
continue
if row[0] != properties["uuid"]:
writer.writerow(row)
os.rename(
FRECKLES_RUN_LOG_FILE_PATH + ".tmp",
FRECKLES_RUN_LOG_FILE_PATH,
)
except (Exception) as e:
print("COULD NOT WRITE JOBS LOG: {}".format(e))
log.debug("Could not write run log file: {}".format(e))
[docs]def get_current_runs():
if not os.path.exists(FRECKLES_RUN_LOG_FILE_PATH):
return {}
content = OrderedDict()
with io.open(FRECKLES_RUN_LOG_FILE_PATH, "r", encoding="utf-8") as f:
for row in csv.reader(f):
if not row:
continue
data = convert_log_file_row(row)
if data["state"] == "started":
content[data["uuid"]] = data
else:
if data["uuid"] in content:
content.pop(data["uuid"])
result = OrderedDict()
for uuid, data in content.items():
if not freckles_run_process_exists(data):
continue
result[uuid] = data
return result
[docs]def get_last_run():
if not os.path.exists(FRECKLES_LAST_RUN_FILE_PATH):
return None
with io.open(FRECKLES_LAST_RUN_FILE_PATH, "r", encoding="utf-8") as f:
for row in csv.reader(f):
if not row:
continue
data = convert_log_file_row(row)
break
if not os.path.exists(data["env_dir"]):
return None
return data
[docs]class RunWatchManager(object):
def __init__(self, *run_watchers):
self._run_watchers = run_watchers
self._lock = threading.Lock()
self._current_runs = None
self._runs_event_handler = FrecklesRunsListFileHandler(
callback=self.update_current_runs
)
self._runs_observer = None
self._aliases = {}
self._unique_index = {}
[docs] def start(self):
current_runs = get_current_runs()
self.update_current_runs(current_runs)
self._runs_observer = watch_runs(self._runs_event_handler)
[docs] def stop(self):
for watcher in self._run_watchers:
watcher.stop()
self._runs_observer.stop()
[docs] def join_runs_watch(self):
self._runs_observer.join()
[docs] @fasteners.locked
def update_current_runs(self, current_runs):
if self._current_runs:
old_current = self._current_runs
else:
old_current = {}
self._current_runs = current_runs
for uuid, r in self._current_runs.items():
if uuid in old_current.keys():
continue
index = 0
while index in self._unique_index.values():
index = index + 1
alias = r["run_alias"]
if alias.startswith("__dyn_"):
alias = "no_name"
elif os.path.sep in alias:
alias = "...{}{}".format(os.path.sep, os.path.basename(alias))
if alias not in self._aliases.keys():
self._aliases[alias] = 1
alias_new = alias
else:
current = self._aliases[alias]
current = current + 1
alias_new = "{}_{}".format(alias, current)
self._aliases[alias] = current
self.run_started(uuid=uuid, alias=alias_new, run_data=r, index=index)
old_current[uuid] = r
self._unique_index[uuid] = index
remove = []
for uuid, run_data in old_current.items():
if uuid not in self._current_runs.keys():
remove.append(uuid)
continue
if not freckles_run_process_exists(run_data):
remove.append(uuid)
for r in remove:
self.run_finished(r)
self._unique_index.pop(r)
if not self._current_runs:
self._aliases = {}
self._unique_index = {}
[docs] def run_started(self, uuid, alias, run_data, index):
for watcher in self._run_watchers:
watcher.run_started(uuid=uuid, alias=alias, run_data=run_data, index=index)
[docs] def run_finished(self, uuid):
for watcher in self._run_watchers:
watcher.run_finished(uuid=uuid)
[docs]class FrecklesLogFileHander(FileSystemEventHandler):
def __init__(
self,
run_alias,
watch_path=None,
created_callback=None,
callback=None,
finished_callback=None,
adapter_log=None,
index=0,
):
if adapter_log and watch_path:
raise FrklException(
msg="Can only watch either the adapter log, or a specific path."
)
if watch_path is None:
watch_path = "run_log.json"
if run_alias.startswith("__dyn_"):
run_alias = "_no_name_"
self._alias = run_alias
self._index = index
self._watch_path = watch_path
self._created_callback = created_callback
self._callback = callback
self._finished_callback = finished_callback
self._last_file_pos = 0
self._adapter_log = adapter_log
if not self._adapter_log:
self._log_file = os.path.join(self._env_dir, self._watch_path)
else:
if self._adapter == "nsbl":
self._log_file = os.path.join(
self._env_dir, "nsbl/logs/ansible_run_log"
)
else:
raise FrklException(msg="Watching logs for adapter '{}' not supported.")
self._watch_dir = os.path.dirname(self._log_file)
[docs] def on_created(self, event):
if not self._created_callback:
return
if event.src_path != self._log_file:
return
self._created_callback(event.src_path)
[docs] def on_modified(self, event):
if not self._callback:
return
if event.src_path != self._log_file:
return
if not os.path.exists(event.src_path):
return []
with io.open(event.src_path, "r", encoding="utf-8") as f:
f.seek(self._last_file_pos)
data = f.readlines()
self._last_file_pos = f.tell()
if not self._adapter_log:
result = []
for line in data:
d = json.loads(line)
result.append(d["value"])
else:
result = data
return self._callback(result)
[docs] def on_deleted(self, event):
if not self._finished_callback:
return
if event.src_path != self._log_file:
return
self._finished_callback()
[docs]@six.add_metaclass(abc.ABCMeta)
class FrecklesRunWatcher(object):
[docs] @abc.abstractmethod
def run_started(self, uuid, alias, run_data, index):
pass
[docs] @abc.abstractmethod
def run_finished(self, uuid):
pass
[docs] @abc.abstractmethod
def stop(self):
pass
[docs]class FrecklesCurrentRunsWatcher(FrecklesRunWatcher):
def __init__(self):
self._current_runs = None
@property
def current_runs(self):
if self._current_runs is not None:
return self._current_runs
self._current_runs = get_current_runs()
return self._current_runs
[docs] def run_started(self, uuid, alias, run_data, index):
click.echo("run started: {}".format(run_data["run_alias"]))
self.current_runs[uuid] = run_data
click.echo("\ncurrent runs:")
for r in self.current_runs.values():
click.echo(" - {}".format(r["run_alias"]))
click.echo()
[docs] def run_finished(self, uuid):
click.echo("run finished: {}".format(self.current_runs[uuid]["run_alias"]))
self.current_runs.pop(uuid)
if not self.current_runs:
click.echo("\ncurrent runs: none")
else:
click.echo("\ncurrent runs:")
for r in self.current_runs.values():
click.echo(" - {}".format(r["run_alias"]))
click.echo()
[docs]class FrecklesRunsLogTerminalOutput(FrecklesRunWatcher):
def __init__(self, watch_path=None, adapter_log=False):
if adapter_log and watch_path:
raise FrklException(
msg="Can only watch either the adapter log, or a specific path."
)
self._watch_path = watch_path
self._adapter_log = adapter_log
self._log_file_printers = {}
[docs] def run_started(self, uuid, alias, run_data, index):
fw = FrecklesRunLogTerminalOutput(
alias,
run_data,
watch_path=self._watch_path,
adapter_log=self._adapter_log,
index=index,
)
self._log_file_printers[uuid] = fw
[docs] def run_finished(self, uuid):
fw = self._log_file_printers[uuid]
fw.finished()
self._log_file_printers.pop(uuid)
[docs] def stop(self):
for uuid in self._log_file_printers.keys():
fw = self._log_file_printers[uuid]
fw.finished(print_status=False)
self._log_file_printers = {}
[docs]def print_task_detail(run_detail, alias=None, color=None):
if color is None:
color = ""
reset = ""
else:
reset = Fore.RESET
if alias:
alias = "{}: ".format(alias)
else:
alias = ""
level = run_detail["level"]
msg = run_detail["msg"]
finished = run_detail["finished"]
success = run_detail.get("success", None)
skipped = run_detail.get("skipped", None)
# changed = d.get("changed", None)
# messages = d["messages"]
error_messages = run_detail["error_messages"]
padding = " " * level
if not finished:
click.echo("{}{}{}- {} -> started{}".format(color, alias, padding, msg, reset))
else:
if success:
if skipped:
status = "skipped"
else:
status = "ok"
else:
status = "failed"
click.echo("{}{}{}- {}: {}{}".format(color, alias, padding, msg, status, reset))
if not success:
if alias:
alias = alias[0:-1]
click.echo("{}{} -> {}{}".format(color, alias, error_messages, reset))
[docs]class FrecklesRunLogTerminalOutput(FrecklesLogFileHander):
COLORS = [
Fore.GREEN,
Fore.CYAN,
Fore.BLUE,
Fore.LIGHTRED_EX,
Fore.MAGENTA,
Fore.YELLOW,
Fore.RED,
]
def __init__(self, run_alias, run_data, watch_path=None, adapter_log=None, index=0):
self._run_data = run_data
self._uuid = self._run_data["uuid"]
self._adapter = self._run_data["adapter"]
self._env_dir = self._run_data["env_dir"]
self._started = self._run_data["timestamp"]
super(FrecklesRunLogTerminalOutput, self).__init__(
run_alias=run_alias,
watch_path=watch_path,
callback=self.updated,
finished_callback=self.finished,
adapter_log=adapter_log,
index=index,
)
self._observer = watch_log_file(self._watch_dir, self)
self._finished = False
[docs] def updated(self, data):
if not self._adapter_log:
self.updated_log(data)
else:
self.updated_adapter(data)
[docs] def updated_adapter(self, data):
if self._index < len(FrecklesRunLogTerminalOutput.COLORS):
color_index = self._index
else:
color_index = self._index % len(FrecklesRunLogTerminalOutput.COLORS)
color = FrecklesRunLogTerminalOutput.COLORS[color_index]
for line in data:
click.echo(
"{}{}: {}{}".format(color, self._alias, line, Fore.RESET), nl=False
)
[docs] def updated_log(self, data):
if self._index < len(FrecklesRunLogTerminalOutput.COLORS):
color_index = self._index
else:
color_index = self._index % len(FrecklesRunLogTerminalOutput.COLORS)
color = FrecklesRunLogTerminalOutput.COLORS[color_index]
for d in data:
print_task_detail(d, alias=self._alias, color=color)
[docs] def finished(self, print_status=True):
if not self._finished:
self._finished = True
if print_status:
click.echo(
"{}: finished (some log output might not have been printed if 'keep_run_folder' is set to false)".format(
self._alias
)
)
self._observer.stop()
[docs]class FrecklesRunsListFileHandler(FileSystemEventHandler):
def __init__(self, callback):
self._callback = callback
[docs] def on_any_event(self, event):
if event.src_path == FRECKLES_RUN_LOG_FILE_PATH or (
hasattr(event, "dest_path")
and event.dest_path == FRECKLES_RUN_LOG_FILE_PATH
):
content = get_current_runs()
self._callback(content)
[docs]def watch_runs(event_handler):
observer = Observer()
observer.schedule(
event_handler, os.path.dirname(FRECKLES_RUN_LOG_FILE_PATH), recursive=False
)
observer.start()
return observer
[docs]def watch_log_file(env_dir, event_handler):
observer = Observer()
observer.schedule(event_handler, env_dir, recursive=False)
observer.start()
return observer