#!/usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # SPDX-FileCopyrightText: 2019 Renesas Electronics Corporation import kmstest import pykms import time class Pipeline(object): def __init__(self, crtc): self.crtc = crtc self.connector = None self.plane = None self.mode_blob = None class RoutingTest(kmstest.KMSTest): """Test output routing.""" def main(self): # Create the reverse map from CRTC to possible connectors and calculate # the largest resolution. self.crtc_to_connectors = {} max_hdisplay = 0 max_vdisplay = 0 for connector in self.output_connectors(): mode = connector.get_default_mode() max_hdisplay = max(mode.hdisplay, max_hdisplay) max_vdisplay = max(mode.vdisplay, max_vdisplay) for crtc in connector.get_possible_crtcs(): if not crtc in self.crtc_to_connectors: self.crtc_to_connectors[crtc] = [] self.crtc_to_connectors[crtc].append(connector) # Find a connector that can be routed to at least two CRTCs that have # at least two output routes each. shared_connector = None for connector in self.output_connectors(): pipes = [] for crtc in connector.get_possible_crtcs(): if len(self.crtc_to_connectors[crtc]) >= 2: pipes.append(Pipeline(crtc)) if len(pipes) >= 2: shared_connector = connector break if not shared_connector: self.skip('No suitable connector') return # Allocate planes for each CRTC. pool = [(pipe, list(pipe.crtc.possible_planes)) for pipe in pipes] while len(pool): pool.sort(key=lambda elem: len(elem[1]), reverse=True) pipe, planes = pool[-1] pipe.plane = planes[0] pool = [(elem[0], [p for p in elem[1] if p != pipe.plane]) for elem in pool[:-1]] # Create a framebuffer big enough for all connectors. fb = pykms.DumbFramebuffer(self.card, max_hdisplay, max_vdisplay, 'XR24') pykms.draw_test_pattern(fb) self.start(f'Moving connector {shared_connector.fullname} ' f'between CRTCs {[pipe.crtc.id for pipe in pipes]}') self.logger.log(f'Highest display resolution: {max_hdisplay}x{max_vdisplay}') for master_pipe in pipes: req = kmstest.AtomicRequest(self) connectors = self.allocate_connectors(pipes, master_pipe, shared_connector) route = [] for pipe in pipes: if pipe.connector and not pipe.connector in connectors.values(): req.add(pipe.connector, 'CRTC_ID', 0) pipe.connector = connectors[pipe.crtc] mode = pipe.connector.get_default_mode() pipe.mode_blob = mode.to_blob(self.card) req.add(pipe.connector, 'CRTC_ID', pipe.crtc.id) req.add(pipe.crtc, {'ACTIVE': 1, 'MODE_ID': pipe.mode_blob.id}) req.add(pipe.plane, { 'FB_ID': fb.id, 'CRTC_ID': pipe.crtc.id, 'SRC_X': 0, 'SRC_Y': 0, 'SRC_W': int(mode.hdisplay * 65536), 'SRC_H': int(mode.vdisplay * 65536), 'CRTC_X': 0, 'CRTC_Y': 0, 'CRTC_W': mode.hdisplay, 'CRTC_H': mode.vdisplay, }) route.append(f'CRTC {pipe.crtc.id} to connector {pipe.connector.fullname}') self.logger.log('Routing ' + ', '.join(route)) ret = req.commit_sync(True) if ret < 0: self.fail(f'atomic commit failed with {ret}') return time.sleep(5) self.success() for pipe in pipes: self.atomic_crtc_disable(pipe.crtc) def allocate_connectors(self, pipes, master_pipe, shared_connector): # Allocate one connector for each CRTC. Create a pool of available # connectors for each CRTC, sorted by the number of connectors, and # allocate started with the CRTC that has the least number of options. # The master CRTC is always given the shared connector. pool = [] for pipe in pipes: if pipe == master_pipe: pool.append((pipe.crtc, [shared_connector])) continue pool.append((pipe.crtc, list(self.crtc_to_connectors[pipe.crtc]))) allocated = {} while len(pool): pool.sort(key=lambda elem: len(elem[1]), reverse=True) crtc, connectors = pool[-1] connector = connectors[0] allocated[crtc] = connector # Remove the selected connector from all elements in the pool pool = [(elem[0], [c for c in elem[1] if c != connector]) for elem in pool[:-1]] return allocated if __name__ == '__main__': RoutingTest().execute()