summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorLaurent Pinchart <laurent.pinchart@ideasonboard.com>2017-08-12 02:23:14 +0300
committerLaurent Pinchart <laurent.pinchart@ideasonboard.com>2017-08-16 01:00:27 +0300
commit5493e9c64964f8273453d539b89c742f36d5f279 (patch)
tree95b4b967ae6c770ec9382a853344b1ec0eb27898 /tests
Initial import
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/Makefile8
-rwxr-xr-xtests/kms-test-allplanes.py83
-rwxr-xr-xtests/kms-test-connectors.py28
-rwxr-xr-xtests/kms-test-modes.py65
-rwxr-xr-xtests/kms-test-modeset.py58
-rwxr-xr-xtests/kms-test-pageflip.py118
-rwxr-xr-xtests/kms-test-planeposition.py105
-rwxr-xr-xtests/kmstest.py327
8 files changed, 792 insertions, 0 deletions
diff --git a/tests/Makefile b/tests/Makefile
new file mode 100644
index 0000000..6586b29
--- /dev/null
+++ b/tests/Makefile
@@ -0,0 +1,8 @@
+SCRIPTS=$(wildcard *.sh)
+
+all:
+
+clean:
+
+install:
+ cp $(SCRIPTS) $(INSTALL_DIR)/
diff --git a/tests/kms-test-allplanes.py b/tests/kms-test-allplanes.py
new file mode 100755
index 0000000..9e6a18b
--- /dev/null
+++ b/tests/kms-test-allplanes.py
@@ -0,0 +1,83 @@
+#!/usr/bin/python3
+
+import kmstest
+import pykms
+
+class AllPlanesTest(kmstest.KMSTest):
+ """Test composition with all planes enabled on all CRTCs."""
+
+ def handle_page_flip(self, frame, time):
+ self.logger.log("Page flip complete")
+
+ def main(self):
+ # Create the connectors to CRTCs map
+ connectors = {}
+ for connector in self.card.connectors:
+ # Skip disconnected connectors
+ if not connector.connected():
+ continue
+
+ # Add the connector to the map
+ for crtc in connector.get_possible_crtcs():
+ if crtc not in connectors:
+ connectors[crtc] = connector
+
+ for crtc in self.card.crtcs:
+ self.start("composition on CRTC %u" % crtc.id)
+
+ # Get the connector and default mode
+ try:
+ connector = connectors[crtc];
+ mode = connector.get_default_mode()
+ except KeyError:
+ self.skip("no connector or mode available")
+ continue
+
+ # List planes available for the CRTC
+ planes = []
+ for plane in self.card.planes:
+ if plane.supports_crtc(crtc) and plane != crtc.primary_plane:
+ planes.append(plane)
+
+ if len(planes) == 0:
+ self.skip("no plane available for CRTC")
+ continue
+
+ self.logger.log("Testing connector %s, CRTC %u, mode %s with %u planes" % \
+ (connector.fullname, crtc.id, mode.name, len(planes)))
+
+ # Create a frame buffer
+ fb = pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24")
+ pykms.draw_test_pattern(fb)
+
+ # Set the mode with a primary plane
+ ret = self.atomic_crtc_mode_set(crtc, connector, mode, fb)
+ if ret < 0:
+ self.fail("atomic mode set failed with %d" % ret)
+ continue
+
+ self.run(3)
+
+ # Add all other planes one by one
+ offset = 100
+ for plane in planes:
+ source = kmstest.Rect(0, 0, fb.width, fb.height)
+ destination = kmstest.Rect(offset, offset, fb.width, fb.height)
+ ret = self.atomic_plane_set(plane, crtc, source, destination, fb)
+ if ret < 0:
+ self.fail("atomic plane set failed with %d" % ret)
+ break
+
+ self.logger.log("Adding plane %u" % plane.id)
+ self.run(1)
+
+ if self.flips == 0:
+ self.fail("No page flip registered")
+ break
+
+ offset += 50
+
+ else:
+ self.success()
+
+AllPlanesTest().execute()
diff --git a/tests/kms-test-connectors.py b/tests/kms-test-connectors.py
new file mode 100755
index 0000000..25fc5dc
--- /dev/null
+++ b/tests/kms-test-connectors.py
@@ -0,0 +1,28 @@
+#!/usr/bin/python3
+
+import kmstest
+import pykms
+
+class ConnectorsTest(kmstest.KMSTest):
+ """Perform sanity checks on all connectors."""
+
+ def main(self):
+ for connector in self.card.connectors:
+ self.start("connector %s" % connector.fullname)
+
+ # Every connector should have at least one suitable CRTC
+ crtcs = connector.get_possible_crtcs()
+ if len(crtcs) == 0:
+ self.fail("no possible CRTC")
+ continue
+
+ # Connected connectors should have at least one mode
+ if connector.connected():
+ modes = connector.get_modes()
+ if len(modes) == 0:
+ self.fail("no mode available")
+ continue
+
+ self.success()
+
+ConnectorsTest().execute()
diff --git a/tests/kms-test-modes.py b/tests/kms-test-modes.py
new file mode 100755
index 0000000..31128a7
--- /dev/null
+++ b/tests/kms-test-modes.py
@@ -0,0 +1,65 @@
+#!/usr/bin/python3
+
+import kmstest
+import pykms
+
+class ModesTest(kmstest.KMSTest):
+ """Test all available modes on all available connectors."""
+
+ def handle_page_flip(self, frame, time):
+ self.logger.log("Page flip complete")
+
+ def test_mode(self, connector, crtc, mode):
+ self.logger.log("Testing connector %s on CRTC %u with mode %s" % \
+ (connector.fullname, crtc.id, mode.name))
+
+ # Create a frame buffer
+ fb = pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24")
+ pykms.draw_test_pattern(fb)
+
+ # Perform the mode set
+ ret = self.atomic_crtc_mode_set(crtc, connector, mode, fb)
+ if ret < 0:
+ raise RuntimeError("atomic mode set failed with %d" % ret)
+
+ self.logger.log("Atomic mode set complete")
+ self.run(4)
+
+ if self.flips == 0:
+ raise RuntimeError("Page flip not registered")
+
+ def main(self):
+ for connector in self.card.connectors:
+ self.start("modes on connector %s" % connector.fullname)
+
+ # Skip disconnected connectors
+ if not connector.connected():
+ self.skip("unconnected connector")
+ continue
+
+ # Find a CRTC suitable for the connector
+ crtc = connector.get_current_crtc()
+ if not crtc:
+ crtcs = connector.get_possible_crtcs()
+ if len(crtcs) == 0:
+ pass
+
+ crtc = crtcs[0]
+
+ # Test all available modes
+ modes = connector.get_modes()
+ if len(modes) == 0:
+ self.skip("no mode available")
+ continue
+
+ for i in range(len(modes)):
+ try:
+ self.progress(i+1, len(modes))
+ self.test_mode(connector, crtc, modes[i])
+ except RuntimeError as e:
+ self.fail(e.message)
+ break
+ else:
+ self.success()
+
+ModesTest().execute()
diff --git a/tests/kms-test-modeset.py b/tests/kms-test-modeset.py
new file mode 100755
index 0000000..e3551f1
--- /dev/null
+++ b/tests/kms-test-modeset.py
@@ -0,0 +1,58 @@
+#!/usr/bin/python3
+
+import kmstest
+import pykms
+
+class ModeSetTest(kmstest.KMSTest):
+ """Test mode setting on all connectors in sequence with the default mode."""
+
+ def handle_page_flip(self, frame, time):
+ self.logger.log("Page flip complete")
+
+ def main(self):
+ for connector in self.card.connectors:
+ self.start("atomic mode set on connector %s" % connector.fullname)
+
+ # Skip disconnected connectors
+ if not connector.connected():
+ self.skip("unconnected connector")
+ continue
+
+ # Find a CRTC suitable for the connector
+ crtc = connector.get_current_crtc()
+ if not crtc:
+ crtcs = connector.get_possible_crtcs()
+ if len(crtcs) == 0:
+ pass
+
+ crtc = crtcs[0]
+
+ # Get the default mode for the connector
+ try:
+ mode = connector.get_default_mode()
+ except ValueError:
+ self.skip("no mode available")
+ continue
+
+ self.logger.log("Testing connector %s on CRTC %u with mode %s" % \
+ (connector.fullname, crtc.id, mode.name))
+
+ # Create a frame buffer
+ fb = pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24")
+ pykms.draw_test_pattern(fb)
+
+ # Perform a mode set
+ ret = self.atomic_crtc_mode_set(crtc, connector, mode, fb)
+ if ret < 0:
+ self.fail("atomic mode set failed with %d" % ret)
+ continue
+
+ self.logger.log("Atomic mode set complete")
+ self.run(5)
+
+ if self.flips == 0:
+ self.fail("Page flip not registered")
+ else:
+ self.success()
+
+ModeSetTest().execute()
diff --git a/tests/kms-test-pageflip.py b/tests/kms-test-pageflip.py
new file mode 100755
index 0000000..2ab9b87
--- /dev/null
+++ b/tests/kms-test-pageflip.py
@@ -0,0 +1,118 @@
+#!/usr/bin/python3
+
+import kmstest
+import pykms
+
+class PageFlipTest(kmstest.KMSTest):
+ """Test page flipping on all connectors in sequence with the default mode."""
+
+ BAR_WIDTH = 20
+ BAR_SPEED = 8
+
+ def handle_page_flip(self, frame, time):
+ if self.flips == 1:
+ self.logger.log("first page flip frame %u time %f" % (frame, time))
+ self.frame_start = frame
+ self.time_start = time
+
+ if self.stop_requested:
+ self.logger.log("last page flip frame %u time %f" % (frame, time))
+ self.frame_end = frame
+ self.time_end = time
+ self.loop.stop()
+ self.stop_requested = False
+ return
+
+ fb = self.fbs[self.front_buf]
+ self.front_buf = self.front_buf ^ 1
+
+ old_xpos = (self.bar_xpos - self.BAR_SPEED) % (fb.width - self.BAR_WIDTH);
+ new_xpos = (self.bar_xpos + self.BAR_SPEED) % (fb.width - self.BAR_WIDTH);
+ self.bar_xpos = new_xpos
+
+ pykms.draw_color_bar(fb, old_xpos, new_xpos, self.BAR_WIDTH)
+
+ source = kmstest.Rect(0, 0, fb.width, fb.height)
+ destination = kmstest.Rect(0, 0, fb.width, fb.height)
+ self.atomic_plane_set(self.plane, self.crtc, source, destination, fb)
+
+ def stop_page_flip(self):
+ self.stop_requested = True
+
+ def main(self):
+ for connector in self.card.connectors:
+ self.start("page flip on connector %s" % connector.fullname)
+
+ # Skip disconnected connectors
+ if not connector.connected():
+ self.skip("unconnected connector")
+ continue
+
+ # Find a CRTC suitable for the connector
+ crtc = connector.get_current_crtc()
+ if not crtc:
+ crtcs = connector.get_possible_crtcs()
+ if len(crtcs) == 0:
+ pass
+
+ crtc = crtcs[0]
+
+ self.crtc = crtc
+
+ # Find a plane suitable for the CRTC
+ for plane in self.card.planes:
+ if plane.supports_crtc(crtc):
+ self.plane = plane
+ break
+ else:
+ self.skip("no plane available for CRTC %u" % crtc.id)
+ continue
+
+ # Get the default mode for the connector
+ try:
+ mode = connector.get_default_mode()
+ except ValueError:
+ self.skip("no mode available")
+ continue
+
+ self.logger.log("Testing connector %s, CRTC %u, plane %u, mode %s" % \
+ (connector.fullname, crtc.id, self.plane.id, mode.name))
+
+ # Create two frame buffers
+ self.fbs = []
+ for i in range(2):
+ self.fbs.append(pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24"))
+
+ # Set the mode and perform the initial page flip
+ ret = self.atomic_crtc_mode_set(crtc, connector, mode, self.fbs[0])
+ if ret < 0:
+ self.fail("atomic mode set failed with %d" % ret)
+ continue
+
+ # Flip pages for 10s
+ self.bar_xpos = 0
+ self.front_buf = 0
+ self.frame_start = 0
+ self.frame_end = 0
+ self.time_start = 0
+ self.time_end = 0
+ self.stop_requested = False
+
+ self.loop.add_timer(10, self.stop_page_flip)
+ self.run(11)
+
+ if not self.flips:
+ self.fail("No page flip registered")
+ continue
+
+ if self.stop_requested:
+ self.fail("Last page flip not registered")
+ continue
+
+ frames = self.frame_end - self.frame_start + 1
+ interval = self.time_end - self.time_start
+ self.logger.log("Frame rate: %f (%u/%u frames in %f s)" % \
+ (frames / interval, self.flips, frames, interval))
+ self.success()
+
+PageFlipTest().execute()
diff --git a/tests/kms-test-planeposition.py b/tests/kms-test-planeposition.py
new file mode 100755
index 0000000..2a9935d
--- /dev/null
+++ b/tests/kms-test-planeposition.py
@@ -0,0 +1,105 @@
+#!/usr/bin/python3
+
+import kmstest
+import pykms
+import time
+
+class PlanePositionTest(kmstest.KMSTest):
+ """Test boundaries of plane positioning."""
+
+ def main(self):
+ self.start("plane positioning boundaries")
+
+ # Find a CRTC with a connected connector and at least two planes
+ for connector in self.card.connectors:
+ if not connector.connected():
+ self.skip("unconnected connector")
+ continue
+
+ try:
+ mode = connector.get_default_mode()
+ except ValueError:
+ continue
+
+ crtcs = connector.get_possible_crtcs()
+ for crtc in crtcs:
+ planes = []
+ for plane in self.card.planes:
+ if plane.supports_crtc(crtc):
+ planes.append(plane)
+
+ if len(planes) > 1:
+ break
+ else:
+ crtc = None
+
+ if crtc:
+ break
+
+ else:
+ self.skip("no CRTC available with connector and at least two planes")
+ return
+
+ self.logger.log("Testing connector %s, CRTC %u, mode %s with %u planes" % \
+ (connector.fullname, crtc.id, mode.name, len(planes)))
+
+ # Create a frame buffer
+ fb = pykms.DumbFramebuffer(self.card, mode.hdisplay, mode.vdisplay, "XR24")
+ pykms.draw_test_pattern(fb)
+
+ # Set the mode with no plane, wait 5s for the monitor to wake up
+ ret = self.atomic_crtc_mode_set(crtc, connector, mode, sync=True)
+ if ret < 0:
+ self.fail("atomic mode set failed with %d" % ret)
+ return
+
+ self.logger.log("Initial atomic mode set completed")
+ time.sleep(5)
+
+ # Add the first plane to cover half of the CRTC
+ source = kmstest.Rect(0, 0, fb.width // 2, fb.height)
+ destination = kmstest.Rect(0, 0, fb.width // 2, fb.height)
+ ret = self.atomic_plane_set(planes[0], crtc, source, destination, fb, sync=True)
+ if ret < 0:
+ self.fail("atomic plane set for first plane failed with %d" % ret)
+ return
+
+ self.logger.log("Root plane enabled")
+ time.sleep(3)
+
+ # Add the second plane and move it around to cross all CRTC boundaries
+ offsets = ((50, 50), (150, 50), (50, 150), (-50, 50), (50, -50))
+ for offset in offsets:
+ width = fb.width - 100
+ height = fb.height - 100
+ source = kmstest.Rect(0, 0, width, height)
+ destination = kmstest.Rect(offset[0], offset[1], width, height)
+
+ ret = self.atomic_plane_set(planes[1], crtc, source, destination, fb, sync=True)
+ if ret < 0:
+ self.fail("atomic plane set with offset %d,%d" % offset)
+ return
+
+ self.logger.log("Moved overlay plane to %d,%d" % offset)
+ time.sleep(3)
+
+ # Try to move the plane completely off-screen. The device is expected to
+ # reject this.
+ offsets = ((mode.hdisplay, 50), (50, mode.vdisplay),
+ (-mode.hdisplay, 50), (50, -mode.vdisplay))
+ for offset in offsets:
+ width = fb.width - 100
+ height = fb.height - 100
+ source = kmstest.Rect(0, 0, width, height)
+ destination = kmstest.Rect(offset[0], offset[1], width, height)
+
+ ret = self.atomic_plane_set(planes[1], crtc, source, destination, fb, sync=True)
+ if ret >= 0:
+ self.fail("atomic plane set with invalid offset %d,%d accepted" % offset)
+ return
+
+ self.logger.log("Failed to Move overlay plane to %d,%d as expected" % offset)
+
+ self.success()
+
+PlanePositionTest().execute()
diff --git a/tests/kmstest.py b/tests/kmstest.py
new file mode 100755
index 0000000..cb0d9a7
--- /dev/null
+++ b/tests/kmstest.py
@@ -0,0 +1,327 @@
+#!/usr/bin/python3
+
+import errno
+import fcntl
+import os
+import pykms
+import selectors
+import sys
+import time
+
+class Timer(object):
+ def __init__(self, timeout, callback):
+ self.timeout = time.clock_gettime(time.CLOCK_MONOTONIC) + timeout
+ self.callback = callback
+
+
+class EventLoop(selectors.DefaultSelector):
+ def __init__(self):
+ super().__init__()
+ self.__timers = []
+
+ def add_timer(self, timeout, callback):
+ self.__timers.append(Timer(timeout, callback))
+ self.__timers.sort(key=lambda timer: timer.timeout)
+
+ def fire_timers(self):
+ clk = time.clock_gettime(time.CLOCK_MONOTONIC)
+ while len(self.__timers) > 0:
+ timer = self.__timers[0]
+ if timer.timeout > clk:
+ break
+
+ del self.__timers[0]
+ timer.callback()
+
+ def next_timeout(self):
+ clk = time.clock_gettime(time.CLOCK_MONOTONIC)
+ if len(self.__timers) == 0 or self.__timers[0].timeout < clk:
+ return None
+
+ return self.__timers[0].timeout - clk
+
+ def run(self, duration=0):
+ if duration:
+ self.add_timer(duration, self.stop)
+
+ timeout = self.next_timeout()
+
+ self._stop = False
+ while not self._stop:
+ for key, events in self.select(timeout):
+ key.data(key.fileobj, events)
+ self.fire_timers()
+
+ self.__timers = []
+
+ def stop(self):
+ self._stop = True
+
+
+class KernelLogMessage(object):
+ def __init__(self, msg):
+ pos = msg.find(";")
+ header = msg[:pos]
+ msg = msg[pos+1:]
+
+ facility, sequence, timestamp, *other = header.split(",")
+ self.facility = int(facility)
+ self.sequence = int(sequence)
+ self.timestamp = int(timestamp) / 1000000.
+
+ msg = msg.split("\n")
+ self.msg = msg[0]
+ self.tags = {}
+
+ try:
+ tags = msg[1:-1]
+ for tag in tags:
+ tag = tag.strip().split("=")
+ self.tags[tag[0]] = tag[1]
+ except:
+ pass
+
+
+class KernelLogReader(object):
+ def __init__(self):
+ self.kmsg = os.open("/dev/kmsg", 0)
+ flags = fcntl.fcntl(self.kmsg, fcntl.F_GETFL)
+ fcntl.fcntl(self.kmsg, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+ os.lseek(self.kmsg, 0, os.SEEK_END)
+
+ def __del__(self):
+ os.close(self.kmsg)
+
+ def read(self):
+ msgs = []
+ while True:
+ try:
+ msg = os.read(self.kmsg, 8191)
+ msg = msg.decode("utf-8")
+ except OSError as e:
+ if e.errno == errno.EAGAIN:
+ break
+ else:
+ raise e
+ msgs.append(KernelLogMessage(msg))
+
+ return msgs
+
+
+class Logger(object):
+ def __init__(self, name):
+ self.logfile = open("%s.log" % name, "w")
+ self._kmsg = KernelLogReader()
+
+ def __del__(self):
+ self.close()
+
+ def close(self):
+ if self.logfile:
+ self.logfile.close()
+ self.logfile = None
+
+ def event(self):
+ kmsgs = self._kmsg.read()
+ for msg in kmsgs:
+ self.logfile.write("K [%6f] %s\n" % (msg.timestamp, msg.msg))
+ self.logfile.flush()
+
+ @property
+ def fd(self):
+ return self._kmsg.kmsg
+
+ def flush(self):
+ self.logfile.flush()
+ os.fsync(self.logfile)
+
+ def log(self, msg):
+ # Start by processing the kernel log as there might not be any event
+ # loop running.
+ self.event()
+
+ now = time.clock_gettime(time.CLOCK_MONOTONIC)
+ self.logfile.write("U [%6f] %s\n" % (now, msg))
+ self.logfile.flush()
+
+
+class Rect(object):
+ def __init__(self, left, top, width, height):
+ self.left = left
+ self.top = top
+ self.width = width
+ self.height = height
+
+
+class KMSTest(object):
+ def __init__(self, use_default_key_handler=False):
+ if not getattr(self, 'main', None):
+ raise RuntimeError('Test class must implement main method')
+
+ self.card = pykms.Card()
+ if not self.card.has_atomic:
+ raise RuntimeError("Device doesn't support the atomic API")
+
+ logname = self.__class__.__name__
+ self.logger = Logger(logname)
+
+ self.loop = EventLoop()
+ self.loop.register(self.logger.fd, selectors.EVENT_READ, self.__read_logger)
+ self.loop.register(self.card.fd, selectors.EVENT_READ, self.__read_event)
+ if use_default_key_handler:
+ self.loop.register(sys.stdin, selectors.EVENT_READ, self.__read_key)
+
+ def __del__(self):
+ self.logger.close()
+
+ def __format_props(self, props):
+ return {k: v & ((1 << 64) - 1) for k, v in props.items()}
+
+ def atomic_crtc_disable(self, crtc, sync=True):
+ req = pykms.AtomicReq(self.card)
+ req.add(crtc, 'ACTIVE', False)
+ if sync:
+ return req.commit_sync(True)
+ else:
+ return req.commit(self, True)
+
+ def atomic_crtc_mode_set(self, crtc, connector, mode, fb=None, sync=False):
+ """Perform a mode set on the given connector and CRTC. The framebuffer,
+ if present, will be output on the primary plane. Otherwise no plane is
+ configured for the CRTC."""
+
+ # Mode blobs are reference-counted, make sure the blob stays valid until
+ # the commit completes.
+ mode_blob = mode.to_blob(self.card)
+
+ req = pykms.AtomicReq(self.card)
+ req.add(connector, 'CRTC_ID', crtc.id)
+ req.add(crtc, { 'ACTIVE': 1, 'MODE_ID': mode_blob.id })
+ if fb:
+ req.add(crtc.primary_plane, {
+ 'FB_ID': fb.id,
+ 'CRTC_ID': crtc.id,
+ 'SRC_X': 0,
+ 'SRC_Y': 0,
+ 'SRC_W': int(fb.width * 65536),
+ 'SRC_H': int(fb.height * 65536),
+ 'CRTC_X': 0,
+ 'CRTC_Y': 0,
+ 'CRTC_W': fb.width,
+ 'CRTC_H': fb.height,
+ })
+ if sync:
+ return req.commit_sync(True)
+ else:
+ return req.commit(self, True)
+
+ def atomic_plane_set(self, plane, crtc, source, destination, fb, sync=False):
+ req = pykms.AtomicReq(self.card)
+ req.add(plane, self.__format_props({
+ 'FB_ID': fb.id,
+ 'CRTC_ID': crtc.id,
+ 'SRC_X': int(source.left * 65536),
+ 'SRC_Y': int(source.top * 65536),
+ 'SRC_W': int(source.width * 65536),
+ 'SRC_H': int(source.height * 65536),
+ 'CRTC_X': destination.left,
+ 'CRTC_Y': destination.top,
+ 'CRTC_W': destination.width,
+ 'CRTC_H': destination.height,
+ }))
+ if sync:
+ return req.commit_sync()
+ else:
+ return req.commit(self)
+
+ def atomic_planes_disable(self, sync=True):
+ req = pykms.AtomicReq(self.card)
+ for plane in self.card.planes:
+ req.add(plane, { "FB_ID": 0, 'CRTC_ID': 0 })
+
+ if sync:
+ return req.commit_sync()
+ else:
+ return req.commit(self)
+
+ def __handle_page_flip(self, frame, time):
+ self.flips += 1
+ try:
+ # The handle_page_flip() method is optional, ignore attribute errors
+ self.handle_page_flip(frame, time)
+ except AttributeError:
+ pass
+
+ def __read_event(self, fileobj, events):
+ for event in self.card.read_events():
+ if event.type == pykms.DrmEventType.FLIP_COMPLETE:
+ self.__handle_page_flip(event.seq, event.time)
+
+ def __read_logger(self, fileobj, events):
+ self.logger.event()
+
+ def __read_key(self, fileobj, events):
+ sys.stdin.readline()
+ self.loop.stop()
+
+ def execute(self):
+ """Execute the test by running the main function."""
+ self.main()
+
+ def flush_events(self):
+ """Discard all pending DRM events."""
+
+ # Temporarily switch to non-blocking I/O to read events, as there might
+ # be no event pending.
+ flags = fcntl.fcntl(self.card.fd, fcntl.F_GETFL)
+ fcntl.fcntl(self.card.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ # read_events() is a generator so we have to go through all events
+ # explicitly. Ignore -EAGAIN errors, they're expected in non-blocking
+ # I/O mode.
+ try:
+ for event in self.card.read_events():
+ pass
+ except OSError as e:
+ if e.errno != errno.EAGAIN:
+ raise e
+
+ fcntl.fcntl(self.card.fd, fcntl.F_SETFL, flags)
+
+ def run(self, duration):
+ """Run the event loop for the given duration (in seconds)."""
+ self.flips = 0
+ self.loop.run(duration)
+
+ def start(self, name):
+ """Start a test."""
+ self.test_name = name
+ self.logger.log("Testing %s" % name)
+ sys.stdout.write("Testing %s: " % name)
+ sys.stdout.flush()
+
+ def progress(self, current, maximum):
+ sys.stdout.write("\rTesting %s: %u/%u" % (self.test_name, current, maximum))
+ sys.stdout.flush()
+
+ def fail(self, reason):
+ """Complete a test with failure."""
+ self.logger.log("Test failed. Reason: %s" % reason)
+ self.logger.flush()
+ sys.stdout.write("\rTesting %s: FAIL\n" % self.test_name)
+ sys.stdout.flush()
+
+ def skip(self, reason):
+ """Complete a test with skip."""
+ self.logger.log("Test skipped. Reason: %s" % reason)
+ self.logger.flush()
+ sys.stdout.write("SKIP\n")
+ sys.stdout.flush()
+
+ def success(self):
+ """Complete a test with success."""
+ self.logger.log("Test completed successfully")
+ self.logger.flush()
+ sys.stdout.write("\rTesting %s: SUCCESS\n" % self.test_name)
+ sys.stdout.flush()
+