USB Gateware: Part 3 - Control Transfers

This series of tutorial walks through the process of implementing a complete USB device with Cynthion and LUNA:

The goal of this tutorial is to define a control interface for the device we created in Part 1 that will allow it to receive and respond to control requests from a host.

Prerequisites

Data Transfer between a Host and Device

USB is a host-centric bus, what this means is that all transfers are initiated by the host irrespective of the direction of data transfer.

For data transfers to the device, the host issues an OUT token to notify the device of an incoming data transfer. When data has to be transferred from the device, the host issues an IN token to notify the device that it should send some data to the host.

The USB 2.0 specification defines four endpoint or transfer types:

  • Control Transfers: Typically used for command and status operations, control transfers are the only transfer type with a defined USB format.

  • Bulk Transfers: Bulk transfers are best suited for large amounts of data delivered in bursts such as file transfers to/from a storage device or the captured packet data from Cynthion to the control host.

  • Interrupt Transfers: Interrupt transfers are a bit of a misnomer as the host needs to continuously poll the device to check if an interrupt has occurred but the principle is the same. Commonly used for peripherals that generate input events such as a keyboard or mouse.

  • Isochronous Transfers: Finally, isochronous transfers occur continuously with a fixed periodicity. Suited for time-sensitive information such as video or audio streams they do not offer any guarantees on delivery. If a packet or frame is dropped it’s is up to the host driver to decide on how to best handle it.

By default all LUNA devices have a default implementation for two endpoints: An OUT Control Endpoint and an IN Control endpoint. These endpoints are used by the host to enumerate the device but they can also be extended to support various other class or custom vendor requests.

We’ll start by extending our control endpoints to support two vendor requests: One to set the state of the Cynthion FPGA LEDs and another to get the state of the Cynthion USER BUTTON.

Extend Default Control Endpoints

To implement vendor requests, begin by adding a VendorRequestHandler to our device’s control endpoint:

gateware-usb-device.py
 1from amaranth                                    import *
 2from luna.usb2                                   import USBDevice
 3from usb_protocol.emitters                       import DeviceDescriptorCollection
 4
 5from luna.gateware.usb.request.windows           import (
 6    MicrosoftOS10DescriptorCollection,
 7    MicrosoftOS10RequestHandler,
 8)
 9from usb_protocol.emitters.descriptors.standard  import get_string_descriptor
10from usb_protocol.types.descriptors.microsoft10  import RegistryTypes
11
12from luna.gateware.stream.generator              import StreamSerializer
13from luna.gateware.usb.request.control           import ControlRequestHandler
14from luna.gateware.usb.usb2.transfer             import USBInStreamInterface
15
16VENDOR_ID  = 0x1209 # https://pid.codes/1209/
17PRODUCT_ID = 0x0001
18
19class VendorRequestHandler(ControlRequestHandler):
20    VENDOR_SET_FPGA_LEDS   = 0x01
21    VENDOR_GET_USER_BUTTON = 0x02
22
23    def elaborate(self, platform):
24        m = Module()
25
26        # shortcuts
27        interface: RequestHandlerInterface = self.interface
28        setup: SetupPacket = self.interface.setup
29
30        # get a reference to the FPGA LEDs and USER button
31        fpga_leds   = Cat(platform.request("led", i).o for i in range(6))
32        user_button = platform.request("button_user").i
33
34        # create a streamserializer for transmitting IN data back to the host
35        serializer = StreamSerializer(
36            domain           = "usb",
37            stream_type      = USBInStreamInterface,
38            data_length      = 1,
39            max_length_width = 1,
40        )
41        m.submodules += serializer
42
43        return m
44
45class GatewareUSBDevice(Elaboratable):
46
47    ...
48
49    def elaborate(self, platform):
50        m = Module()
51
52        # configure cynthion's clocks and reset signals
53        m.submodules.car = platform.clock_domain_generator()
54
55        # request the physical interface for cynthion's TARGET C port
56        ulpi = platform.request("target_phy")
57
58        # create the USB device
59        m.submodules.usb = usb = USBDevice(bus=ulpi)
60
61        # create our standard descriptors and add them to the device's control endpoint
62        descriptors = self.create_standard_descriptors()
63        control_endpoint = usb.add_standard_control_endpoint(
64            descriptors,
65            avoid_blockram=True,
66        )
67
68        # add microsoft os 1.0 descriptors and request handler
69        descriptors.add_descriptor(get_string_descriptor("MSFT100\xee"), index=0xee)
70        msft_descriptors = MicrosoftOS10DescriptorCollection()
71        with msft_descriptors.ExtendedCompatIDDescriptor() as c:
72            with c.Function() as f:
73                f.bFirstInterfaceNumber = 0
74                f.compatibleID          = 'WINUSB'
75        with msft_descriptors.ExtendedPropertiesDescriptor() as d:
76            with d.Property() as p:
77                p.dwPropertyDataType = RegistryTypes.REG_SZ
78                p.PropertyName       = "DeviceInterfaceGUID"
79                p.PropertyData       = "{88bae032-5a81-49f0-bc3d-a4ff138216d6}"
80        msft_handler = MicrosoftOS10RequestHandler(msft_descriptors, request_code=0xee)
81        control_endpoint.add_request_handler(msft_handler)
82
83        # add our vendor request handler
84        control_endpoint.add_request_handler(VendorRequestHandler())
85
86        # configure the device to connect by default when plugged into a host
87        m.d.comb += usb.connect.eq(1)
88
89        return m

Vendor requests are unique to a device and are identified by the 8-bit bRequest field of the control transfer setup packet. Here we’ve defined two id’s corresponding to setting the led states and getting the button state.

So far our VendorRequestHandler contains references to Cynthion’s FPGA LEDs and USER BUTTON, as well as a StreamSerializer we’ll be using to send data back to the host when it asks for the USER BUTTON status.

Implement Vendor Request Handlers

Let’s implement that functionality below:

gateware-usb-device.py
 1class VendorRequestHandler(ControlRequestHandler):
 2    VENDOR_SET_FPGA_LEDS   = 0x01
 3    VENDOR_GET_USER_BUTTON = 0x02
 4
 5    def elaborate(self, platform):
 6        m = Module()
 7
 8        # Shortcuts.
 9        interface: RequestHandlerInterface = self.interface
10        setup: SetupPacket = self.interface.setup
11
12        # Grab a reference to the FPGA LEDs and USER button.
13        fpga_leds   = Cat(platform.request("led", i).o for i in range(6))
14        user_button = platform.request("button_user").i
15
16        # Create a StreamSerializer for sending IN data back to the host
17        serializer = StreamSerializer(
18            domain           = "usb",
19            stream_type      = USBInStreamInterface,
20            data_length      = 1,
21            max_length_width = 1,
22        )
23        m.submodules += serializer
24
25        # we've received a setup packet containing a vendor request.
26        with m.If(setup.type == USBRequestType.VENDOR):
27            # use a state machine to sequence our request handling
28            with m.FSM(domain="usb"):
29                with m.State("IDLE"):
30                    with m.If(setup.received):
31                        with m.Switch(setup.request):
32                            with m.Case(self.VENDOR_SET_FPGA_LEDS):
33                                m.next = "HANDLE_SET_FPGA_LEDS"
34                            with m.Case(self.VENDOR_GET_USER_BUTTON):
35                                m.next = "HANDLE_GET_USER_BUTTON"
36
37                with m.State("HANDLE_SET_FPGA_LEDS"):
38                    # take ownership of the interface
39                    m.d.comb += interface.claim.eq(1)
40
41                    # if we have an active data byte, set the FPGA LEDs to the payload
42                    with m.If(interface.rx.valid & interface.rx.next):
43                        m.d.usb += fpga_leds.eq(interface.rx.payload[0:6])
44
45                    # once the receive is complete, respond with an ACK
46                    with m.If(interface.rx_ready_for_response):
47                        m.d.comb += interface.handshakes_out.ack.eq(1)
48
49                    # finally, once we reach the status stage, send a ZLP
50                    with m.If(interface.status_requested):
51                        m.d.comb += self.send_zlp()
52                        m.next = "IDLE"
53
54                with m.State("HANDLE_GET_USER_BUTTON"):
55                    # take ownership of the interface
56                    m.d.comb += interface.claim.eq(1)
57
58                    # write the state of the user button into a local data register
59                    data = Signal(8)
60                    m.d.comb += data[0].eq(user_button)
61
62                    # transmit our data using a built-in handler function that
63                    # automatically advances the FSM back to the 'IDLE' state on
64                    # completion
65                    self.handle_simple_data_request(m, serializer, data)
66
67        return m

When handling a control request in LUNA the first thing we look at is the setup.type field of the setup packet interface. We could check for other types such as USBRequestType.CLASS or USBRequestType.DEVICE if we wanted to implement handlers for them but, in this case, we’re only interested in vendor requests.

Next, we take ownership of the interface, in order to avoid conflicting with the standard, or other registered request handlers. Then we sequence the actual request handling with an Amaranth Finite State Machine, starting in the IDLE state.

While in IDLE we wait for the setup.received signal to go high and signal the arrival of a new control request. We then parse the setup.request field to identify the next state to advance our FSM to. (We could also use the other setup packet fields such as wValue and wIndex for dispatch or as arguments but for now we’re just intered in bRequest.)

We then implement two handlers, the first is HANDLE_SET_FPGA_LEDS, which needs to read the data sent with our OUT control request in order to set the fpga leds state.

Then the second, in HANDLE_GET_USER_BUTTON we will use one of the built-in LUNA helper function to respond to our IN control request with the data containing the state of the user button.

Test Control Endpoints

First, remember to build and upload the device gateware to your Cynthion with:

python ./gateware-usb-device.py

Then, open your test-gateware-usb-device.py script from the previous tutorials and add the following code to it:

test-gateware-usb-device.py
 1import usb1
 2import time
 3
 4VENDOR_ID  = 0x1209 # https://pid.codes/1209/
 5PRODUCT_ID = 0x0001
 6
 7VENDOR_SET_FPGA_LEDS   = 0x01
 8VENDOR_GET_USER_BUTTON = 0x02
 9
10# - list available usb devices ------------------------------------------------
11
12def list_available_usb_devices(context):
13    for device in context.getDeviceList():
14        try:
15            manufacturer = device.getManufacturer()
16            product = device.getProduct()
17            print(f"{device}:  {manufacturer} - {product}")
18        except Exception as e:
19            print(f"{device}: {e}")
20
21
22# - wrappers for control requests ---------------------------------------------
23
24def set_fpga_leds(device_handle, led_state):
25    response = device_handle.controlWrite(
26        request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE,
27        request      = VENDOR_SET_FPGA_LEDS,
28        index        = 0,
29        value        = 0,
30        data         = [led_state],
31        timeout      = 1000,
32    )
33
34def get_user_button(device_handle):
35    response = device_handle.controlRead(
36        request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
37        request      = VENDOR_GET_USER_BUTTON,
38        index        = 0,
39        value        = 0,
40        length       = 1,
41        timeout      = 1000,
42    )
43    return response[0]
44
45
46# - test control endpoints ----------------------------------------------------
47
48def test_control_endpoints(device_handle):
49    led_counter = 0
50    last_button_state = False
51
52    while True:
53        # led counter
54        set_fpga_leds(device_handle, led_counter)
55        led_counter = (led_counter + 1) % 256
56
57        # reset led counter when the USER button is pressed
58        button_state = get_user_button(device_handle)
59        if button_state:
60            led_counter = 0
61
62        # print button state when it changes
63        if button_state != last_button_state:
64            print(f"USER button is: {'ON' if button_state else 'OFF' }")
65            last_button_state = button_state
66
67        # slow the loop down so we can see the counter change
68        time.sleep(0.1)
69
70
71# - main ----------------------------------------------------------------------
72
73if __name__ == "__main__":
74    with usb1.USBContext() as context:
75        # list available devices
76        list_available_usb_devices(context)
77
78        # get a device handle to our simple usb device
79        device_handle = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)
80        if device_handle is None:
81            raise Exception("Device not found.")
82
83        # claim the device's interface
84        device_handle.claimInterface(0)
85
86        # pass the device handle to our control endpoint test
87        test_control_endpoints(device_handle)

Run the file with:

python ./test-gateware-usb-device.py

And, if all goes well you should see the FPGA LEDs on Cynthion counting in binary. If you press and release the USER button you should see the count reset back to zero and the following text in the terminal.

USER button is: ON
USER button is: OFF

Job done!

In the next part of the tutorial we’ll finish up by adding IN and OUT Bulk endpoints to our device.

Exercises

  1. Add a vendor request to retrieve the current state of the FPGA LEDs.

  2. Add a vendor request that will disconnect and then re-connect your device to the USB bus.

More information

Source Code

gateware-usb-device-03.py
  1#!/usr/bin/env python3
  2#
  3# This file is part of Cynthion.
  4#
  5# Copyright (c) 2024 Great Scott Gadgets <info@greatscottgadgets.com>
  6# SPDX-License-Identifier: BSD-3-Clause
  7
  8from amaranth                                    import *
  9from luna.usb2                                   import USBDevice
 10from usb_protocol.emitters                       import DeviceDescriptorCollection
 11
 12from luna.gateware.usb.request.windows           import (
 13    MicrosoftOS10DescriptorCollection,
 14    MicrosoftOS10RequestHandler,
 15)
 16from usb_protocol.emitters.descriptors.standard  import get_string_descriptor
 17from usb_protocol.types.descriptors.microsoft10  import RegistryTypes
 18
 19from luna.gateware.stream.generator              import StreamSerializer
 20from luna.gateware.usb.request.control           import ControlRequestHandler
 21from luna.gateware.usb.request.interface         import SetupPacket
 22from luna.gateware.usb.usb2.request              import RequestHandlerInterface
 23from luna.gateware.usb.usb2.transfer             import USBInStreamInterface
 24from usb_protocol.types                          import USBRequestType
 25
 26VENDOR_ID  = 0x1209 # https://pid.codes/1209/
 27PRODUCT_ID = 0x0001
 28
 29class VendorRequestHandler(ControlRequestHandler):
 30    VENDOR_SET_FPGA_LEDS   = 0x01
 31    VENDOR_GET_USER_BUTTON = 0x02
 32
 33    def elaborate(self, platform):
 34        m = Module()
 35
 36        # shortcuts
 37        interface: RequestHandlerInterface = self.interface
 38        setup: SetupPacket = self.interface.setup
 39
 40        # get a reference to the FPGA LEDs and USER button
 41        fpga_leds   = Cat(platform.request("led", i).o for i in range(6))
 42        user_button = platform.request("button_user").i
 43
 44        # create a streamserializer for transmitting IN data back to the host
 45        serializer = StreamSerializer(
 46            domain           = "usb",
 47            stream_type      = USBInStreamInterface,
 48            data_length      = 1,
 49            max_length_width = 1,
 50        )
 51        m.submodules += serializer
 52
 53        # we've received a setup packet containing a vendor request.
 54        with m.If(setup.type == USBRequestType.VENDOR):
 55            # use a state machine to sequence our request handling
 56            with m.FSM(domain="usb"):
 57                with m.State("IDLE"):
 58                    with m.If(setup.received):
 59                        with m.Switch(setup.request):
 60                            with m.Case(self.VENDOR_SET_FPGA_LEDS):
 61                                m.next = "HANDLE_SET_FPGA_LEDS"
 62                            with m.Case(self.VENDOR_GET_USER_BUTTON):
 63                                m.next = "HANDLE_GET_USER_BUTTON"
 64
 65                with m.State("HANDLE_SET_FPGA_LEDS"):
 66                    # take ownership of the interface
 67                    m.d.comb += interface.claim.eq(1)
 68
 69                    # if we have an active data byte, set the FPGA LEDs to the payload
 70                    with m.If(interface.rx.valid & interface.rx.next):
 71                        m.d.usb += fpga_leds.eq(interface.rx.payload[0:6])
 72
 73                    # once the receive is complete, respond with an ACK
 74                    with m.If(interface.rx_ready_for_response):
 75                       m.d.comb += interface.handshakes_out.ack.eq(1)
 76
 77                    # finally, once we reach the status stage, send a ZLP
 78                    with m.If(interface.status_requested):
 79                        m.d.comb += self.send_zlp()
 80                        m.next = "IDLE"
 81
 82                with m.State("HANDLE_GET_USER_BUTTON"):
 83                    # take ownership of the interface
 84                    m.d.comb += interface.claim.eq(1)
 85
 86                    # write the state of the user button into a local data register
 87                    data = Signal(8)
 88                    m.d.comb += data[0].eq(user_button)
 89
 90                    # transmit our data using a built-in handler function that
 91                    # automatically advances the FSM back to the 'IDLE' state on
 92                    # completion
 93                    self.handle_simple_data_request(m, serializer, data)
 94
 95        return m
 96
 97
 98class GatewareUSBDevice(Elaboratable):
 99    """ A simple USB device that can communicate with the host via vendor requests. """
100
101    def create_standard_descriptors(self):
102        """ Create the USB descriptors for the device. """
103
104        descriptors = DeviceDescriptorCollection()
105
106        # all USB devices have a single device descriptor
107        with descriptors.DeviceDescriptor() as d:
108            d.idVendor           = VENDOR_ID
109            d.idProduct          = PRODUCT_ID
110            d.iManufacturer      = "Cynthion Project"
111            d.iProduct           = "Gateware USB Device"
112
113            d.bNumConfigurations = 1
114
115        # and at least one configuration descriptor
116        with descriptors.ConfigurationDescriptor() as c:
117
118            # with at least one interface descriptor
119            with c.InterfaceDescriptor() as i:
120                i.bInterfaceNumber = 0
121
122                # interfaces also need endpoints to do anything useful
123                # but we'll add those later!
124
125        return descriptors
126
127    def elaborate(self, platform):
128        m = Module()
129
130        # configure cynthion's clocks and reset signals
131        m.submodules.car = platform.clock_domain_generator()
132
133        # request the physical interface for cynthion's TARGET C port
134        ulpi = platform.request("target_phy")
135
136        # create the USB device
137        m.submodules.usb = usb = USBDevice(bus=ulpi)
138
139        # create our standard descriptors and add them to the device's control endpoint
140        descriptors = self.create_standard_descriptors()
141        control_endpoint = usb.add_standard_control_endpoint(
142            descriptors,
143            # the blockram descriptor handler lacks support for
144            # non-contiguous string descriptor indices, which is
145            # required for the Microsoft OS string descriptor at 0xEE
146            avoid_blockram=True,
147        )
148
149        # add microsoft os 1.0 descriptors and request handler
150        descriptors.add_descriptor(get_string_descriptor("MSFT100\xee"), index=0xee)
151        msft_descriptors = MicrosoftOS10DescriptorCollection()
152        with msft_descriptors.ExtendedCompatIDDescriptor() as c:
153            with c.Function() as f:
154                f.bFirstInterfaceNumber = 0
155                f.compatibleID          = 'WINUSB'
156        with msft_descriptors.ExtendedPropertiesDescriptor() as d:
157            with d.Property() as p:
158                p.dwPropertyDataType = RegistryTypes.REG_SZ
159                p.PropertyName       = "DeviceInterfaceGUID"
160                p.PropertyData       = "{88bae032-5a81-49f0-bc3d-a4ff138216d6}"
161        msft_handler = MicrosoftOS10RequestHandler(msft_descriptors, request_code=0xee)
162        control_endpoint.add_request_handler(msft_handler)
163
164        # add the vendor request handler
165        control_endpoint.add_request_handler(VendorRequestHandler())
166
167        # configure the device to connect by default when plugged into a host
168        m.d.comb += usb.connect.eq(1)
169
170        return m
171
172
173if __name__ == "__main__":
174    from luna import top_level_cli
175    top_level_cli(GatewareUSBDevice)
test-gateware-usb-device-03.py
 1import usb1
 2import time
 3
 4VENDOR_ID  = 0x1209 # https://pid.codes/1209/
 5PRODUCT_ID = 0x0001
 6
 7VENDOR_SET_FPGA_LEDS   = 0x01
 8VENDOR_GET_USER_BUTTON = 0x02
 9
10# - list available usb devices ------------------------------------------------
11
12def list_available_usb_devices(context):
13    for device in context.getDeviceList():
14        try:
15            manufacturer = device.getManufacturer()
16            product = device.getProduct()
17            print(f"{device}:  {manufacturer} - {product}")
18        except Exception as e:
19            print(f"{device}: {e}")
20
21
22# - wrappers for control requests ---------------------------------------------
23
24def set_fpga_leds(device_handle, led_state):
25    response = device_handle.controlWrite(
26        request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE,
27        request      = VENDOR_SET_FPGA_LEDS,
28        index        = 0,
29        value        = 0,
30        data         = [led_state],
31        timeout      = 1000,
32    )
33
34def get_user_button(device_handle):
35    response = device_handle.controlRead(
36        request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
37        request      = VENDOR_GET_USER_BUTTON,
38        index        = 0,
39        value        = 0,
40        length       = 1,
41        timeout      = 1000,
42    )
43    return response[0]
44
45
46# - test control endpoints ----------------------------------------------------
47
48def test_control_endpoints(device_handle):
49    led_counter = 0
50    last_button_state = False
51
52    while True:
53        # led counter
54        set_fpga_leds(device_handle, led_counter)
55        led_counter = (led_counter + 1) % 256
56
57        # reset led counter when the USER button is pressed
58        button_state = get_user_button(device_handle)
59        if button_state:
60            led_counter = 0
61
62        # print button state when it changes
63        if button_state != last_button_state:
64            print(f"USER button is: {'ON' if button_state else 'OFF' }")
65            last_button_state = button_state
66
67        # slow the loop down so we can see the counter change
68        time.sleep(0.1)
69
70
71# - main ----------------------------------------------------------------------
72
73if __name__ == "__main__":
74    with usb1.USBContext() as context:
75        # list available devices
76        list_available_usb_devices(context)
77
78        # get a device handle to our simple usb device
79        device_handle = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)
80        if device_handle is None:
81            raise Exception("Device not found.")
82
83        # claim the device's interface
84        device_handle.claimInterface(0)
85
86        # pass the device handle to our control endpoint test
87        test_control_endpoints(device_handle)