Skip to content
Snippets Groups Projects
Commit 6cae7b2b authored by tuhe's avatar tuhe
Browse files

Working on the dashboard

parent ff6e0ba0
No related branches found
No related tags found
No related merge requests found
import os, time
import threading
from queue import Queue
from colorama import Fore
class WorkerThread(threading.Thread):
""" A worker thread that takes directory names from a queue, finds all
files in them recursively and reports the result.
Input is done by placing directory names (as strings) into the
Queue passed in dir_q.
Output is done by placing tuples into the Queue passed in result_q.
Each tuple is (thread name, dirname, [list of files]).
Ask the thread to stop by calling its join() method.
"""
def __init__(self, dir_q, db=None):
super(WorkerThread, self).__init__()
self.dir_q = dir_q
# self.result_q = result_q
self.db = db
self.stoprequest = threading.Event()
def empty_queue(self):
ss = ""
try:
while True:
std_type, m = self.dir_q.get(True, 0.05)
ss += f"{Fore.RED}{m}{Fore.WHITE}" if std_type == 'stderr' else m
except Exception as e:
pass
# continue
finally:
if len(ss) > 0:
# print(ss)
cq = self.db.get('stdout')
self.db.set('stdout', cq + [(len(cq), ss)])
def run(self):
# As long as we weren't asked to stop, try to take new tasks from the
# queue. The tasks are taken with a blocking 'get', so no CPU
# cycles are wasted while waiting.
# Also, 'get' is given a timeout, so stoprequest is always checked,
# even if there's nothing in the queue.
import time
while not self.stoprequest.is_set():
self.empty_queue()
time.sleep(0.1)
def join(self, timeout=None):
self.stoprequest.set()
super().join(timeout)
self.empty_queue()
class DummyPipe:
def __init__(self, type, std_out_or_err, queue, mute=False):
self.type = type
self.std_ = std_out_or_err
self.queue = queue
self.mute = False
def write(self, message):
if not self.mute:
self.std_.write(message)
self.queue.put( (self.type, message) )
def flush(self):
self.std_.flush()
class StdCapturing():
def __init__(self, stdout, stderr, db=None, mute=False):
# self.stdout = stdout
db.set('stdout', [])
import queue
self.queu = queue.Queue()
self.dummy_stdout = DummyPipe('stdout', stdout, self.queu, mute=mute)
self.dummy_stderr = DummyPipe('stderr', stderr, self.queu, mute=mute)
# capture either stdout or stderr.
# self.mute = mute
self.recordings = []
self.recording = False
import threading
self.thread = WorkerThread(self.queu, db=db) # threading.Thread(target=self.consume_queue, args=self.lifo)
self.thread.start()
# def write(self, message):
# if not self.mute:
# self.stdout.write(message)
# self.queu.put(message)
#
# if self.recording:
# self.recordings[-1] += message
# def flush(self):
# if not self.mute:
# self.stdout.flush()
def close(self):
try:
self.thread.join()
except Exception as e:
print(e)
pass
class ArtifactMapper:
def __init__(self):
self.artifact_output_json = ''
from threading import Lock
self.lock = Lock()
import pupdb
self.db = pupdb.Pupdb(self.artifact_output_json)
run = self.db.get("run_number", 0)
def add_stack_trace(self, e):
# Add an error.
pass
def print_stdout(self, msg, timestamp):
pass
def print_stderr(self, msg, timestamp):
pass
def restart_test(self):
pass
def register_outcome(self, did_fail=False, did_pass=False, did_error=False):
pass
# gummiandlichessthibault42.
\ No newline at end of file
......@@ -12,11 +12,10 @@ import urllib.parse
import requests
import ast
import numpy
from unitgrade.runners import UTextResult
from unitgrade.utils import gprint, Capturing2, Capturing
colorama.init(autoreset=True) # auto resets your settings after every output
colorama.init(autoreset=True) # auto resets your settings after every output
numpy.seterr(all='raise')
def setup_dir_by_class(C, base_dir):
......@@ -52,6 +51,12 @@ class Report:
def _file(self):
return inspect.getfile(type(self))
def _artifact_file(self):
""" File for the artifacts DB (thread safe). This file is optinal. Note that it is a pupdb database file.
Note the file is shared between all sub-questions. """
return os.path.join(os.path.dirname(self._file()), "unitgrade_data/main_config_"+ os.path.basename(self._file()[:-3]) + ".json")
def _is_run_in_grade_mode(self):
""" True if this report is being run as part of a grade run. """
return self._file().endswith("_grade.py") # Not sure I love this convention.
......@@ -107,13 +112,15 @@ class Report:
if hasattr(q, '_save_cache'):
q()._save_cache()
print("q is", q())
# q()._cache_put('time', q.time) # = q.time
report_cache[q.__qualname__] = q._cache2
else:
report_cache[q.__qualname__] = {'no cache see _setup_answers in framework.py': True}
if with_coverage:
for q, _ in self.questions:
q._with_coverage = False
# report_cache is saved on a per-question basis.
# it could also contain additional information such as runtime metadata etc. This may not be appropriate to store with the invidivual questions(?).
# In this case, the function should be re-defined.
return report_cache
def set_payload(self, payloads, strict=False):
......@@ -226,6 +233,13 @@ class Report:
f.write(f"{t}")
def get_hints(ss):
""" Extract all blocks of the forms:
Hints:
bla-bla.
and returns the content unaltered.
"""
if ss == None:
return None
try:
......@@ -240,6 +254,8 @@ def get_hints(ss):
ss = ss[ss.lower().find(h) + len(h) + 1:]
ss = "\n".join([l for l in ss.split("\n") if not l.strip().startswith(":")])
ss = textwrap.dedent(ss).strip()
# if ss.startswith('*'):
# ss = ss[1:].strip()
return ss
except Exception as e:
print("bad hints", ss, e)
......@@ -261,8 +277,6 @@ class UTestCase(unittest.TestCase):
if hasattr(self, '_stdout') and self._stdout is not None:
file = self._stdout
else:
# self._stdout = sys.stdout
# sys._stdout = io.StringIO()
file = sys.stdout
return Capturing2(stdout=file)
......@@ -275,13 +289,133 @@ class UTestCase(unittest.TestCase):
return title
return cls.__qualname__
def run(self, result):
from unitgrade.artifacts import StdCapturing
from unittest.case import TestCase
from pupdb.core import PupDB
db = PupDB(self._artifact_file())
db.set('run_id', np.random.randint(1000*1000))
db.set("state", "running")
db.set('coverage_files_changed', None)
_stdout = sys.stdout
_stderr = sys.stderr
std_capture = StdCapturing(stdout=sys.stdout, stderr=sys.stderr, db=db, mute=False)
# stderr_capture = StdCapturing(sys.stderr, db=db)
# std_err_capture = StdCapturing(sys.stderr, "stderr", db=db)
try:
# Run this unittest and record all of the output.
# This is probably where we should hijack the stdout output and save it -- after all, this is where the test is actually run.
# sys.stdout = stdout_capture
sys.stderr = std_capture.dummy_stderr
sys.stdout = std_capture.dummy_stdout
result_ = TestCase.run(self, result)
from werkzeug.debug.tbtools import DebugTraceback, _process_traceback
# print(result_._excinfo[0])
actual_errors = []
for test, err in self._error_fed_during_run:
if err is None:
continue
else:
import traceback
# traceback.print_tb(err[2])
actual_errors.append(err)
if len(actual_errors) > 0:
ex, exi, tb = actual_errors[0]
# exi = result_._excinfo[0]._excinfo
# tb = result_._excinfo[0]._excinfo[-1]
# DebugTraceback(tb)
# ex = exi[1]
exi.__traceback__ = tb
# tbe = _process_traceback(ex)
dbt = DebugTraceback(exi)
# dbt.render_traceback_text()
sys.stderr.write(dbt.render_traceback_text())
html = dbt.render_traceback_html(include_title="hello world")
# print(HEADER)
# from unittest.case import As
db.set('wz_stacktrace', html)
db.set('state', 'fail')
# print("> Set state of test to:", "fail", exi, tb)
else:
print("> Set state of test to:", "pass")
db.set('state', 'pass')
except Exception as e:
print("-----------------.///////////////////////////////////////////////////////////////")
# print(e)
import traceback
traceback.print_exc()
raise e
finally:
sys.stdout = _stdout
sys.stderr = _stderr
std_capture.close()
# stderr_capture.close()
# if len(actual_errors)
# print(result._test._error_fed_during_run)
# print(self._error_fed_during_run)
# print( result.errors[0][0]._error_fed_during_run )
#
# result_.errors[0][0]._error_fed_during_run
# result_._excinfo[0].errisinstance(Exception)
# import AssertionError
from werkzeug.debug.tbtools import HEADER
# from pupdb.core import PupDB
# db = PupDB(self._artifact_file())
# actual_errors
return result_
@classmethod
def reset(cls):
print("Warning, I am not sure UTestCase.reset() is needed anymore and it seems very hacky.")
raise Exception("reset called in test case. This method is deprecated.")
cls._outcome = None
cls._cache = None
cls._cache2 = None
def before_setup_called(cls):
print("hi")
# print("I am called before the fucking class is fucking made. setUpClass has been broken!")
pass
setUpClass_not_overwritten = False
@classmethod
def setUpClass(cls) -> None:
cls.setUpClass_not_overwritten = True
pass
@classmethod
def __new__(cls, *args, **kwargs):
old_setup = cls.setUpClass
def new_setup():
cls.before_setup_called()
try:
old_setup()
except Exception as e:
raise e
finally:
pass
cls.setUpClass = new_setup
return super().__new__(cls)
def _callSetUp(self):
if self._with_coverage:
......@@ -300,26 +434,16 @@ class UTestCase(unittest.TestCase):
from pathlib import Path
from snipper import snipper_main
try:
# print("Stoppping coverage...")
self.cov.stop()
# print("Coverage was stopped")
# self.cov.html_report()
# print("Success!")
except Exception as e:
print("Something went wrong while tearing down coverage test")
print(e)
data = self.cov.get_data()
base, _, _ = self._report._import_base_relative()
# print("Measured coverage files", data.measured_files)
for file in data.measured_files():
# print(file)
file = os.path.normpath(file)
root = Path(base)
child = Path(file)
# print("root", root, "child", child)
# print(child, "is in parent?", root in child.parents)
# print(child.parents)
if root in child.parents:
# print("Reading file", child)
with open(child, 'r') as f:
......@@ -331,31 +455,14 @@ class UTestCase(unittest.TestCase):
if len(lines) != len(lines2):
for k in range(len(lines)):
print(k, ">", lines[k], "::::::::", lines2[k])
# print("-" * 100)
# print("\n".join(lines))
# print("-"*100)
# print("\n".join(lines2))
# print("-" * 100)
print("Snipper failure; line lenghts do not agree. Exiting..")
print(child, "len(lines) == len(lines2)", len(lines), len(lines2))
import sys
sys.exit()
assert len(lines) == len(lines2)
# print("In file ", file, "context by lineno", data.contexts_by_lineno(file))
for ll in data.contexts_by_lineno(file):
# For empty files (e.g. __init__) there is a potential bug where coverage will return the file but lines2 will be = [].
# print("loop B: ll is", ll)
l = ll-1
# print(l)
# l1 = (lines[l] + " "*1000)[:80]
# l2 = (lines2[l] + " "*1000)[:80]
# print("l is", l, l1, " " + l2, "file", file)
# print("Checking if statement: ")
# print(l, lines2)
# print(">> ", lines2[l])
# print(">", lines2[l].strip(), garb)
if l < len(lines2) and lines2[l].strip() == garb:
# print("Got a hit at l", l)
rel = os.path.relpath(child, root)
......@@ -394,7 +501,7 @@ class UTestCase(unittest.TestCase):
self._cache_put((self.cache_id(), 'title'), value)
def _get_outcome(self):
if not (self.__class__, '_outcome') or self.__class__._outcome is None:
if not hasattr(self.__class__, '_outcome') or self.__class__._outcome is None:
self.__class__._outcome = {}
return self.__class__._outcome
......@@ -425,7 +532,7 @@ class UTestCase(unittest.TestCase):
# Find the report class this class is defined within.
if skip_remote_check:
return
import inspect
# import inspect
# file = inspect.getfile(self.__class__)
import importlib, inspect
......@@ -449,11 +556,6 @@ class UTestCase(unittest.TestCase):
report = found_reports[0]
report()._check_remote_versions()
# self._get_report_class()
# def _get_report_class(self):
# pass
def _ensure_cache_exists(self):
if not hasattr(self.__class__, '_cache') or self.__class__._cache == None:
......@@ -484,8 +586,7 @@ class UTestCase(unittest.TestCase):
key = (self.cache_id(), 'assert')
if not self._cache_contains(key):
print("Warning, framework missing", key)
self.__class__._cache[
key] = {} # A new dict. We manually insert it because we have to use that the dict is mutable.
self.__class__._cache[key] = {} # A new dict. We manually insert it because we have to use that the dict is mutable.
cache = self._cache_get(key)
id = self._assert_cache_index
_expected = cache.get(id, f"Key {id} not found in cache; framework files missing. Please run deploy()")
......@@ -580,6 +681,11 @@ class UTestCase(unittest.TestCase):
def _cache_file(self):
return os.path.dirname(inspect.getabsfile(type(self))) + "/unitgrade_data/" + self.__class__.__name__ + ".pkl"
def _artifact_file(self):
""" File for the artifacts DB (thread safe). This file is optinal. Note that it is a pupdb database file.
Note the file is shared between all sub-questions. """
return os.path.join(os.path.dirname(self._cache_file()), '-'.join(self.cache_id()) + ".json")
def _save_cache(self):
# get the class name (i.e. what to save to).
cfile = self._cache_file()
......@@ -607,8 +713,55 @@ class UTestCase(unittest.TestCase):
else:
print("Warning! data file not found", cfile)
def _get_coverage_files(self):
key = (self.cache_id(), 'coverage')
# CC = None
# if self._cache_contains(key):
return self._cache_get(key, None)
# return CC
def _get_hints(self):
"""
This code is run when the test is set up to generate the hints and store them in an artifact file. It may be beneficial to simple compute them beforehand
and store them in the local unitgrade pickle file. This code is therefore expected to superceede the alterative code later.
"""
hints = []
# print("Getting hint")
key = (self.cache_id(), 'coverage')
if self._cache_contains(key):
CC = self._cache_get(key)
# cl, m = self.cache_id()
# print("Getting hint using", CC)
# Insert newline to get better formatting.
# gprint(
# f"\n> An error occured during the test: {cl}.{m}. The following files/methods has code in them you are supposed to edit and may therefore be the cause of the problem:")
for file in CC:
rec = CC[file]
# gprint(f"> * {file}")
for l in rec:
_, comments = CC[file][l]
hint = get_hints(comments)
if hint != None:
hints.append((hint, file, l))
doc = self._testMethodDoc
# print("doc", doc)
if doc is not None:
hint = get_hints(self._testMethodDoc)
if hint is not None:
hints = [(hint, None, self.cache_id()[1])] + hints
return hints
def _feedErrorsToResult(self, result, errors):
""" Use this to show hints on test failure. """
""" Use this to show hints on test failure.
It feeds error to the result -- so if there are errors, they will crop up here
"""
self._error_fed_during_run = errors.copy() # import to copy the error list.
# result._test._error_fed_during_run = errors.copy()
if not isinstance(result, UTextResult):
er = [e for e, v in errors if v != None]
# print("Errors are", errors)
......@@ -653,8 +806,9 @@ class UTestCase(unittest.TestCase):
except Exception as e:
print("Bad stuff in hints. ")
print(hints)
# result._last_errors = errors
super()._feedErrorsToResult(result, errors)
b = 234
def startTestRun(self):
super().startTestRun()
......
......@@ -21,6 +21,8 @@ msum = lambda x: sum(x)
mfloor = lambda x: np.floor(x)
class Logger(object):
def __init__(self, buffer, write_to_stdout=True):
# assert False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment