From 5493e9c64964f8273453d539b89c742f36d5f279 Mon Sep 17 00:00:00 2001 From: Laurent Pinchart Date: Sat, 12 Aug 2017 02:23:14 +0300 Subject: Initial import Signed-off-by: Laurent Pinchart --- Makefile | 12 ++ README | 45 ++++++ tests/Makefile | 8 + tests/kms-test-allplanes.py | 83 ++++++++++ tests/kms-test-connectors.py | 28 ++++ tests/kms-test-modes.py | 65 ++++++++ tests/kms-test-modeset.py | 58 +++++++ tests/kms-test-pageflip.py | 118 +++++++++++++++ tests/kms-test-planeposition.py | 105 +++++++++++++ tests/kmstest.py | 327 ++++++++++++++++++++++++++++++++++++++++ 10 files changed, 849 insertions(+) create mode 100644 Makefile create mode 100644 README create mode 100644 tests/Makefile create mode 100755 tests/kms-test-allplanes.py create mode 100755 tests/kms-test-connectors.py create mode 100755 tests/kms-test-modes.py create mode 100755 tests/kms-test-modeset.py create mode 100755 tests/kms-test-pageflip.py create mode 100755 tests/kms-test-planeposition.py create mode 100755 tests/kmstest.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3fe6ed9 --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +SUBDIRS=tests + +recursive=all clean install + +all: + +$(recursive): + @target=$@ ; \ + for subdir in $(SUBDIRS); do \ + echo "Making $$target in $$subdir" ; \ + $(MAKE) -C $$subdir $$target; \ + done diff --git a/README b/README new file mode 100644 index 0000000..69a3bc5 --- /dev/null +++ b/README @@ -0,0 +1,45 @@ +du-tests +-------- + +Test suite for the Renesas R-Car DU display unit. + +You can find the latest version of du-tests in the project's git tree at + + git://git.ideasonboard.com/renesas/du-tests.git + http://git.ideasonboard.com/renesas/du-tests.git + + +----------------------- +Building and Installing +----------------------- + +The du-tests suite is written in Python and doesn't have any build time +dependency. To install it, run + + make install INSTALL_DIR=/path/to/target/directory + +This will copy the test scripts to the target directory to be copied or +exported to the host. + + +-------------------- +Runtime Dependencies +-------------------- + +The tests scripts require the following dependencies to be installed on the +target. + +* Python 3 +* kmsxx Python bindings (https://github.com/tomba/kmsxx.git) + +kmsxx hasn't released any stable version yet, it is recommended to use the +latest master branch from the git repository. + + +----------------- +Running the Tests +----------------- + +The test scripts are named kms-test-*.py. They can be run directly from the +test suite root directory. + diff --git a/tests/Makefile b/tests/Makefile new file mode 100644 index 0000000..6586b29 --- /dev/null +++ b/tests/Makefile @@ -0,0 +1,8 @@ +SCRIPTS=$(wildcard *.sh) + +all: + +clean: + +install: + cp $(SCRIPTS) $(INSTALL_DIR)/ diff --git a/tests/kms-test-allplanes.py b/tests/kms-test-allplanes.py new file mode 100755 index 0000000..9e6a18b --- /dev/null +++ b/tests/kms-test-allplanes.py @@ -0,0 +1,83 @@ +#!/usr/bin/python3 + +import kmstest +import pykms + +class AllPlanesTest(kmstest.KMSTest): + """Test composition with all planes enabled on all CRTCs.""" + + def handle_page_flip(self, frame, time): + self.logger.log("Page flip complete") + + def main(self): + # Create the connectors to CRTCs map + connectors = {} + for connector in self.card.connectors: + # Skip disconnected connectors + if not connector.connected(): + continue + + # Add the connector to the map + for crtc in connector.get_possible_crtcs(): + if crtc not in connectors: + connectors[crtc] = connector + + for crtc in self.card.crtcs: + self.start("composition on CRTC %u" % crtc.id) + + # Get the connector and default mode + try: + connector = connectors[crtc]; + mode = connector.get_default_mode() + except KeyError: + self.skip("no connector or mode available") + continue + + # List planes available for the CRTC + planes = [] + for plane in self.card.planes: + if plane.supports_crtc(crtc) and plane != crtc.primary_plane: + planes.append(plane) + + if len(planes) == 0: + self.skip("no plane available for CRTC") + continue + + self.logger.log("Testing connector %s, CRTC %u, mode %s with %u planes" % \ + (connector.fullname, crtc.id, mode.name, len(planes))) + + # Create a frame buffer + fb = pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24") + pykms.draw_test_pattern(fb) + + # Set the mode with a primary plane + ret = self.atomic_crtc_mode_set(crtc, connector, mode, fb) + if ret < 0: + self.fail("atomic mode set failed with %d" % ret) + continue + + self.run(3) + + # Add all other planes one by one + offset = 100 + for plane in planes: + source = kmstest.Rect(0, 0, fb.width, fb.height) + destination = kmstest.Rect(offset, offset, fb.width, fb.height) + ret = self.atomic_plane_set(plane, crtc, source, destination, fb) + if ret < 0: + self.fail("atomic plane set failed with %d" % ret) + break + + self.logger.log("Adding plane %u" % plane.id) + self.run(1) + + if self.flips == 0: + self.fail("No page flip registered") + break + + offset += 50 + + else: + self.success() + +AllPlanesTest().execute() diff --git a/tests/kms-test-connectors.py b/tests/kms-test-connectors.py new file mode 100755 index 0000000..25fc5dc --- /dev/null +++ b/tests/kms-test-connectors.py @@ -0,0 +1,28 @@ +#!/usr/bin/python3 + +import kmstest +import pykms + +class ConnectorsTest(kmstest.KMSTest): + """Perform sanity checks on all connectors.""" + + def main(self): + for connector in self.card.connectors: + self.start("connector %s" % connector.fullname) + + # Every connector should have at least one suitable CRTC + crtcs = connector.get_possible_crtcs() + if len(crtcs) == 0: + self.fail("no possible CRTC") + continue + + # Connected connectors should have at least one mode + if connector.connected(): + modes = connector.get_modes() + if len(modes) == 0: + self.fail("no mode available") + continue + + self.success() + +ConnectorsTest().execute() diff --git a/tests/kms-test-modes.py b/tests/kms-test-modes.py new file mode 100755 index 0000000..31128a7 --- /dev/null +++ b/tests/kms-test-modes.py @@ -0,0 +1,65 @@ +#!/usr/bin/python3 + +import kmstest +import pykms + +class ModesTest(kmstest.KMSTest): + """Test all available modes on all available connectors.""" + + def handle_page_flip(self, frame, time): + self.logger.log("Page flip complete") + + def test_mode(self, connector, crtc, mode): + self.logger.log("Testing connector %s on CRTC %u with mode %s" % \ + (connector.fullname, crtc.id, mode.name)) + + # Create a frame buffer + fb = pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24") + pykms.draw_test_pattern(fb) + + # Perform the mode set + ret = self.atomic_crtc_mode_set(crtc, connector, mode, fb) + if ret < 0: + raise RuntimeError("atomic mode set failed with %d" % ret) + + self.logger.log("Atomic mode set complete") + self.run(4) + + if self.flips == 0: + raise RuntimeError("Page flip not registered") + + def main(self): + for connector in self.card.connectors: + self.start("modes on connector %s" % connector.fullname) + + # Skip disconnected connectors + if not connector.connected(): + self.skip("unconnected connector") + continue + + # Find a CRTC suitable for the connector + crtc = connector.get_current_crtc() + if not crtc: + crtcs = connector.get_possible_crtcs() + if len(crtcs) == 0: + pass + + crtc = crtcs[0] + + # Test all available modes + modes = connector.get_modes() + if len(modes) == 0: + self.skip("no mode available") + continue + + for i in range(len(modes)): + try: + self.progress(i+1, len(modes)) + self.test_mode(connector, crtc, modes[i]) + except RuntimeError as e: + self.fail(e.message) + break + else: + self.success() + +ModesTest().execute() diff --git a/tests/kms-test-modeset.py b/tests/kms-test-modeset.py new file mode 100755 index 0000000..e3551f1 --- /dev/null +++ b/tests/kms-test-modeset.py @@ -0,0 +1,58 @@ +#!/usr/bin/python3 + +import kmstest +import pykms + +class ModeSetTest(kmstest.KMSTest): + """Test mode setting on all connectors in sequence with the default mode.""" + + def handle_page_flip(self, frame, time): + self.logger.log("Page flip complete") + + def main(self): + for connector in self.card.connectors: + self.start("atomic mode set on connector %s" % connector.fullname) + + # Skip disconnected connectors + if not connector.connected(): + self.skip("unconnected connector") + continue + + # Find a CRTC suitable for the connector + crtc = connector.get_current_crtc() + if not crtc: + crtcs = connector.get_possible_crtcs() + if len(crtcs) == 0: + pass + + crtc = crtcs[0] + + # Get the default mode for the connector + try: + mode = connector.get_default_mode() + except ValueError: + self.skip("no mode available") + continue + + self.logger.log("Testing connector %s on CRTC %u with mode %s" % \ + (connector.fullname, crtc.id, mode.name)) + + # Create a frame buffer + fb = pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24") + pykms.draw_test_pattern(fb) + + # Perform a mode set + ret = self.atomic_crtc_mode_set(crtc, connector, mode, fb) + if ret < 0: + self.fail("atomic mode set failed with %d" % ret) + continue + + self.logger.log("Atomic mode set complete") + self.run(5) + + if self.flips == 0: + self.fail("Page flip not registered") + else: + self.success() + +ModeSetTest().execute() diff --git a/tests/kms-test-pageflip.py b/tests/kms-test-pageflip.py new file mode 100755 index 0000000..2ab9b87 --- /dev/null +++ b/tests/kms-test-pageflip.py @@ -0,0 +1,118 @@ +#!/usr/bin/python3 + +import kmstest +import pykms + +class PageFlipTest(kmstest.KMSTest): + """Test page flipping on all connectors in sequence with the default mode.""" + + BAR_WIDTH = 20 + BAR_SPEED = 8 + + def handle_page_flip(self, frame, time): + if self.flips == 1: + self.logger.log("first page flip frame %u time %f" % (frame, time)) + self.frame_start = frame + self.time_start = time + + if self.stop_requested: + self.logger.log("last page flip frame %u time %f" % (frame, time)) + self.frame_end = frame + self.time_end = time + self.loop.stop() + self.stop_requested = False + return + + fb = self.fbs[self.front_buf] + self.front_buf = self.front_buf ^ 1 + + old_xpos = (self.bar_xpos - self.BAR_SPEED) % (fb.width - self.BAR_WIDTH); + new_xpos = (self.bar_xpos + self.BAR_SPEED) % (fb.width - self.BAR_WIDTH); + self.bar_xpos = new_xpos + + pykms.draw_color_bar(fb, old_xpos, new_xpos, self.BAR_WIDTH) + + source = kmstest.Rect(0, 0, fb.width, fb.height) + destination = kmstest.Rect(0, 0, fb.width, fb.height) + self.atomic_plane_set(self.plane, self.crtc, source, destination, fb) + + def stop_page_flip(self): + self.stop_requested = True + + def main(self): + for connector in self.card.connectors: + self.start("page flip on connector %s" % connector.fullname) + + # Skip disconnected connectors + if not connector.connected(): + self.skip("unconnected connector") + continue + + # Find a CRTC suitable for the connector + crtc = connector.get_current_crtc() + if not crtc: + crtcs = connector.get_possible_crtcs() + if len(crtcs) == 0: + pass + + crtc = crtcs[0] + + self.crtc = crtc + + # Find a plane suitable for the CRTC + for plane in self.card.planes: + if plane.supports_crtc(crtc): + self.plane = plane + break + else: + self.skip("no plane available for CRTC %u" % crtc.id) + continue + + # Get the default mode for the connector + try: + mode = connector.get_default_mode() + except ValueError: + self.skip("no mode available") + continue + + self.logger.log("Testing connector %s, CRTC %u, plane %u, mode %s" % \ + (connector.fullname, crtc.id, self.plane.id, mode.name)) + + # Create two frame buffers + self.fbs = [] + for i in range(2): + self.fbs.append(pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24")) + + # Set the mode and perform the initial page flip + ret = self.atomic_crtc_mode_set(crtc, connector, mode, self.fbs[0]) + if ret < 0: + self.fail("atomic mode set failed with %d" % ret) + continue + + # Flip pages for 10s + self.bar_xpos = 0 + self.front_buf = 0 + self.frame_start = 0 + self.frame_end = 0 + self.time_start = 0 + self.time_end = 0 + self.stop_requested = False + + self.loop.add_timer(10, self.stop_page_flip) + self.run(11) + + if not self.flips: + self.fail("No page flip registered") + continue + + if self.stop_requested: + self.fail("Last page flip not registered") + continue + + frames = self.frame_end - self.frame_start + 1 + interval = self.time_end - self.time_start + self.logger.log("Frame rate: %f (%u/%u frames in %f s)" % \ + (frames / interval, self.flips, frames, interval)) + self.success() + +PageFlipTest().execute() diff --git a/tests/kms-test-planeposition.py b/tests/kms-test-planeposition.py new file mode 100755 index 0000000..2a9935d --- /dev/null +++ b/tests/kms-test-planeposition.py @@ -0,0 +1,105 @@ +#!/usr/bin/python3 + +import kmstest +import pykms +import time + +class PlanePositionTest(kmstest.KMSTest): + """Test boundaries of plane positioning.""" + + def main(self): + self.start("plane positioning boundaries") + + # Find a CRTC with a connected connector and at least two planes + for connector in self.card.connectors: + if not connector.connected(): + self.skip("unconnected connector") + continue + + try: + mode = connector.get_default_mode() + except ValueError: + continue + + crtcs = connector.get_possible_crtcs() + for crtc in crtcs: + planes = [] + for plane in self.card.planes: + if plane.supports_crtc(crtc): + planes.append(plane) + + if len(planes) > 1: + break + else: + crtc = None + + if crtc: + break + + else: + self.skip("no CRTC available with connector and at least two planes") + return + + self.logger.log("Testing connector %s, CRTC %u, mode %s with %u planes" % \ + (connector.fullname, crtc.id, mode.name, len(planes))) + + # Create a frame buffer + fb = pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24") + pykms.draw_test_pattern(fb) + + # Set the mode with no plane, wait 5s for the monitor to wake up + ret = self.atomic_crtc_mode_set(crtc, connector, mode, sync=True) + if ret < 0: + self.fail("atomic mode set failed with %d" % ret) + return + + self.logger.log("Initial atomic mode set completed") + time.sleep(5) + + # Add the first plane to cover half of the CRTC + source = kmstest.Rect(0, 0, fb.width // 2, fb.height) + destination = kmstest.Rect(0, 0, fb.width // 2, fb.height) + ret = self.atomic_plane_set(planes[0], crtc, source, destination, fb, sync=True) + if ret < 0: + self.fail("atomic plane set for first plane failed with %d" % ret) + return + + self.logger.log("Root plane enabled") + time.sleep(3) + + # Add the second plane and move it around to cross all CRTC boundaries + offsets = ((50, 50), (150, 50), (50, 150), (-50, 50), (50, -50)) + for offset in offsets: + width = fb.width - 100 + height = fb.height - 100 + source = kmstest.Rect(0, 0, width, height) + destination = kmstest.Rect(offset[0], offset[1], width, height) + + ret = self.atomic_plane_set(planes[1], crtc, source, destination, fb, sync=True) + if ret < 0: + self.fail("atomic plane set with offset %d,%d" % offset) + return + + self.logger.log("Moved overlay plane to %d,%d" % offset) + time.sleep(3) + + # Try to move the plane completely off-screen. The device is expected to + # reject this. + offsets = ((mode.hdisplay, 50), (50, mode.vdisplay), + (-mode.hdisplay, 50), (50, -mode.vdisplay)) + for offset in offsets: + width = fb.width - 100 + height = fb.height - 100 + source = kmstest.Rect(0, 0, width, height) + destination = kmstest.Rect(offset[0], offset[1], width, height) + + ret = self.atomic_plane_set(planes[1], crtc, source, destination, fb, sync=True) + if ret >= 0: + self.fail("atomic plane set with invalid offset %d,%d accepted" % offset) + return + + self.logger.log("Failed to Move overlay plane to %d,%d as expected" % offset) + + self.success() + +PlanePositionTest().execute() diff --git a/tests/kmstest.py b/tests/kmstest.py new file mode 100755 index 0000000..cb0d9a7 --- /dev/null +++ b/tests/kmstest.py @@ -0,0 +1,327 @@ +#!/usr/bin/python3 + +import errno +import fcntl +import os +import pykms +import selectors +import sys +import time + +class Timer(object): + def __init__(self, timeout, callback): + self.timeout = time.clock_gettime(time.CLOCK_MONOTONIC) + timeout + self.callback = callback + + +class EventLoop(selectors.DefaultSelector): + def __init__(self): + super().__init__() + self.__timers = [] + + def add_timer(self, timeout, callback): + self.__timers.append(Timer(timeout, callback)) + self.__timers.sort(key=lambda timer: timer.timeout) + + def fire_timers(self): + clk = time.clock_gettime(time.CLOCK_MONOTONIC) + while len(self.__timers) > 0: + timer = self.__timers[0] + if timer.timeout > clk: + break + + del self.__timers[0] + timer.callback() + + def next_timeout(self): + clk = time.clock_gettime(time.CLOCK_MONOTONIC) + if len(self.__timers) == 0 or self.__timers[0].timeout < clk: + return None + + return self.__timers[0].timeout - clk + + def run(self, duration=0): + if duration: + self.add_timer(duration, self.stop) + + timeout = self.next_timeout() + + self._stop = False + while not self._stop: + for key, events in self.select(timeout): + key.data(key.fileobj, events) + self.fire_timers() + + self.__timers = [] + + def stop(self): + self._stop = True + + +class KernelLogMessage(object): + def __init__(self, msg): + pos = msg.find(";") + header = msg[:pos] + msg = msg[pos+1:] + + facility, sequence, timestamp, *other = header.split(",") + self.facility = int(facility) + self.sequence = int(sequence) + self.timestamp = int(timestamp) / 1000000. + + msg = msg.split("\n") + self.msg = msg[0] + self.tags = {} + + try: + tags = msg[1:-1] + for tag in tags: + tag = tag.strip().split("=") + self.tags[tag[0]] = tag[1] + except: + pass + + +class KernelLogReader(object): + def __init__(self): + self.kmsg = os.open("/dev/kmsg", 0) + flags = fcntl.fcntl(self.kmsg, fcntl.F_GETFL) + fcntl.fcntl(self.kmsg, fcntl.F_SETFL, flags | os.O_NONBLOCK) + os.lseek(self.kmsg, 0, os.SEEK_END) + + def __del__(self): + os.close(self.kmsg) + + def read(self): + msgs = [] + while True: + try: + msg = os.read(self.kmsg, 8191) + msg = msg.decode("utf-8") + except OSError as e: + if e.errno == errno.EAGAIN: + break + else: + raise e + msgs.append(KernelLogMessage(msg)) + + return msgs + + +class Logger(object): + def __init__(self, name): + self.logfile = open("%s.log" % name, "w") + self._kmsg = KernelLogReader() + + def __del__(self): + self.close() + + def close(self): + if self.logfile: + self.logfile.close() + self.logfile = None + + def event(self): + kmsgs = self._kmsg.read() + for msg in kmsgs: + self.logfile.write("K [%6f] %s\n" % (msg.timestamp, msg.msg)) + self.logfile.flush() + + @property + def fd(self): + return self._kmsg.kmsg + + def flush(self): + self.logfile.flush() + os.fsync(self.logfile) + + def log(self, msg): + # Start by processing the kernel log as there might not be any event + # loop running. + self.event() + + now = time.clock_gettime(time.CLOCK_MONOTONIC) + self.logfile.write("U [%6f] %s\n" % (now, msg)) + self.logfile.flush() + + +class Rect(object): + def __init__(self, left, top, width, height): + self.left = left + self.top = top + self.width = width + self.height = height + + +class KMSTest(object): + def __init__(self, use_default_key_handler=False): + if not getattr(self, 'main', None): + raise RuntimeError('Test class must implement main method') + + self.card = pykms.Card() + if not self.card.has_atomic: + raise RuntimeError("Device doesn't support the atomic API") + + logname = self.__class__.__name__ + self.logger = Logger(logname) + + self.loop = EventLoop() + self.loop.register(self.logger.fd, selectors.EVENT_READ, self.__read_logger) + self.loop.register(self.card.fd, selectors.EVENT_READ, self.__read_event) + if use_default_key_handler: + self.loop.register(sys.stdin, selectors.EVENT_READ, self.__read_key) + + def __del__(self): + self.logger.close() + + def __format_props(self, props): + return {k: v & ((1 << 64) - 1) for k, v in props.items()} + + def atomic_crtc_disable(self, crtc, sync=True): + req = pykms.AtomicReq(self.card) + req.add(crtc, 'ACTIVE', False) + if sync: + return req.commit_sync(True) + else: + return req.commit(self, True) + + def atomic_crtc_mode_set(self, crtc, connector, mode, fb=None, sync=False): + """Perform a mode set on the given connector and CRTC. The framebuffer, + if present, will be output on the primary plane. Otherwise no plane is + configured for the CRTC.""" + + # Mode blobs are reference-counted, make sure the blob stays valid until + # the commit completes. + mode_blob = mode.to_blob(self.card) + + req = pykms.AtomicReq(self.card) + req.add(connector, 'CRTC_ID', crtc.id) + req.add(crtc, { 'ACTIVE': 1, 'MODE_ID': mode_blob.id }) + if fb: + req.add(crtc.primary_plane, { + 'FB_ID': fb.id, + 'CRTC_ID': crtc.id, + 'SRC_X': 0, + 'SRC_Y': 0, + 'SRC_W': int(fb.width * 65536), + 'SRC_H': int(fb.height * 65536), + 'CRTC_X': 0, + 'CRTC_Y': 0, + 'CRTC_W': fb.width, + 'CRTC_H': fb.height, + }) + if sync: + return req.commit_sync(True) + else: + return req.commit(self, True) + + def atomic_plane_set(self, plane, crtc, source, destination, fb, sync=False): + req = pykms.AtomicReq(self.card) + req.add(plane, self.__format_props({ + 'FB_ID': fb.id, + 'CRTC_ID': crtc.id, + 'SRC_X': int(source.left * 65536), + 'SRC_Y': int(source.top * 65536), + 'SRC_W': int(source.width * 65536), + 'SRC_H': int(source.height * 65536), + 'CRTC_X': destination.left, + 'CRTC_Y': destination.top, + 'CRTC_W': destination.width, + 'CRTC_H': destination.height, + })) + if sync: + return req.commit_sync() + else: + return req.commit(self) + + def atomic_planes_disable(self, sync=True): + req = pykms.AtomicReq(self.card) + for plane in self.card.planes: + req.add(plane, { "FB_ID": 0, 'CRTC_ID': 0 }) + + if sync: + return req.commit_sync() + else: + return req.commit(self) + + def __handle_page_flip(self, frame, time): + self.flips += 1 + try: + # The handle_page_flip() method is optional, ignore attribute errors + self.handle_page_flip(frame, time) + except AttributeError: + pass + + def __read_event(self, fileobj, events): + for event in self.card.read_events(): + if event.type == pykms.DrmEventType.FLIP_COMPLETE: + self.__handle_page_flip(event.seq, event.time) + + def __read_logger(self, fileobj, events): + self.logger.event() + + def __read_key(self, fileobj, events): + sys.stdin.readline() + self.loop.stop() + + def execute(self): + """Execute the test by running the main function.""" + self.main() + + def flush_events(self): + """Discard all pending DRM events.""" + + # Temporarily switch to non-blocking I/O to read events, as there might + # be no event pending. + flags = fcntl.fcntl(self.card.fd, fcntl.F_GETFL) + fcntl.fcntl(self.card.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + # read_events() is a generator so we have to go through all events + # explicitly. Ignore -EAGAIN errors, they're expected in non-blocking + # I/O mode. + try: + for event in self.card.read_events(): + pass + except OSError as e: + if e.errno != errno.EAGAIN: + raise e + + fcntl.fcntl(self.card.fd, fcntl.F_SETFL, flags) + + def run(self, duration): + """Run the event loop for the given duration (in seconds).""" + self.flips = 0 + self.loop.run(duration) + + def start(self, name): + """Start a test.""" + self.test_name = name + self.logger.log("Testing %s" % name) + sys.stdout.write("Testing %s: " % name) + sys.stdout.flush() + + def progress(self, current, maximum): + sys.stdout.write("\rTesting %s: %u/%u" % (self.test_name, current, maximum)) + sys.stdout.flush() + + def fail(self, reason): + """Complete a test with failure.""" + self.logger.log("Test failed. Reason: %s" % reason) + self.logger.flush() + sys.stdout.write("\rTesting %s: FAIL\n" % self.test_name) + sys.stdout.flush() + + def skip(self, reason): + """Complete a test with skip.""" + self.logger.log("Test skipped. Reason: %s" % reason) + self.logger.flush() + sys.stdout.write("SKIP\n") + sys.stdout.flush() + + def success(self): + """Complete a test with success.""" + self.logger.log("Test completed successfully") + self.logger.flush() + sys.stdout.write("\rTesting %s: SUCCESS\n" % self.test_name) + sys.stdout.flush() + -- cgit v1.2.3