Tasks and Parallel Computing

Tasks

Core.TaskType

Task(func)

Create a Task (i.e. coroutine) to execute the given function (which must be callable with no arguments). The task exits when this function returns.

Example

julia> a() = det(rand(1000, 1000));

julia> b = Task(a);

In this example, b is a runnable Task that hasn't started yet.

source

Base.current_taskFunction

current_task()

Get the currently running Task.

source

Base.istaskdoneFunction

istaskdone(t::Task) -> Bool

Determine whether a task has exited.

julia> a2() = det(rand(1000, 1000));

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source

Base.istaskstartedFunction

istaskstarted(t::Task) -> Bool

Determine whether a task has started executing.

julia> a3() = det(rand(1000, 1000));

julia> b = Task(a3);

julia> istaskstarted(b)
false
source

Base.yieldFunction

yield()

Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks.

source
yield(t::Task, arg = nothing)

A fast, unfair-scheduling version of schedule(t, arg); yield() which immediately yields to t before calling the scheduler.

source

Base.yieldtoFunction

yieldto(t::Task, arg = nothing)

Switch to the given task. The first time a task is switched to, the task's function is called with no arguments. On subsequent switches, arg is returned from the task's last call to yieldto. This is a low-level call that only switches tasks, not considering states or scheduling in any way. Its use is discouraged.

source

Base.task_local_storageMethod

task_local_storage(key)

Look up the value of a key in the current task's task-local storage.

source

Base.task_local_storageMethod

task_local_storage(key, value)

Assign a value to a key in the current task's task-local storage.

source

Base.task_local_storageMethod

task_local_storage(body, key, value)

Call the function body with a modified task-local storage, in which value is assigned to key; the previous value of key, or lack thereof, is restored afterwards. Useful for emulating dynamic scoping.

source

Base.ConditionType

Condition()

Create an edge-triggered event source that tasks can wait for. Tasks that call wait on a Condition are suspended and queued. Tasks are woken up when notify is later called on the Condition. Edge triggering means that only tasks waiting at the time notify is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. The Channel type does this, and so can be used for level-triggered events.

source

Base.notifyFunction

notify(condition, val=nothing; all=true, error=false)

Wake up tasks waiting for a condition, passing them val. If all is true (the default), all waiting tasks are woken, otherwise only one is. If error is true, the passed value is raised as an exception in the woken tasks.

Returns the count of tasks woken up. Returns 0 if no tasks are waiting on condition.

source

Base.scheduleFunction

schedule(t::Task, [val]; error=false)

Add a Task to the scheduler's queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such as wait.

If a second argument val is provided, it will be passed to the task (via the return value of yieldto) when it runs again. If error is true, the value is raised as an exception in the woken task.

julia> a5() = det(rand(1000, 1000));

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Base.@scheduleMacro

@schedule

Wrap an expression in a Task and add it to the local machine's scheduler queue. Similar to @async except that an enclosing @sync does NOT wait for tasks started with an @schedule.

source

Base.@taskMacro

@task

Wrap an expression in a Task without executing it, and return the Task. This only creates a task, and does not run it.

julia> a1() = det(rand(1000, 1000));

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source

Base.sleepFunction

sleep(seconds)

Block the current task for a specified number of seconds. The minimum sleep time is 1 millisecond or input of 0.001.

source

Base.ChannelType

Channel{T}(sz::Int)

Constructs a Channel with an internal buffer that can hold a maximum of sz objects of type T. put! calls on a full channel block until an object is removed with take!.

Channel(0) constructs an unbuffered channel. put! blocks until a matching take! is called. And vice-versa.

Other constructors:

  • Channel(Inf): equivalent to Channel{Any}(typemax(Int))

  • Channel(sz): equivalent to Channel{Any}(sz)

source

Base.put!Method

put!(c::Channel, v)

Appends an item v to the channel c. Blocks if the channel is full.

For unbuffered channels, blocks until a take! is performed by a different task.

source

Base.take!Method

take!(c::Channel)

Removes and returns a value from a Channel. Blocks until data is available.

For unbuffered channels, blocks until a put! is performed by a different task.

source

Base.isreadyMethod

isready(c::Channel)

Determine whether a Channel has a value stored to it. Returns immediately, does not block.

For unbuffered channels returns true if there are tasks waiting on a put!.

source

Base.fetchMethod

fetch(c::Channel)

Waits for and gets the first available item from the channel. Does not remove the item. fetch is unsupported on an unbuffered (0-size) channel.

source

Base.closeMethod

close(c::Channel)

Closes a channel. An exception is thrown by:

  • put! on a closed channel.

  • take! and fetch on an empty, closed channel.

source

Base.bindMethod

bind(chnl::Channel, task::Task)

Associates the lifetime of chnl with a task. Channel chnl is automatically closed when the task terminates. Any uncaught exception in the task is propagated to all waiters on chnl.

The chnl object can be explicitly closed independent of task termination. Terminating tasks have no effect on already closed Channel objects.

When a channel is bound to multiple tasks, the first task to terminate will close the channel. When multiple channels are bound to the same task, termination of the task will close all of the bound channels.

julia> c = Channel(0);

julia> task = @schedule foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @schedule (put!(c,1);error("foo"));

julia> bind(c,task);

julia> take!(c)
1

julia> put!(c,1);
ERROR: foo
Stacktrace:
 [1] check_channel_state(::Channel{Any}) at ./channels.jl:131
 [2] put!(::Channel{Any}, ::Int64) at ./channels.jl:261
source

Base.asyncmapFunction

asyncmap(f, c...; ntasks=0, batch_size=nothing)

Uses multiple concurrent tasks to map f over a collection (or multiple equal length collections). For multiple collection arguments, f is applied elementwise.

ntasks specifies the number of tasks to run concurrently. Depending on the length of the collections, if ntasks is unspecified, up to 100 tasks will be used for concurrent mapping.

ntasks can also be specified as a zero-arg function. In this case, the number of tasks to run in parallel is checked before processing every element and a new task started if the value of ntasks_func() is less than the current number of tasks.

If batch_size is specified, the collection is processed in batch mode. f must then be a function that must accept a Vector of argument tuples and must return a vector of results. The input vector will have a length of batch_size or less.

The following examples highlight execution in different tasks by returning the object_id of the tasks in which the mapping function is executed.

First, with ntasks undefined, each element is processed in a different task.

julia> tskoid() = object_id(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

With ntasks=2 all elements are processed in 2 tasks.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

With batch_size defined, the mapping function needs to be changed to accept an array of argument tuples and return an array of results. map is used in the modified mapping function to achieve this.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Note

Currently, all tasks in Julia are executed in a single OS thread co-operatively. Consequently, ayncmap is beneficial only when the mapping function involves any I/O - disk, network, remote worker invocation, etc.

source

Base.asyncmap!Function

asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Like asyncmap(), but stores output in results rather than returning a collection.

source

General Parallel Computing Support

Base.Distributed.addprocsFunction

addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers

Launches worker processes via the specified cluster manager.

For example, Beowulf clusters are supported via a custom cluster manager implemented in the package ClusterManagers.jl.

The number of seconds a newly launched worker waits for connection establishment from the master can be specified via variable JULIA_WORKER_TIMEOUT in the worker process's environment. Relevant only when using TCP/IP as transport.

source
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers

Add processes on remote machines via SSH. Requires julia to be installed in the same location on each node, or to be available via a shared file system.

machines is a vector of machine specifications. Workers are started for each specification.

A machine specification is either a string machine_spec or a tuple - (machine_spec, count).

machine_spec is a string of the form [user@]host[:port] [bind_addr[:port]]. user defaults to current user, port to the standard ssh port. If [bind_addr[:port]] is specified, other workers will connect to this worker at the specified bind_addr and port.

count is the number of workers to be launched on the specified host. If specified as :auto it will launch as many workers as the number of cores on the specific host.

Keyword arguments:

  • tunnel: if true then SSH tunneling will be used to connect to the worker from the master process. Default is false.

  • sshflags: specifies additional ssh options, e.g. sshflags=`-i /home/foo/bar.pem`

  • max_parallel: specifies the maximum number of workers connected to in parallel at a host. Defaults to 10.

  • dir: specifies the working directory on the workers. Defaults to the host's current directory (as found by pwd())

  • enable_threaded_blas: if true then BLAS will run on multiple threads in added processes. Default is false.

  • exename: name of the julia executable. Defaults to "$JULIA_HOME/julia" or "$JULIA_HOME/julia-debug" as the case may be.

  • exeflags: additional flags passed to the worker processes.

  • topology: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error.

    • topology=:all_to_all: All processes are connected to each other. The default.

    • topology=:master_slave: Only the driver process, i.e. pid 1 connects to the workers. The workers do not connect to each other.

    • topology=:custom: The launch method of the cluster manager specifies the connection topology via fields ident and connect_idents in WorkerConfig. A worker with a cluster manager identity ident will connect to all workers specified in connect_idents.

Environment variables :

If the master process fails to establish a connection with a newly launched worker within 60.0 seconds, the worker treats it as a fatal situation and terminates. This timeout can be controlled via environment variable JULIA_WORKER_TIMEOUT. The value of JULIA_WORKER_TIMEOUT on the master process specifies the number of seconds a newly launched worker waits for connection establishment.

source
addprocs(; kwargs...) -> List of process identifiers

Equivalent to addprocs(Sys.CPU_CORES; kwargs...)

Note that workers do not run a .juliarc.jl startup script, nor do they synchronize their global state (such as global variables, new method definitions, and loaded modules) with any of the other running processes.

source
addprocs(np::Integer; restrict=true, kwargs...) -> List of process identifiers

Launches workers using the in-built LocalManager which only launches workers on the local host. This can be used to take advantage of multiple cores. addprocs(4) will add 4 processes on the local machine. If restrict is true, binding is restricted to 127.0.0.1. Keyword args dir, exename, exeflags, topology, and enable_threaded_blas have the same effect as documented for addprocs(machines).

source

Base.Distributed.nprocsFunction

nprocs()

Get the number of available processes.

source

Base.Distributed.nworkersFunction

nworkers()

Get the number of available worker processes. This is one less than nprocs(). Equal to nprocs() if nprocs() == 1.

source

Base.Distributed.procsMethod

procs()

Returns a list of all process identifiers.

source

Base.Distributed.procsMethod

procs(pid::Integer)

Returns a list of all process identifiers on the same physical node. Specifically all workers bound to the same ip-address as pid are returned.

source

Base.Distributed.workersFunction

workers()

Returns a list of all worker process identifiers.

source

Base.Distributed.rmprocsFunction

rmprocs(pids...; waitfor=typemax(Int))

Removes the specified workers. Note that only process 1 can add or remove workers.

Argument waitfor specifies how long to wait for the workers to shut down: - If unspecified, rmprocs will wait until all requested pids are removed. - An ErrorException is raised if all workers cannot be terminated before the requested waitfor seconds. - With a waitfor value of 0, the call returns immediately with the workers scheduled for removal in a different task. The scheduled Task object is returned. The user should call wait on the task before invoking any other parallel calls.

source

Base.Distributed.interruptFunction

interrupt(pids::Integer...)

Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.

source
interrupt(pids::AbstractVector=workers())

Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.

source

Base.Distributed.myidFunction

myid()

Get the id of the current process.

source

Base.Distributed.pmapFunction

pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[]), retry_check=nothing) -> collection

Transform collection c by applying f to each element using available workers and tasks.

For multiple collection arguments, apply f elementwise.

Note that f must be made available to all worker processes; see Code Availability and Loading Packages for details.

If a worker pool is not specified, all available workers, i.e., the default worker pool is used.

By default, pmap distributes the computation over all specified workers. To use only the local process and distribute over tasks, specify distributed=false. This is equivalent to using asyncmap. For example, pmap(f, c; distributed=false) is equivalent to asyncmap(f,c; ntasks=()->nworkers())

pmap can also use a mix of processes and tasks via the batch_size argument. For batch sizes greater than 1, the collection is processed in multiple batches, each of length batch_size or less. A batch is sent as a single request to a free worker, where a local asyncmap processes elements from the batch using multiple concurrent tasks.

Any error stops pmap from processing the remainder of the collection. To override this behavior you can specify an error handling function via argument on_error which takes in a single argument, i.e., the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value which is then returned inline with the results to the caller.

Consider the following two examples. The first one returns the exception object inline, the second a 0 in place of any exception:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

Errors can also be handled by retrying failed computations. Keyword arguments retry_delays and retry_check are passed through to retry as keyword arguments delays and check respectively. If batching is specified, and an entire batch fails, all items in the batch are retried.

Note that if both on_error and retry_delays are specified, the on_error hook is called before retrying. If on_error does not throw (or rethrow) an exception, the element will not be retried.

Example: On errors, retry f on an element a maximum of 3 times without any delay between retries.

pmap(f, c; retry_delays = zeros(3))

Example: Retry f only if the exception is not of type InexactError, with exponentially increasing delays up to 3 times. Return a NaN in place for all InexactError occurrences.

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow(e)), retry_delays = ExponentialBackOff(n = 3))
source

Base.Distributed.RemoteExceptionType

RemoteException(captured)

Exceptions on remote computations are captured and rethrown locally. A RemoteException wraps the pid of the worker and a captured exception. A CapturedException captures the remote exception and a serializable form of the call stack when the exception was raised.

source

Base.Distributed.FutureType

Future(pid::Integer=myid())

Create a Future on process pid. The default pid is the current process.

source

Base.Distributed.RemoteChannelMethod

RemoteChannel(pid::Integer=myid())

Make a reference to a Channel{Any}(1) on process pid. The default pid is the current process.

source

Base.Distributed.RemoteChannelMethod

RemoteChannel(f::Function, pid::Integer=myid())

Create references to remote channels of a specific size and type. f() is a function that when executed on pid must return an implementation of an AbstractChannel.

For example, RemoteChannel(()->Channel{Int}(10), pid), will return a reference to a channel of type Int and size 10 on pid.

The default pid is the current process.

source

Base.waitFunction

wait([x])

Block the current task until some event occurs, depending on the type of the argument:

  • RemoteChannel : Wait for a value to become available on the specified remote channel.

  • Future : Wait for a value to become available for the specified future.

  • Channel: Wait for a value to be appended to the channel.

  • Condition: Wait for notify on a condition.

  • Process: Wait for a process or process chain to exit. The exitcode field of a process can be used to determine success or failure.

  • Task: Wait for a Task to finish, returning its result value. If the task fails with an exception, the exception is propagated (re-thrown in the task that called wait).

  • RawFD: Wait for changes on a file descriptor (see poll_fd for keyword arguments and return code)

If no argument is passed, the task blocks for an undefined period. A task can only be restarted by an explicit call to schedule or yieldto.

Often wait is called within a while loop to ensure a waited-for condition is met before proceeding.

source

Base.fetchMethod

fetch(x)

Waits and fetches a value from x depending on the type of x:

  • Future: Wait for and get the value of a Future. The fetched value is cached locally. Further calls to fetch on the same reference return the cached value. If the remote value is an exception, throws a RemoteException which captures the remote exception and backtrace.

  • RemoteChannel: Wait for and get the value of a remote reference. Exceptions raised are same as for a Future .

Does not remove the item fetched.

source

Base.Distributed.remotecallMethod

remotecall(f, id::Integer, args...; kwargs...) -> Future

Call a function f asynchronously on the given arguments on the specified process. Returns a Future. Keyword arguments, if any, are passed through to f.

source

Base.Distributed.remotecall_waitMethod

remotecall_wait(f, id::Integer, args...; kwargs...)

Perform a faster wait(remotecall(...)) in one message on the Worker specified by worker id id. Keyword arguments, if any, are passed through to f.

See also wait and remotecall.

source

Base.Distributed.remotecall_fetchMethod

remotecall_fetch(f, id::Integer, args...; kwargs...)

Perform fetch(remotecall(...)) in one message. Keyword arguments, if any, are passed through to f. Any remote exceptions are captured in a RemoteException and thrown.

See also fetch and remotecall.

source

Base.Distributed.remote_doMethod

remote_do(f, id::Integer, args...; kwargs...) -> nothing

Executes f on worker id asynchronously. Unlike remotecall, it does not store the result of computation, nor is there a way to wait for its completion.

A successful invocation indicates that the request has been accepted for execution on the remote node.

While consecutive remotecalls to the same worker are serialized in the order they are invoked, the order of executions on the remote worker is undetermined. For example, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) will serialize the call to f1, followed by f2 and f3 in that order. However, it is not guaranteed that f1 is executed before f3 on worker 2.

Any exceptions thrown by f are printed to STDERR on the remote worker.

Keyword arguments, if any, are passed through to f.

source

Base.put!Method

put!(rr::RemoteChannel, args...)

Store a set of values to the RemoteChannel. If the channel is full, blocks until space is available. Returns its first argument.

source

Base.put!Method

put!(rr::Future, v)

Store a value to a Future rr. Futures are write-once remote references. A put! on an already set Future throws an Exception. All asynchronous remote calls return Futures and set the value to the return value of the call upon completion.

source

Base.take!Method

take!(rr::RemoteChannel, args...)

Fetch value(s) from a RemoteChannel rr, removing the value(s) in the processs.

source

Base.isreadyMethod

isready(rr::RemoteChannel, args...)

Determine whether a RemoteChannel has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. However, it can be safely used on a Future since they are assigned only once.

source

Base.isreadyMethod

isready(rr::Future)

Determine whether a Future has a value stored to it.

If the argument Future is owned by a different node, this call will block to wait for the answer. It is recommended to wait for rr in a separate task instead or to use a local Channel as a proxy:

c = Channel(1)
@async put!(c, remotecall_fetch(long_computation, p))
isready(c)  # will not block
source

Base.Distributed.WorkerPoolType

WorkerPool(workers::Vector{Int})

Create a WorkerPool from a vector of worker ids.

source

Base.Distributed.CachingPoolType

CachingPool(workers::Vector{Int})

An implementation of an AbstractWorkerPool. remote, remotecall_fetch, pmap (and other remote calls which execute functions remotely) benefit from caching the serialized/deserialized functions on the worker nodes, especially closures (which may capture large amounts of data).

The remote cache is maintained for the lifetime of the returned CachingPool object. To clear the cache earlier, use clear!(pool).

For global variables, only the bindings are captured in a closure, not the data. let blocks can be used to capture global data.

For example:

const foo=rand(10^8);
wp=CachingPool(workers())
let foo=foo
    pmap(wp, i->sum(foo)+i, 1:100);
end

The above would transfer foo only once to each worker.

source

Base.Distributed.default_worker_poolFunction

default_worker_pool()

WorkerPool containing idle workers() - used by remote(f) and pmap (by default).

source

Base.Distributed.clear!Method

clear!(pool::CachingPool) -> pool

Removes all cached functions from all participating workers.

source

Base.Distributed.remoteFunction

remote([::AbstractWorkerPool], f) -> Function

Returns an anonymous function that executes function f on an available worker using remotecall_fetch.

source

Base.Distributed.remotecallMethod

remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool variant of remotecall(f, pid, ....). Waits for and takes a free worker from pool and performs a remotecall on it.

source

Base.Distributed.remotecall_waitMethod

remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool variant of remotecall_wait(f, pid, ....). Waits for and takes a free worker from pool and performs a remotecall_wait on it.

source

Base.Distributed.remotecall_fetchMethod

remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool variant of remotecall_fetch(f, pid, ....). Waits for and takes a free worker from pool and performs a remotecall_fetch on it.

source

Base.Distributed.remote_doMethod

remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

WorkerPool variant of remote_do(f, pid, ....). Waits for and takes a free worker from pool and performs a remote_do on it.

source

Base.timedwaitFunction

timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)

Waits until testcb returns true or for secs seconds, whichever is earlier. testcb is polled every pollint seconds.

source

Base.Distributed.@spawnMacro

@spawn

Creates a closure around an expression and runs it on an automatically-chosen process, returning a Future to the result.

source

Base.Distributed.@spawnatMacro

@spawnat

Accepts two arguments, p and an expression. A closure is created around the expression and run asynchronously on process p. Returns a Future to the result.

source

Base.Distributed.@fetchMacro

@fetch

Equivalent to fetch(@spawn expr). See fetch and @spawn.

source

Base.Distributed.@fetchfromMacro

@fetchfrom

Equivalent to fetch(@spawnat p expr). See fetch and @spawnat.

source

Base.@asyncMacro

@async

Like @schedule, @async wraps an expression in a Task and adds it to the local machine's scheduler queue. Additionally it adds the task to the set of items that the nearest enclosing @sync waits for.

source

Base.@syncMacro

@sync

Wait until all dynamically-enclosed uses of @async, @spawn, @spawnat and @parallel are complete. All exceptions thrown by enclosed async operations are collected and thrown as a CompositeException.

source

Base.Distributed.@parallelMacro

@parallel

A parallel for loop of the form :

@parallel [reducer] for var = range
    body
end

The specified range is partitioned and locally executed across all workers. In case an optional reducer function is specified, @parallel performs local reductions on each worker with a final reduction on the calling process.

Note that without a reducer function, @parallel executes asynchronously, i.e. it spawns independent tasks on all available workers and returns immediately without waiting for completion. To wait for completion, prefix the call with @sync, like :

@sync @parallel for var = range
    body
end
source

Base.Distributed.@everywhereMacro

@everywhere expr

Execute an expression under Main everywhere. Equivalent to calling eval(Main, expr) on all processes. Errors on any of the processes are collected into a CompositeException and thrown. For example :

@everywhere bar=1

will define Main.bar on all processes.

Unlike @spawn and @spawnat, @everywhere does not capture any local variables. Prefixing @everywhere with @eval allows us to broadcast local variables using interpolation :

foo = 1
@eval @everywhere bar=$foo

The expression is evaluated under Main irrespective of where @everywhere is called from. For example :

module FooBar
    foo() = @everywhere bar()=myid()
end
FooBar.foo()

will result in Main.bar being defined on all processes and not FooBar.bar.

source

Base.Distributed.clear!Method

clear!(syms, pids=workers(); mod=Main)

Clears global bindings in modules by initializing them to nothing. syms should be of type Symbol or a collection of Symbols . pids and mod identify the processes and the module in which global variables are to be reinitialized. Only those names found to be defined under mod are cleared.

An exception is raised if a global constant is requested to be cleared.

source

Base.Distributed.remoteref_idFunction

Base.remoteref_id(r::AbstractRemoteRef) -> RRID

Futures and RemoteChannels are identified by fields:

  • where - refers to the node where the underlying object/storage referred to by the reference actually exists.

  • whence - refers to the node the remote reference was created from. Note that this is different from the node where the underlying object referred to actually exists. For example calling RemoteChannel(2) from the master process would result in a where value of 2 and a whence value of 1.

  • id is unique across all references created from the worker specified by whence.

Taken together, whence and id uniquely identify a reference across all workers.

Base.remoteref_id is a low-level API which returns a Base.RRID object that wraps whence and id values of a remote reference.

source

Base.Distributed.channel_from_idFunction

Base.channel_from_id(id) -> c

A low-level API which returns the backing AbstractChannel for an id returned by remoteref_id. The call is valid only on the node where the backing channel exists.

source

Base.Distributed.worker_id_from_socketFunction

Base.worker_id_from_socket(s) -> pid

A low-level API which given a IO connection or a Worker, returns the pid of the worker it is connected to. This is useful when writing custom serialize methods for a type, which optimizes the data written out depending on the receiving process id.

source
Base.cluster_cookie() -> cookie

Returns the cluster cookie.

source
Base.cluster_cookie(cookie) -> cookie

Sets the passed cookie as the cluster cookie, then returns it.

source

Shared Arrays

Base.SharedArrayType

SharedArray{T}(dims::NTuple; init=false, pids=Int[])
SharedArray{T,N}(...)

Construct a SharedArray of a bits type T and size dims across the processes specified by pids - all of which have to be on the same host. If N is specified by calling SharedArray{T,N}(dims), then N must match the length of dims.

If pids is left unspecified, the shared array will be mapped across all processes on the current host, including the master. But, localindexes and indexpids will only refer to worker processes. This facilitates work distribution code to use workers for actual computation with the master process acting as a driver.

If an init function of the type initfn(S::SharedArray) is specified, it is called on all the participating workers.

The shared array is valid as long as a reference to the SharedArray object exists on the node which created the mapping.

SharedArray{T}(filename::AbstractString, dims::NTuple, [offset=0]; mode=nothing, init=false, pids=Int[])
SharedArray{T,N}(...)

Construct a SharedArray backed by the file filename, with element type T (must be a bits type) and size dims, across the processes specified by pids - all of which have to be on the same host. This file is mmapped into the host memory, with the following consequences:

  • The array data must be represented in binary format (e.g., an ASCII format like CSV cannot be supported)

  • Any changes you make to the array values (e.g., A[3] = 0) will also change the values on disk

If pids is left unspecified, the shared array will be mapped across all processes on the current host, including the master. But, localindexes and indexpids will only refer to worker processes. This facilitates work distribution code to use workers for actual computation with the master process acting as a driver.

mode must be one of "r", "r+", "w+", or "a+", and defaults to "r+" if the file specified by filename already exists, or "w+" if not. If an init function of the type initfn(S::SharedArray) is specified, it is called on all the participating workers. You cannot specify an init function if the file is not writable.

offset allows you to skip the specified number of bytes at the beginning of the file.

source

Base.Distributed.procsMethod

procs(S::SharedArray)

Get the vector of processes mapping the shared array.

source

Base.sdataFunction

sdata(S::SharedArray)

Returns the actual Array object backing S.

source

Base.indexpidsFunction

indexpids(S::SharedArray)

Returns the current worker's index in the list of workers mapping the SharedArray (i.e. in the same list returned by procs(S)), or 0 if the SharedArray is not mapped locally.

source

Base.localindexesFunction

localindexes(S::SharedArray)

Returns a range describing the "default" indexes to be handled by the current process. This range should be interpreted in the sense of linear indexing, i.e., as a sub-range of 1:length(S). In multi-process contexts, returns an empty range in the parent process (or any process for which indexpids returns 0).

It's worth emphasizing that localindexes exists purely as a convenience, and you can partition work on the array among workers any way you wish. For a SharedArray, all indexes should be equally fast for each worker process.

source

Multi-Threading

This experimental interface supports Julia's multi-threading capabilities. Types and functions described here might (and likely will) change in the future.

Base.Threads.threadidFunction

Threads.threadid()

Get the ID number of the current thread of execution. The master thread has ID 1.

source

Base.Threads.nthreadsFunction

Threads.nthreads()

Get the number of threads available to the Julia process. This is the inclusive upper bound on threadid().

source

Base.Threads.@threadsMacro

Threads.@threads

A macro to parallelize a for-loop to run with multiple threads. This spawns nthreads() number of threads, splits the iteration space amongst them, and iterates in parallel. A barrier is placed at the end of the loop which waits for all the threads to finish execution, and the loop returns.

source

Base.Threads.AtomicType

Threads.Atomic{T}

Holds a reference to an object of type T, ensuring that it is only accessed atomically, i.e. in a thread-safe manner.

Only certain "simple" types can be used atomically, namely the primitive integer and float-point types. These are Int8...Int128, UInt8...UInt128, and Float16...Float64.

New atomic objects can be created from a non-atomic values; if none is specified, the atomic object is initialized with zero.

Atomic objects can be accessed using the [] notation:

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> x[] = 1
1

julia> x[]
1

Atomic operations use an atomic_ prefix, such as atomic_add!, atomic_xchg!, etc.

source

Base.Threads.atomic_cas!Function

Threads.atomic_cas!{T}(x::Atomic{T}, cmp::T, newval::T)

Atomically compare-and-set x

Atomically compares the value in x with cmp. If equal, write newval to x. Otherwise, leaves x unmodified. Returns the old value in x. By comparing the returned value to cmp (via ===) one knows whether x was modified and now holds the new value newval.

For further details, see LLVM's cmpxchg instruction.

This function can be used to implement transactional semantics. Before the transaction, one records the value in x. After the transaction, the new value is stored only if x has not been modified in the mean time.

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 4, 2);

julia> x
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 3, 2);

julia> x
Base.Threads.Atomic{Int64}(2)
source

Base.Threads.atomic_xchg!Function

Threads.atomic_xchg!{T}(x::Atomic{T}, newval::T)

Atomically exchange the value in x

Atomically exchanges the value in x with newval. Returns the old value.

For further details, see LLVM's atomicrmw xchg instruction.

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_xchg!(x, 2)
3

julia> x[]
2
source

Base.Threads.atomic_add!Function

Threads.atomic_add!{T}(x::Atomic{T}, val::T)

Atomically add val to x

Performs x[] += val atomically. Returns the old value.

For further details, see LLVM's atomicrmw add instruction.

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5
source

Base.Threads.atomic_sub!Function

Threads.atomic_sub!{T}(x::Atomic{T}, val::T)

Atomically subtract val from x

Performs x[] -= val atomically. Returns the old value.

For further details, see LLVM's atomicrmw sub instruction.

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_sub!(x, 2)
3

julia> x[]
1
source

Base.Threads.atomic_and!Function

Threads.atomic_and!{T}(x::Atomic{T}, val::T)

Atomically bitwise-and x with val

Performs x[] &= val atomically. Returns the old value.

For further details, see LLVM's atomicrmw and instruction.

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_and!(x, 2)
3

julia> x[]
2
source

Base.Threads.atomic_nand!Function

Threads.atomic_nand!{T}(x::Atomic{T}, val::T)

Atomically bitwise-nand (not-and) x with val

Performs x[] = ~(x[] & val) atomically. Returns the old value.

For further details, see LLVM's atomicrmw nand instruction.

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_nand!(x, 2)
3

julia> x[]
-3
source

Base.Threads.atomic_or!Function

Threads.atomic_or!{T}(x::Atomic{T}, val::T)

Atomically bitwise-or x with val

Performs x[] |= val atomically. Returns the old value.

For further details, see LLVM's atomicrmw or instruction.

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_or!(x, 7)
5

julia> x[]
7
source

Base.Threads.atomic_xor!Function

Threads.atomic_xor!{T}(x::Atomic{T}, val::T)

Atomically bitwise-xor (exclusive-or) x with val

Performs x[] $= val atomically. Returns the old value.

For further details, see LLVM's atomicrmw xor instruction.

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_xor!(x, 7)
5

julia> x[]
2
source

Base.Threads.atomic_max!Function

Threads.atomic_max!{T}(x::Atomic{T}, val::T)

Atomically store the maximum of x and val in x

Performs x[] = max(x[], val) atomically. Returns the old value.

For further details, see LLVM's atomicrmw max instruction.

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_max!(x, 7)
5

julia> x[]
7
source

Base.Threads.atomic_min!Function

Threads.atomic_min!{T}(x::Atomic{T}, val::T)

Atomically store the minimum of x and val in x

Performs x[] = min(x[], val) atomically. Returns the old value.

For further details, see LLVM's atomicrmw min instruction.

julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)

julia> Threads.atomic_min!(x, 5)
7

julia> x[]
5
source

Base.Threads.atomic_fenceFunction

Threads.atomic_fence()

Insert a sequential-consistency memory fence

Inserts a memory fence with sequentially-consistent ordering semantics. There are algorithms where this is needed, i.e. where an acquire/release ordering is insufficient.

This is likely a very expensive operation. Given that all other atomic operations in Julia already have acquire/release semantics, explicit fences should not be necessary in most cases.

For further details, see LLVM's fence instruction.

source

ccall using a threadpool (Experimental)

Base.@threadcallMacro

@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

The @threadcall macro is called in the same way as ccall but does the work in a different thread. This is useful when you want to call a blocking C function without causing the main julia thread to become blocked. Concurrency is limited by size of the libuv thread pool, which defaults to 4 threads but can be increased by setting the UV_THREADPOOL_SIZE environment variable and restarting the julia process.

Note that the called function should never call back into Julia.

source

Synchronization Primitives

Base.Threads.AbstractLockType

AbstractLock

Abstract supertype describing types that implement the thread-safe synchronization primitives: lock, trylock, unlock, and islocked

source

Base.lockFunction

lock(the_lock)

Acquires the lock when it becomes available. If the lock is already locked by a different task/thread, it waits for it to become available.

Each lock must be matched by an unlock.

source

Base.unlockFunction

unlock(the_lock)

Releases ownership of the lock.

If this is a recursive lock which has been acquired before, it just decrements an internal counter and returns immediately.

source

Base.trylockFunction

trylock(the_lock) -> Success (Boolean)

Acquires the lock if it is available, returning true if successful. If the lock is already locked by a different task/thread, returns false.

Each successful trylock must be matched by an unlock.

source

Base.islockedFunction

islocked(the_lock) -> Status (Boolean)

Check whether the lock is held by any task/thread. This should not be used for synchronization (see instead trylock).

source

Base.ReentrantLockType

ReentrantLock()

Creates a reentrant lock for synchronizing Tasks. The same task can acquire the lock as many times as required. Each lock must be matched with an unlock.

This lock is NOT threadsafe. See Threads.Mutex for a threadsafe lock.

source

Base.Threads.MutexType

Mutex()

These are standard system mutexes for locking critical sections of logic.

On Windows, this is a critical section object, on pthreads, this is a pthread_mutex_t.

See also SpinLock for a lighter-weight lock.

source

Base.Threads.SpinLockType

SpinLock()

Creates a non-reentrant lock. Recursive use will result in a deadlock. Each lock must be matched with an unlock.

Test-and-test-and-set spin locks are quickest up to about 30ish contending threads. If you have more contention than that, perhaps a lock is the wrong way to synchronize.

See also RecursiveSpinLock for a version that permits recursion.

See also Mutex for a more efficient version on one core or if the lock may be held for a considerable length of time.

source

Base.Threads.RecursiveSpinLockType

RecursiveSpinLock()

Creates a reentrant lock. The same thread can acquire the lock as many times as required. Each lock must be matched with an unlock.

See also SpinLock for a slightly faster version.

See also Mutex for a more efficient version on one core or if the lock may be held for a considerable length of time.

source

Base.SemaphoreType

Semaphore(sem_size)

Creates a counting semaphore that allows at most sem_size acquires to be in use at any time. Each acquire must be mached with a release.

This construct is NOT threadsafe.

source

Base.acquireFunction

acquire(s::Semaphore)

Wait for one of the sem_size permits to be available, blocking until one can be acquired.

source

Base.releaseFunction

release(s::Semaphore)

Return one permit to the pool, possibly allowing another task to acquire it and resume execution.

source

Cluster Manager Interface

This interface provides a mechanism to launch and manage Julia workers on different cluster environments. There are two types of managers present in Base: LocalManager, for launching additional workers on the same host, and SSHManager, for launching on remote hosts via ssh. TCP/IP sockets are used to connect and transport messages between processes. It is possible for Cluster Managers to provide a different transport.

Base.Distributed.launchFunction

launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

Implemented by cluster managers. For every Julia worker launched by this function, it should append a WorkerConfig entry to launched and notify launch_ntfy. The function MUST exit once all workers, requested by manager have been launched. params is a dictionary of all keyword arguments addprocs was called with.

source

Base.Distributed.manageFunction

manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

Implemented by cluster managers. It is called on the master process, during a worker's lifetime, with appropriate op values:

  • with :register/:deregister when a worker is added / removed from the Julia worker pool.

  • with :interrupt when interrupt(workers) is called. The ClusterManager should signal the appropriate worker with an interrupt signal.

  • with :finalize for cleanup purposes.

source

Base.killMethod

kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

Implemented by cluster managers. It is called on the master process, by rmprocs. It should cause the remote worker specified by pid to exit. kill(manager::ClusterManager.....) executes a remote exit() on pid.

source

Base.Distributed.init_workerFunction

init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

Called by cluster managers implementing custom transports. It initializes a newly launched process as a worker. Command line argument --worker has the effect of initializing a process as a worker using TCP/IP sockets for transport. cookie is a cluster_cookie.

source

Base.connectMethod

connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Implemented by cluster managers using custom transports. It should establish a logical connection to worker with id pid, specified by config and return a pair of IO objects. Messages from pid to current process will be read off instrm, while messages to be sent to pid will be written to outstrm. The custom transport implementation must ensure that messages are delivered and received completely and in order. connect(manager::ClusterManager.....) sets up TCP/IP socket connections in-between workers.

source

Base.Distributed.process_messagesFunction

Base.process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

Called by cluster managers using custom transports. It should be called when the custom transport implementation receives the first message from a remote worker. The custom transport must manage a logical connection to the remote worker and provide two IO objects, one for incoming messages and the other for messages addressed to the remote worker. If incoming is true, the remote peer initiated the connection. Whichever of the pair initiates the connection sends the cluster cookie and its Julia version number to perform the authentication handshake.

See also cluster_cookie.

source

© 2009–2016 Jeff Bezanson, Stefan Karpinski, Viral B. Shah, and other contributors
Licensed under the MIT License.
https://docs.julialang.org/en/release-0.6/stdlib/parallel/