asyncdispatch
This module implements asynchronous IO. This includes a dispatcher, a Future type implementation, and an async macro which allows asynchronous code to be written in a synchronous style with the await keyword.
The dispatcher acts as a kind of event loop. You must call poll on it (or a function which does so for you such as waitFor or runForever) in order to poll for any outstanding events. The underlying implementation is based on epoll on Linux, IO Completion Ports on Windows and select on other operating systems.
The poll function will not, on its own, return any events. Instead an appropriate Future object will be completed. A Future is a type which holds a value which is not yet available, but which may be available in the future. You can check whether a future is finished by using the finished function. When a future is finished it means that either the value that it holds is now available or it holds an error instead. The latter situation occurs when the operation to complete a future fails with an exception. You can distinguish between the two situations with the failed function.
Future objects can also store a callback procedure which will be called automatically once the future completes.
Futures therefore can be thought of as an implementation of the proactor pattern. In this pattern you make a request for an action, and once that action is fulfilled a future is completed with the result of that action. Requests can be made by calling the appropriate functions. For example: calling the recv function will create a request for some data to be read from a socket. The future which the recv function returns will then complete once the requested amount of data is read or an exception occurs.
Code to read some data from a socket may look something like this:
var future = socket.recv(100)
future.addCallback(
proc () =
echo(future.read)
) All asynchronous functions returning a Future will not block. They will not however return immediately. An asynchronous function will have code which will be executed before an asynchronous request is made, in most cases this code sets up the request.
In the above example, the recv function will return a brand new Future instance once the request for data to be read from the socket is made. This Future instance will complete once the requested amount of data is read, in this case it is 100 bytes. The second line sets a callback on this future which will be called once the future completes. All the callback does is write the data stored in the future to stdout. The read function is used for this and it checks whether the future completes with an error for you (if it did it will simply raise the error), if there is no error however it returns the value of the future.
Asynchronous procedures
Asynchronous procedures remove the pain of working with callbacks. They do this by allowing you to write asynchronous code the same way as you would write synchronous code.
An asynchronous procedure is marked using the {.async.} pragma. When marking a procedure with the {.async.} pragma it must have a Future[T] return type or no return type at all. If you do not specify a return type then Future[void] is assumed.
Inside asynchronous procedures await can be used to call any procedures which return a Future; this includes asynchronous procedures. When a procedure is "awaited", the asynchronous procedure it is awaited in will suspend its execution until the awaited procedure's Future completes. At which point the asynchronous procedure will resume its execution. During the period when an asynchronous procedure is suspended other asynchronous procedures will be run by the dispatcher.
The await call may be used in many contexts. It can be used on the right hand side of a variable declaration: var data = await socket.recv(100), in which case the variable will be set to the value of the future automatically. It can be used to await a Future object, and it can be used to await a procedure returning a Future[void]: await socket.send("foobar").
If an awaited future completes with an error, then await will re-raise this error. To avoid this, you can use the yield keyword instead of await. The following section shows different ways that you can handle exceptions in async procs.
Handling Exceptions
The most reliable way to handle exceptions is to use yield on a future then check the future's failed property. For example:
var future = sock.recv(100) yield future if future.failed: # Handle exception
The async procedures also offer limited support for the try statement.
try:
let data = await sock.recv(100)
echo("Received ", data)
except:
# Handle exception Unfortunately the semantics of the try statement may not always be correct, and occasionally the compilation may fail altogether. As such it is better to use the former style when possible.
Discarding futures
Futures should never be discarded. This is because they may contain errors. If you do not care for the result of a Future then you should use the asyncCheck procedure instead of the discard keyword. Note however that this does not wait for completion, and you should use waitFor for that purpose.
Examples
For examples take a look at the documentation for the modules implementing asynchronous IO. A good place to start is the asyncnet module.
Investigating pending futures
It's possible to get into a situation where an async proc, or more accurately a Future[T] gets stuck and never completes. This can happen for various reasons and can cause serious memory leaks. When this occurs it's hard to identify the procedure that is stuck.
Thankfully there is a mechanism which tracks the count of each pending future. All you need to do to enable it is compile with -d:futureLogging and use the getFuturesInProgress procedure to get the list of pending futures together with the stack traces to the moment of their creation.
You may also find it useful to use this prometheus package which will log the pending futures into prometheus, allowing you to analyse them via a nice graph.
Limitations/Bugs
- The effect system (
raises: []) does not work with async procedures.
asyncdispatch module depends on the asyncmacro module to work properly. Imports
- os, tables, strutils, times, heapqueue, options, asyncstreams, options, math, monotimes, asyncfutures, nativesockets, net, deques, winlean, sets, hashes, macros, strutils, asyncfutures, posix
Types
CompletionData = object fd*: AsyncFD cb*: owned(proc (fd: AsyncFD; bytesTransferred: DWORD; errcode: OSErrorCode) {...}{. closure, gcsafe.}) cell*: ForeignCell- Source Edit
PDispatcher = ref object of PDispatcherBase ioPort: Handle handles*: HashSet[AsyncFD]
- Source Edit
CustomRef = ref CustomObj
- Source Edit
AsyncFD = distinct int
- Source Edit
AsyncEvent = ptr AsyncEventImpl
- Source Edit
Callback = proc (fd: AsyncFD): bool {...}{.closure, gcsafe.}- Source Edit
Procs
proc `==`(x: AsyncFD; y: AsyncFD): bool {...}{.borrow.}- Source Edit
proc newDispatcher(): owned PDispatcher {...}{.raises: [], tags: [].}- Creates a new Dispatcher instance. Source Edit
proc setGlobalDispatcher(disp: sink PDispatcher) {...}{.raises: [Exception], tags: [RootEffect].}- Source Edit
proc getGlobalDispatcher(): PDispatcher {...}{.raises: [Exception], tags: [RootEffect].}- Source Edit
proc getIoHandler(disp: PDispatcher): Handle {...}{.raises: [], tags: [].}- Returns the underlying IO Completion Port handle (Windows) or selector (Unix) for the specified dispatcher. Source Edit
proc register(fd: AsyncFD) {...}{.raises: [Exception, OSError], tags: [RootEffect].}- Registers
fdwith the dispatcher. Source Edit proc hasPendingOperations(): bool {...}{.raises: [Exception], tags: [RootEffect].}- Returns
trueif the global dispatcher has pending operations. Source Edit proc newCustom(): CustomRef {...}{.raises: [], tags: [].}- Source Edit
proc recv(socket: AsyncFD; size: int; flags = {SafeDisconn}): owned( Future[string]) {...}{.raises: [Exception, ValueError], tags: [RootEffect].}-
Reads up to
sizebytes fromsocket. Returned future will complete once all the data requested is read, a part of the data has been read, or the socket has disconnected in which case the future will complete with a value of"".Warning: The
Source EditPeeksocket flag is not supported on Windows. proc recvInto(socket: AsyncFD; buf: pointer; size: int; flags = {SafeDisconn}): owned( Future[int]) {...}{.raises: [Exception, ValueError], tags: [RootEffect].}-
Reads up to
sizebytes fromsocketintobuf, which must at least be of that size. Returned future will complete once all the data requested is read, a part of the data has been read, or the socket has disconnected in which case the future will complete with a value of0.Warning: The
Source EditPeeksocket flag is not supported on Windows. proc send(socket: AsyncFD; buf: pointer; size: int; flags = {SafeDisconn}): owned( Future[void]) {...}{.raises: [Exception, ValueError], tags: [RootEffect].}-
Sends
sizebytes frombuftosocket. The returned future will complete once all data has been sent.WARNING: Use it with caution. If
Source Editbufrefers to GC'ed object, you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. proc sendTo(socket: AsyncFD; data: pointer; size: int; saddr: ptr SockAddr; saddrLen: SockLen; flags = {SafeDisconn}): owned(Future[void]) {...}{. raises: [Exception, ValueError], tags: [RootEffect].}- Sends
datato specified destinationsaddr, using socketsocket. The returned future will complete once all data has been sent. Source Edit proc recvFromInto(socket: AsyncFD; data: pointer; size: int; saddr: ptr SockAddr; saddrLen: ptr SockLen; flags = {SafeDisconn}): owned(Future[int]) {...}{. raises: [Exception, ValueError], tags: [RootEffect].}- Receives a datagram data from
socketintobuf, which must be at least of sizesize, address of datagram's sender will be stored intosaddrandsaddrLen. Returned future will complete once one datagram has been received, and will return size of packet received. Source Edit proc acceptAddr(socket: AsyncFD; flags = {SafeDisconn}; inheritable = defined(nimInheritHandles)): owned( Future[tuple[address: string, client: AsyncFD]]) {...}{. raises: [Exception, ValueError, OSError, ValueError, Exception], tags: [RootEffect].}-
Accepts a new connection. Returns a future containing the client socket corresponding to that connection and the remote address of the client. The future will complete when the connection is successfully accepted.
The resulting client socket is automatically registered to the dispatcher.
If
inheritableis false (the default), the resulting client socket will not be inheritable by child processes.The
Source Editacceptcall may result in an error if the connecting socket disconnects during the duration of theaccept. If theSafeDisconnflag is specified then this error will not be raised and instead accept will be called again. proc setInheritable(fd: AsyncFD; inheritable: bool): bool {...}{.raises: [], tags: [].}-
Control whether a file handle can be inherited by child processes. Returns
trueon success.This procedure is not guaranteed to be available for all platforms. Test for availability with declared().
Source Edit proc closeSocket(socket: AsyncFD) {...}{.raises: [Exception], tags: [RootEffect].}- Closes a socket and ensures that it is unregistered. Source Edit
proc unregister(fd: AsyncFD) {...}{.raises: [Exception], tags: [RootEffect].}- Unregisters
fd. Source Edit proc contains(disp: PDispatcher; fd: AsyncFD): bool {...}{.raises: [], tags: [].}- Source Edit
proc addRead(fd: AsyncFD; cb: Callback) {...}{.raises: [Exception, OSError], tags: [RootEffect].}-
Start watching the file descriptor for read availability and then call the callback
cb.This is not
puremechanism for Windows Completion Ports (IOCP), so if you can avoid it, please do it. UseaddReadonly if really need it (main usecase is adaptation of unix-like libraries to be asynchronous on Windows).If you use this function, you don't need to use asyncdispatch.recv() or asyncdispatch.accept(), because they are using IOCP, please use nativesockets.recv() and nativesockets.accept() instead.
Be sure your callback
Source Editcbreturnstrue, if you want to remove watch ofreadnotifications, andfalse, if you want to continue receiving notifications. proc addWrite(fd: AsyncFD; cb: Callback) {...}{.raises: [Exception, OSError], tags: [RootEffect].}-
Start watching the file descriptor for write availability and then call the callback
cb.This is not
puremechanism for Windows Completion Ports (IOCP), so if you can avoid it, please do it. UseaddWriteonly if really need it (main usecase is adaptation of unix-like libraries to be asynchronous on Windows).If you use this function, you don't need to use asyncdispatch.send() or asyncdispatch.connect(), because they are using IOCP, please use nativesockets.send() and nativesockets.connect() instead.
Be sure your callback
Source Editcbreturnstrue, if you want to remove watch ofwritenotifications, andfalse, if you want to continue receiving notifications. proc addTimer(timeout: int; oneshot: bool; cb: Callback) {...}{. raises: [Exception, OSError], tags: [RootEffect].}-
Registers callback
cbto be called when timer expired.Parameters:
-
timeout- timeout value in milliseconds. -
oneshot-
true- generate only one timeout event -
false- generate timeout events periodically
-
-
proc addProcess(pid: int; cb: Callback) {...}{.raises: [Exception, OSError], tags: [RootEffect].}- Registers callback
cbto be called when process with process IDpidexited. Source Edit proc newAsyncEvent(): AsyncEvent {...}{.raises: [OSError], tags: [].}-
Creates a new thread-safe
AsyncEventobject.New
Source EditAsyncEventobject is not automatically registered with dispatcher likeAsyncSocket. proc trigger(ev: AsyncEvent) {...}{.raises: [OSError], tags: [].}- Set event
evto signaled state. Source Edit proc unregister(ev: AsyncEvent) {...}{.raises: [Exception, OSError], tags: [RootEffect].}- Unregisters event
ev. Source Edit proc close(ev: AsyncEvent) {...}{.raises: [OSError], tags: [].}- Closes event
ev. Source Edit proc addEvent(ev: AsyncEvent; cb: Callback) {...}{.raises: [Exception, OSError], tags: [RootEffect].}- Registers callback
cbto be called whenevwill be signaled Source Edit proc drain(timeout = 500) {...}{.raises: [Exception, ValueError, OSError], tags: [TimeEffect, RootEffect].}- Waits for completion of all events and processes them. Raises
ValueErrorif there are no pending operations. In contrast topollthis processes as many events as are available until the timeout has elapsed. Source Edit proc poll(timeout = 500) {...}{.raises: [Exception, ValueError, OSError], tags: [RootEffect, TimeEffect].}- Waits for completion events and processes them. Raises
ValueErrorif there are no pending operations. This runs the underlying OS epoll or kqueue primitive only once. Source Edit proc createAsyncNativeSocket(domain: cint; sockType: cint; protocol: cint; inheritable = defined(nimInheritHandles)): AsyncFD {...}{. raises: [OSError, Exception], tags: [RootEffect].}- Source Edit
proc createAsyncNativeSocket(domain: Domain = Domain.AF_INET; sockType: SockType = SOCK_STREAM; protocol: Protocol = IPPROTO_TCP; inheritable = defined(nimInheritHandles)): AsyncFD {...}{. raises: [OSError, Exception], tags: [RootEffect].}- Source Edit
proc dial(address: string; port: Port; protocol: Protocol = IPPROTO_TCP): owned( Future[AsyncFD]) {...}{.raises: [OSError, ValueError, Exception], tags: [RootEffect].}- Establishes connection to the specified
address:portpair via the specified protocol. The procedure iterates through possible resolutions of theaddressuntil it succeeds, meaning that it seamlessly works with both IPv4 and IPv6. Returns the async file descriptor, registered in the dispatcher of the current thread, ready to send or receive data. Source Edit proc connect(socket: AsyncFD; address: string; port: Port; domain = Domain.AF_INET): owned(Future[void]) {...}{. raises: [OSError, IOError, ValueError, Exception], tags: [RootEffect].}- Source Edit
proc sleepAsync(ms: int | float): owned(Future[void])
- Suspends the execution of the current async procedure for the next
msmilliseconds. Source Edit proc withTimeout[T](fut: Future[T]; timeout: int): owned(Future[bool])
-
Returns a future which will complete once
futcompletes or aftertimeoutmilliseconds has elapsed.If
Source Editfutcompletes first the returned future will hold true, otherwise, iftimeoutmilliseconds has elapsed first, the returned future will hold false. proc accept(socket: AsyncFD; flags = {SafeDisconn}; inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) {...}{. raises: [Exception, ValueError, OSError], tags: [RootEffect].}-
Accepts a new connection. Returns a future containing the client socket corresponding to that connection.
If
inheritableis false (the default), the resulting client socket will not be inheritable by child processes.The future will complete when the connection is successfully accepted.
Source Edit proc send(socket: AsyncFD; data: string; flags = {SafeDisconn}): owned( Future[void]) {...}{.raises: [Exception, ValueError], tags: [RootEffect].}- Sends
datatosocket. The returned future will complete once all data has been sent. Source Edit proc readAll(future: FutureStream[string]): owned(Future[string]) {...}{. raises: [Exception, ValueError], tags: [RootEffect].}- Returns a future that will complete when all the string data from the specified future stream is retrieved. Source Edit
proc callSoon(cbproc: proc () {...}{.gcsafe.}) {...}{.gcsafe, raises: [Exception], tags: [RootEffect].}- Schedule
cbprocto be called as soon as possible. The callback is called when control returns to the event loop. Source Edit proc runForever() {...}{.raises: [Exception, ValueError, OSError], tags: [RootEffect, TimeEffect].}- Begins a never ending global dispatcher poll loop. Source Edit
proc waitFor[T](fut: Future[T]): T
- Blocks the current thread until the specified future completes. Source Edit
proc activeDescriptors(): int {...}{.inline, raises: [], tags: [].}- Returns the current number of active file descriptors for the current event loop. This is a cheap operation that does not involve a system call. Source Edit
proc maxDescriptors(): int {...}{.raises: OSError, tags: [].}- Returns the maximum number of active file descriptors for the current process. This involves a system call. For now
maxDescriptorsis supported on the following OSes: Windows, Linux, OSX, BSD. Source Edit
Macros
macro async(prc: untyped): untyped
- Macro which processes async procedures into the appropriate iterators and yield statements. Source Edit
macro multisync(prc: untyped): untyped
-
Macro which processes async procedures into both asynchronous and synchronous procedures.
The generated async procedures use the
Source Editasyncmacro, whereas the generated synchronous procedures simply strip off theawaitcalls.
Templates
template await(f: typed): untyped {...}{.used.}- Source Edit
template await[T](f: Future[T]): auto {...}{.used.}- Source Edit
Exports
- Port, SocketFlag, and, addCallback, asyncCheck, or, read, fail, setCallSoonProc, addCallback, clean, clearCallbacks, newFutureVar, mget, Future, failed, $, callback=, complete, callback=, NimAsyncContinueSuffix, FutureBase, all, complete, FutureError, getCallSoonProc, FutureVar, isFutureLoggingEnabled, complete, readError, complete, newFuture, finished, len, callback=, fail, newFutureStream, finished, write, complete, FutureStream, read, failed
© 2006–2021 Andreas Rumpf
Licensed under the MIT License.
https://nim-lang.org/docs/asyncdispatch.html