USB Gateware: Part 3 - Control Transfers
This series of tutorial walks through the process of implementing a complete USB device with Cynthion and LUNA:
USB Gateware: Part 3 - Control Transfers (This tutorial)
The goal of this tutorial is to define a control interface for the device we created in Part 1 that will allow it to receive and respond to control requests from a host.
Prerequisites
Complete the USB Gateware: Part 1 - Enumeration tutorial.
Complete the USB Gateware: Part 2 - WCID Descriptors tutorial. (Optional, required for Windows support)
Data Transfer between a Host and Device
USB is a host-centric bus, what this means is that all transfers are initiated by the host irrespective of the direction of data transfer.
For data transfers to the device, the host issues an OUT token to notify the device of an incoming data transfer. When data has to be transferred from the device, the host issues an IN token to notify the device that it should send some data to the host.
The USB 2.0 specification defines four endpoint or transfer types:
Control Transfers: Typically used for command and status operations, control transfers are the only transfer type with a defined USB format.
Bulk Transfers: Bulk transfers are best suited for large amounts of data delivered in bursts such as file transfers to/from a storage device or the captured packet data from Cynthion to the control host.
Interrupt Transfers: Interrupt transfers are a bit of a misnomer as the host needs to continuously poll the device to check if an interrupt has occurred but the principle is the same. Commonly used for peripherals that generate input events such as a keyboard or mouse.
Isochronous Transfers: Finally, isochronous transfers occur continuously with a fixed periodicity. Suited for time-sensitive information such as video or audio streams they do not offer any guarantees on delivery. If a packet or frame is dropped it’s is up to the host driver to decide on how to best handle it.
By default all LUNA devices have a default implementation for two endpoints: An OUT Control Endpoint and an IN Control endpoint. These endpoints are used by the host to enumerate the device but they can also be extended to support various other class or custom vendor requests.
We’ll start by extending our control endpoints to support two vendor requests: One to set the state of the Cynthion FPGA LEDs and another to get the state of the Cynthion USER BUTTON.
Extend Default Control Endpoints
To implement vendor requests, begin by adding a VendorRequestHandler
to our device’s control endpoint:
1from amaranth import *
2from luna.usb2 import USBDevice
3from usb_protocol.emitters import DeviceDescriptorCollection
4
5from luna.gateware.usb.request.windows import (
6 MicrosoftOS10DescriptorCollection,
7 MicrosoftOS10RequestHandler,
8)
9from usb_protocol.emitters.descriptors.standard import get_string_descriptor
10from usb_protocol.types.descriptors.microsoft10 import RegistryTypes
11
12from luna.gateware.stream.generator import StreamSerializer
13from luna.gateware.usb.request.control import ControlRequestHandler
14from luna.gateware.usb.usb2.transfer import USBInStreamInterface
15
16VENDOR_ID = 0x1209 # https://pid.codes/1209/
17PRODUCT_ID = 0x0001
18
19class VendorRequestHandler(ControlRequestHandler):
20 VENDOR_SET_FPGA_LEDS = 0x01
21 VENDOR_GET_USER_BUTTON = 0x02
22
23 def elaborate(self, platform):
24 m = Module()
25
26 # shortcuts
27 interface: RequestHandlerInterface = self.interface
28 setup: SetupPacket = self.interface.setup
29
30 # get a reference to the FPGA LEDs and USER button
31 fpga_leds = Cat(platform.request("led", i).o for i in range(6))
32 user_button = platform.request("button_user").i
33
34 # create a streamserializer for transmitting IN data back to the host
35 serializer = StreamSerializer(
36 domain = "usb",
37 stream_type = USBInStreamInterface,
38 data_length = 1,
39 max_length_width = 1,
40 )
41 m.submodules += serializer
42
43 return m
44
45class GatewareUSBDevice(Elaboratable):
46
47 ...
48
49 def elaborate(self, platform):
50 m = Module()
51
52 # configure cynthion's clocks and reset signals
53 m.submodules.car = platform.clock_domain_generator()
54
55 # request the physical interface for cynthion's TARGET C port
56 ulpi = platform.request("target_phy")
57
58 # create the USB device
59 m.submodules.usb = usb = USBDevice(bus=ulpi)
60
61 # create our standard descriptors and add them to the device's control endpoint
62 descriptors = self.create_standard_descriptors()
63 control_endpoint = usb.add_standard_control_endpoint(
64 descriptors,
65 avoid_blockram=True,
66 )
67
68 # add microsoft os 1.0 descriptors and request handler
69 descriptors.add_descriptor(get_string_descriptor("MSFT100\xee"), index=0xee)
70 msft_descriptors = MicrosoftOS10DescriptorCollection()
71 with msft_descriptors.ExtendedCompatIDDescriptor() as c:
72 with c.Function() as f:
73 f.bFirstInterfaceNumber = 0
74 f.compatibleID = 'WINUSB'
75 with msft_descriptors.ExtendedPropertiesDescriptor() as d:
76 with d.Property() as p:
77 p.dwPropertyDataType = RegistryTypes.REG_SZ
78 p.PropertyName = "DeviceInterfaceGUID"
79 p.PropertyData = "{88bae032-5a81-49f0-bc3d-a4ff138216d6}"
80 msft_handler = MicrosoftOS10RequestHandler(msft_descriptors, request_code=0xee)
81 control_endpoint.add_request_handler(msft_handler)
82
83 # add our vendor request handler
84 control_endpoint.add_request_handler(VendorRequestHandler())
85
86 # configure the device to connect by default when plugged into a host
87 m.d.comb += usb.connect.eq(1)
88
89 return m
Vendor requests are unique to a device and are identified by the 8-bit bRequest
field of the control transfer setup packet. Here we’ve defined two id’s corresponding to setting the led states and getting the button state.
So far our VendorRequestHandler
contains references to Cynthion’s FPGA LEDs and USER BUTTON, as well as a StreamSerializer we’ll be using to send data back to the host when it asks for the USER BUTTON status.
Implement Vendor Request Handlers
Let’s implement that functionality below:
1class VendorRequestHandler(ControlRequestHandler):
2 VENDOR_SET_FPGA_LEDS = 0x01
3 VENDOR_GET_USER_BUTTON = 0x02
4
5 def elaborate(self, platform):
6 m = Module()
7
8 # Shortcuts.
9 interface: RequestHandlerInterface = self.interface
10 setup: SetupPacket = self.interface.setup
11
12 # Grab a reference to the FPGA LEDs and USER button.
13 fpga_leds = Cat(platform.request("led", i).o for i in range(6))
14 user_button = platform.request("button_user").i
15
16 # Create a StreamSerializer for sending IN data back to the host
17 serializer = StreamSerializer(
18 domain = "usb",
19 stream_type = USBInStreamInterface,
20 data_length = 1,
21 max_length_width = 1,
22 )
23 m.submodules += serializer
24
25 # we've received a setup packet containing a vendor request.
26 with m.If(setup.type == USBRequestType.VENDOR):
27 # use a state machine to sequence our request handling
28 with m.FSM(domain="usb"):
29 with m.State("IDLE"):
30 with m.If(setup.received):
31 with m.Switch(setup.request):
32 with m.Case(self.VENDOR_SET_FPGA_LEDS):
33 m.next = "HANDLE_SET_FPGA_LEDS"
34 with m.Case(self.VENDOR_GET_USER_BUTTON):
35 m.next = "HANDLE_GET_USER_BUTTON"
36
37 with m.State("HANDLE_SET_FPGA_LEDS"):
38 # take ownership of the interface
39 m.d.comb += interface.claim.eq(1)
40
41 # if we have an active data byte, set the FPGA LEDs to the payload
42 with m.If(interface.rx.valid & interface.rx.next):
43 m.d.usb += fpga_leds.eq(interface.rx.payload[0:6])
44
45 # once the receive is complete, respond with an ACK
46 with m.If(interface.rx_ready_for_response):
47 m.d.comb += interface.handshakes_out.ack.eq(1)
48
49 # finally, once we reach the status stage, send a ZLP
50 with m.If(interface.status_requested):
51 m.d.comb += self.send_zlp()
52 m.next = "IDLE"
53
54 with m.State("HANDLE_GET_USER_BUTTON"):
55 # take ownership of the interface
56 m.d.comb += interface.claim.eq(1)
57
58 # write the state of the user button into a local data register
59 data = Signal(8)
60 m.d.comb += data[0].eq(user_button)
61
62 # transmit our data using a built-in handler function that
63 # automatically advances the FSM back to the 'IDLE' state on
64 # completion
65 self.handle_simple_data_request(m, serializer, data)
66
67 return m
When handling a control request in LUNA the first thing we look at is the setup.type
field of the setup packet interface. We could check for other types such as USBRequestType.CLASS
or USBRequestType.DEVICE
if we wanted to implement handlers for them but, in this case, we’re only interested in vendor requests.
Next, we take ownership of the interface, in order to avoid conflicting with the standard, or other registered request handlers. Then we sequence the actual request handling with an Amaranth Finite State Machine, starting in the IDLE
state.
While in IDLE
we wait for the setup.received
signal to go high and signal the arrival of a new control request. We then parse the setup.request
field to identify the next state to advance our FSM to. (We could also use the other setup packet fields such as wValue
and wIndex
for dispatch or as arguments but for now we’re just intered in bRequest
.)
We then implement two handlers, the first is HANDLE_SET_FPGA_LEDS
, which needs to read the data sent with our OUT control request in order to set the fpga leds state.
Then the second, in HANDLE_GET_USER_BUTTON
we will use one of the built-in LUNA helper function to respond to our IN control request with the data containing the state of the user button.
Test Control Endpoints
First, remember to build and upload the device gateware to your Cynthion with:
python ./gateware-usb-device.py
Then, open your test-gateware-usb-device.py
script from the previous tutorials and add the following code to it:
1import usb1
2import time
3
4VENDOR_ID = 0x1209 # https://pid.codes/1209/
5PRODUCT_ID = 0x0001
6
7VENDOR_SET_FPGA_LEDS = 0x01
8VENDOR_GET_USER_BUTTON = 0x02
9
10# - list available usb devices ------------------------------------------------
11
12def list_available_usb_devices(context):
13 for device in context.getDeviceList():
14 try:
15 manufacturer = device.getManufacturer()
16 product = device.getProduct()
17 print(f"{device}: {manufacturer} - {product}")
18 except Exception as e:
19 print(f"{device}: {e}")
20
21
22# - wrappers for control requests ---------------------------------------------
23
24def set_fpga_leds(device_handle, led_state):
25 response = device_handle.controlWrite(
26 request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE,
27 request = VENDOR_SET_FPGA_LEDS,
28 index = 0,
29 value = 0,
30 data = [led_state],
31 timeout = 1000,
32 )
33
34def get_user_button(device_handle):
35 response = device_handle.controlRead(
36 request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
37 request = VENDOR_GET_USER_BUTTON,
38 index = 0,
39 value = 0,
40 length = 1,
41 timeout = 1000,
42 )
43 return response[0]
44
45
46# - test control endpoints ----------------------------------------------------
47
48def test_control_endpoints(device_handle):
49 led_counter = 0
50 last_button_state = False
51
52 while True:
53 # led counter
54 set_fpga_leds(device_handle, led_counter)
55 led_counter = (led_counter + 1) % 256
56
57 # reset led counter when the USER button is pressed
58 button_state = get_user_button(device_handle)
59 if button_state:
60 led_counter = 0
61
62 # print button state when it changes
63 if button_state != last_button_state:
64 print(f"USER button is: {'ON' if button_state else 'OFF' }")
65 last_button_state = button_state
66
67 # slow the loop down so we can see the counter change
68 time.sleep(0.1)
69
70
71# - main ----------------------------------------------------------------------
72
73if __name__ == "__main__":
74 with usb1.USBContext() as context:
75 # list available devices
76 list_available_usb_devices(context)
77
78 # get a device handle to our simple usb device
79 device_handle = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)
80 if device_handle is None:
81 raise Exception("Device not found.")
82
83 # claim the device's interface
84 device_handle.claimInterface(0)
85
86 # pass the device handle to our control endpoint test
87 test_control_endpoints(device_handle)
Run the file with:
python ./test-gateware-usb-device.py
And, if all goes well you should see the FPGA LEDs on Cynthion counting in binary. If you press and release the USER button you should see the count reset back to zero and the following text in the terminal.
USER button is: ON
USER button is: OFF
Job done!
In the next part of the tutorial we’ll finish up by adding IN and OUT Bulk endpoints to our device.
Exercises
Add a vendor request to retrieve the current state of the FPGA LEDs.
Add a vendor request that will disconnect and then re-connect your device to the USB bus.
More information
Beyond Logic’s USB in a NutShell.
Source Code
1#!/usr/bin/env python3
2#
3# This file is part of Cynthion.
4#
5# Copyright (c) 2024 Great Scott Gadgets <info@greatscottgadgets.com>
6# SPDX-License-Identifier: BSD-3-Clause
7
8from amaranth import *
9from luna.usb2 import USBDevice
10from usb_protocol.emitters import DeviceDescriptorCollection
11
12from luna.gateware.usb.request.windows import (
13 MicrosoftOS10DescriptorCollection,
14 MicrosoftOS10RequestHandler,
15)
16from usb_protocol.emitters.descriptors.standard import get_string_descriptor
17from usb_protocol.types.descriptors.microsoft10 import RegistryTypes
18
19from luna.gateware.stream.generator import StreamSerializer
20from luna.gateware.usb.request.control import ControlRequestHandler
21from luna.gateware.usb.request.interface import SetupPacket
22from luna.gateware.usb.usb2.request import RequestHandlerInterface
23from luna.gateware.usb.usb2.transfer import USBInStreamInterface
24from usb_protocol.types import USBRequestType
25
26VENDOR_ID = 0x1209 # https://pid.codes/1209/
27PRODUCT_ID = 0x0001
28
29class VendorRequestHandler(ControlRequestHandler):
30 VENDOR_SET_FPGA_LEDS = 0x01
31 VENDOR_GET_USER_BUTTON = 0x02
32
33 def elaborate(self, platform):
34 m = Module()
35
36 # shortcuts
37 interface: RequestHandlerInterface = self.interface
38 setup: SetupPacket = self.interface.setup
39
40 # get a reference to the FPGA LEDs and USER button
41 fpga_leds = Cat(platform.request("led", i).o for i in range(6))
42 user_button = platform.request("button_user").i
43
44 # create a streamserializer for transmitting IN data back to the host
45 serializer = StreamSerializer(
46 domain = "usb",
47 stream_type = USBInStreamInterface,
48 data_length = 1,
49 max_length_width = 1,
50 )
51 m.submodules += serializer
52
53 # we've received a setup packet containing a vendor request.
54 with m.If(setup.type == USBRequestType.VENDOR):
55 # use a state machine to sequence our request handling
56 with m.FSM(domain="usb"):
57 with m.State("IDLE"):
58 with m.If(setup.received):
59 with m.Switch(setup.request):
60 with m.Case(self.VENDOR_SET_FPGA_LEDS):
61 m.next = "HANDLE_SET_FPGA_LEDS"
62 with m.Case(self.VENDOR_GET_USER_BUTTON):
63 m.next = "HANDLE_GET_USER_BUTTON"
64
65 with m.State("HANDLE_SET_FPGA_LEDS"):
66 # take ownership of the interface
67 m.d.comb += interface.claim.eq(1)
68
69 # if we have an active data byte, set the FPGA LEDs to the payload
70 with m.If(interface.rx.valid & interface.rx.next):
71 m.d.usb += fpga_leds.eq(interface.rx.payload[0:6])
72
73 # once the receive is complete, respond with an ACK
74 with m.If(interface.rx_ready_for_response):
75 m.d.comb += interface.handshakes_out.ack.eq(1)
76
77 # finally, once we reach the status stage, send a ZLP
78 with m.If(interface.status_requested):
79 m.d.comb += self.send_zlp()
80 m.next = "IDLE"
81
82 with m.State("HANDLE_GET_USER_BUTTON"):
83 # take ownership of the interface
84 m.d.comb += interface.claim.eq(1)
85
86 # write the state of the user button into a local data register
87 data = Signal(8)
88 m.d.comb += data[0].eq(user_button)
89
90 # transmit our data using a built-in handler function that
91 # automatically advances the FSM back to the 'IDLE' state on
92 # completion
93 self.handle_simple_data_request(m, serializer, data)
94
95 return m
96
97
98class GatewareUSBDevice(Elaboratable):
99 """ A simple USB device that can communicate with the host via vendor requests. """
100
101 def create_standard_descriptors(self):
102 """ Create the USB descriptors for the device. """
103
104 descriptors = DeviceDescriptorCollection()
105
106 # all USB devices have a single device descriptor
107 with descriptors.DeviceDescriptor() as d:
108 d.idVendor = VENDOR_ID
109 d.idProduct = PRODUCT_ID
110 d.iManufacturer = "Cynthion Project"
111 d.iProduct = "Gateware USB Device"
112
113 d.bNumConfigurations = 1
114
115 # and at least one configuration descriptor
116 with descriptors.ConfigurationDescriptor() as c:
117
118 # with at least one interface descriptor
119 with c.InterfaceDescriptor() as i:
120 i.bInterfaceNumber = 0
121
122 # interfaces also need endpoints to do anything useful
123 # but we'll add those later!
124
125 return descriptors
126
127 def elaborate(self, platform):
128 m = Module()
129
130 # configure cynthion's clocks and reset signals
131 m.submodules.car = platform.clock_domain_generator()
132
133 # request the physical interface for cynthion's TARGET C port
134 ulpi = platform.request("target_phy")
135
136 # create the USB device
137 m.submodules.usb = usb = USBDevice(bus=ulpi)
138
139 # create our standard descriptors and add them to the device's control endpoint
140 descriptors = self.create_standard_descriptors()
141 control_endpoint = usb.add_standard_control_endpoint(
142 descriptors,
143 # the blockram descriptor handler lacks support for
144 # non-contiguous string descriptor indices, which is
145 # required for the Microsoft OS string descriptor at 0xEE
146 avoid_blockram=True,
147 )
148
149 # add microsoft os 1.0 descriptors and request handler
150 descriptors.add_descriptor(get_string_descriptor("MSFT100\xee"), index=0xee)
151 msft_descriptors = MicrosoftOS10DescriptorCollection()
152 with msft_descriptors.ExtendedCompatIDDescriptor() as c:
153 with c.Function() as f:
154 f.bFirstInterfaceNumber = 0
155 f.compatibleID = 'WINUSB'
156 with msft_descriptors.ExtendedPropertiesDescriptor() as d:
157 with d.Property() as p:
158 p.dwPropertyDataType = RegistryTypes.REG_SZ
159 p.PropertyName = "DeviceInterfaceGUID"
160 p.PropertyData = "{88bae032-5a81-49f0-bc3d-a4ff138216d6}"
161 msft_handler = MicrosoftOS10RequestHandler(msft_descriptors, request_code=0xee)
162 control_endpoint.add_request_handler(msft_handler)
163
164 # add the vendor request handler
165 control_endpoint.add_request_handler(VendorRequestHandler())
166
167 # configure the device to connect by default when plugged into a host
168 m.d.comb += usb.connect.eq(1)
169
170 return m
171
172
173if __name__ == "__main__":
174 from luna import top_level_cli
175 top_level_cli(GatewareUSBDevice)
1import usb1
2import time
3
4VENDOR_ID = 0x1209 # https://pid.codes/1209/
5PRODUCT_ID = 0x0001
6
7VENDOR_SET_FPGA_LEDS = 0x01
8VENDOR_GET_USER_BUTTON = 0x02
9
10# - list available usb devices ------------------------------------------------
11
12def list_available_usb_devices(context):
13 for device in context.getDeviceList():
14 try:
15 manufacturer = device.getManufacturer()
16 product = device.getProduct()
17 print(f"{device}: {manufacturer} - {product}")
18 except Exception as e:
19 print(f"{device}: {e}")
20
21
22# - wrappers for control requests ---------------------------------------------
23
24def set_fpga_leds(device_handle, led_state):
25 response = device_handle.controlWrite(
26 request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE,
27 request = VENDOR_SET_FPGA_LEDS,
28 index = 0,
29 value = 0,
30 data = [led_state],
31 timeout = 1000,
32 )
33
34def get_user_button(device_handle):
35 response = device_handle.controlRead(
36 request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
37 request = VENDOR_GET_USER_BUTTON,
38 index = 0,
39 value = 0,
40 length = 1,
41 timeout = 1000,
42 )
43 return response[0]
44
45
46# - test control endpoints ----------------------------------------------------
47
48def test_control_endpoints(device_handle):
49 led_counter = 0
50 last_button_state = False
51
52 while True:
53 # led counter
54 set_fpga_leds(device_handle, led_counter)
55 led_counter = (led_counter + 1) % 256
56
57 # reset led counter when the USER button is pressed
58 button_state = get_user_button(device_handle)
59 if button_state:
60 led_counter = 0
61
62 # print button state when it changes
63 if button_state != last_button_state:
64 print(f"USER button is: {'ON' if button_state else 'OFF' }")
65 last_button_state = button_state
66
67 # slow the loop down so we can see the counter change
68 time.sleep(0.1)
69
70
71# - main ----------------------------------------------------------------------
72
73if __name__ == "__main__":
74 with usb1.USBContext() as context:
75 # list available devices
76 list_available_usb_devices(context)
77
78 # get a device handle to our simple usb device
79 device_handle = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)
80 if device_handle is None:
81 raise Exception("Device not found.")
82
83 # claim the device's interface
84 device_handle.claimInterface(0)
85
86 # pass the device handle to our control endpoint test
87 test_control_endpoints(device_handle)