USB Gateware: Part 4 - Bulk Transfers
This series of tutorial walks through the process of implementing a complete USB device with Cynthion and LUNA:
USB Gateware: Part 4 - Bulk Transfers (This tutorial)
The goal of this tutorial is to define Bulk Endpoints for the device we created in Part 3 that will allow us to efficiently perform larger data transfers than those allowed by Control Transfers.
Prerequisites
Complete the USB Gateware: Part 1 - Enumeration tutorial.
Complete the USB Gateware: Part 2 - WCID Descriptors tutorial. (Optional, required for Windows support)
Complete the USB Gateware: Part 3 - Control Transfers tutorial.
Add Bulk Endpoints
While Control transfers are well suited for command and status operations they are not the best way to exchange large quantities of data. Control transfers have high per-packet protocol overhead and can only transfer packets of 8 bytes on low speed (1.5Mbps) devices and 64 bytes on full (12Mbps) and high (512Mbps) speed devices.
On the other hand, Bulk transfers support a packet size of up to 512 bytes on high speed devices and do not require any protocol overhead.
In the first section we’ll begin by updating our device’s descriptors so it can inform the host that it has bulk endpoints available.
Update Device Descriptors
Open gateware-usb-device.py
and add the highlighted lines:
1from amaranth import *
2from luna.usb2 import USBDevice
3from usb_protocol.emitters import DeviceDescriptorCollection
4
5from luna.gateware.usb.request.windows import (
6 MicrosoftOS10DescriptorCollection,
7 MicrosoftOS10RequestHandler,
8)
9from usb_protocol.emitters.descriptors.standard import get_string_descriptor
10from usb_protocol.types.descriptors.microsoft10 import RegistryTypes
11
12from luna.gateware.stream.generator import StreamSerializer
13from luna.gateware.usb.request.control import ControlRequestHandler
14from luna.gateware.usb.request.interface import SetupPacket
15from luna.gateware.usb.usb2.request import RequestHandlerInterface
16from luna.gateware.usb.usb2.transfer import USBInStreamInterface
17from usb_protocol.types import USBRequestType
18
19from luna.usb2 import (
20 USBStreamInEndpoint,
21 USBStreamOutEndpoint,
22)
23from usb_protocol.types import (
24 USBDirection,
25 USBTransferType,
26)
27
28VENDOR_ID = 0x1209 # https://pid.codes/1209/
29PRODUCT_ID = 0x0001
30
31MAX_PACKET_SIZE = 512
32
33class VendorRequestHandler(ControlRequestHandler):
34 ...
35
36class GatewareUSBDevice(Elaboratable):
37 def create_standard_descriptors(self):
38 descriptors = DeviceDescriptorCollection()
39
40 with descriptors.DeviceDescriptor() as d:
41 d.idVendor = VENDOR_ID
42 d.idProduct = PRODUCT_ID
43 d.iManufacturer = "Cynthion Project"
44 d.iProduct = "Gateware USB Device"
45 d.bNumConfigurations = 1
46
47 with descriptors.ConfigurationDescriptor() as c:
48 with c.InterfaceDescriptor() as i:
49 i.bInterfaceNumber = 0
50 # EP 0x01 OUT - receives bulk data from the host
51 with i.EndpointDescriptor() as e:
52 e.bEndpointAddress = USBDirection.OUT.to_endpoint_address(0x01)
53 e.bmAttributes = USBTransferType.BULK
54 e.wMaxPacketSize = MAX_PACKET_SIZE
55 # EP 0x82 IN - transmits bulk data to the host
56 with i.EndpointDescriptor() as e:
57 e.bEndpointAddress = USBDirection.IN.to_endpoint_address(0x02)
58 e.bmAttributes = USBTransferType.BULK
59 e.wMaxPacketSize = MAX_PACKET_SIZE
60
61 return descriptors
62
63 def elaborate(self, platform):
64 ...
This adds two endpoint descriptors to our default interface, each of type USBTransferType.BULK
and with a MAX_PACKET_SIZE
of 512. Where the endpoints differ is in their endpoint address. USB endpoint descriptors encode their direction in an 8 bit endpoint address. The first four bits encode the endpoint number, the next three bits are reserved and set to zero and the final bit encodes the direction; 0 for OUT and 1 for IN.
This means that an OUT endpoint number of 0x01 encodes to an endpoint address of 0x01 while an IN endpoint number of 0x02
encodes to the address 0x82
. (0b0000_0010 + 0b1000_0000 = 0b1000_0010 = 0x82
)
Add USB Stream Endpoints
Once our endpoint descriptors have been added to our device configuration we will need some gateware that will be able to respond to USB requests from the host and allow us to receive and transmit data.
LUNA provides the USBStreamOutEndpoint
and USBStreamInEndpoint
components which conform to the Amaranth Data streams interface. Simply put, streams provide a uniform mechanism for unidirectional exchange of arbitrary data between gateware components.
1...
2
3class GatewareUSBDevice(Elaboratable):
4 def create_standard_descriptors(self):
5 ...
6
7 def elaborate(self, platform):
8 ...
9
10 # add the vendor request handler
11 control_endpoint.add_request_handler(VendorRequestHandler())
12
13 # create and add stream endpoints for our device's Bulk IN & OUT endpoints
14 ep_out = USBStreamOutEndpoint(
15 endpoint_number=0x01, # (EP 0x01)
16 max_packet_size=MAX_PACKET_SIZE,
17 )
18 usb.add_endpoint(ep_out)
19 ep_in = USBStreamInEndpoint(
20 endpoint_number=0x02, # (EP 0x82)
21 max_packet_size=MAX_PACKET_SIZE
22 )
23 usb.add_endpoint(ep_in)
24
25 # configure the device to connect by default when plugged into a host
26 m.d.comb += usb.connect.eq(1)
27
28 return m
We now have two streaming endpoints that are able to receive and transmit data between any other module that supports the Amaranth Data streams interface.
However, before we can stream any data across these endpoints we first need to come up with a USB Function for each of our endpoints. In other words, what does our device actually _do_?
This could be any data source and/or sink but for the purposes of this tutorial let’s create a simple loopback function that will accept a bulk OUT request from the host and then return the request payload when the host makes a bulk IN request.
Define Endpoint Functions
A simple implementation for our device’s endpoint functions could be a simple FIFO (First In First Out) queue with enough space to hold the 512 bytes of a bulk transfer.
Using the OUT endpoint we could then transmit a stream of data from the host to Cynthion and write it into the FIFO. Then, when we transmit a request from the host to the IN endpoint we can stream the previously queued data back to the host.
We’re only working in a single clock-domain so we can use a SyncFIFO from the Amaranth standard library for our queue:
1from amaranth import *
2from amaranth.lib.fifo import SyncFIFO
3from luna.usb2 import USBDevice
4from usb_protocol.emitters import DeviceDescriptorCollection
5...
6
7class VendorRequestHandler(ControlRequestHandler):
8 ...
9
10class GatewareUSBDevice(Elaboratable):
11 ...
12
13 def elaborate(self, platform):
14 ...
15
16 # create and add stream endpoints for our device's Bulk IN & OUT endpoints
17 ep_out = USBStreamOutEndpoint(
18 endpoint_number=0x01, # (EP 0x01)
19 max_packet_size=MAX_PACKET_SIZE,
20 )
21 usb.add_endpoint(ep_out)
22 ep_in = USBStreamInEndpoint(
23 endpoint_number=0x02, # (EP 0x82)
24 max_packet_size=MAX_PACKET_SIZE
25 )
26 usb.add_endpoint(ep_in)
27
28 # create a FIFO queue we'll connect to the stream interfaces of our
29 # IN & OUT endpoints
30 m.submodules.fifo = fifo = DomainRenamer("usb")(
31 SyncFIFO(width=8, depth=MAX_PACKET_SIZE)
32 )
33
34 # connect our Bulk OUT endpoint's stream interface to the FIFO's write port
35 stream_out = ep_out.stream
36 m.d.comb += fifo.w_data.eq(stream_out.payload)
37 m.d.comb += fifo.w_en.eq(stream_out.valid)
38 m.d.comb += stream_out.ready.eq(fifo.w_rdy)
39
40 # connect our Bulk IN endpoint's stream interface to the FIFO's read port
41 stream_in = ep_in.stream
42 m.d.comb += stream_in.payload.eq(fifo.r_data)
43 m.d.comb += stream_in.valid.eq(fifo.r_rdy)
44 m.d.comb += fifo.r_en.eq(stream_in.ready)
45
46 # configure the device to connect by default when plugged into a host
47 m.d.comb += usb.connect.eq(1)
48
49 return m
Note
Something to take note off is the use of an Amaranth DomainRenamer component to wrap SyncFIFO
in the following lines:
m.submodules.fifo = fifo = DomainRenamer("usb")(
fifo.SyncFIFO(width=8, depth=MAX_PACKET_SIZE)
)
Any moderately complex FPGA hardware & gateware design will usually consist of multiple clock-domains running at different frequencies. Cynthion, for example, has three clock domains:
sync
- the default clock domain, running at 120 MHz.usb
- the clock domain for USB components and gateware, running at 60 MHz.fast
- a fast clock domain used for the HyperRAM, running at 240 MHz.
Because our designs so far have all been interfacing with Cynthion’s USB components we’ve only needed to use the usb
clock domain. However, reusable Amaranth components such as SyncFIFO
are usually implemented using the default sync
domain. We therefore need to be able to rename its clock domain to match the domain used in our design. This is what DomainRenamer
does.
And that’s it, we’ve defined our endpoint functions! Let’s try it out.
Test Bulk Endpoints
Open up test-gateware-usb-device.py
and add the following code to it:
1import usb1
2import time
3import random
4
5VENDOR_ID = 0x1209 # https://pid.codes/1209/
6PRODUCT_ID = 0x0001
7
8VENDOR_SET_FPGA_LEDS = 0x01
9VENDOR_GET_USER_BUTTON = 0x02
10
11MAX_PACKET_SIZE = 512
12
13# - list available usb devices ------------------------------------------------
14
15def list_available_usb_devices(context):
16 for device in context.getDeviceList():
17 try:
18 manufacturer = device.getManufacturer()
19 product = device.getProduct()
20 print(f"{device}: {manufacturer} - {product}")
21 except Exception as e:
22 print(f"{device}: {e}")
23
24
25# - wrappers for control requests ---------------------------------------------
26
27def set_fpga_leds(device_handle, led_state):
28 response = device_handle.controlWrite(
29 request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE,
30 request = VENDOR_SET_FPGA_LEDS,
31 index = 0,
32 value = 0,
33 data = [led_state],
34 timeout = 1000,
35 )
36
37def get_user_button(device_handle):
38 response = device_handle.controlRead(
39 request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
40 request = VENDOR_GET_USER_BUTTON,
41 index = 0,
42 value = 0,
43 length = 1,
44 timeout = 1000,
45 )
46 return response[0]
47
48
49# - test control endpoints ----------------------------------------------------
50
51def test_control_endpoints(device_handle):
52 led_counter = 0
53 last_button_state = False
54
55 while True:
56 # led counter
57 set_fpga_leds(device_handle, led_counter)
58 led_counter = (led_counter + 1) % 256
59
60 # reset led counter when the USER button is pressed
61 button_state = get_user_button(device_handle)
62 if button_state:
63 led_counter = 0
64
65 # print button state when it changes
66 if button_state != last_button_state:
67 print(f"USER button is: {'ON' if button_state else 'OFF' }")
68 last_button_state = button_state
69
70 # slow the loop down so we can see the counter change
71 time.sleep(0.1)
72
73
74# - wrappers for bulk requests ------------------------------------------------
75
76def bulk_out_transfer(device_handle, data):
77 response = device_handle.bulkWrite(
78 endpoint = 0x01,
79 data = data,
80 timeout = 1000,
81 )
82 return response
83
84def bulk_in_transfer(device_handle, length):
85 response = device_handle.bulkRead(
86 endpoint = 0x02,
87 length = length,
88 timeout = 1000,
89 )
90 return response
91
92
93# - test bulk endpoints -------------------------------------------------------
94
95def test_bulk_endpoints(device_handle):
96 # bulk_out - write a list of random numbers to memory
97 data = list([random.randint(0, 255) for _ in range(MAX_PACKET_SIZE)])
98 response = bulk_out_transfer(device_handle, data)
99 print(f"OUT endpoint transmitted {response} bytes: {data[0:4]} ... {data[-4:]}")
100
101 # bulk_in - retrieve the contents of our memory
102 response = list(bulk_in_transfer(device_handle, MAX_PACKET_SIZE))
103 print(f"IN endpoint received {len(response)} bytes: {response[0:4]} ... {response[-4:]}")
104
105 # check that the stored data matches the sent data
106 assert(data == list(response))
107
108
109# - main ----------------------------------------------------------------------
110
111if __name__ == "__main__":
112 with usb1.USBContext() as context:
113 # list available devices
114 list_available_usb_devices(context)
115
116 # get a device handle to our simple usb device
117 device_handle = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)
118 if device_handle is None:
119 raise Exception("Device not found.")
120
121 # claim the device's interface
122 device_handle.claimInterface(0)
123
124 # pass the device handle to our bulk endpoint test
125 test_bulk_endpoints(device_handle)
126
127 # pass the device handle to our control endpoint test
128 test_control_endpoints(device_handle)
Run the file with:
python ./test-gateware-usb-device.py
Assuming everything is going to plan you should see two matching sets of random numbers:
OUT endpoint transmitted 512 bytes: [252, 107, 106, 56] ... [109, 175, 112, 126]
IN endpoint received 512 bytes: [252, 107, 106, 56] ... [109, 175, 112, 126]
Congratulations, if you made it this far then you’ve just finished building your first complete USB Gateware Device with custom vendor request control and bulk data transfer!
Exercises
Create a benchmark to test the speed of your device when doing Bulk IN and OUT transfers.
Move the device endpoint to
aux_phy
and attempt to capture the packets exchanged between a device plugged into a host via thetarget_phy
port.
More information
Beyond Logic’s USB in a NutShell.
Source Code
1#!/usr/bin/env python3
2#
3# This file is part of Cynthion.
4#
5# Copyright (c) 2024 Great Scott Gadgets <info@greatscottgadgets.com>
6# SPDX-License-Identifier: BSD-3-Clause
7
8from amaranth import *
9from amaranth.lib.fifo import SyncFIFO
10from luna.usb2 import USBDevice
11from usb_protocol.emitters import DeviceDescriptorCollection
12
13from luna.gateware.usb.request.windows import (
14 MicrosoftOS10DescriptorCollection,
15 MicrosoftOS10RequestHandler,
16)
17from usb_protocol.emitters.descriptors.standard import get_string_descriptor
18from usb_protocol.types.descriptors.microsoft10 import RegistryTypes
19
20from luna.gateware.stream.generator import StreamSerializer
21from luna.gateware.usb.request.control import ControlRequestHandler
22from luna.gateware.usb.request.interface import SetupPacket
23from luna.gateware.usb.usb2.request import RequestHandlerInterface
24from luna.gateware.usb.usb2.transfer import USBInStreamInterface
25from usb_protocol.types import USBRequestType
26
27from luna.usb2 import USBStreamInEndpoint, USBStreamOutEndpoint
28from usb_protocol.types import USBDirection, USBTransferType
29
30VENDOR_ID = 0x1209 # https://pid.codes/1209/
31PRODUCT_ID = 0x0001
32
33MAX_PACKET_SIZE = 512
34
35class VendorRequestHandler(ControlRequestHandler):
36 VENDOR_SET_FPGA_LEDS = 0x01
37 VENDOR_GET_USER_BUTTON = 0x02
38
39 def elaborate(self, platform):
40 m = Module()
41
42 # shortcuts
43 interface: RequestHandlerInterface = self.interface
44 setup: SetupPacket = self.interface.setup
45
46 # get a reference to the FPGA LEDs and USER button
47 fpga_leds = Cat(platform.request("led", i).o for i in range(6))
48 user_button = platform.request("button_user").i
49
50 # create a streamserializer for transmitting IN data back to the host
51 serializer = StreamSerializer(
52 domain = "usb",
53 stream_type = USBInStreamInterface,
54 data_length = 1,
55 max_length_width = 1,
56 )
57 m.submodules += serializer
58
59 # we've received a setup packet containing a vendor request.
60 with m.If(setup.type == USBRequestType.VENDOR):
61 # use a state machine to sequence our request handling
62 with m.FSM(domain="usb"):
63 with m.State("IDLE"):
64 with m.If(setup.received):
65 with m.Switch(setup.request):
66 with m.Case(self.VENDOR_SET_FPGA_LEDS):
67 m.next = "HANDLE_SET_FPGA_LEDS"
68 with m.Case(self.VENDOR_GET_USER_BUTTON):
69 m.next = "HANDLE_GET_USER_BUTTON"
70
71 with m.State("HANDLE_SET_FPGA_LEDS"):
72 # take ownership of the interface
73 m.d.comb += interface.claim.eq(1)
74
75 # if we have an active data byte, set the FPGA LEDs to the payload
76 with m.If(interface.rx.valid & interface.rx.next):
77 m.d.usb += fpga_leds.eq(interface.rx.payload[0:6])
78
79 # once the receive is complete, respond with an ACK
80 with m.If(interface.rx_ready_for_response):
81 m.d.comb += interface.handshakes_out.ack.eq(1)
82
83 # finally, once we reach the status stage, send a ZLP
84 with m.If(interface.status_requested):
85 m.d.comb += self.send_zlp()
86 m.next = "IDLE"
87
88 with m.State("HANDLE_GET_USER_BUTTON"):
89 # take ownership of the interface
90 m.d.comb += interface.claim.eq(1)
91
92 # write the state of the user button into a local data register
93 data = Signal(8)
94 m.d.comb += data[0].eq(user_button)
95
96 # transmit our data using a built-in handler function that
97 # automatically advances the FSM back to the 'IDLE' state on
98 # completion
99 self.handle_simple_data_request(m, serializer, data)
100
101 return m
102
103
104class GatewareUSBDevice(Elaboratable):
105 """ A simple USB device that can communicate with the host via vendor and bulk requests. """
106
107 def create_standard_descriptors(self):
108 """ Create the USB descriptors for the device. """
109
110 descriptors = DeviceDescriptorCollection()
111
112 # all USB devices have a single device descriptor
113 with descriptors.DeviceDescriptor() as d:
114 d.idVendor = VENDOR_ID
115 d.idProduct = PRODUCT_ID
116 d.iManufacturer = "Cynthion Project"
117 d.iProduct = "Gateware USB Device"
118
119 d.bNumConfigurations = 1
120
121 # and at least one configuration descriptor
122 with descriptors.ConfigurationDescriptor() as c:
123
124 # with at least one interface descriptor
125 with c.InterfaceDescriptor() as i:
126 i.bInterfaceNumber = 0
127
128 # an endpoint for receiving bulk data from the host
129 with i.EndpointDescriptor() as e:
130 e.bEndpointAddress = USBDirection.OUT.to_endpoint_address(0x01) # EP 0x01 OUT
131 e.bmAttributes = USBTransferType.BULK
132 e.wMaxPacketSize = MAX_PACKET_SIZE
133
134 # and an endpoint for transmitting bulk data to the host
135 with i.EndpointDescriptor() as e:
136 e.bEndpointAddress = USBDirection.IN.to_endpoint_address(0x02) # EP 0x82 IN
137 e.bmAttributes = USBTransferType.BULK
138 e.wMaxPacketSize = MAX_PACKET_SIZE
139
140 return descriptors
141
142 def elaborate(self, platform):
143 m = Module()
144
145 # configure cynthion's clocks and reset signals
146 m.submodules.car = platform.clock_domain_generator()
147
148 # request the physical interface for cynthion's TARGET C port
149 ulpi = platform.request("target_phy")
150
151 # create the USB device
152 m.submodules.usb = usb = USBDevice(bus=ulpi)
153
154 # create our standard descriptors and add them to the device's control endpoint
155 descriptors = self.create_standard_descriptors()
156 control_endpoint = usb.add_standard_control_endpoint(
157 descriptors,
158 # the blockram descriptor handler lacks support for
159 # non-contiguous string descriptor indices, which is
160 # required for the Microsoft OS string descriptor at 0xEE
161 avoid_blockram=True,
162 )
163
164 # add microsoft os 1.0 descriptors and request handler
165 descriptors.add_descriptor(get_string_descriptor("MSFT100\xee"), index=0xee)
166 msft_descriptors = MicrosoftOS10DescriptorCollection()
167 with msft_descriptors.ExtendedCompatIDDescriptor() as c:
168 with c.Function() as f:
169 f.bFirstInterfaceNumber = 0
170 f.compatibleID = 'WINUSB'
171 with msft_descriptors.ExtendedPropertiesDescriptor() as d:
172 with d.Property() as p:
173 p.dwPropertyDataType = RegistryTypes.REG_SZ
174 p.PropertyName = "DeviceInterfaceGUID"
175 p.PropertyData = "{88bae032-5a81-49f0-bc3d-a4ff138216d6}"
176 msft_handler = MicrosoftOS10RequestHandler(msft_descriptors, request_code=0xee)
177 control_endpoint.add_request_handler(msft_handler)
178
179 # add the vendor request handler
180 control_endpoint.add_request_handler(VendorRequestHandler())
181
182 # create and add stream endpoints for our device's Bulk IN & OUT endpoints
183 ep_out = USBStreamOutEndpoint(
184 endpoint_number=0x01, # (EP 0x01)
185 max_packet_size=MAX_PACKET_SIZE,
186 )
187 usb.add_endpoint(ep_out)
188 ep_in = USBStreamInEndpoint(
189 endpoint_number=0x02, # (EP 0x82)
190 max_packet_size=MAX_PACKET_SIZE
191 )
192 usb.add_endpoint(ep_in)
193
194 # create a FIFO queue we'll connect to the stream interfaces of our
195 # IN & OUT endpoints
196 m.submodules.fifo = fifo = DomainRenamer("usb")(
197 SyncFIFO(width=8, depth=MAX_PACKET_SIZE)
198 )
199
200 # connect our Bulk OUT endpoint's stream interface to the FIFO's write port
201 stream_out = ep_out.stream
202 m.d.comb += fifo.w_data.eq(stream_out.payload)
203 m.d.comb += fifo.w_en.eq(stream_out.valid)
204 m.d.comb += stream_out.ready.eq(fifo.w_rdy)
205
206 # connect our Bulk IN endpoint's stream interface to the FIFO's read port
207 stream_in = ep_in.stream
208 m.d.comb += stream_in.payload.eq(fifo.r_data)
209 m.d.comb += stream_in.valid.eq(fifo.r_rdy)
210 m.d.comb += fifo.r_en.eq(stream_in.ready)
211
212 # configure the device to connect by default when plugged into a host
213 m.d.comb += usb.connect.eq(1)
214
215 return m
216
217
218if __name__ == "__main__":
219 from luna import top_level_cli
220 top_level_cli(GatewareUSBDevice)
1import usb1
2import time
3import random
4
5VENDOR_ID = 0x1209 # https://pid.codes/1209/
6PRODUCT_ID = 0x0001
7
8VENDOR_SET_FPGA_LEDS = 0x01
9VENDOR_GET_USER_BUTTON = 0x02
10
11MAX_PACKET_SIZE = 512
12
13# - list available usb devices ------------------------------------------------
14
15def list_available_usb_devices(context):
16 for device in context.getDeviceList():
17 try:
18 manufacturer = device.getManufacturer()
19 product = device.getProduct()
20 print(f"{device}: {manufacturer} - {product}")
21 except Exception as e:
22 print(f"{device}: {e}")
23
24
25# - wrappers for control requests ---------------------------------------------
26
27def set_fpga_leds(device_handle, led_state):
28 response = device_handle.controlWrite(
29 request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE,
30 request = VENDOR_SET_FPGA_LEDS,
31 index = 0,
32 value = 0,
33 data = [led_state],
34 timeout = 1000,
35 )
36
37def get_user_button(device_handle):
38 response = device_handle.controlRead(
39 request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
40 request = VENDOR_GET_USER_BUTTON,
41 index = 0,
42 value = 0,
43 length = 1,
44 timeout = 1000,
45 )
46 return response[0]
47
48
49# - test control endpoints ----------------------------------------------------
50
51def test_control_endpoints(device_handle):
52 led_counter = 0
53 last_button_state = False
54
55 while True:
56 # led counter
57 set_fpga_leds(device_handle, led_counter)
58 led_counter = (led_counter + 1) % 256
59
60 # reset led counter when the USER button is pressed
61 button_state = get_user_button(device_handle)
62 if button_state:
63 led_counter = 0
64
65 # print button state when it changes
66 if button_state != last_button_state:
67 print(f"USER button is: {'ON' if button_state else 'OFF' }")
68 last_button_state = button_state
69
70 # slow the loop down so we can see the counter change
71 time.sleep(0.1)
72
73
74# - wrappers for bulk requests ------------------------------------------------
75
76def bulk_out_transfer(device_handle, data):
77 response = device_handle.bulkWrite(
78 endpoint = 0x01,
79 data = data,
80 timeout = 1000,
81 )
82 return response
83
84def bulk_in_transfer(device_handle, length):
85 response = device_handle.bulkRead(
86 endpoint = 0x02,
87 length = length,
88 timeout = 1000,
89 )
90 return response
91
92
93# - test bulk endpoints -------------------------------------------------------
94
95def test_bulk_endpoints(device_handle):
96 # bulk_out - write a list of random numbers to memory
97 data = list([random.randint(0, 255) for _ in range(MAX_PACKET_SIZE)])
98 response = bulk_out_transfer(device_handle, data)
99 print(f"OUT endpoint transmitted {response} bytes: {data[0:4]} ... {data[-4:]}")
100
101 # bulk_in - retrieve the contents of our memory
102 response = list(bulk_in_transfer(device_handle, MAX_PACKET_SIZE))
103 print(f"IN endpoint received {len(response)} bytes: {response[0:4]} ... {response[-4:]}")
104
105 # check that the stored data matches the sent data
106 assert(data == list(response))
107
108
109# - main ----------------------------------------------------------------------
110
111if __name__ == "__main__":
112 with usb1.USBContext() as context:
113 # list available devices
114 list_available_usb_devices(context)
115
116 # get a device handle to our simple usb device
117 device_handle = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)
118 if device_handle is None:
119 raise Exception("Device not found.")
120
121 # claim the device's interface
122 device_handle.claimInterface(0)
123
124 # pass the device handle to our bulk endpoint test
125 test_bulk_endpoints(device_handle)
126
127 # pass the device handle to our control endpoint test
128 test_control_endpoints(device_handle)