lph-11/odin-http/nbio/_io_uring/os.odin
2025-03-13 18:14:21 +13:00

738 lines
20 KiB
Odin

#+build linux
package io_uring
import "core:math"
import "core:os"
import "core:sync"
import "core:sys/linux"
import "core:sys/unix"
DEFAULT_THREAD_IDLE_MS :: 1000
DEFAULT_ENTRIES :: 32
MAX_ENTRIES :: 4096
IO_Uring_Error :: enum {
None,
Entries_Zero,
Entries_Not_Power_Of_Two,
Entries_Too_Large,
Params_Outside_Accessible_Address_Space,
Arguments_Invalid,
Process_Fd_Quota_Exceeded,
System_Fd_Quota_Exceeded,
System_Resources,
Permission_Denied,
System_Outdated,
Submission_Queue_Full,
File_Descriptor_Invalid,
Completion_Queue_Overcommitted,
Submission_Queue_Entry_Invalid,
Buffer_Invalid,
Ring_Shutting_Down,
Opcode_Not_Supported,
Signal_Interrupt,
Unexpected,
}
IO_Uring :: struct {
fd: os.Handle,
sq: Submission_Queue,
cq: Completion_Queue,
flags: u32,
features: u32,
}
// Set up an IO_Uring with default parameters, `entries` must be a power of 2 between 1 and 4096.
io_uring_make :: proc(
params: ^io_uring_params,
entries: u32 = DEFAULT_ENTRIES,
flags: u32 = 0,
) -> (
ring: IO_Uring,
err: IO_Uring_Error,
) {
params.flags = flags
params.sq_thread_idle = DEFAULT_THREAD_IDLE_MS
err = io_uring_init(&ring, entries, params)
return
}
// Initialize and setup a io_uring with more control than io_uring_make.
io_uring_init :: proc(ring: ^IO_Uring, entries: u32, params: ^io_uring_params) -> (err: IO_Uring_Error) {
check_entries(entries) or_return
res := sys_io_uring_setup(entries, params)
if res < 0 {
#partial switch os.Platform_Error(-res) {
case .EFAULT:
return .Params_Outside_Accessible_Address_Space
// The resv array contains non-zero data, p.flags contains an unsupported flag,
// entries out of bounds, IORING_SETUP_SQ_AFF was specified without IORING_SETUP_SQPOLL,
// or IORING_SETUP_CQSIZE was specified but linux.io_uring_params.cq_entries was invalid:
case .EINVAL:
return .Arguments_Invalid
case .EMFILE:
return .Process_Fd_Quota_Exceeded
case .ENFILE:
return .System_Fd_Quota_Exceeded
case .ENOMEM:
return .System_Resources
// IORING_SETUP_SQPOLL was specified but effective user ID lacks sufficient privileges,
// or a container seccomp policy prohibits io_uring syscalls:
case .EPERM:
return .Permission_Denied
case .ENOSYS:
return .System_Outdated
case:
return .Unexpected
}
}
fd := os.Handle(res)
// Unsupported features.
assert((params.features & IORING_FEAT_SINGLE_MMAP) != 0)
assert((params.flags & IORING_SETUP_CQE32) == 0)
assert((params.flags & IORING_SETUP_SQE128) == 0)
sq, ok := submission_queue_make(fd, params)
if !ok do return .System_Resources
ring.fd = fd
ring.sq = sq
ring.cq = completion_queue_make(fd, params, &sq)
ring.flags = params.flags
ring.features = params.features
return
}
// Checks if the entries conform to the kernel rules.
@(private)
check_entries :: proc(entries: u32) -> (err: IO_Uring_Error) {
switch {
case entries >= MAX_ENTRIES:
err = .Entries_Too_Large
case entries == 0:
err = .Entries_Zero
case !math.is_power_of_two(int(entries)):
err = .Entries_Not_Power_Of_Two
case:
err = .None
}
return
}
io_uring_destroy :: proc(ring: ^IO_Uring) {
assert(ring.fd >= 0)
submission_queue_destroy(&ring.sq)
os.close(ring.fd)
ring.fd = -1
}
// Returns a pointer to a vacant submission queue entry, or an error if the submission queue is full.
get_sqe :: proc(ring: ^IO_Uring) -> (sqe: ^io_uring_sqe, err: IO_Uring_Error) {
sq := &ring.sq
head: u32 = sync.atomic_load_explicit(sq.head, .Acquire)
next := sq.sqe_tail + 1
if int(next - head) > len(sq.sqes) {
err = .Submission_Queue_Full
return
}
sqe = &sq.sqes[sq.sqe_tail & sq.mask]
sqe^ = {}
sq.sqe_tail = next
return
}
// Submits the submission queue entries acquired via get_sqe().
// Returns the number of entries submitted.
// Optionally wait for a number of events by setting wait_nr.
submit :: proc(ring: ^IO_Uring, wait_nr: u32 = 0) -> (n_submitted: u32, err: IO_Uring_Error) {
n_submitted = flush_sq(ring)
flags: u32 = 0
if sq_ring_needs_enter(ring, &flags) || wait_nr > 0 {
if wait_nr > 0 || ring.flags & IORING_SETUP_IOPOLL != 0 {
flags |= IORING_ENTER_GETEVENTS
}
n_submitted, err = enter(ring, n_submitted, wait_nr, flags)
}
return
}
// Tells the kernel that submission queue entries were submitted and/or we want to wait for their completion queue entries.
// Returns the number of submission queue entries that were submitted.
enter :: proc(
ring: ^IO_Uring,
n_to_submit: u32,
min_complete: u32,
flags: u32,
) -> (
n_submitted: u32,
err: IO_Uring_Error,
) {
assert(ring.fd >= 0)
ns := sys_io_uring_enter(u32(ring.fd), n_to_submit, min_complete, flags, nil)
if ns < 0 {
#partial switch os.Platform_Error(-ns) {
case .NONE:
err = .None
case .EAGAIN:
// The kernel was unable to allocate memory or ran out of resources for the request. (try again)
err = .System_Resources
case .EBADF:
// The SQE `fd` is invalid, or `IOSQE_FIXED_FILE` was set but no files were registered
err = .File_Descriptor_Invalid
// case os.EBUSY: // TODO: why is this not in os_linux
// // Attempted to overcommit the number of requests it can have pending. Should wait for some completions and try again.
// err = .Completion_Queue_Overcommitted
case .EINVAL:
// The SQE is invalid, or valid but the ring was setup with `IORING_SETUP_IOPOLL`
err = .Submission_Queue_Entry_Invalid
case .EFAULT:
// The buffer is outside the process' accessible address space, or `IORING_OP_READ_FIXED`
// or `IORING_OP_WRITE_FIXED` was specified but no buffers were registered, or the range
// described by `addr` and `len` is not within the buffer registered at `buf_index`
err = .Buffer_Invalid
case .ENXIO:
err = .Ring_Shutting_Down
case .EOPNOTSUPP:
// The kernel believes the `fd` doesn't refer to an `io_uring`, or the opcode isn't supported by this kernel (more likely)
err = .Opcode_Not_Supported
case .EINTR:
// The op was interrupted by a delivery of a signal before it could complete.This can happen while waiting for events with `IORING_ENTER_GETEVENTS`
err = .Signal_Interrupt
case:
err = .Unexpected
}
return
}
n_submitted = u32(ns)
return
}
// Sync internal state with kernel ring state on the submission queue side.
// Returns the number of all pending events in the submission queue.
// Rationale is to determine that an enter call is needed.
flush_sq :: proc(ring: ^IO_Uring) -> (n_pending: u32) {
sq := &ring.sq
to_submit := sq.sqe_tail - sq.sqe_head
if to_submit != 0 {
tail := sq.tail^
i: u32 = 0
for ; i < to_submit; i += 1 {
sq.array[tail & sq.mask] = sq.sqe_head & sq.mask
tail += 1
sq.sqe_head += 1
}
sync.atomic_store_explicit(sq.tail, tail, .Release)
}
n_pending = sq_ready(ring)
return
}
// Returns true if we are not using an SQ thread (thus nobody submits but us),
// or if IORING_SQ_NEED_WAKEUP is set and the SQ thread must be explicitly awakened.
// For the latter case, we set the SQ thread wakeup flag.
// Matches the implementation of sq_ring_needs_enter() in liburing.
sq_ring_needs_enter :: proc(ring: ^IO_Uring, flags: ^u32) -> bool {
assert(flags^ == 0)
if ring.flags & IORING_SETUP_SQPOLL == 0 do return true
if sync.atomic_load_explicit(ring.sq.flags, .Relaxed) & IORING_SQ_NEED_WAKEUP != 0 {
flags^ |= IORING_ENTER_SQ_WAKEUP
return true
}
return false
}
// Returns the number of submission queue entries in the submission queue.
sq_ready :: proc(ring: ^IO_Uring) -> u32 {
// Always use the shared ring state (i.e. head and not sqe_head) to avoid going out of sync,
// see https://github.com/axboe/liburing/issues/92.
return ring.sq.sqe_tail - sync.atomic_load_explicit(ring.sq.head, .Acquire)
}
// Returns the number of completion queue entries in the completion queue (yet to consume).
cq_ready :: proc(ring: ^IO_Uring) -> (n_ready: u32) {
return sync.atomic_load_explicit(ring.cq.tail, .Acquire) - ring.cq.head^
}
// Copies as many CQEs as are ready, and that can fit into the destination `cqes` slice.
// If none are available, enters into the kernel to wait for at most `wait_nr` CQEs.
// Returns the number of CQEs copied, advancing the CQ ring.
// Provides all the wait/peek methods found in liburing, but with batching and a single method.
copy_cqes :: proc(ring: ^IO_Uring, cqes: []io_uring_cqe, wait_nr: u32) -> (n_copied: u32, err: IO_Uring_Error) {
n_copied = copy_cqes_ready(ring, cqes)
if n_copied > 0 do return
if wait_nr > 0 || cq_ring_needs_flush(ring) {
_ = enter(ring, 0, wait_nr, IORING_ENTER_GETEVENTS) or_return
n_copied = copy_cqes_ready(ring, cqes)
}
return
}
copy_cqes_ready :: proc(ring: ^IO_Uring, cqes: []io_uring_cqe) -> (n_copied: u32) {
n_ready := cq_ready(ring)
n_copied = min(u32(len(cqes)), n_ready)
head := ring.cq.head^
tail := head + n_copied
i := 0
for head != tail {
cqes[i] = ring.cq.cqes[head & ring.cq.mask]
head += 1
i += 1
}
cq_advance(ring, n_copied)
return
}
cq_ring_needs_flush :: proc(ring: ^IO_Uring) -> bool {
return sync.atomic_load_explicit(ring.sq.flags, .Relaxed) & IORING_SQ_CQ_OVERFLOW != 0
}
// For advanced use cases only that implement custom completion queue methods.
// If you use copy_cqes() or copy_cqe() you must not call cqe_seen() or cq_advance().
// Must be called exactly once after a zero-copy CQE has been processed by your application.
// Not idempotent, calling more than once will result in other CQEs being lost.
// Matches the implementation of cqe_seen() in liburing.
cqe_seen :: proc(ring: ^IO_Uring) {
cq_advance(ring, 1)
}
// For advanced use cases only that implement custom completion queue methods.
// Matches the implementation of cq_advance() in liburing.
cq_advance :: proc(ring: ^IO_Uring, count: u32) {
if count == 0 do return
sync.atomic_store_explicit(ring.cq.head, ring.cq.head^ + count, .Release)
}
// Queues (but does not submit) an SQE to perform an `fsync(2)`.
// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
fsync :: proc(
ring: ^IO_Uring,
user_data: u64,
fd: os.Handle,
flags: u32,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = .FSYNC
sqe.rw_flags = i32(flags)
sqe.fd = i32(fd)
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to perform a no-op.
// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
// A no-op is more useful than may appear at first glance.
// For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to
// know when the ring is idle before acting on a kill signal.
nop :: proc(ring: ^IO_Uring, user_data: u64) -> (sqe: ^io_uring_sqe, err: IO_Uring_Error) {
sqe = get_sqe(ring) or_return
sqe.opcode = .NOP
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to perform a `read(2)`.
read :: proc(
ring: ^IO_Uring,
user_data: u64,
fd: os.Handle,
buf: []u8,
offset: u64,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = .READ
sqe.fd = i32(fd)
sqe.addr = cast(u64)uintptr(raw_data(buf))
sqe.len = u32(len(buf))
sqe.off = offset
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to perform a `write(2)`.
write :: proc(
ring: ^IO_Uring,
user_data: u64,
fd: os.Handle,
buf: []u8,
offset: u64,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = .WRITE
sqe.fd = i32(fd)
sqe.addr = cast(u64)uintptr(raw_data(buf))
sqe.len = u32(len(buf))
sqe.off = offset
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
// `addr`,`addr_len` optional
accept :: proc(
ring: ^IO_Uring,
user_data: u64,
sockfd: os.Socket,
addr: ^os.SOCKADDR = nil,
addr_len: ^os.socklen_t = nil,
flags: u32 = 0,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.ACCEPT
sqe.fd = i32(sockfd)
sqe.addr = cast(u64)uintptr(addr)
sqe.off = cast(u64)uintptr(addr_len)
sqe.rw_flags = i32(flags)
sqe.user_data = user_data
return
}
// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
connect :: proc(
ring: ^IO_Uring,
user_data: u64,
sockfd: os.Socket,
addr: ^os.SOCKADDR,
addr_len: os.socklen_t,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.CONNECT
sqe.fd = i32(sockfd)
sqe.addr = cast(u64)uintptr(addr)
sqe.off = cast(u64)addr_len
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to perform a `recv(2)`.
recv :: proc(
ring: ^IO_Uring,
user_data: u64,
sockfd: os.Socket,
buf: []byte,
flags: u32,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.RECV
sqe.fd = i32(sockfd)
sqe.addr = cast(u64)uintptr(raw_data(buf))
sqe.len = cast(u32)uintptr(len(buf))
sqe.rw_flags = i32(flags)
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to perform a `send(2)`.
send :: proc(
ring: ^IO_Uring,
user_data: u64,
sockfd: os.Socket,
buf: []byte,
flags: u32,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.SEND
sqe.fd = i32(sockfd)
sqe.addr = cast(u64)uintptr(raw_data(buf))
sqe.len = u32(len(buf))
sqe.rw_flags = i32(flags)
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to perform an `openat(2)`.
openat :: proc(
ring: ^IO_Uring,
user_data: u64,
fd: os.Handle,
path: cstring,
mode: u32,
flags: u32,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.OPENAT
sqe.fd = i32(fd)
sqe.addr = cast(u64)transmute(uintptr)path
sqe.len = mode
sqe.rw_flags = i32(flags)
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to perform a `close(2)`.
close :: proc(ring: ^IO_Uring, user_data: u64, fd: os.Handle) -> (sqe: ^io_uring_sqe, err: IO_Uring_Error) {
sqe, err = get_sqe(ring)
if err != .None {return}
sqe.opcode = IORING_OP.CLOSE
sqe.fd = i32(fd)
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to register a timeout operation.
// Returns a pointer to the SQE.
//
// The timeout will complete when either the timeout expires, or after the specified number of
// events complete (if `count` is greater than `0`).
//
// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an absolute timeout.
//
// The completion event result will be `-ETIME` if the timeout completed through expiration,
// `0` if the timeout completed after the specified number of events, or `-ECANCELED` if the
// timeout was removed before it expired.
//
// io_uring timeouts use the `CLOCK.MONOTONIC` clock source.
timeout :: proc(
ring: ^IO_Uring,
user_data: u64,
ts: ^linux.Time_Spec,
count: u32,
flags: u32,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.TIMEOUT
sqe.fd = -1
sqe.addr = cast(u64)uintptr(ts)
sqe.len = 1
sqe.off = u64(count)
sqe.rw_flags = i32(flags)
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to remove an existing timeout operation.
// Returns a pointer to the SQE.
//
// The timeout is identified by its `user_data`.
//
// The completion event result will be `0` if the timeout was found and cancelled successfully,
// `-EBUSY` if the timeout was found but expiration was already in progress, or
// `-ENOENT` if the timeout was not found.
timeout_remove :: proc(
ring: ^IO_Uring,
user_data: u64,
timeout_user_data: u64,
flags: u32,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.TIMEOUT_REMOVE
sqe.fd = -1
sqe.addr = timeout_user_data
sqe.rw_flags = i32(flags)
sqe.user_data = user_data
return
}
// Queues (but does not submit) an SQE to add a link timeout operation.
// Returns a pointer to the SQE.
//
// You need to set linux.IOSQE_IO_LINK to flags of the target operation
// and then call this method right after the target operation.
// See https://lwn.net/Articles/803932/ for detail.
//
// If the dependent request finishes before the linked timeout, the timeout
// is canceled. If the timeout finishes before the dependent request, the
// dependent request will be canceled.
//
// The completion event result of the link_timeout will be
// `-ETIME` if the timeout finishes before the dependent request
// (in this case, the completion event result of the dependent request will
// be `-ECANCELED`), or
// `-EALREADY` if the dependent request finishes before the linked timeout.
link_timeout :: proc(
ring: ^IO_Uring,
user_data: u64,
ts: ^os.Unix_File_Time,
flags: u32,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.LINK_TIMEOUT
sqe.fd = -1
sqe.addr = cast(u64)uintptr(ts)
sqe.len = 1
sqe.rw_flags = i32(flags)
sqe.user_data = user_data
return
}
poll_add :: proc(
ring: ^IO_Uring,
user_data: u64,
fd: os.Handle,
events: linux.Fd_Poll_Events,
flags: IORing_Poll_Flags,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.POLL_ADD
sqe.fd = i32(fd)
sqe.poll_events = transmute(u16)events
sqe.len = transmute(u32)flags
sqe.user_data = user_data
return
}
poll_remove :: proc(
ring: ^IO_Uring,
user_data: u64,
fd: os.Handle,
events: linux.Fd_Poll_Events,
) -> (
sqe: ^io_uring_sqe,
err: IO_Uring_Error,
) {
sqe = get_sqe(ring) or_return
sqe.opcode = IORING_OP.POLL_REMOVE
sqe.fd = i32(fd)
sqe.poll_events = transmute(u16)events
sqe.user_data = user_data
return
}
Submission_Queue :: struct {
head: ^u32,
tail: ^u32,
mask: u32,
flags: ^u32,
dropped: ^u32,
array: []u32,
sqes: []io_uring_sqe,
mmap: []u8,
mmap_sqes: []u8,
// We use `sqe_head` and `sqe_tail` in the same way as liburing:
// We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`.
// We then set `tail` to `sqe_tail` once, only when these events are actually submitted.
// This allows us to amortize the cost of the @atomicStore to `tail` across multiple SQEs.
sqe_head: u32,
sqe_tail: u32,
}
submission_queue_make :: proc(fd: os.Handle, params: ^io_uring_params) -> (sq: Submission_Queue, ok: bool) {
assert(fd >= 0)
// Unsupported feature.
assert((params.features & IORING_FEAT_SINGLE_MMAP) != 0)
sq_size := params.sq_off.array + params.sq_entries * size_of(u32)
cq_size := params.cq_off.cqes + params.cq_entries * size_of(io_uring_cqe)
size := max(sq_size, cq_size)
mmap_result := unix.sys_mmap(
nil,
uint(size),
unix.PROT_READ | unix.PROT_WRITE,
unix.MAP_SHARED,
/* | unix.MAP_POPULATE */
int(fd),
IORING_OFF_SQ_RING,
)
if mmap_result < 0 do return
defer if !ok do unix.sys_munmap(rawptr(uintptr(mmap_result)), uint(size))
mmap := transmute([^]u8)uintptr(mmap_result)
size_sqes := params.sq_entries * size_of(io_uring_sqe)
mmap_sqes_result := unix.sys_mmap(
nil,
uint(size_sqes),
unix.PROT_READ | unix.PROT_WRITE,
unix.MAP_SHARED,
/* | unix.MAP_POPULATE */
int(fd),
IORING_OFF_SQES,
)
if mmap_sqes_result < 0 do return
array := cast([^]u32)&mmap[params.sq_off.array]
sqes := cast([^]io_uring_sqe)uintptr(mmap_sqes_result)
mmap_sqes := cast([^]u8)uintptr(mmap_sqes_result)
sq.head = cast(^u32)&mmap[params.sq_off.head]
sq.tail = cast(^u32)&mmap[params.sq_off.tail]
sq.mask = (cast(^u32)&mmap[params.sq_off.ring_mask])^
sq.flags = cast(^u32)&mmap[params.sq_off.flags]
sq.dropped = cast(^u32)&mmap[params.sq_off.dropped]
sq.array = array[:params.sq_entries]
sq.sqes = sqes[:params.sq_entries]
sq.mmap = mmap[:size]
sq.mmap_sqes = mmap_sqes[:size_sqes]
ok = true
return
}
submission_queue_destroy :: proc(sq: ^Submission_Queue) {
unix.sys_munmap(raw_data(sq.mmap), uint(len(sq.mmap)))
unix.sys_munmap(raw_data(sq.mmap_sqes), uint(len(sq.mmap)))
}
Completion_Queue :: struct {
head: ^u32,
tail: ^u32,
mask: u32,
overflow: ^u32,
cqes: []io_uring_cqe,
}
completion_queue_make :: proc(fd: os.Handle, params: ^io_uring_params, sq: ^Submission_Queue) -> Completion_Queue {
assert(fd >= 0)
// Unsupported feature.
assert((params.features & IORING_FEAT_SINGLE_MMAP) != 0)
mmap := sq.mmap
cqes := cast([^]io_uring_cqe)&mmap[params.cq_off.cqes]
return(
{
head = cast(^u32)&mmap[params.cq_off.head],
tail = cast(^u32)&mmap[params.cq_off.tail],
mask = (cast(^u32)&mmap[params.cq_off.ring_mask])^,
overflow = cast(^u32)&mmap[params.cq_off.overflow],
cqes = cqes[:params.cq_entries],
} \
)
}