summaryrefslogtreecommitdiff
path: root/tests/kms-test-routing.py
blob: 68aff9c11cff03be51ca86f779a0160cfa618221 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/python3
# SPDX-License-Identifier: GPL-2.0-or-later
# SPDX-FileCopyrightText: 2019 Renesas Electronics Corporation

import kmstest
import pykms
import time

class Pipeline(object):
    def __init__(self, crtc):
        self.crtc = crtc
        self.connector = None
        self.plane = None
        self.mode_blob = None


class RoutingTest(kmstest.KMSTest):
    """Test output routing."""

    def main(self):

        # Create the reverse map from CRTC to possible connectors and calculate
        # the largest resolution.
        self.crtc_to_connectors = {}
        max_hdisplay = 0
        max_vdisplay = 0

        for connector in self.output_connectors():
            mode = connector.get_default_mode()
            max_hdisplay = max(mode.hdisplay, max_hdisplay)
            max_vdisplay = max(mode.vdisplay, max_vdisplay)

            for crtc in connector.get_possible_crtcs():
                if not crtc in self.crtc_to_connectors:
                    self.crtc_to_connectors[crtc] = []
                self.crtc_to_connectors[crtc].append(connector)

        # Find a connector that can be routed to at least two CRTCs that have
        # at least two output routes each.
        shared_connector = None
        for connector in self.output_connectors():
            pipes = []
            for crtc in connector.get_possible_crtcs():
                if len(self.crtc_to_connectors[crtc]) >= 2:
                    pipes.append(Pipeline(crtc))

            if len(pipes) >= 2:
                shared_connector = connector
                break

        if not shared_connector:
            self.skip('No suitable connector')
            return

        # Allocate planes for each CRTC.
        pool = [(pipe, list(pipe.crtc.possible_planes)) for pipe in pipes]
        while len(pool):
            pool.sort(key=lambda elem: len(elem[1]), reverse=True)
            pipe, planes = pool[-1]
            pipe.plane = planes[0]
            pool = [(elem[0], [p for p in elem[1] if p != pipe.plane]) for elem in pool[:-1]]

        # Create a framebuffer big enough for all connectors.
        fb = pykms.DumbFramebuffer(self.card, max_hdisplay, max_vdisplay, 'XR24')
        pykms.draw_test_pattern(fb)

        self.start(f'Moving connector {shared_connector.fullname} '
                   f'between CRTCs {[pipe.crtc.id for pipe in pipes]}')

        self.logger.log(f'Highest display resolution: {max_hdisplay}x{max_vdisplay}')

        for master_pipe in pipes:
            req = kmstest.AtomicRequest(self)
            connectors = self.allocate_connectors(pipes, master_pipe, shared_connector)
            route = []

            for pipe in pipes:
                if pipe.connector and not pipe.connector in connectors.values():
                    req.add(pipe.connector, 'CRTC_ID', 0)

                pipe.connector = connectors[pipe.crtc]
                mode = pipe.connector.get_default_mode()
                pipe.mode_blob = mode.to_blob(self.card)

                req.add(pipe.connector, 'CRTC_ID', pipe.crtc.id)
                req.add(pipe.crtc, {'ACTIVE': 1, 'MODE_ID': pipe.mode_blob.id})
                req.add(pipe.plane, {
                            'FB_ID': fb.id,
                            'CRTC_ID': pipe.crtc.id,
                            'SRC_X': 0,
                            'SRC_Y': 0,
                            'SRC_W': int(mode.hdisplay * 65536),
                            'SRC_H': int(mode.vdisplay * 65536),
                            'CRTC_X': 0,
                            'CRTC_Y': 0,
                            'CRTC_W': mode.hdisplay,
                            'CRTC_H': mode.vdisplay,
                        })

                route.append(f'CRTC {pipe.crtc.id} to connector {pipe.connector.fullname}')

            self.logger.log('Routing ' + ', '.join(route))

            ret = req.commit_sync(True)
            if ret < 0:
                self.fail(f'atomic commit failed with {ret}')
                return

            time.sleep(5)

        self.success()

        for pipe in pipes:
            self.atomic_crtc_disable(pipe.crtc)


    def allocate_connectors(self, pipes, master_pipe, shared_connector):
        # Allocate one connector for each CRTC. Create a pool of available
        # connectors for each CRTC, sorted by the number of connectors, and
        # allocate started with the CRTC that has the least number of options.
        # The master CRTC is always given the shared connector.
        pool = []
        for pipe in pipes:
            if pipe == master_pipe:
                pool.append((pipe.crtc, [shared_connector]))
                continue

            pool.append((pipe.crtc, list(self.crtc_to_connectors[pipe.crtc])))

        allocated = {}
        while len(pool):
            pool.sort(key=lambda elem: len(elem[1]), reverse=True)
            crtc, connectors = pool[-1]

            connector = connectors[0]
            allocated[crtc] = connector

            # Remove the selected connector from all elements in the pool
            pool = [(elem[0], [c for c in elem[1] if c != connector]) for elem in pool[:-1]]

        return allocated


RoutingTest().execute()