564 lines
13 KiB
Odin
564 lines
13 KiB
Odin
#+private
|
|
package nbio
|
|
|
|
import "base:runtime"
|
|
|
|
import "core:container/queue"
|
|
import "core:mem"
|
|
import "core:net"
|
|
import "core:os"
|
|
import "core:time"
|
|
|
|
import kqueue "_kqueue"
|
|
|
|
MAX_EVENTS :: 256
|
|
|
|
_IO :: struct {
|
|
kq: os.Handle,
|
|
io_inflight: int,
|
|
completion_pool: Pool(Completion),
|
|
timeouts: [dynamic]^Completion,
|
|
completed: queue.Queue(^Completion),
|
|
io_pending: [dynamic]^Completion,
|
|
allocator: mem.Allocator,
|
|
}
|
|
|
|
_Completion :: struct {
|
|
operation: Operation,
|
|
ctx: runtime.Context,
|
|
}
|
|
|
|
Op_Accept :: struct {
|
|
callback: On_Accept,
|
|
sock: net.TCP_Socket,
|
|
}
|
|
|
|
Op_Close :: struct {
|
|
callback: On_Close,
|
|
handle: os.Handle,
|
|
}
|
|
|
|
Op_Connect :: struct {
|
|
callback: On_Connect,
|
|
socket: net.TCP_Socket,
|
|
sockaddr: os.SOCKADDR_STORAGE_LH,
|
|
initiated: bool,
|
|
}
|
|
|
|
Op_Recv :: struct {
|
|
callback: On_Recv,
|
|
socket: net.Any_Socket,
|
|
buf: []byte,
|
|
all: bool,
|
|
received: int,
|
|
len: int,
|
|
}
|
|
|
|
Op_Send :: struct {
|
|
callback: On_Sent,
|
|
socket: net.Any_Socket,
|
|
buf: []byte,
|
|
endpoint: Maybe(net.Endpoint),
|
|
all: bool,
|
|
len: int,
|
|
sent: int,
|
|
}
|
|
|
|
Op_Read :: struct {
|
|
callback: On_Read,
|
|
fd: os.Handle,
|
|
buf: []byte,
|
|
offset: int,
|
|
all: bool,
|
|
read: int,
|
|
len: int,
|
|
}
|
|
|
|
Op_Write :: struct {
|
|
callback: On_Write,
|
|
fd: os.Handle,
|
|
buf: []byte,
|
|
offset: int,
|
|
all: bool,
|
|
written: int,
|
|
len: int,
|
|
}
|
|
|
|
Op_Timeout :: struct {
|
|
callback: On_Timeout,
|
|
expires: time.Time,
|
|
}
|
|
|
|
Op_Next_Tick :: struct {
|
|
callback: On_Next_Tick,
|
|
}
|
|
|
|
Op_Poll :: struct {
|
|
callback: On_Poll,
|
|
fd: os.Handle,
|
|
event: Poll_Event,
|
|
multi: bool,
|
|
}
|
|
|
|
Op_Poll_Remove :: struct {
|
|
fd: os.Handle,
|
|
event: Poll_Event,
|
|
}
|
|
|
|
flush :: proc(io: ^IO) -> os.Errno {
|
|
events: [MAX_EVENTS]kqueue.KEvent
|
|
|
|
min_timeout := flush_timeouts(io)
|
|
change_events := flush_io(io, events[:])
|
|
|
|
if (change_events > 0 || queue.len(io.completed) == 0) {
|
|
if (change_events == 0 && queue.len(io.completed) == 0 && io.io_inflight == 0) {
|
|
return os.ERROR_NONE
|
|
}
|
|
|
|
max_timeout := time.Millisecond * 10
|
|
ts: kqueue.Time_Spec
|
|
ts.nsec = min(min_timeout.? or_else i64(max_timeout), i64(max_timeout))
|
|
new_events, err := kqueue.kevent(io.kq, events[:change_events], events[:], &ts)
|
|
if err != .None do return ev_err_to_os_err(err)
|
|
|
|
// PERF: this is ordered and O(N), can this be made unordered?
|
|
remove_range(&io.io_pending, 0, change_events)
|
|
|
|
io.io_inflight += change_events
|
|
io.io_inflight -= new_events
|
|
|
|
if new_events > 0 {
|
|
queue.reserve(&io.completed, new_events)
|
|
for event in events[:new_events] {
|
|
completion := cast(^Completion)event.udata
|
|
queue.push_back(&io.completed, completion)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Save length so we avoid an infinite loop when there is added to the queue in a callback.
|
|
n := queue.len(io.completed)
|
|
for _ in 0 ..< n {
|
|
completed := queue.pop_front(&io.completed)
|
|
context = completed.ctx
|
|
|
|
switch &op in completed.operation {
|
|
case Op_Accept: do_accept (io, completed, &op)
|
|
case Op_Close: do_close (io, completed, &op)
|
|
case Op_Connect: do_connect (io, completed, &op)
|
|
case Op_Read: do_read (io, completed, &op)
|
|
case Op_Recv: do_recv (io, completed, &op)
|
|
case Op_Send: do_send (io, completed, &op)
|
|
case Op_Write: do_write (io, completed, &op)
|
|
case Op_Timeout: do_timeout (io, completed, &op)
|
|
case Op_Next_Tick: do_next_tick (io, completed, &op)
|
|
case Op_Poll: do_poll (io, completed, &op)
|
|
case Op_Poll_Remove: do_poll_remove(io, completed, &op)
|
|
case: unreachable()
|
|
}
|
|
}
|
|
|
|
return os.ERROR_NONE
|
|
}
|
|
|
|
flush_io :: proc(io: ^IO, events: []kqueue.KEvent) -> int {
|
|
events := events
|
|
events_loop: for &event, i in events {
|
|
if len(io.io_pending) <= i do return i
|
|
completion := io.io_pending[i]
|
|
|
|
switch op in completion.operation {
|
|
case Op_Accept:
|
|
event.ident = uintptr(op.sock)
|
|
event.filter = kqueue.EVFILT_READ
|
|
case Op_Connect:
|
|
event.ident = uintptr(op.socket)
|
|
event.filter = kqueue.EVFILT_WRITE
|
|
case Op_Read:
|
|
event.ident = uintptr(op.fd)
|
|
event.filter = kqueue.EVFILT_READ
|
|
case Op_Write:
|
|
event.ident = uintptr(op.fd)
|
|
event.filter = kqueue.EVFILT_WRITE
|
|
case Op_Recv:
|
|
event.ident = uintptr(os.Socket(net.any_socket_to_socket(op.socket)))
|
|
event.filter = kqueue.EVFILT_READ
|
|
case Op_Send:
|
|
event.ident = uintptr(os.Socket(net.any_socket_to_socket(op.socket)))
|
|
event.filter = kqueue.EVFILT_WRITE
|
|
case Op_Poll:
|
|
event.ident = uintptr(op.fd)
|
|
switch op.event {
|
|
case .Read: event.filter = kqueue.EVFILT_READ
|
|
case .Write: event.filter = kqueue.EVFILT_WRITE
|
|
case: unreachable()
|
|
}
|
|
|
|
event.flags = kqueue.EV_ADD | kqueue.EV_ENABLE
|
|
if !op.multi {
|
|
event.flags |= kqueue.EV_ONESHOT
|
|
}
|
|
|
|
event.udata = completion
|
|
|
|
continue events_loop
|
|
case Op_Poll_Remove:
|
|
event.ident = uintptr(op.fd)
|
|
switch op.event {
|
|
case .Read: event.filter = kqueue.EVFILT_READ
|
|
case .Write: event.filter = kqueue.EVFILT_WRITE
|
|
case: unreachable()
|
|
}
|
|
|
|
event.flags = kqueue.EV_DELETE | kqueue.EV_DISABLE | kqueue.EV_ONESHOT
|
|
|
|
event.udata = completion
|
|
|
|
continue events_loop
|
|
case Op_Timeout, Op_Close, Op_Next_Tick:
|
|
panic("invalid completion operation queued")
|
|
}
|
|
|
|
event.flags = kqueue.EV_ADD | kqueue.EV_ENABLE | kqueue.EV_ONESHOT
|
|
event.udata = completion
|
|
}
|
|
|
|
return len(events)
|
|
}
|
|
|
|
flush_timeouts :: proc(io: ^IO) -> (min_timeout: Maybe(i64)) {
|
|
now: time.Time
|
|
// PERF: is there a faster way to compare time? Or time since program start and compare that?
|
|
if len(io.timeouts) > 0 do now = time.now()
|
|
|
|
for i := len(io.timeouts) - 1; i >= 0; i -= 1 {
|
|
completion := io.timeouts[i]
|
|
|
|
timeout, ok := &completion.operation.(Op_Timeout)
|
|
if !ok do panic("non-timeout operation found in the timeouts queue")
|
|
|
|
unow := time.to_unix_nanoseconds(now)
|
|
expires := time.to_unix_nanoseconds(timeout.expires)
|
|
if unow >= expires {
|
|
ordered_remove(&io.timeouts, i)
|
|
queue.push_back(&io.completed, completion)
|
|
continue
|
|
}
|
|
|
|
timeout_ns := expires - unow
|
|
if min, has_min_timeout := min_timeout.(i64); has_min_timeout {
|
|
if timeout_ns < min {
|
|
min_timeout = timeout_ns
|
|
}
|
|
} else {
|
|
min_timeout = timeout_ns
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
do_accept :: proc(io: ^IO, completion: ^Completion, op: ^Op_Accept) {
|
|
client, source, err := net.accept_tcp(op.sock)
|
|
if err == net.Accept_Error.Would_Block {
|
|
append(&io.io_pending, completion)
|
|
return
|
|
}
|
|
|
|
if err == nil {
|
|
err = _prepare_socket(client)
|
|
}
|
|
|
|
if err != nil {
|
|
net.close(client)
|
|
op.callback(completion.user_data, {}, {}, err)
|
|
} else {
|
|
op.callback(completion.user_data, client, source, nil)
|
|
}
|
|
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_close :: proc(io: ^IO, completion: ^Completion, op: ^Op_Close) {
|
|
ok := os.close(op.handle)
|
|
|
|
op.callback(completion.user_data, ok == os.ERROR_NONE)
|
|
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_connect :: proc(io: ^IO, completion: ^Completion, op: ^Op_Connect) {
|
|
defer op.initiated = true
|
|
|
|
err: os.Errno
|
|
if op.initiated {
|
|
// We have already called os.connect, retrieve error number only.
|
|
os.getsockopt(os.Socket(op.socket), os.SOL_SOCKET, os.SO_ERROR, &err, size_of(os.Errno))
|
|
} else {
|
|
err = os.connect(os.Socket(op.socket), (^os.SOCKADDR)(&op.sockaddr), i32(op.sockaddr.len))
|
|
if err == os.EINPROGRESS {
|
|
append(&io.io_pending, completion)
|
|
return
|
|
}
|
|
}
|
|
|
|
if err != os.ERROR_NONE {
|
|
net.close(op.socket)
|
|
op.callback(completion.user_data, {}, net.Dial_Error(err.(os.Platform_Error)))
|
|
} else {
|
|
op.callback(completion.user_data, op.socket, nil)
|
|
}
|
|
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_read :: proc(io: ^IO, completion: ^Completion, op: ^Op_Read) {
|
|
read: int
|
|
err: os.Errno
|
|
//odinfmt:disable
|
|
switch {
|
|
case op.offset >= 0: read, err = os.read_at(op.fd, op.buf, i64(op.offset))
|
|
case: read, err = os.read(op.fd, op.buf)
|
|
}
|
|
//odinfmt:enable
|
|
|
|
op.read += read
|
|
|
|
if err != os.ERROR_NONE {
|
|
if err == os.EWOULDBLOCK {
|
|
append(&io.io_pending, completion)
|
|
return
|
|
}
|
|
|
|
op.callback(completion.user_data, op.read, err)
|
|
pool_put(&io.completion_pool, completion)
|
|
return
|
|
}
|
|
|
|
if op.all && op.read < op.len {
|
|
op.buf = op.buf[read:]
|
|
|
|
if op.offset >= 0 {
|
|
op.offset += read
|
|
}
|
|
|
|
do_read(io, completion, op)
|
|
return
|
|
}
|
|
|
|
op.callback(completion.user_data, op.read, os.ERROR_NONE)
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_recv :: proc(io: ^IO, completion: ^Completion, op: ^Op_Recv) {
|
|
received: int
|
|
err: net.Network_Error
|
|
remote_endpoint: Maybe(net.Endpoint)
|
|
switch sock in op.socket {
|
|
case net.TCP_Socket:
|
|
received, err = net.recv_tcp(sock, op.buf)
|
|
|
|
// NOTE: Timeout is the name for EWOULDBLOCK in net package.
|
|
if err == net.TCP_Recv_Error.Timeout {
|
|
append(&io.io_pending, completion)
|
|
return
|
|
}
|
|
case net.UDP_Socket:
|
|
received, remote_endpoint, err = net.recv_udp(sock, op.buf)
|
|
|
|
// NOTE: Timeout is the name for EWOULDBLOCK in net package.
|
|
if err == net.UDP_Recv_Error.Timeout {
|
|
append(&io.io_pending, completion)
|
|
return
|
|
}
|
|
}
|
|
|
|
op.received += received
|
|
|
|
if err != nil {
|
|
op.callback(completion.user_data, op.received, remote_endpoint, err)
|
|
pool_put(&io.completion_pool, completion)
|
|
return
|
|
}
|
|
|
|
if op.all && op.received < op.len {
|
|
op.buf = op.buf[received:]
|
|
do_recv(io, completion, op)
|
|
return
|
|
}
|
|
|
|
op.callback(completion.user_data, op.received, remote_endpoint, err)
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_send :: proc(io: ^IO, completion: ^Completion, op: ^Op_Send) {
|
|
sent: u32
|
|
errno: os.Errno
|
|
err: net.Network_Error
|
|
|
|
switch sock in op.socket {
|
|
case net.TCP_Socket:
|
|
sent, errno = os.send(os.Socket(sock), op.buf, 0)
|
|
if errno != nil {
|
|
err = net.TCP_Send_Error(errno.(os.Platform_Error))
|
|
}
|
|
|
|
case net.UDP_Socket:
|
|
toaddr := _endpoint_to_sockaddr(op.endpoint.(net.Endpoint))
|
|
sent, errno = os.sendto(os.Socket(sock), op.buf, 0, cast(^os.SOCKADDR)&toaddr, i32(toaddr.len))
|
|
if errno != nil {
|
|
err = net.UDP_Send_Error(errno.(os.Platform_Error))
|
|
}
|
|
}
|
|
|
|
op.sent += int(sent)
|
|
|
|
if errno != os.ERROR_NONE {
|
|
if errno == os.EWOULDBLOCK {
|
|
append(&io.io_pending, completion)
|
|
return
|
|
}
|
|
|
|
op.callback(completion.user_data, op.sent, err)
|
|
pool_put(&io.completion_pool, completion)
|
|
return
|
|
}
|
|
|
|
if op.all && op.sent < op.len {
|
|
op.buf = op.buf[sent:]
|
|
do_send(io, completion, op)
|
|
return
|
|
}
|
|
|
|
op.callback(completion.user_data, op.sent, nil)
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_write :: proc(io: ^IO, completion: ^Completion, op: ^Op_Write) {
|
|
written: int
|
|
err: os.Errno
|
|
//odinfmt:disable
|
|
switch {
|
|
case op.offset >= 0: written, err = os.write_at(op.fd, op.buf, i64(op.offset))
|
|
case: written, err = os.write(op.fd, op.buf)
|
|
}
|
|
//odinfmt:enable
|
|
|
|
op.written += written
|
|
|
|
if err != os.ERROR_NONE {
|
|
if err == os.EWOULDBLOCK {
|
|
append(&io.io_pending, completion)
|
|
return
|
|
}
|
|
|
|
op.callback(completion.user_data, op.written, err)
|
|
pool_put(&io.completion_pool, completion)
|
|
return
|
|
}
|
|
|
|
// The write did not write the whole buffer, need to write more.
|
|
if op.all && op.written < op.len {
|
|
op.buf = op.buf[written:]
|
|
|
|
// Increase offset so we don't overwrite what we just wrote.
|
|
if op.offset >= 0 {
|
|
op.offset += written
|
|
}
|
|
|
|
do_write(io, completion, op)
|
|
return
|
|
}
|
|
|
|
op.callback(completion.user_data, op.written, os.ERROR_NONE)
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_timeout :: proc(io: ^IO, completion: ^Completion, op: ^Op_Timeout) {
|
|
op.callback(completion.user_data)
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_poll :: proc(io: ^IO, completion: ^Completion, op: ^Op_Poll) {
|
|
op.callback(completion.user_data, op.event)
|
|
if !op.multi {
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
}
|
|
|
|
do_poll_remove :: proc(io: ^IO, completion: ^Completion, op: ^Op_Poll_Remove) {
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
do_next_tick :: proc(io: ^IO, completion: ^Completion, op: ^Op_Next_Tick) {
|
|
op.callback(completion.user_data)
|
|
pool_put(&io.completion_pool, completion)
|
|
}
|
|
|
|
kq_err_to_os_err :: proc(err: kqueue.Queue_Error) -> os.Errno {
|
|
switch err {
|
|
case .Out_Of_Memory:
|
|
return os.ENOMEM
|
|
case .Descriptor_Table_Full:
|
|
return os.EMFILE
|
|
case .File_Table_Full:
|
|
return os.ENFILE
|
|
case .Unknown:
|
|
return os.EFAULT
|
|
case .None:
|
|
fallthrough
|
|
case:
|
|
return os.ERROR_NONE
|
|
}
|
|
}
|
|
|
|
ev_err_to_os_err :: proc(err: kqueue.Event_Error) -> os.Errno {
|
|
switch err {
|
|
case .Access_Denied:
|
|
return os.EACCES
|
|
case .Invalid_Event:
|
|
return os.EFAULT
|
|
case .Invalid_Descriptor:
|
|
return os.EBADF
|
|
case .Signal:
|
|
return os.EINTR
|
|
case .Invalid_Timeout_Or_Filter:
|
|
return os.EINVAL
|
|
case .Event_Not_Found:
|
|
return os.ENOENT
|
|
case .Out_Of_Memory:
|
|
return os.ENOMEM
|
|
case .Process_Not_Found:
|
|
return os.ESRCH
|
|
case .Unknown:
|
|
return os.EFAULT
|
|
case .None:
|
|
fallthrough
|
|
case:
|
|
return os.ERROR_NONE
|
|
}
|
|
}
|
|
|
|
// Private proc in net package, verbatim copy.
|
|
_endpoint_to_sockaddr :: proc(ep: net.Endpoint) -> (sockaddr: os.SOCKADDR_STORAGE_LH) {
|
|
switch a in ep.address {
|
|
case net.IP4_Address:
|
|
(^os.sockaddr_in)(&sockaddr)^ = os.sockaddr_in {
|
|
sin_port = u16be(ep.port),
|
|
sin_addr = transmute(os.in_addr)a,
|
|
sin_family = u8(os.AF_INET),
|
|
sin_len = size_of(os.sockaddr_in),
|
|
}
|
|
return
|
|
case net.IP6_Address:
|
|
(^os.sockaddr_in6)(&sockaddr)^ = os.sockaddr_in6 {
|
|
sin6_port = u16be(ep.port),
|
|
sin6_addr = transmute(os.in6_addr)a,
|
|
sin6_family = u8(os.AF_INET6),
|
|
sin6_len = size_of(os.sockaddr_in6),
|
|
}
|
|
return
|
|
}
|
|
unreachable()
|
|
}
|