270 lines
6.1 KiB
Odin
270 lines
6.1 KiB
Odin
package nbio
|
|
|
|
import "core:container/queue"
|
|
import "core:net"
|
|
import "core:os"
|
|
import "core:time"
|
|
|
|
import kqueue "_kqueue"
|
|
|
|
_init :: proc(io: ^IO, allocator := context.allocator) -> (err: os.Errno) {
|
|
qerr: kqueue.Queue_Error
|
|
io.kq, qerr = kqueue.kqueue()
|
|
if qerr != .None do return kq_err_to_os_err(qerr)
|
|
|
|
pool_init(&io.completion_pool, allocator = allocator)
|
|
|
|
io.timeouts = make([dynamic]^Completion, allocator)
|
|
io.io_pending = make([dynamic]^Completion, allocator)
|
|
|
|
queue.init(&io.completed, allocator = allocator)
|
|
|
|
io.allocator = allocator
|
|
return
|
|
}
|
|
|
|
_num_waiting :: #force_inline proc(io: ^IO) -> int {
|
|
return io.completion_pool.num_waiting
|
|
}
|
|
|
|
_destroy :: proc(io: ^IO) {
|
|
context.allocator = io.allocator
|
|
|
|
delete(io.timeouts)
|
|
delete(io.io_pending)
|
|
|
|
queue.destroy(&io.completed)
|
|
|
|
os.close(io.kq)
|
|
|
|
pool_destroy(&io.completion_pool)
|
|
}
|
|
|
|
_tick :: proc(io: ^IO) -> os.Errno {
|
|
return flush(io)
|
|
}
|
|
|
|
_listen :: proc(socket: net.TCP_Socket, backlog := 1000) -> net.Network_Error {
|
|
errno := os.listen(os.Socket(socket), backlog)
|
|
if errno != nil {
|
|
return net.Listen_Error(errno.(os.Platform_Error))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
_accept :: proc(io: ^IO, socket: net.TCP_Socket, user: rawptr, callback: On_Accept) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Accept{
|
|
callback = callback,
|
|
sock = socket,
|
|
}
|
|
|
|
queue.push_back(&io.completed, completion)
|
|
return completion
|
|
}
|
|
|
|
// Wraps os.close using the kqueue.
|
|
_close :: proc(io: ^IO, fd: Closable, user: rawptr, callback: On_Close) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
|
|
completion.operation = Op_Close{
|
|
callback = callback,
|
|
}
|
|
op := &completion.operation.(Op_Close)
|
|
|
|
switch h in fd {
|
|
case net.TCP_Socket: op.handle = os.Handle(h)
|
|
case net.UDP_Socket: op.handle = os.Handle(h)
|
|
case net.Socket: op.handle = os.Handle(h)
|
|
case os.Handle: op.handle = h
|
|
}
|
|
|
|
queue.push_back(&io.completed, completion)
|
|
return completion
|
|
}
|
|
|
|
// TODO: maybe call this dial?
|
|
_connect :: proc(io: ^IO, endpoint: net.Endpoint, user: rawptr, callback: On_Connect) -> (^Completion, net.Network_Error) {
|
|
if endpoint.port == 0 {
|
|
return nil, net.Dial_Error.Port_Required
|
|
}
|
|
|
|
family := net.family_from_endpoint(endpoint)
|
|
sock, err := net.create_socket(family, .TCP)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = _prepare_socket(sock); err != nil {
|
|
close(io, net.any_socket_to_socket(sock))
|
|
return nil, err
|
|
}
|
|
|
|
completion := pool_get(&io.completion_pool)
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Connect {
|
|
callback = callback,
|
|
socket = sock.(net.TCP_Socket),
|
|
sockaddr = _endpoint_to_sockaddr(endpoint),
|
|
}
|
|
|
|
queue.push_back(&io.completed, completion)
|
|
return completion, nil
|
|
}
|
|
|
|
_read :: proc(
|
|
io: ^IO,
|
|
fd: os.Handle,
|
|
offset: Maybe(int),
|
|
buf: []byte,
|
|
user: rawptr,
|
|
callback: On_Read,
|
|
all := false,
|
|
) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Read {
|
|
callback = callback,
|
|
fd = fd,
|
|
buf = buf,
|
|
offset = offset.? or_else -1,
|
|
all = all,
|
|
len = len(buf),
|
|
}
|
|
|
|
queue.push_back(&io.completed, completion)
|
|
return completion
|
|
}
|
|
|
|
_recv :: proc(io: ^IO, socket: net.Any_Socket, buf: []byte, user: rawptr, callback: On_Recv, all := false) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Recv {
|
|
callback = callback,
|
|
socket = socket,
|
|
buf = buf,
|
|
all = all,
|
|
len = len(buf),
|
|
}
|
|
|
|
queue.push_back(&io.completed, completion)
|
|
return completion
|
|
}
|
|
|
|
_send :: proc(
|
|
io: ^IO,
|
|
socket: net.Any_Socket,
|
|
buf: []byte,
|
|
user: rawptr,
|
|
callback: On_Sent,
|
|
endpoint: Maybe(net.Endpoint) = nil,
|
|
all := false,
|
|
) -> ^Completion {
|
|
if _, ok := socket.(net.UDP_Socket); ok {
|
|
assert(endpoint != nil)
|
|
}
|
|
|
|
completion := pool_get(&io.completion_pool)
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Send {
|
|
callback = callback,
|
|
socket = socket,
|
|
buf = buf,
|
|
endpoint = endpoint,
|
|
all = all,
|
|
len = len(buf),
|
|
}
|
|
|
|
queue.push_back(&io.completed, completion)
|
|
return completion
|
|
}
|
|
|
|
_write :: proc(
|
|
io: ^IO,
|
|
fd: os.Handle,
|
|
offset: Maybe(int),
|
|
buf: []byte,
|
|
user: rawptr,
|
|
callback: On_Write,
|
|
all := false,
|
|
) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Write {
|
|
callback = callback,
|
|
fd = fd,
|
|
buf = buf,
|
|
offset = offset.? or_else -1,
|
|
all = all,
|
|
len = len(buf),
|
|
}
|
|
|
|
queue.push_back(&io.completed, completion)
|
|
return completion
|
|
}
|
|
|
|
// Runs the callback after the timeout, using the kqueue.
|
|
_timeout :: proc(io: ^IO, dur: time.Duration, user: rawptr, callback: On_Timeout) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Timeout {
|
|
callback = callback,
|
|
expires = time.time_add(time.now(), dur),
|
|
}
|
|
|
|
append(&io.timeouts, completion)
|
|
return completion
|
|
}
|
|
|
|
_next_tick :: proc(io: ^IO, user: rawptr, callback: On_Next_Tick) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Next_Tick {
|
|
callback = callback,
|
|
}
|
|
|
|
queue.push_back(&io.completed, completion)
|
|
return completion
|
|
}
|
|
|
|
_poll :: proc(io: ^IO, fd: os.Handle, event: Poll_Event, multi: bool, user: rawptr, callback: On_Poll) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
|
|
completion.ctx = context
|
|
completion.user_data = user
|
|
completion.operation = Op_Poll{
|
|
callback = callback,
|
|
fd = fd,
|
|
event = event,
|
|
multi = multi,
|
|
}
|
|
|
|
append(&io.io_pending, completion)
|
|
return completion
|
|
}
|
|
|
|
_poll_remove :: proc(io: ^IO, fd: os.Handle, event: Poll_Event) -> ^Completion {
|
|
completion := pool_get(&io.completion_pool)
|
|
|
|
completion.ctx = context
|
|
completion.operation = Op_Poll_Remove{
|
|
fd = fd,
|
|
event = event,
|
|
}
|
|
|
|
append(&io.io_pending, completion)
|
|
return completion
|
|
}
|