#+build linux package io_uring import "core:math" import "core:os" import "core:sync" import "core:sys/linux" import "core:sys/unix" DEFAULT_THREAD_IDLE_MS :: 1000 DEFAULT_ENTRIES :: 32 MAX_ENTRIES :: 4096 IO_Uring_Error :: enum { None, Entries_Zero, Entries_Not_Power_Of_Two, Entries_Too_Large, Params_Outside_Accessible_Address_Space, Arguments_Invalid, Process_Fd_Quota_Exceeded, System_Fd_Quota_Exceeded, System_Resources, Permission_Denied, System_Outdated, Submission_Queue_Full, File_Descriptor_Invalid, Completion_Queue_Overcommitted, Submission_Queue_Entry_Invalid, Buffer_Invalid, Ring_Shutting_Down, Opcode_Not_Supported, Signal_Interrupt, Unexpected, } IO_Uring :: struct { fd: os.Handle, sq: Submission_Queue, cq: Completion_Queue, flags: u32, features: u32, } // Set up an IO_Uring with default parameters, `entries` must be a power of 2 between 1 and 4096. io_uring_make :: proc( params: ^io_uring_params, entries: u32 = DEFAULT_ENTRIES, flags: u32 = 0, ) -> ( ring: IO_Uring, err: IO_Uring_Error, ) { params.flags = flags params.sq_thread_idle = DEFAULT_THREAD_IDLE_MS err = io_uring_init(&ring, entries, params) return } // Initialize and setup a io_uring with more control than io_uring_make. io_uring_init :: proc(ring: ^IO_Uring, entries: u32, params: ^io_uring_params) -> (err: IO_Uring_Error) { check_entries(entries) or_return res := sys_io_uring_setup(entries, params) if res < 0 { #partial switch os.Platform_Error(-res) { case .EFAULT: return .Params_Outside_Accessible_Address_Space // The resv array contains non-zero data, p.flags contains an unsupported flag, // entries out of bounds, IORING_SETUP_SQ_AFF was specified without IORING_SETUP_SQPOLL, // or IORING_SETUP_CQSIZE was specified but linux.io_uring_params.cq_entries was invalid: case .EINVAL: return .Arguments_Invalid case .EMFILE: return .Process_Fd_Quota_Exceeded case .ENFILE: return .System_Fd_Quota_Exceeded case .ENOMEM: return .System_Resources // IORING_SETUP_SQPOLL was specified but effective user ID lacks sufficient privileges, // or a container seccomp policy prohibits io_uring syscalls: case .EPERM: return .Permission_Denied case .ENOSYS: return .System_Outdated case: return .Unexpected } } fd := os.Handle(res) // Unsupported features. assert((params.features & IORING_FEAT_SINGLE_MMAP) != 0) assert((params.flags & IORING_SETUP_CQE32) == 0) assert((params.flags & IORING_SETUP_SQE128) == 0) sq, ok := submission_queue_make(fd, params) if !ok do return .System_Resources ring.fd = fd ring.sq = sq ring.cq = completion_queue_make(fd, params, &sq) ring.flags = params.flags ring.features = params.features return } // Checks if the entries conform to the kernel rules. @(private) check_entries :: proc(entries: u32) -> (err: IO_Uring_Error) { switch { case entries >= MAX_ENTRIES: err = .Entries_Too_Large case entries == 0: err = .Entries_Zero case !math.is_power_of_two(int(entries)): err = .Entries_Not_Power_Of_Two case: err = .None } return } io_uring_destroy :: proc(ring: ^IO_Uring) { assert(ring.fd >= 0) submission_queue_destroy(&ring.sq) os.close(ring.fd) ring.fd = -1 } // Returns a pointer to a vacant submission queue entry, or an error if the submission queue is full. get_sqe :: proc(ring: ^IO_Uring) -> (sqe: ^io_uring_sqe, err: IO_Uring_Error) { sq := &ring.sq head: u32 = sync.atomic_load_explicit(sq.head, .Acquire) next := sq.sqe_tail + 1 if int(next - head) > len(sq.sqes) { err = .Submission_Queue_Full return } sqe = &sq.sqes[sq.sqe_tail & sq.mask] sqe^ = {} sq.sqe_tail = next return } // Submits the submission queue entries acquired via get_sqe(). // Returns the number of entries submitted. // Optionally wait for a number of events by setting wait_nr. submit :: proc(ring: ^IO_Uring, wait_nr: u32 = 0) -> (n_submitted: u32, err: IO_Uring_Error) { n_submitted = flush_sq(ring) flags: u32 = 0 if sq_ring_needs_enter(ring, &flags) || wait_nr > 0 { if wait_nr > 0 || ring.flags & IORING_SETUP_IOPOLL != 0 { flags |= IORING_ENTER_GETEVENTS } n_submitted, err = enter(ring, n_submitted, wait_nr, flags) } return } // Tells the kernel that submission queue entries were submitted and/or we want to wait for their completion queue entries. // Returns the number of submission queue entries that were submitted. enter :: proc( ring: ^IO_Uring, n_to_submit: u32, min_complete: u32, flags: u32, ) -> ( n_submitted: u32, err: IO_Uring_Error, ) { assert(ring.fd >= 0) ns := sys_io_uring_enter(u32(ring.fd), n_to_submit, min_complete, flags, nil) if ns < 0 { #partial switch os.Platform_Error(-ns) { case .NONE: err = .None case .EAGAIN: // The kernel was unable to allocate memory or ran out of resources for the request. (try again) err = .System_Resources case .EBADF: // The SQE `fd` is invalid, or `IOSQE_FIXED_FILE` was set but no files were registered err = .File_Descriptor_Invalid // case os.EBUSY: // TODO: why is this not in os_linux // // Attempted to overcommit the number of requests it can have pending. Should wait for some completions and try again. // err = .Completion_Queue_Overcommitted case .EINVAL: // The SQE is invalid, or valid but the ring was setup with `IORING_SETUP_IOPOLL` err = .Submission_Queue_Entry_Invalid case .EFAULT: // The buffer is outside the process' accessible address space, or `IORING_OP_READ_FIXED` // or `IORING_OP_WRITE_FIXED` was specified but no buffers were registered, or the range // described by `addr` and `len` is not within the buffer registered at `buf_index` err = .Buffer_Invalid case .ENXIO: err = .Ring_Shutting_Down case .EOPNOTSUPP: // The kernel believes the `fd` doesn't refer to an `io_uring`, or the opcode isn't supported by this kernel (more likely) err = .Opcode_Not_Supported case .EINTR: // The op was interrupted by a delivery of a signal before it could complete.This can happen while waiting for events with `IORING_ENTER_GETEVENTS` err = .Signal_Interrupt case: err = .Unexpected } return } n_submitted = u32(ns) return } // Sync internal state with kernel ring state on the submission queue side. // Returns the number of all pending events in the submission queue. // Rationale is to determine that an enter call is needed. flush_sq :: proc(ring: ^IO_Uring) -> (n_pending: u32) { sq := &ring.sq to_submit := sq.sqe_tail - sq.sqe_head if to_submit != 0 { tail := sq.tail^ i: u32 = 0 for ; i < to_submit; i += 1 { sq.array[tail & sq.mask] = sq.sqe_head & sq.mask tail += 1 sq.sqe_head += 1 } sync.atomic_store_explicit(sq.tail, tail, .Release) } n_pending = sq_ready(ring) return } // Returns true if we are not using an SQ thread (thus nobody submits but us), // or if IORING_SQ_NEED_WAKEUP is set and the SQ thread must be explicitly awakened. // For the latter case, we set the SQ thread wakeup flag. // Matches the implementation of sq_ring_needs_enter() in liburing. sq_ring_needs_enter :: proc(ring: ^IO_Uring, flags: ^u32) -> bool { assert(flags^ == 0) if ring.flags & IORING_SETUP_SQPOLL == 0 do return true if sync.atomic_load_explicit(ring.sq.flags, .Relaxed) & IORING_SQ_NEED_WAKEUP != 0 { flags^ |= IORING_ENTER_SQ_WAKEUP return true } return false } // Returns the number of submission queue entries in the submission queue. sq_ready :: proc(ring: ^IO_Uring) -> u32 { // Always use the shared ring state (i.e. head and not sqe_head) to avoid going out of sync, // see https://github.com/axboe/liburing/issues/92. return ring.sq.sqe_tail - sync.atomic_load_explicit(ring.sq.head, .Acquire) } // Returns the number of completion queue entries in the completion queue (yet to consume). cq_ready :: proc(ring: ^IO_Uring) -> (n_ready: u32) { return sync.atomic_load_explicit(ring.cq.tail, .Acquire) - ring.cq.head^ } // Copies as many CQEs as are ready, and that can fit into the destination `cqes` slice. // If none are available, enters into the kernel to wait for at most `wait_nr` CQEs. // Returns the number of CQEs copied, advancing the CQ ring. // Provides all the wait/peek methods found in liburing, but with batching and a single method. copy_cqes :: proc(ring: ^IO_Uring, cqes: []io_uring_cqe, wait_nr: u32) -> (n_copied: u32, err: IO_Uring_Error) { n_copied = copy_cqes_ready(ring, cqes) if n_copied > 0 do return if wait_nr > 0 || cq_ring_needs_flush(ring) { _ = enter(ring, 0, wait_nr, IORING_ENTER_GETEVENTS) or_return n_copied = copy_cqes_ready(ring, cqes) } return } copy_cqes_ready :: proc(ring: ^IO_Uring, cqes: []io_uring_cqe) -> (n_copied: u32) { n_ready := cq_ready(ring) n_copied = min(u32(len(cqes)), n_ready) head := ring.cq.head^ tail := head + n_copied i := 0 for head != tail { cqes[i] = ring.cq.cqes[head & ring.cq.mask] head += 1 i += 1 } cq_advance(ring, n_copied) return } cq_ring_needs_flush :: proc(ring: ^IO_Uring) -> bool { return sync.atomic_load_explicit(ring.sq.flags, .Relaxed) & IORING_SQ_CQ_OVERFLOW != 0 } // For advanced use cases only that implement custom completion queue methods. // If you use copy_cqes() or copy_cqe() you must not call cqe_seen() or cq_advance(). // Must be called exactly once after a zero-copy CQE has been processed by your application. // Not idempotent, calling more than once will result in other CQEs being lost. // Matches the implementation of cqe_seen() in liburing. cqe_seen :: proc(ring: ^IO_Uring) { cq_advance(ring, 1) } // For advanced use cases only that implement custom completion queue methods. // Matches the implementation of cq_advance() in liburing. cq_advance :: proc(ring: ^IO_Uring, count: u32) { if count == 0 do return sync.atomic_store_explicit(ring.cq.head, ring.cq.head^ + count, .Release) } // Queues (but does not submit) an SQE to perform an `fsync(2)`. // Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases. fsync :: proc( ring: ^IO_Uring, user_data: u64, fd: os.Handle, flags: u32, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = .FSYNC sqe.rw_flags = i32(flags) sqe.fd = i32(fd) sqe.user_data = user_data return } // Queues (but does not submit) an SQE to perform a no-op. // Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases. // A no-op is more useful than may appear at first glance. // For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to // know when the ring is idle before acting on a kill signal. nop :: proc(ring: ^IO_Uring, user_data: u64) -> (sqe: ^io_uring_sqe, err: IO_Uring_Error) { sqe = get_sqe(ring) or_return sqe.opcode = .NOP sqe.user_data = user_data return } // Queues (but does not submit) an SQE to perform a `read(2)`. read :: proc( ring: ^IO_Uring, user_data: u64, fd: os.Handle, buf: []u8, offset: u64, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = .READ sqe.fd = i32(fd) sqe.addr = cast(u64)uintptr(raw_data(buf)) sqe.len = u32(len(buf)) sqe.off = offset sqe.user_data = user_data return } // Queues (but does not submit) an SQE to perform a `write(2)`. write :: proc( ring: ^IO_Uring, user_data: u64, fd: os.Handle, buf: []u8, offset: u64, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = .WRITE sqe.fd = i32(fd) sqe.addr = cast(u64)uintptr(raw_data(buf)) sqe.len = u32(len(buf)) sqe.off = offset sqe.user_data = user_data return } // Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket. // `addr`,`addr_len` optional accept :: proc( ring: ^IO_Uring, user_data: u64, sockfd: os.Socket, addr: ^os.SOCKADDR = nil, addr_len: ^os.socklen_t = nil, flags: u32 = 0, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.ACCEPT sqe.fd = i32(sockfd) sqe.addr = cast(u64)uintptr(addr) sqe.off = cast(u64)uintptr(addr_len) sqe.rw_flags = i32(flags) sqe.user_data = user_data return } // Queue (but does not submit) an SQE to perform a `connect(2)` on a socket. connect :: proc( ring: ^IO_Uring, user_data: u64, sockfd: os.Socket, addr: ^os.SOCKADDR, addr_len: os.socklen_t, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.CONNECT sqe.fd = i32(sockfd) sqe.addr = cast(u64)uintptr(addr) sqe.off = cast(u64)addr_len sqe.user_data = user_data return } // Queues (but does not submit) an SQE to perform a `recv(2)`. recv :: proc( ring: ^IO_Uring, user_data: u64, sockfd: os.Socket, buf: []byte, flags: u32, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.RECV sqe.fd = i32(sockfd) sqe.addr = cast(u64)uintptr(raw_data(buf)) sqe.len = cast(u32)uintptr(len(buf)) sqe.rw_flags = i32(flags) sqe.user_data = user_data return } // Queues (but does not submit) an SQE to perform a `send(2)`. send :: proc( ring: ^IO_Uring, user_data: u64, sockfd: os.Socket, buf: []byte, flags: u32, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.SEND sqe.fd = i32(sockfd) sqe.addr = cast(u64)uintptr(raw_data(buf)) sqe.len = u32(len(buf)) sqe.rw_flags = i32(flags) sqe.user_data = user_data return } // Queues (but does not submit) an SQE to perform an `openat(2)`. openat :: proc( ring: ^IO_Uring, user_data: u64, fd: os.Handle, path: cstring, mode: u32, flags: u32, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.OPENAT sqe.fd = i32(fd) sqe.addr = cast(u64)transmute(uintptr)path sqe.len = mode sqe.rw_flags = i32(flags) sqe.user_data = user_data return } // Queues (but does not submit) an SQE to perform a `close(2)`. close :: proc(ring: ^IO_Uring, user_data: u64, fd: os.Handle) -> (sqe: ^io_uring_sqe, err: IO_Uring_Error) { sqe, err = get_sqe(ring) if err != .None {return} sqe.opcode = IORING_OP.CLOSE sqe.fd = i32(fd) sqe.user_data = user_data return } // Queues (but does not submit) an SQE to register a timeout operation. // Returns a pointer to the SQE. // // The timeout will complete when either the timeout expires, or after the specified number of // events complete (if `count` is greater than `0`). // // `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an absolute timeout. // // The completion event result will be `-ETIME` if the timeout completed through expiration, // `0` if the timeout completed after the specified number of events, or `-ECANCELED` if the // timeout was removed before it expired. // // io_uring timeouts use the `CLOCK.MONOTONIC` clock source. timeout :: proc( ring: ^IO_Uring, user_data: u64, ts: ^linux.Time_Spec, count: u32, flags: u32, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.TIMEOUT sqe.fd = -1 sqe.addr = cast(u64)uintptr(ts) sqe.len = 1 sqe.off = u64(count) sqe.rw_flags = i32(flags) sqe.user_data = user_data return } // Queues (but does not submit) an SQE to remove an existing timeout operation. // Returns a pointer to the SQE. // // The timeout is identified by its `user_data`. // // The completion event result will be `0` if the timeout was found and cancelled successfully, // `-EBUSY` if the timeout was found but expiration was already in progress, or // `-ENOENT` if the timeout was not found. timeout_remove :: proc( ring: ^IO_Uring, user_data: u64, timeout_user_data: u64, flags: u32, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.TIMEOUT_REMOVE sqe.fd = -1 sqe.addr = timeout_user_data sqe.rw_flags = i32(flags) sqe.user_data = user_data return } // Queues (but does not submit) an SQE to add a link timeout operation. // Returns a pointer to the SQE. // // You need to set linux.IOSQE_IO_LINK to flags of the target operation // and then call this method right after the target operation. // See https://lwn.net/Articles/803932/ for detail. // // If the dependent request finishes before the linked timeout, the timeout // is canceled. If the timeout finishes before the dependent request, the // dependent request will be canceled. // // The completion event result of the link_timeout will be // `-ETIME` if the timeout finishes before the dependent request // (in this case, the completion event result of the dependent request will // be `-ECANCELED`), or // `-EALREADY` if the dependent request finishes before the linked timeout. link_timeout :: proc( ring: ^IO_Uring, user_data: u64, ts: ^os.Unix_File_Time, flags: u32, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.LINK_TIMEOUT sqe.fd = -1 sqe.addr = cast(u64)uintptr(ts) sqe.len = 1 sqe.rw_flags = i32(flags) sqe.user_data = user_data return } poll_add :: proc( ring: ^IO_Uring, user_data: u64, fd: os.Handle, events: linux.Fd_Poll_Events, flags: IORing_Poll_Flags, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.POLL_ADD sqe.fd = i32(fd) sqe.poll_events = transmute(u16)events sqe.len = transmute(u32)flags sqe.user_data = user_data return } poll_remove :: proc( ring: ^IO_Uring, user_data: u64, fd: os.Handle, events: linux.Fd_Poll_Events, ) -> ( sqe: ^io_uring_sqe, err: IO_Uring_Error, ) { sqe = get_sqe(ring) or_return sqe.opcode = IORING_OP.POLL_REMOVE sqe.fd = i32(fd) sqe.poll_events = transmute(u16)events sqe.user_data = user_data return } Submission_Queue :: struct { head: ^u32, tail: ^u32, mask: u32, flags: ^u32, dropped: ^u32, array: []u32, sqes: []io_uring_sqe, mmap: []u8, mmap_sqes: []u8, // We use `sqe_head` and `sqe_tail` in the same way as liburing: // We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`. // We then set `tail` to `sqe_tail` once, only when these events are actually submitted. // This allows us to amortize the cost of the @atomicStore to `tail` across multiple SQEs. sqe_head: u32, sqe_tail: u32, } submission_queue_make :: proc(fd: os.Handle, params: ^io_uring_params) -> (sq: Submission_Queue, ok: bool) { assert(fd >= 0) // Unsupported feature. assert((params.features & IORING_FEAT_SINGLE_MMAP) != 0) sq_size := params.sq_off.array + params.sq_entries * size_of(u32) cq_size := params.cq_off.cqes + params.cq_entries * size_of(io_uring_cqe) size := max(sq_size, cq_size) mmap_result := unix.sys_mmap( nil, uint(size), unix.PROT_READ | unix.PROT_WRITE, unix.MAP_SHARED, /* | unix.MAP_POPULATE */ int(fd), IORING_OFF_SQ_RING, ) if mmap_result < 0 do return defer if !ok do unix.sys_munmap(rawptr(uintptr(mmap_result)), uint(size)) mmap := transmute([^]u8)uintptr(mmap_result) size_sqes := params.sq_entries * size_of(io_uring_sqe) mmap_sqes_result := unix.sys_mmap( nil, uint(size_sqes), unix.PROT_READ | unix.PROT_WRITE, unix.MAP_SHARED, /* | unix.MAP_POPULATE */ int(fd), IORING_OFF_SQES, ) if mmap_sqes_result < 0 do return array := cast([^]u32)&mmap[params.sq_off.array] sqes := cast([^]io_uring_sqe)uintptr(mmap_sqes_result) mmap_sqes := cast([^]u8)uintptr(mmap_sqes_result) sq.head = cast(^u32)&mmap[params.sq_off.head] sq.tail = cast(^u32)&mmap[params.sq_off.tail] sq.mask = (cast(^u32)&mmap[params.sq_off.ring_mask])^ sq.flags = cast(^u32)&mmap[params.sq_off.flags] sq.dropped = cast(^u32)&mmap[params.sq_off.dropped] sq.array = array[:params.sq_entries] sq.sqes = sqes[:params.sq_entries] sq.mmap = mmap[:size] sq.mmap_sqes = mmap_sqes[:size_sqes] ok = true return } submission_queue_destroy :: proc(sq: ^Submission_Queue) { unix.sys_munmap(raw_data(sq.mmap), uint(len(sq.mmap))) unix.sys_munmap(raw_data(sq.mmap_sqes), uint(len(sq.mmap))) } Completion_Queue :: struct { head: ^u32, tail: ^u32, mask: u32, overflow: ^u32, cqes: []io_uring_cqe, } completion_queue_make :: proc(fd: os.Handle, params: ^io_uring_params, sq: ^Submission_Queue) -> Completion_Queue { assert(fd >= 0) // Unsupported feature. assert((params.features & IORING_FEAT_SINGLE_MMAP) != 0) mmap := sq.mmap cqes := cast([^]io_uring_cqe)&mmap[params.cq_off.cqes] return( { head = cast(^u32)&mmap[params.cq_off.head], tail = cast(^u32)&mmap[params.cq_off.tail], mask = (cast(^u32)&mmap[params.cq_off.ring_mask])^, overflow = cast(^u32)&mmap[params.cq_off.overflow], cqes = cqes[:params.cq_entries], } \ ) }