#!/usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # SPDX-FileCopyrightText: 2017-2019 Renesas Electronics Corporation import collections.abc import errno import fcntl import os import pykms import selectors import sys import time class Timer(object): def __init__(self, timeout, callback): self.timeout = time.clock_gettime(time.CLOCK_MONOTONIC) + timeout self.callback = callback class EventLoop(selectors.DefaultSelector): def __init__(self): super().__init__() self.__timers = [] def add_timer(self, timeout, callback): self.__timers.append(Timer(timeout, callback)) self.__timers.sort(key=lambda timer: timer.timeout) def fire_timers(self): clk = time.clock_gettime(time.CLOCK_MONOTONIC) while len(self.__timers) > 0: timer = self.__timers[0] if timer.timeout > clk: break del self.__timers[0] timer.callback() def next_timeout(self): clk = time.clock_gettime(time.CLOCK_MONOTONIC) if len(self.__timers) == 0 or self.__timers[0].timeout < clk: return None return self.__timers[0].timeout - clk def run(self, duration=0): if duration: self.add_timer(duration, self.stop) timeout = self.next_timeout() self._stop = False while not self._stop: for key, events in self.select(timeout): key.data(key.fileobj, events) self.fire_timers() self.__timers = [] def stop(self): self._stop = True class KernelLogMessage(object): def __init__(self, msg): pos = msg.find(';') header = msg[:pos] msg = msg[pos+1:] facility, sequence, timestamp, *other = header.split(',') self.facility = int(facility) self.sequence = int(sequence) self.timestamp = int(timestamp) / 1000000. msg = msg.split('\n') self.msg = msg[0] self.tags = {} try: tags = msg[1:-1] for tag in tags: tag = tag.strip().split('=') self.tags[tag[0]] = tag[1] except: pass class KernelLogReader(object): def __init__(self): self.kmsg = os.open('/dev/kmsg', 0) flags = fcntl.fcntl(self.kmsg, fcntl.F_GETFL) fcntl.fcntl(self.kmsg, fcntl.F_SETFL, flags | os.O_NONBLOCK) os.lseek(self.kmsg, 0, os.SEEK_END) def __del__(self): os.close(self.kmsg) def read(self): msgs = [] while True: try: msg = os.read(self.kmsg, 8191) msg = msg.decode('utf-8') except OSError as e: if e.errno == errno.EAGAIN: break else: raise e msgs.append(KernelLogMessage(msg)) return msgs class Logger(object): def __init__(self, name): self.logfile = open(f'{name}.log', 'w') self._kmsg = KernelLogReader() def __del__(self): self.close() def close(self): if self.logfile: # Capture the last kernel messages. self.event() self.logfile.close() self.logfile = None def event(self): kmsgs = self._kmsg.read() for msg in kmsgs: self.logfile.write(f'K [{msg.timestamp:6f} {msg.msg}\n') self.logfile.flush() @property def fd(self): return self._kmsg.kmsg def flush(self): self.logfile.flush() os.fsync(self.logfile) def log(self, msg): # Start by processing the kernel log as there might not be any event # loop running. self.event() now = time.clock_gettime(time.CLOCK_MONOTONIC) self.logfile.write(f'U [{now:6f}] {msg}\n') self.logfile.flush() class CRC(object): def __init__(self, crc): if crc.startswith('XXXXXXXXXX'): self.frame = None else: self.frame = int(crc[:10], 16) crc = crc[11:].strip('\n\0').split(' ') self.crcs = [int(c, 16) for c in crc] class CRCReader(object): MAX_CRC_ENTRIES = 10 MAX_LINE_LEN = 10 + 11 * MAX_CRC_ENTRIES + 1 def __init__(self, crtc): self.pipe = crtc.idx self.ctrl = -1 self.dir = -1 self.data = -1 # Hardcode the device minor to 0 as the KMSTest constructor opens the # default card object. self.dir = os.open(f'/sys/kernel/debug/dri/0/crtc-{self.pipe}/crc', 0) self.ctrl = os.open('control', os.O_WRONLY, dir_fd = self.dir) def __del__(self): self.stop() if self.ctrl != -1: os.close(self.ctrl) if self.dir != -1: os.close(self.dir) def start(self, source): os.write(self.ctrl, source.encode('ascii')) self.data = os.open('data', os.O_RDONLY, dir_fd = self.dir) def stop(self): if self.data != -1: os.close(self.data) self.data = -1 def read(self, num_entries=1): crcs = [] while len(crcs) < num_entries: try: crc = os.read(self.data, CRCReader.MAX_LINE_LEN) crc = crc.decode('ascii') except OSError as e: if e.errno == errno.EAGAIN: break else: raise e crcs.append(CRC(crc)) return crcs class Dist(object): def __init__(self, x, y): self.x = x self.y = y def __repr__(self): return f'({self.x},{self.y})' class Point(object): def __init__(self, x, y): self.x = x self.y = y def __repr__(self): return f'({self.x},{self.y})' def move(self, distance): self.x += distance.x self.y += distance.y class Size(object): def __init__(self, width, height): self.width = width self.height = height def __repr__(self): return f'{self.width}x{self.height}' class Rect(object): def __init__(self, left, top, width, height): self.left = left self.top = top self.width = width self.height = height def __repr__(self): return f'({self.left},{self.top})/{self.width}x{self.height}' def isEmpty(self): """Check if the rectangle has a zero width or height""" return self.width == 0 or self.height == 0 class AtomicRequest(pykms.AtomicReq): """pymkms.AtomicReq wrapper to track state changes""" def __init__(self, test): super().__init__(test.card) self.__test = test self.__props = {} def __format_props(self, obj, props): out = {} for k, v in props.items(): if isinstance(v, str): if v.endswith('%'): prop = obj.get_prop(k) if not prop: raise RuntimeError(f'Property {k} not supported by object {obj}') if prop.type not in (pykms.PropertyType.Range, pykms.PropertyType.SignedRange): raise RuntimeError(f'Unsupported property type {prop.type} for value {v}') min, max = prop.values v = min + int((max - min) * int(v[:-1]) / 100) elif v.isnumeric(): v = int(v) else: prop = obj.get_prop(k) if not prop: raise RuntimeError(f'Property {k} not supported by object {obj}') if prop.type != pykms.PropertyType.Enum: raise RuntimeError(f'Unsupported property type {prop.type} for value {v}') for value, name in prop.enums.items(): if name == v: v = value break else: raise RuntimeError(f'Enum value with name "{v}" not found in property {k}') if not isinstance(v, int): raise RuntimeError(f'Unsupported value type {type(v)} for property {k}') # Convert negative values to a 64-bit unsigned integer as required # by the bindings for AtomicRequest::add(). out[k] = v & ((1 << 64) - 1) return out def add(self, obj, *kwargs): if obj.id not in self.__props: self.__props[obj.id] = {} obj_props = self.__props[obj.id] if len(kwargs) == 1 and isinstance(kwargs[0], collections.abc.Mapping): props = self.__format_props(obj, kwargs[0]) elif len(kwargs) == 2: props = self.__format_props(obj, { kwargs[0]: kwargs[1] }) obj_props.update(props) super().add(obj, props) def commit(self, data=0, allow_modeset=False): ret = super().commit(data, allow_modeset) if ret == 0: self.__test._props.update(self.__props) return ret def commit_sync(self, allow_modeset=False): ret = super().commit_sync(allow_modeset) if ret == 0: self.__test._props.update(self.__props) return ret def __repr__(self): return repr(self.__props) class KMSTest(object): def __init__(self, use_default_key_handler=False): if not getattr(self, 'main', None): raise RuntimeError('Test class must implement main method') self.card = pykms.Card() if not self.card.has_atomic: raise RuntimeError("Device doesn't support the atomic API") self._props = {} logname = self.__class__.__name__ self.logger = Logger(logname) self.loop = EventLoop() self.loop.register(self.logger.fd, selectors.EVENT_READ, self.__read_logger) self.loop.register(self.card.fd, selectors.EVENT_READ, self.__read_event) if use_default_key_handler: self.loop.register(sys.stdin, selectors.EVENT_READ, self.__read_key) def __enter__(self): return self def __exit__(self, *err): self.card = None self.loop.close() self.logger.close() def __del__(self): self.card = None self.loop.close() self.logger.close() def atomic_crtc_disable(self, crtc, sync=True): req = AtomicRequest(self) req.add(crtc, {'ACTIVE': 0, 'MODE_ID': 0}) for connector in self.card.connectors: if connector.id in self._props: props = self._props[connector.id] try: if props['CRTC_ID'] == crtc.id: req.add(connector, 'CRTC_ID', 0) except KeyError: pass for plane in self.card.planes: if plane.id in self._props: props = self._props[plane.id] try: if props['CRTC_ID'] == crtc.id: req.add(plane, {'CRTC_ID': 0, 'FB_ID': 0}) except KeyError: pass if sync: return req.commit_sync(True) else: return req.commit(0, True) def atomic_crtc_mode_set(self, crtc, connector, mode, fb=None, sync=False): """Perform a mode set on the given connector and CRTC. The framebuffer, if present, will be output on the primary plane. Otherwise no plane is configured for the CRTC.""" # Mode blobs are reference-counted, make sure the blob stays valid until # the commit completes. mode_blob = mode.to_blob(self.card) req = AtomicRequest(self) req.add(connector, 'CRTC_ID', crtc.id) req.add(crtc, { 'ACTIVE': 1, 'MODE_ID': mode_blob.id }) if fb: req.add(crtc.primary_plane, { 'FB_ID': fb.id, 'CRTC_ID': crtc.id, 'SRC_X': 0, 'SRC_Y': 0, 'SRC_W': int(fb.width * 65536), 'SRC_H': int(fb.height * 65536), 'CRTC_X': 0, 'CRTC_Y': 0, 'CRTC_W': fb.width, 'CRTC_H': fb.height, }) if sync: return req.commit_sync(True) else: return req.commit(0, True) def atomic_plane_set(self, plane, crtc, source, destination, fb, alpha=None, zpos=None, blendmode=None, sync=False): req = AtomicRequest(self) req.add(plane, { 'FB_ID': fb.id, 'CRTC_ID': crtc.id, 'SRC_X': int(source.left * 65536), 'SRC_Y': int(source.top * 65536), 'SRC_W': int(source.width * 65536), 'SRC_H': int(source.height * 65536), 'CRTC_X': destination.left, 'CRTC_Y': destination.top, 'CRTC_W': destination.width, 'CRTC_H': destination.height, }) if alpha is not None: req.add(plane, 'alpha', alpha) if zpos is not None: req.add(plane, 'zpos', zpos) if blendmode is not None: req.add(plane, 'pixel blend mode', blendmode) if sync: return req.commit_sync() else: return req.commit(0) def atomic_plane_disable(self, plane, sync=True): req = AtomicRequest(self) req.add(plane, { 'FB_ID': 0, 'CRTC_ID': 0 }) if sync: return req.commit_sync() else: return req.commit(0) def atomic_planes_disable(self, sync=True): req = AtomicRequest(self) for plane in self.card.planes: req.add(plane, { 'FB_ID': 0, 'CRTC_ID': 0 }) if sync: return req.commit_sync() else: return req.commit(0) def output_connectors(self): for connector in self.card.connectors: if connector.fullname.startswith('writeback-'): continue yield connector def __handle_page_flip(self, frame, time): self.flips += 1 try: # The handle_page_flip() method is optional, ignore attribute errors self.handle_page_flip(frame, time) except AttributeError: pass def __read_event(self, fileobj, events): for event in self.card.read_events(): if event.type == pykms.DrmEventType.FLIP_COMPLETE: self.__handle_page_flip(event.seq, event.time) def __read_logger(self, fileobj, events): self.logger.event() def __read_key(self, fileobj, events): sys.stdin.readline() self.loop.stop() def execute(self): """Execute the test by running the main function.""" self.main() def flush_events(self): """Discard all pending DRM events.""" # Temporarily switch to non-blocking I/O to read events, as there might # be no event pending. flags = fcntl.fcntl(self.card.fd, fcntl.F_GETFL) fcntl.fcntl(self.card.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) # read_events() is a generator so we have to go through all events # explicitly. Ignore -EAGAIN errors, they're expected in non-blocking # I/O mode. try: for event in self.card.read_events(): pass except OSError as e: if e.errno != errno.EAGAIN: raise e fcntl.fcntl(self.card.fd, fcntl.F_SETFL, flags) def run(self, duration): """Run the event loop for the given duration (in seconds).""" self.flips = 0 self.loop.run(duration) def start(self, name): """Start a test.""" self.test_name = name self.logger.log(f'Testing {name}') sys.stdout.write(f'Testing {name}: ') sys.stdout.flush() def progress(self, current, maximum): sys.stdout.write(f'\rTesting {self.test_name}: {current}/{maximum}') sys.stdout.flush() def fail(self, reason): """Complete a test with failure.""" self.logger.log(f'Test failed. Reason: {reason}') self.logger.flush() sys.stdout.write(f'\rTesting {self.test_name}: FAIL\n') sys.stdout.flush() def skip(self, reason): """Complete a test with skip.""" self.logger.log(f'Test skipped. Reason: {reason}') self.logger.flush() sys.stdout.write('SKIP\n') sys.stdout.flush() def success(self): """Complete a test with success.""" self.logger.log('Test completed successfully') self.logger.flush() sys.stdout.write(f'\rTesting {self.test_name}: SUCCESS\n') sys.stdout.flush() if __name__ == '__main__': import importlib import inspect import os files = [] for path in os.scandir(): if path.is_file() and path.name.startswith('kms-test-') and path.name.endswith('.py'): files.append(path.name) files.sort() for file in files: print(f'- {file}') module = importlib.import_module(file[:-3]) tests = [] for name in dir(module): obj = getattr(module, name) if not isinstance(obj, type): continue if 'KMSTest' in [cls.__name__ for cls in inspect.getmro(obj)]: tests.append(obj) for test in tests: # Use a context manager to ensure proper cleanup after each test, # otherwise state from one test may leak to the other based on when # objects end up being deleted. with test() as test: test.execute()