Skip to content
Snippets Groups Projects
Commit 2422dc07 authored by tuhe's avatar tuhe
Browse files

Working on removing wexpect and pexpect

parent c395db63
Branches
No related tags found
No related merge requests found
Pipeline #19322 passed
Showing
with 484 additions and 93 deletions
Metadata-Version: 2.1
Name: codesnipper
Version: 0.1.18.7
Version: 0.1.18.8
Summary: A lightweight framework for censoring student solutions files and extracting code + output
Home-page: https://lab.compute.dtu.dk/tuhe/snipper
Author: Tue Herlau
......
......@@ -109,7 +109,7 @@ def indent(l):
return l[:len(l) - len(l.lstrip())]
def full_strip(lines, tags=None):
def full_strip(lines : list, tags=("#!s", "#!o", "#!f", "#!b")):
if tags is None:
tags = ["#!s", "#!o", "#!f", "#!b"]
for t in tags:
......
import code
import traceback
import functools
import textwrap
from snipper.legacy import block_process
from snipper.block_parsing import full_strip
import sys
from snipper.block_parsing import block_split, block_join
import os
if os.name == 'nt':
......@@ -10,13 +13,12 @@ if os.name == 'nt':
else:
import pexpect as we
def rsession(analyzer, lines, extra):
l2 = []
dbug = False
# analyzer = we.spawn("python", encoding="utf-8", timeout=20)
# analyzer.expect([">>>"])
if "You can group" in "\n".join(lines): # in "\n".join(lines):
if "b = make_square(5)" in "\n".join(lines): # in "\n".join(lines):
print("\n".join(lines))
print("-"*50)
for k in extra['session_results']:
......@@ -97,13 +99,11 @@ area # This line shows us the value of 'area' #!i=b
before += analyzer.after
break
# print("Before is", before)
abefore = analyzer.before.rstrip()
# Sanitize by removing garbage binary stuff the terminal puts in
abefore = "\n".join([l for l in abefore.splitlines() if not l.startswith('\x1b')] )
dotmode = analyzer.after == "..."
if 'dir(s)' in word:
pass
......@@ -123,55 +123,233 @@ area # This line shows us the value of 'area' #!i=b
def run_i(lines, file, output):
if 'python0A' in str(file):
print(234)
extra = dict(python=None, output=output, evaluated_lines=0, session_results=[])
def block_fun(lines, start_extra, end_extra, art, head="", tail="", output=None, extra=None):
outf = output + ("_" + art if art is not None and len(art) > 0 else "") + ".shell"
lines = full_strip(lines)
s = "\n".join(lines)
s.replace("...", "..") # passive-aggressively truncate ... because of #issues.
lines = textwrap.dedent(s).strip().splitlines()
# an.setecho(True) # TH January 2023: Seems to fix an issue on linux with truncated lines. May cause problems on windows?
return new_run_i(lines, file, output)
# return
#
# if 'python0A' in str(file):
# print(234)
# extra = dict(python=None, output=output, evaluated_lines=0, session_results=[])
# def block_fun(lines, start_extra, end_extra, art, head="", tail="", output=None, extra=None):
# outf = output + ("_" + art if art is not None and len(art) > 0 else "") + ".shell"
# lines = full_strip(lines)
# s = "\n".join(lines)
# s.replace("...", "..") # passive-aggressively truncate ... because of #issues.
# lines = textwrap.dedent(s).strip().splitlines()
# # an.setecho(True) # TH January 2023: Seems to fix an issue on linux with truncated lines. May cause problems on windows?
#
# if extra['python'] is None:
# an = we.spawn(sys.executable, encoding="utf-8", timeout=20)
# try:
# an.setwinsize(400, 400) # set window size to avoid truncated output or input.
# except AttributeError as e:
# print("> Mulble pexpect('pyhon',...) does not support setwinsize on this system (windows?). Ignoring")
#
# an.expect([">>>"])
# extra['python'] = an
#
# # analyzer = extra['python']
# # What does this do?
# # for l in (head[extra['evaluated_lines']:] + ["\n"]):
# # analyzer.sendline(l)
# # analyzer.expect_exact([">>>", "..."])
# alines = rsession(extra['python'], lines, extra) # give it the analyzer
# extra['evaluated_lines'] += len(head) + len(lines)
# lines = alines
# return lines, [outf, lines]
# try:
# a,b,c,_ = block_process(lines, tag="#!i", block_fun=functools.partial(block_fun, output=output, extra=extra))
# if extra['python'] is not None:
# extra['python'].close()
#
# if len(c)>0:
# kvs= { v[0] for v in c}
# for outf in kvs:
# out = "\n".join( ["\n".join(v[1]) for v in c if v[0] == outf] )
# out = out.replace("\r", "")
# # if outf.endswith("python0B_e4.shell"):
# # print(outf)
#
# with open(outf, 'w') as f:
# f.write(out)
#
# except Exception as e:
# print("lines are")
# print("\n".join(lines))
# print("Bad thing in #!i command in file", file)
# raise e
# return lines
if extra['python'] is None:
an = we.spawn(sys.executable, encoding="utf-8", timeout=20)
class FileConsole(code.InteractiveConsole):
"""Emulate python console but use file instead of stdin
See https://tdhock.github.io/blog/2021/python-prompt-commands-output/
"""
def __init__(self, *args, lines=None, **kwargs):
super().__init__(*args, **kwargs)
# self.lines = lines.splitlines()
# self.output = {'a': []}
pass
def raw_input(self, prompt):
while True:
if len(self.blocks) == 0:
raise EOFError
k = next(self.blocks.__iter__())
self.k = k
if len(self.blocks[k]) == 0:
self.output[self.k].append(self.f.getvalue().rstrip())
self.f.truncate(0)
self.f.seek(0)
del self.blocks[k]
else:
line = self.blocks[k].pop(0)
break
# if line == "":
# raise EOFError()
# print(prompt, line.replace("\n", ""))
sval = self.f.getvalue()
if sval != '':
self.output[self.k].append(self.f.getvalue())
self.f.truncate(0)
self.f.seek(0)
# self.f.
o_ = prompt.replace("\n", "") + (" " if prompt != "... " else "") + line +"\n"
# print(o_)
self.output[self.k].append(o_)
# print(line)
return line
def write(self, str):
print(str)
pass
# self.output[self.k].append(str)
def showtraceback(self):
"""Display the exception that just occurred.
We remove the first stack item because it is our own code.
The output is written by self.write(), below.
"""
sys.last_type, sys.last_value, last_tb = ei = sys.exc_info()
sys.last_traceback = last_tb
try:
an.setwinsize(400, 400) # set window size to avoid truncated output or input.
except AttributeError as e:
print("> Mulble pexpect('pyhon',...) does not support setwinsize on this system (windows?). Ignoring")
lines = traceback.format_exception(ei[0], ei[1], last_tb.tb_next)
if sys.excepthook is sys.__excepthook__:
self.write(''.join(lines))
else:
# If someone has set sys.excepthook, we let that take precedence
# over self.write
sys.excepthook(ei[0], ei[1], last_tb)
finally:
last_tb = ei = None
def run_blocks(self, blocks):
from collections import defaultdict
self.blocks = {k: v.splitlines() for k, v in blocks.items()}
self.output = defaultdict(list)
self.k = "startup"
from contextlib import redirect_stdout
import io
self.f = io.StringIO()
eh = sys.excepthook
sys.excepthook = sys.__excepthook__
an.expect([">>>"])
extra['python'] = an
# analyzer = extra['python']
# What does this do?
# for l in (head[extra['evaluated_lines']:] + ["\n"]):
# analyzer.sendline(l)
# analyzer.expect_exact([">>>", "..."])
alines = rsession(extra['python'], lines, extra) # give it the analyzer
extra['evaluated_lines'] += len(head) + len(lines)
lines = alines
return lines, [outf, lines]
try:
a,b,c,_ = block_process(lines, tag="#!i", block_fun=functools.partial(block_fun, output=output, extra=extra))
if extra['python'] is not None:
extra['python'].close()
if len(c)>0:
kvs= { v[0] for v in c}
for outf in kvs:
out = "\n".join( ["\n".join(v[1]) for v in c if v[0] == outf] )
out = out.replace("\r", "")
# if outf.endswith("python0B_e4.shell"):
# print(outf)
with open(outf, 'w') as f:
f.write(out)
with redirect_stdout(self.f):
# print("hello world")
self.interact()
except Exception as e:
print("Snipper encountered a fatal problem. ")
print("I was processing")
print(blocks)
print("And encountered", e)
raise e
finally:
sys.excepthook = eh
return self.output
def new_run_i(lines, file, output):
# Create a database of all output.
# cutouts = []
l0 = lines
cutouts = {}
# id = ""
while True:
b = block_split(lines, tag="#!i")
if b == None:
break
# id = b['arg1']
art = b['name']
outf = output + ("_" + art if art is not None and len(art) > 0 else "") + ".shell"
# print(outf)
id = os.path.basename(outf)
cutouts[id] = {'first': b['first'], 'block': b['block'], 'last': []}
lines = b['last']
#
# continue
# args = {k: v for k, v in b['start_tag_args'].items() if len(k) > 0}
# cutout.append(b['block'])
# b['block'], dn = _block_fun(b['block'], start_extra=b['arg1'], end_extra=b['arg2'], **args, keep=keep)
# # cutout += b['block']
# # method = b['start_tag_args'].get('', 'remove')
# # b['block'], dn = _block_fun(b['block'], start_extra=b['arg1'], end_extra=b['arg1'], **args, keep=keep)
# lines = block_join(b)
# # cutout +_=
# n += dn
# if len(cutouts) > 0:
# cutouts[id]['last'] = lines
if len(cutouts) == 0:
return
import sys
def run_code_blocks(blocks):
# p_ = sys.ps1
try:
sys.ps1 = ">>>"
fc = FileConsole()
out = fc.run_blocks(blocks)
except Exception as e:
print("lines are")
print("\n".join(lines))
print("Bad thing in #!i command in file", file)
raise e
return lines
\ No newline at end of file
finally:
pass
# sys.ps1 = p_
return out
blks = {}
for id, block in cutouts.items():
# b = block['block']
dx = min( [k for k, l in enumerate(block['block']) if len(l.strip()) != 0] )
blks[id +"_pre"] = "\n".join( block['first'] + block['block'][:dx] )
blks[id] = "\n".join(block['block'][dx:])
out = run_code_blocks(blks)
for id in cutouts:
print("-"*5, id, "-"*5)
print("".join(out[id]))
with open(f"{os.path.dirname(output)}/{id}", 'w') as f:
f.write("".join(out[id]))
return l0
#
#
# # import sys
# sys.ps1 = ">>>"
# fc = FileConsole(lines='print("hello world")\nprint("world")\na=23')
# # fc.interact()
# fc = FileConsole()
# out = fc.run_blocks({'a': 'print("hello world")\nprint("world")\na=23\nprint("buy!")',
# 'b': """for i in range(4):
# print("i is", i)
#
# """, 'c': 'th = 31'})
# print(out)
#
#
# return lines
# a = 234
# pass
\ No newline at end of file
__version__ = "0.1.18.7"
__version__ = "0.1.18.8"
......@@ -15,6 +15,7 @@ def sorted_array_to_bst(nums):
node.left = sorted_array_to_bst(nums[:mid_val])
node.right = sorted_array_to_bst(nums[mid_val + 1:]) #!b # Solve this problem
return node #!b Here
print("hello world asdfasd")
#!o=a
......@@ -24,6 +25,7 @@ def preOrder(node):
print(node.val)
preOrder(node.left)#!s=myfile
preOrder(node.right)
for _ in range(10):
print("Hello world")
#!o=a
......
......@@ -12,6 +12,7 @@ def sorted_array_to_bst(nums):
# Solve this problem
# TODO: 2 lines missing.
raise NotImplementedError("Here")
print("hello world asdfasd")
def preOrder(node):
......@@ -21,6 +22,7 @@ def preOrder(node):
print(node.val)
preOrder(node.left)
preOrder(node.right)
for _ in range(10):
print("Hello world")
a = 234
......
>>> for _ in range(10):
... print("hi")
...
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
\ No newline at end of file
......@@ -3,6 +3,5 @@ hello
>>> def myfun(a):
... return a*2
...
>>>
>>> print(myfun(4))
8
\ No newline at end of file
......@@ -5,6 +5,7 @@
node.left = sorted_array_to_bst(nums[:mid_val])
node.right = sorted_array_to_bst(nums[mid_val + 1:])
return node
print("hello world asdfasd")
def preOrder(node):
......
......@@ -5,6 +5,7 @@
node.left = sorted_array_to_bst(nums[:mid_val])
node.right = sorted_array_to_bst(nums[mid_val + 1:])
return node
print("hello world asdfasd")
def preOrder(node):
......
......@@ -5,7 +5,6 @@ class TreeNode(object):
# TODO: 3 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
#!s=a
def sorted_array_to_bst(nums):
# TODO: 4 lines missing.
raise NotImplementedError("Your stuff here")
......@@ -13,9 +12,9 @@ def sorted_array_to_bst(nums):
# Solve this problem
# TODO: 2 lines missing.
raise NotImplementedError("Here")
print("hello world asdfasd")
#!o=a
def preOrder(node):
if not node:
# TODO: 1 lines missing.
......@@ -23,23 +22,19 @@ def preOrder(node):
print(node.val)
preOrder(node.left)
preOrder(node.right)
for _ in range(10):
print("Hello world")
#!o=a
a = 234
result = sorted_array_to_bst([1, 2, 3, 4, 5, 6, 7])
preOrder(result)
#!i=a
for _ in range(10):
print("hi")
#!i=a
#!i=b
print("hello")
def myfun(a):
return a*2
print(myfun(4))
#!i=b
>>> for _ in range(10):
... print("hi")
...
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
\ No newline at end of file
......@@ -3,6 +3,5 @@ hello
>>> def myfun(a):
... return a*2
...
>>>
>>> print(myfun(4))
8
\ No newline at end of file
......@@ -5,9 +5,9 @@
node.left = sorted_array_to_bst(nums[:mid_val])
node.right = sorted_array_to_bst(nums[mid_val + 1:])
return node
print("hello world asdfasd")
#!o=a
def preOrder(node):
if not node:
return
......
......@@ -5,9 +5,9 @@
node.left = sorted_array_to_bst(nums[:mid_val])
node.right = sorted_array_to_bst(nums[mid_val + 1:])
return node
print("hello world asdfasd")
#!o=a
def preOrder(node):
if not node:
return
......
import shutil
from unittest import TestCase
import filecmp
import os.path
......@@ -53,6 +54,7 @@ dir = os.path.dirname(__file__)
class TestPython(TestCase):
def test_demo1(self):
# return
from setup_test_files import setup, setup_keep
setup(dir+"/demo1", dir+"/demo1_tmp")
report = filecmp.dircmp(dir+"/demo1_correct", dir+"/demo1_tmp")
......@@ -60,6 +62,7 @@ class TestPython(TestCase):
self.assertTrue(is_same(dir+"/demo1_correct", dir+"/demo1_tmp"))
def test_demo2(self):
# return
from setup_test_files import setup, setup_keep
setup_keep(dir+"/demo2/framework.py", dir+"/demo2/framework_tmp.txt")
with open(dir+"/demo2/framework_tmp.txt") as f:
......@@ -69,3 +72,202 @@ class TestPython(TestCase):
correct = f.read()
self.assertEqual(tmp, correct)
def snipit(code, dest):
# base = os.path.dirname(__file__)
dest = os.path.abspath(dest)
if not os.path.isdir(d_ := os.path.dirname(dest)):
os.makedirs(d_)
with open(dest, 'w') as f:
f.write(code)
# if os.path.isdir(dest):
# shutil.rmtree(dest)
# os.mkdir(dest)
# os.mkdir(dest + "/output")
from snipper import snip_dir
dest_dir = d_ +"/dest"
odir =d_ + "/output"
if os.path.isdir(odir):
shutil.rmtree(odir)
os.makedirs(odir)
import glob
snap = snip_dir(d_, dest_dir=dest_dir, clean_destination_dir=True, output_dir=odir)
rs = {}
for f in glob.glob(odir + "/*"):
with open(f, 'r') as ff:
rs[os.path.basename(f)] = trim(ff.read())
return rs
def trim(s):
# s.rstrip()
return "\n".join( [l.rstrip() for l in s.rstrip().splitlines()] )
def process_blocks(blocks):
pass
class TestError(TestCase):
def test_error(self):
example = """
#!i
s = (1, 2)
s[0] = 1
#!i
"""
if os.path.isdir("tmp"):
shutil.rmtree("tmp")
rs = snipit(example, "tmp/code.py")
print(rs['code.shell'])
# print(rs['code_a.shell'])
def mcmp(a, b):
if a == b:
self.assertEqual(a,b)
else:
aa = a.splitlines()
bb = b.splitlines()
self.assertEqual(len(aa), len(bb), msg="wrong number of lines")
for a_, b_ in zip(aa, bb):
if a_ != b_:
print("not the same")
print(a_)
print(b_)
self.assertEqual(a_, b_)
self.assertEqual(a,b)
mcmp(rs['code.shell'], """
>>> s = (1, 2)
>>> s[0] = 1
Traceback (most recent call last):
File "<console>", line 1, in <module>
TypeError: 'tuple' object does not support item assignment
""".strip())
class TestInteractiveMode(TestCase):
def test_snip(self):
example = """
#!i
print("i=23")
#!i
#!i=a
for i in range(2):
print(i)
#!i=a
#!i=c
print("hello")
a = 234
#!i=c
"""
if os.path.isdir("tmp"):
shutil.rmtree("tmp")
rs = snipit(example, "tmp/code.py")
# print(rs['code_a.shell'])
self.assertEqual(rs['code.shell'], """>>> print("i=23")
i=23""")
b = trim(""">>> for i in range(2):
... print(i)
...
0
1""")
self.assertEqual(rs['code_a.shell'],""">>> for i in range(2):
... print(i)
...
0
1""")
self.assertEqual(rs['code_c.shell'], """>>> print("hello")
hello
>>> a = 234""")
return
# import code
# ita = code.interact()
import sys
from code import InteractiveInterpreter, InteractiveConsole
# from snipper.snip_dir import
console = InteractiveInterpreter()
source = ""
cc = InteractiveConsole()
def _runblock(console, block):
source_lines = (line.rstrip() for line in block)
# console = InteractiveInterpreter()
source = ""
out = []
try:
while True:
source = next(source_lines)
# Allow the user to ignore specific lines of output.
if not source.endswith("# ignore"):
out.append(f">>> {source}")
more = console.runsource(source)
while more:
next_line = next(source_lines)
out.append(f"... {next_line}")
source += "\n" + next_line
more = console.runsource(source)
except StopIteration:
if more:
print("... ")
out.append("... ")
more = console.runsource(source + "\n")
from snipper.block_parsing import full_strip
# full_strip(example)
ex2 = full_strip(example.splitlines(), ("#!i",) )
# _runblock(console, ex2)
# from contextlib import redirect_stdout
# import io
# f = io.StringIO()
#
# with redirect_stdout(f):
# print("hello world")
# a = f.getvalue()
#
# s = redirect_stdout(f)
# obj = s.__enter__()
# print("hello")
# s.__exit__()
# f.getvalue()
# print("\n".join( out['a'] ) )
for k in out:
# print("a:")
print("".join( out[k] ) )
print("-"*10)
pass
a = 234
try:
while True:
source = next(source_lines)
# Allow the user to ignore specific lines of output.
if not source.endswith("# ignore"):
print(">>>", source)
more = console.runsource(source)
while more:
next_line = next(source_lines)
print("...", next_line)
source += "\n" + next_line
more = console.runsource(source)
except StopIteration:
if more:
print("... ")
more = console.runsource(source + "\n")
#!i
print("i=23")
#!i
#!i=a
for i in range(2):
print(i)
#!i=a
#!i=c
print("hello")
a = 234
#!i=c
\ No newline at end of file
print("i=23")
for i in range(2):
print(i)
print("hello")
a = 234
>>> print("i=23")
i=23
\ No newline at end of file
>>> for i in range(2):
... print(i)
...
0
1
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment