USB Gateware: Part 4 - Bulk Transfers

This series of tutorial walks through the process of implementing a complete USB device with Cynthion and LUNA:

The goal of this tutorial is to define Bulk Endpoints for the device we created in Part 3 that will allow us to efficiently perform larger data transfers than those allowed by Control Transfers.

Prerequisites

Add Bulk Endpoints

While Control transfers are well suited for command and status operations they are not the best way to exchange large quantities of data. Control transfers have high per-packet protocol overhead and can only transfer packets of 8 bytes on low speed (1.5Mbps) devices and 64 bytes on full (12Mbps) and high (512Mbps) speed devices.

On the other hand, Bulk transfers support a packet size of up to 512 bytes on high speed devices and do not require any protocol overhead.

In the first section we’ll begin by updating our device’s descriptors so it can inform the host that it has bulk endpoints available.

Update Device Descriptors

Open gateware-usb-device.py and add the highlighted lines:

gateware-usb-device.py
 1from amaranth                                    import *
 2from luna.usb2                                   import USBDevice
 3from usb_protocol.emitters                       import DeviceDescriptorCollection
 4
 5from luna.gateware.usb.request.windows           import (
 6    MicrosoftOS10DescriptorCollection,
 7    MicrosoftOS10RequestHandler,
 8)
 9from usb_protocol.emitters.descriptors.standard  import get_string_descriptor
10from usb_protocol.types.descriptors.microsoft10  import RegistryTypes
11
12from luna.gateware.stream.generator              import StreamSerializer
13from luna.gateware.usb.request.control           import ControlRequestHandler
14from luna.gateware.usb.request.interface         import SetupPacket
15from luna.gateware.usb.usb2.request              import RequestHandlerInterface
16from luna.gateware.usb.usb2.transfer             import USBInStreamInterface
17from usb_protocol.types                          import USBRequestType
18
19from luna.usb2                                   import (
20    USBStreamInEndpoint,
21    USBStreamOutEndpoint,
22)
23from usb_protocol.types                          import (
24    USBDirection,
25    USBTransferType,
26)
27
28VENDOR_ID  = 0x1209 # https://pid.codes/1209/
29PRODUCT_ID = 0x0001
30
31MAX_PACKET_SIZE = 512
32
33class VendorRequestHandler(ControlRequestHandler):
34    ...
35
36class GatewareUSBDevice(Elaboratable):
37    def create_standard_descriptors(self):
38        descriptors = DeviceDescriptorCollection()
39
40        with descriptors.DeviceDescriptor() as d:
41            d.idVendor           = VENDOR_ID
42            d.idProduct          = PRODUCT_ID
43            d.iManufacturer      = "Cynthion Project"
44            d.iProduct           = "Gateware USB Device"
45            d.bNumConfigurations = 1
46
47        with descriptors.ConfigurationDescriptor() as c:
48            with c.InterfaceDescriptor() as i:
49                i.bInterfaceNumber = 0
50                # EP 0x01 OUT - receives bulk data from the host
51                with i.EndpointDescriptor() as e:
52                    e.bEndpointAddress = USBDirection.OUT.to_endpoint_address(0x01)
53                    e.bmAttributes     = USBTransferType.BULK
54                    e.wMaxPacketSize   = MAX_PACKET_SIZE
55                # EP 0x82 IN  - transmits bulk data to the host
56                with i.EndpointDescriptor() as e:
57                    e.bEndpointAddress = USBDirection.IN.to_endpoint_address(0x02)
58                    e.bmAttributes     = USBTransferType.BULK
59                    e.wMaxPacketSize   = MAX_PACKET_SIZE
60
61        return descriptors
62
63    def elaborate(self, platform):
64        ...

This adds two endpoint descriptors to our default interface, each of type USBTransferType.BULK and with a MAX_PACKET_SIZE of 512. Where the endpoints differ is in their endpoint address. USB endpoint descriptors encode their direction in an 8 bit endpoint address. The first four bits encode the endpoint number, the next three bits are reserved and set to zero and the final bit encodes the direction; 0 for OUT and 1 for IN.

This means that an OUT endpoint number of 0x01 encodes to an endpoint address of 0x01 while an IN endpoint number of 0x02 encodes to the address 0x82. (0b0000_0010 + 0b1000_0000 = 0b1000_0010 = 0x82)

Add USB Stream Endpoints

Once our endpoint descriptors have been added to our device configuration we will need some gateware that will be able to respond to USB requests from the host and allow us to receive and transmit data.

LUNA provides the USBStreamOutEndpoint and USBStreamInEndpoint components which conform to the Amaranth Data streams interface. Simply put, streams provide a uniform mechanism for unidirectional exchange of arbitrary data between gateware components.

gateware-usb-device.py
 1...
 2
 3class GatewareUSBDevice(Elaboratable):
 4    def create_standard_descriptors(self):
 5        ...
 6
 7    def elaborate(self, platform):
 8        ...
 9
10        # add the vendor request handler
11        control_endpoint.add_request_handler(VendorRequestHandler())
12
13        # create and add stream endpoints for our device's Bulk IN & OUT endpoints
14        ep_out = USBStreamOutEndpoint(
15            endpoint_number=0x01,  # (EP 0x01)
16            max_packet_size=MAX_PACKET_SIZE,
17        )
18        usb.add_endpoint(ep_out)
19        ep_in = USBStreamInEndpoint(
20            endpoint_number=0x02,  # (EP 0x82)
21            max_packet_size=MAX_PACKET_SIZE
22        )
23        usb.add_endpoint(ep_in)
24
25        # configure the device to connect by default when plugged into a host
26        m.d.comb += usb.connect.eq(1)
27
28        return m

We now have two streaming endpoints that are able to receive and transmit data between any other module that supports the Amaranth Data streams interface.

However, before we can stream any data across these endpoints we first need to come up with a USB Function for each of our endpoints. In other words, what does our device actually _do_?

This could be any data source and/or sink but for the purposes of this tutorial let’s create a simple loopback function that will accept a bulk OUT request from the host and then return the request payload when the host makes a bulk IN request.

Define Endpoint Functions

A simple implementation for our device’s endpoint functions could be a simple FIFO (First In First Out) queue with enough space to hold the 512 bytes of a bulk transfer.

Using the OUT endpoint we could then transmit a stream of data from the host to Cynthion and write it into the FIFO. Then, when we transmit a request from the host to the IN endpoint we can stream the previously queued data back to the host.

We’re only working in a single clock-domain so we can use a SyncFIFO from the Amaranth standard library for our queue:

gateware-usb-device.py
 1from amaranth                                    import *
 2from amaranth.lib.fifo                           import SyncFIFO
 3from luna.usb2                                   import USBDevice
 4from usb_protocol.emitters                       import DeviceDescriptorCollection
 5...
 6
 7class VendorRequestHandler(ControlRequestHandler):
 8    ...
 9
10class GatewareUSBDevice(Elaboratable):
11    ...
12
13    def elaborate(self, platform):
14        ...
15
16        # create and add stream endpoints for our device's Bulk IN & OUT endpoints
17        ep_out = USBStreamOutEndpoint(
18            endpoint_number=0x01,  # (EP 0x01)
19            max_packet_size=MAX_PACKET_SIZE,
20        )
21        usb.add_endpoint(ep_out)
22        ep_in = USBStreamInEndpoint(
23            endpoint_number=0x02,  # (EP 0x82)
24            max_packet_size=MAX_PACKET_SIZE
25        )
26        usb.add_endpoint(ep_in)
27
28        # create a FIFO queue we'll connect to the stream interfaces of our
29        # IN & OUT endpoints
30        m.submodules.fifo = fifo = DomainRenamer("usb")(
31            SyncFIFO(width=8, depth=MAX_PACKET_SIZE)
32        )
33
34        # connect our Bulk OUT endpoint's stream interface to the FIFO's write port
35        stream_out = ep_out.stream
36        m.d.comb += fifo.w_data.eq(stream_out.payload)
37        m.d.comb += fifo.w_en.eq(stream_out.valid)
38        m.d.comb += stream_out.ready.eq(fifo.w_rdy)
39
40        # connect our Bulk IN endpoint's stream interface to the FIFO's read port
41        stream_in  = ep_in.stream
42        m.d.comb += stream_in.payload.eq(fifo.r_data)
43        m.d.comb += stream_in.valid.eq(fifo.r_rdy)
44        m.d.comb += fifo.r_en.eq(stream_in.ready)
45
46        # configure the device to connect by default when plugged into a host
47        m.d.comb += usb.connect.eq(1)
48
49        return m

Note

Something to take note off is the use of an Amaranth DomainRenamer component to wrap SyncFIFO in the following lines:

m.submodules.fifo = fifo = DomainRenamer("usb")(
    fifo.SyncFIFO(width=8, depth=MAX_PACKET_SIZE)
)

Any moderately complex FPGA hardware & gateware design will usually consist of multiple clock-domains running at different frequencies. Cynthion, for example, has three clock domains:

  • sync - the default clock domain, running at 120 MHz.

  • usb - the clock domain for USB components and gateware, running at 60 MHz.

  • fast - a fast clock domain used for the HyperRAM, running at 240 MHz.

Because our designs so far have all been interfacing with Cynthion’s USB components we’ve only needed to use the usb clock domain. However, reusable Amaranth components such as SyncFIFO are usually implemented using the default sync domain. We therefore need to be able to rename its clock domain to match the domain used in our design. This is what DomainRenamer does.

And that’s it, we’ve defined our endpoint functions! Let’s try it out.

Test Bulk Endpoints

Open up test-gateware-usb-device.py and add the following code to it:

test-gateware-usb-device.py
  1import usb1
  2import time
  3import random
  4
  5VENDOR_ID  = 0x1209 # https://pid.codes/1209/
  6PRODUCT_ID = 0x0001
  7
  8VENDOR_SET_FPGA_LEDS   = 0x01
  9VENDOR_GET_USER_BUTTON = 0x02
 10
 11MAX_PACKET_SIZE = 512
 12
 13# - list available usb devices ------------------------------------------------
 14
 15def list_available_usb_devices(context):
 16    for device in context.getDeviceList():
 17        try:
 18            manufacturer = device.getManufacturer()
 19            product = device.getProduct()
 20            print(f"{device}:  {manufacturer} - {product}")
 21        except Exception as e:
 22            print(f"{device}: {e}")
 23
 24
 25# - wrappers for control requests ---------------------------------------------
 26
 27def set_fpga_leds(device_handle, led_state):
 28    response = device_handle.controlWrite(
 29        request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE,
 30        request      = VENDOR_SET_FPGA_LEDS,
 31        index        = 0,
 32        value        = 0,
 33        data         = [led_state],
 34        timeout      = 1000,
 35    )
 36
 37def get_user_button(device_handle):
 38    response = device_handle.controlRead(
 39        request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
 40        request      = VENDOR_GET_USER_BUTTON,
 41        index        = 0,
 42        value        = 0,
 43        length       = 1,
 44        timeout      = 1000,
 45    )
 46    return response[0]
 47
 48
 49# - test control endpoints ----------------------------------------------------
 50
 51def test_control_endpoints(device_handle):
 52    led_counter = 0
 53    last_button_state = False
 54
 55    while True:
 56        # led counter
 57        set_fpga_leds(device_handle, led_counter)
 58        led_counter = (led_counter + 1) % 256
 59
 60        # reset led counter when the USER button is pressed
 61        button_state = get_user_button(device_handle)
 62        if button_state:
 63            led_counter = 0
 64
 65        # print button state when it changes
 66        if button_state != last_button_state:
 67            print(f"USER button is: {'ON' if button_state else 'OFF' }")
 68            last_button_state = button_state
 69
 70        # slow the loop down so we can see the counter change
 71        time.sleep(0.1)
 72
 73
 74# - wrappers for bulk requests ------------------------------------------------
 75
 76def bulk_out_transfer(device_handle, data):
 77    response = device_handle.bulkWrite(
 78        endpoint = 0x01,
 79        data     = data,
 80        timeout  = 1000,
 81    )
 82    return response
 83
 84def bulk_in_transfer(device_handle, length):
 85    response = device_handle.bulkRead(
 86        endpoint = 0x02,
 87        length   = length,
 88        timeout  = 1000,
 89    )
 90    return response
 91
 92
 93# - test bulk endpoints -------------------------------------------------------
 94
 95def test_bulk_endpoints(device_handle):
 96    # bulk_out - write a list of random numbers to memory
 97    data = list([random.randint(0, 255) for _ in range(MAX_PACKET_SIZE)])
 98    response = bulk_out_transfer(device_handle, data)
 99    print(f"OUT endpoint transmitted {response} bytes: {data[0:4]} ... {data[-4:]}")
100
101    # bulk_in - retrieve the contents of our memory
102    response = list(bulk_in_transfer(device_handle, MAX_PACKET_SIZE))
103    print(f"IN  endpoint received {len(response)} bytes:    {response[0:4]} ... {response[-4:]}")
104
105    # check that the stored data matches the sent data
106    assert(data == list(response))
107
108
109# - main ----------------------------------------------------------------------
110
111if __name__ == "__main__":
112    with usb1.USBContext() as context:
113        # list available devices
114        list_available_usb_devices(context)
115
116        # get a device handle to our simple usb device
117        device_handle = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)
118        if device_handle is None:
119            raise Exception("Device not found.")
120
121        # claim the device's interface
122        device_handle.claimInterface(0)
123
124        # pass the device handle to our bulk endpoint test
125        test_bulk_endpoints(device_handle)
126
127        # pass the device handle to our control endpoint test
128        test_control_endpoints(device_handle)

Run the file with:

python ./test-gateware-usb-device.py

Assuming everything is going to plan you should see two matching sets of random numbers:

OUT endpoint transmitted 512 bytes: [252, 107, 106, 56] ... [109, 175, 112, 126]
IN  endpoint received 512 bytes:    [252, 107, 106, 56] ... [109, 175, 112, 126]

Congratulations, if you made it this far then you’ve just finished building your first complete USB Gateware Device with custom vendor request control and bulk data transfer!

Exercises

  1. Create a benchmark to test the speed of your device when doing Bulk IN and OUT transfers.

  2. Move the device endpoint to aux_phy and attempt to capture the packets exchanged between a device plugged into a host via the target_phy port.

More information

Source Code

gateware-usb-device-04.py
  1#!/usr/bin/env python3
  2#
  3# This file is part of Cynthion.
  4#
  5# Copyright (c) 2024 Great Scott Gadgets <info@greatscottgadgets.com>
  6# SPDX-License-Identifier: BSD-3-Clause
  7
  8from amaranth                                    import *
  9from amaranth.lib.fifo                           import SyncFIFO
 10from luna.usb2                                   import USBDevice
 11from usb_protocol.emitters                       import DeviceDescriptorCollection
 12
 13from luna.gateware.usb.request.windows           import (
 14    MicrosoftOS10DescriptorCollection,
 15    MicrosoftOS10RequestHandler,
 16)
 17from usb_protocol.emitters.descriptors.standard  import get_string_descriptor
 18from usb_protocol.types.descriptors.microsoft10  import RegistryTypes
 19
 20from luna.gateware.stream.generator              import StreamSerializer
 21from luna.gateware.usb.request.control           import ControlRequestHandler
 22from luna.gateware.usb.request.interface         import SetupPacket
 23from luna.gateware.usb.usb2.request              import RequestHandlerInterface
 24from luna.gateware.usb.usb2.transfer             import USBInStreamInterface
 25from usb_protocol.types                          import USBRequestType
 26
 27from luna.usb2                                   import USBStreamInEndpoint, USBStreamOutEndpoint
 28from usb_protocol.types                          import USBDirection, USBTransferType
 29
 30VENDOR_ID  = 0x1209 # https://pid.codes/1209/
 31PRODUCT_ID = 0x0001
 32
 33MAX_PACKET_SIZE = 512
 34
 35class VendorRequestHandler(ControlRequestHandler):
 36    VENDOR_SET_FPGA_LEDS   = 0x01
 37    VENDOR_GET_USER_BUTTON = 0x02
 38
 39    def elaborate(self, platform):
 40        m = Module()
 41
 42        # shortcuts
 43        interface: RequestHandlerInterface = self.interface
 44        setup: SetupPacket = self.interface.setup
 45
 46        # get a reference to the FPGA LEDs and USER button
 47        fpga_leds   = Cat(platform.request("led", i).o for i in range(6))
 48        user_button = platform.request("button_user").i
 49
 50        # create a streamserializer for transmitting IN data back to the host
 51        serializer = StreamSerializer(
 52            domain           = "usb",
 53            stream_type      = USBInStreamInterface,
 54            data_length      = 1,
 55            max_length_width = 1,
 56        )
 57        m.submodules += serializer
 58
 59        # we've received a setup packet containing a vendor request.
 60        with m.If(setup.type == USBRequestType.VENDOR):
 61            # use a state machine to sequence our request handling
 62            with m.FSM(domain="usb"):
 63                with m.State("IDLE"):
 64                    with m.If(setup.received):
 65                        with m.Switch(setup.request):
 66                            with m.Case(self.VENDOR_SET_FPGA_LEDS):
 67                                m.next = "HANDLE_SET_FPGA_LEDS"
 68                            with m.Case(self.VENDOR_GET_USER_BUTTON):
 69                                m.next = "HANDLE_GET_USER_BUTTON"
 70
 71                with m.State("HANDLE_SET_FPGA_LEDS"):
 72                    # take ownership of the interface
 73                    m.d.comb += interface.claim.eq(1)
 74
 75                    # if we have an active data byte, set the FPGA LEDs to the payload
 76                    with m.If(interface.rx.valid & interface.rx.next):
 77                        m.d.usb += fpga_leds.eq(interface.rx.payload[0:6])
 78
 79                    # once the receive is complete, respond with an ACK
 80                    with m.If(interface.rx_ready_for_response):
 81                        m.d.comb += interface.handshakes_out.ack.eq(1)
 82
 83                    # finally, once we reach the status stage, send a ZLP
 84                    with m.If(interface.status_requested):
 85                        m.d.comb += self.send_zlp()
 86                        m.next = "IDLE"
 87
 88                with m.State("HANDLE_GET_USER_BUTTON"):
 89                    # take ownership of the interface
 90                    m.d.comb += interface.claim.eq(1)
 91
 92                    # write the state of the user button into a local data register
 93                    data = Signal(8)
 94                    m.d.comb += data[0].eq(user_button)
 95
 96                    # transmit our data using a built-in handler function that
 97                    # automatically advances the FSM back to the 'IDLE' state on
 98                    # completion
 99                    self.handle_simple_data_request(m, serializer, data)
100
101        return m
102
103
104class GatewareUSBDevice(Elaboratable):
105    """ A simple USB device that can communicate with the host via vendor and bulk requests. """
106
107    def create_standard_descriptors(self):
108        """ Create the USB descriptors for the device. """
109
110        descriptors = DeviceDescriptorCollection()
111
112        # all USB devices have a single device descriptor
113        with descriptors.DeviceDescriptor() as d:
114            d.idVendor           = VENDOR_ID
115            d.idProduct          = PRODUCT_ID
116            d.iManufacturer      = "Cynthion Project"
117            d.iProduct           = "Gateware USB Device"
118
119            d.bNumConfigurations = 1
120
121        # and at least one configuration descriptor
122        with descriptors.ConfigurationDescriptor() as c:
123
124            # with at least one interface descriptor
125            with c.InterfaceDescriptor() as i:
126                i.bInterfaceNumber = 0
127
128                # an endpoint for receiving bulk data from the host
129                with i.EndpointDescriptor() as e:
130                    e.bEndpointAddress = USBDirection.OUT.to_endpoint_address(0x01) # EP 0x01 OUT
131                    e.bmAttributes     = USBTransferType.BULK
132                    e.wMaxPacketSize   = MAX_PACKET_SIZE
133
134                # and an endpoint for transmitting bulk data to the host
135                with i.EndpointDescriptor() as e:
136                    e.bEndpointAddress = USBDirection.IN.to_endpoint_address(0x02)  # EP 0x82 IN
137                    e.bmAttributes     = USBTransferType.BULK
138                    e.wMaxPacketSize   = MAX_PACKET_SIZE
139
140        return descriptors
141
142    def elaborate(self, platform):
143        m = Module()
144
145        # configure cynthion's clocks and reset signals
146        m.submodules.car = platform.clock_domain_generator()
147
148        # request the physical interface for cynthion's TARGET C port
149        ulpi = platform.request("target_phy")
150
151        # create the USB device
152        m.submodules.usb = usb = USBDevice(bus=ulpi)
153
154        # create our standard descriptors and add them to the device's control endpoint
155        descriptors = self.create_standard_descriptors()
156        control_endpoint = usb.add_standard_control_endpoint(
157            descriptors,
158            # the blockram descriptor handler lacks support for
159            # non-contiguous string descriptor indices, which is
160            # required for the Microsoft OS string descriptor at 0xEE
161            avoid_blockram=True,
162        )
163
164        # add microsoft os 1.0 descriptors and request handler
165        descriptors.add_descriptor(get_string_descriptor("MSFT100\xee"), index=0xee)
166        msft_descriptors = MicrosoftOS10DescriptorCollection()
167        with msft_descriptors.ExtendedCompatIDDescriptor() as c:
168            with c.Function() as f:
169                f.bFirstInterfaceNumber = 0
170                f.compatibleID          = 'WINUSB'
171        with msft_descriptors.ExtendedPropertiesDescriptor() as d:
172            with d.Property() as p:
173                p.dwPropertyDataType = RegistryTypes.REG_SZ
174                p.PropertyName       = "DeviceInterfaceGUID"
175                p.PropertyData       = "{88bae032-5a81-49f0-bc3d-a4ff138216d6}"
176        msft_handler = MicrosoftOS10RequestHandler(msft_descriptors, request_code=0xee)
177        control_endpoint.add_request_handler(msft_handler)
178
179        # add the vendor request handler
180        control_endpoint.add_request_handler(VendorRequestHandler())
181
182        # create and add stream endpoints for our device's Bulk IN & OUT endpoints
183        ep_out = USBStreamOutEndpoint(
184            endpoint_number=0x01,  # (EP 0x01)
185            max_packet_size=MAX_PACKET_SIZE,
186        )
187        usb.add_endpoint(ep_out)
188        ep_in = USBStreamInEndpoint(
189            endpoint_number=0x02,  # (EP 0x82)
190            max_packet_size=MAX_PACKET_SIZE
191        )
192        usb.add_endpoint(ep_in)
193
194        # create a FIFO queue we'll connect to the stream interfaces of our
195        # IN & OUT endpoints
196        m.submodules.fifo = fifo = DomainRenamer("usb")(
197            SyncFIFO(width=8, depth=MAX_PACKET_SIZE)
198        )
199
200        # connect our Bulk OUT endpoint's stream interface to the FIFO's write port
201        stream_out = ep_out.stream
202        m.d.comb += fifo.w_data.eq(stream_out.payload)
203        m.d.comb += fifo.w_en.eq(stream_out.valid)
204        m.d.comb += stream_out.ready.eq(fifo.w_rdy)
205
206        # connect our Bulk IN endpoint's stream interface to the FIFO's read port
207        stream_in  = ep_in.stream
208        m.d.comb += stream_in.payload.eq(fifo.r_data)
209        m.d.comb += stream_in.valid.eq(fifo.r_rdy)
210        m.d.comb += fifo.r_en.eq(stream_in.ready)
211
212        # configure the device to connect by default when plugged into a host
213        m.d.comb += usb.connect.eq(1)
214
215        return m
216
217
218if __name__ == "__main__":
219    from luna import top_level_cli
220    top_level_cli(GatewareUSBDevice)
test-gateware-usb-device-04.py
  1import usb1
  2import time
  3import random
  4
  5VENDOR_ID  = 0x1209 # https://pid.codes/1209/
  6PRODUCT_ID = 0x0001
  7
  8VENDOR_SET_FPGA_LEDS   = 0x01
  9VENDOR_GET_USER_BUTTON = 0x02
 10
 11MAX_PACKET_SIZE = 512
 12
 13# - list available usb devices ------------------------------------------------
 14
 15def list_available_usb_devices(context):
 16    for device in context.getDeviceList():
 17        try:
 18            manufacturer = device.getManufacturer()
 19            product = device.getProduct()
 20            print(f"{device}:  {manufacturer} - {product}")
 21        except Exception as e:
 22            print(f"{device}: {e}")
 23
 24
 25# - wrappers for control requests ---------------------------------------------
 26
 27def set_fpga_leds(device_handle, led_state):
 28    response = device_handle.controlWrite(
 29        request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE,
 30        request      = VENDOR_SET_FPGA_LEDS,
 31        index        = 0,
 32        value        = 0,
 33        data         = [led_state],
 34        timeout      = 1000,
 35    )
 36
 37def get_user_button(device_handle):
 38    response = device_handle.controlRead(
 39        request_type = usb1.TYPE_VENDOR | usb1.RECIPIENT_DEVICE | usb1.ENDPOINT_OUT,
 40        request      = VENDOR_GET_USER_BUTTON,
 41        index        = 0,
 42        value        = 0,
 43        length       = 1,
 44        timeout      = 1000,
 45    )
 46    return response[0]
 47
 48
 49# - test control endpoints ----------------------------------------------------
 50
 51def test_control_endpoints(device_handle):
 52    led_counter = 0
 53    last_button_state = False
 54
 55    while True:
 56        # led counter
 57        set_fpga_leds(device_handle, led_counter)
 58        led_counter = (led_counter + 1) % 256
 59
 60        # reset led counter when the USER button is pressed
 61        button_state = get_user_button(device_handle)
 62        if button_state:
 63            led_counter = 0
 64
 65        # print button state when it changes
 66        if button_state != last_button_state:
 67            print(f"USER button is: {'ON' if button_state else 'OFF' }")
 68            last_button_state = button_state
 69
 70        # slow the loop down so we can see the counter change
 71        time.sleep(0.1)
 72
 73
 74# - wrappers for bulk requests ------------------------------------------------
 75
 76def bulk_out_transfer(device_handle, data):
 77    response = device_handle.bulkWrite(
 78        endpoint = 0x01,
 79        data     = data,
 80        timeout  = 1000,
 81    )
 82    return response
 83
 84def bulk_in_transfer(device_handle, length):
 85    response = device_handle.bulkRead(
 86        endpoint = 0x02,
 87        length   = length,
 88        timeout  = 1000,
 89    )
 90    return response
 91
 92
 93# - test bulk endpoints -------------------------------------------------------
 94
 95def test_bulk_endpoints(device_handle):
 96    # bulk_out - write a list of random numbers to memory
 97    data = list([random.randint(0, 255) for _ in range(MAX_PACKET_SIZE)])
 98    response = bulk_out_transfer(device_handle, data)
 99    print(f"OUT endpoint transmitted {response} bytes: {data[0:4]} ... {data[-4:]}")
100
101    # bulk_in - retrieve the contents of our memory
102    response = list(bulk_in_transfer(device_handle, MAX_PACKET_SIZE))
103    print(f"IN  endpoint received {len(response)} bytes:    {response[0:4]} ... {response[-4:]}")
104
105    # check that the stored data matches the sent data
106    assert(data == list(response))
107
108
109# - main ----------------------------------------------------------------------
110
111if __name__ == "__main__":
112    with usb1.USBContext() as context:
113        # list available devices
114        list_available_usb_devices(context)
115
116        # get a device handle to our simple usb device
117        device_handle = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)
118        if device_handle is None:
119            raise Exception("Device not found.")
120
121        # claim the device's interface
122        device_handle.claimInterface(0)
123
124        # pass the device handle to our bulk endpoint test
125        test_bulk_endpoints(device_handle)
126
127        # pass the device handle to our control endpoint test
128        test_control_endpoints(device_handle)