Concurrency

Concurrency vs. Parallelism

The definitions of "concurrency" and "parallelism" sometimes get mixed up, but they are not the same.

A concurrent system is one that can be in charge of many tasks, although not necessarily it is executing them at the same time. You can think of yourself being in the kitchen cooking: you chop an onion, put it to fry, and while it's being fried you chop a tomato, but you are not doing all of those things at the same time: you distribute your time between those tasks. Parallelism would be to stir fry onions with one hand while with the other one you chop a tomato.

At the moment of this writing, Crystal has concurrency support but not parallelism: several tasks can be executed, and a bit of time will be spent on each of these, but two code paths are never executed at the same exact time.

A Crystal program executes in a single operating system thread, except the Garbage Collector (GC) which implements a concurrent mark-and-sweep (currently Boehm GC).

Fibers

To achieve concurrency, Crystal has fibers. A fiber is in a way similar to an operating system thread except that it's much more lightweight and its execution is managed internally by the process. So, a program will spawn multiple fibers and Crystal will make sure to execute them when the time is right.

Event loop

For everything I/O related there's an event loop. Some time-consuming operations are delegated to it, and while the event loop waits for that operation to finish the program can continue executing other fibers. A simple example of this is waiting for data to come through a socket.

Channels

Crystal has Channels inspired by CSP. They allow communicating data between fibers without sharing memory and without having to worry about locks, semaphores or other special structures.

Execution of a program

When a program starts, it fires up a main fiber that will execute your top-level code. There, one can spawn many other fibers. The components of a program are:

  • The Runtime Scheduler, in charge of executing all fibers when the time is right.
  • The Event Loop, which is just another fiber, being in charge of async tasks, like for example files, sockets, pipes, signals and timers (like doing a sleep).
  • Channels, to communicate data between fibers. The Runtime Scheduler will coordinate fibers and channels for their communication.
  • Garbage Collector: to clean up "no longer used" memory.

A Fiber

A fiber is an execution unit that is more lightweight than a thread. It's a small object that has an associated stack of 8MB, which is what is usually assigned to an operating system thread.

Fibers, unlike threads, are cooperative. Threads are pre-emptive: the operating system might interrupt a thread at any time and start executing another one. A fiber must explicitly tell the Runtime Scheduler to switch to another fiber. For example if there's I/O to be waited on, a fiber will tell the scheduler "Look, I have to wait for this I/O to be available, you continue executing other fibers and come back to me when that I/O is ready".

The advantage of being cooperative is that a lot of the overhead of doing a context switch (switching between threads) is gone.

A Fiber is much more lightweight than a thread: even though it's assigned 8MB, it starts with a small stack of 4KB.

On a 64-bit machine it lets us spawn millions and millions of fibers. In a 32-bit machine we can only spawn 512 fibers, which is not a lot. But because 32-bit machines are starting to become obsolete, we'll bet on the future and focus more on 64-bit machines.

The Runtime Scheduler

The scheduler has a queue of:

  • Fibers ready to be executed: for example when you spawn a fiber, it's ready to be executed.
  • The event loop: which is another fiber. When there are no other fibers ready to be executed, the event loop checks if there is any async operation that is ready, and then executes the fiber waiting for that operation. The event loop is currently implemented with libevent, which is an abstraction of other event mechanisms like epoll and kqueue.
  • Fibers that voluntarily asked to wait: this is done with Fiber.yield, which means "I can continue executing, but I'll give you some time to execute other fibers if you want".

Communicating data

Because at this moment there's only a single thread executing your code, accessing and modifying a class variable in different fibers will work just fine. However, once multiple threads (parallelism) is introduced in the language, it might break. That's why the recommended mechanism to communicate data is using channels and sending messages between them. Internally, a channel implements all the locking mechanisms to avoid data races, but from the outside you use them as communication primitives, so you (the user) don't have to use locks.

Sample code

Spawning a fiber

To spawn a fiber you use spawn with a block:

spawn do
  # ...
  socket.gets
  # ...
end

spawn do
  # ...
  sleep 5.seconds
  #  ...
end

Here we have two fibers: one reads from a socket and the other does a sleep. When the first fiber reaches the socket.gets line, it gets suspended, the Event Loop is told to continue executing this fiber when there's data in the socket, and the program continues with the second fiber. This fiber wants to sleep for 5 seconds, so the Event Loop is told to continue with this fiber in 5 seconds. If there aren't other fibers to execute, the Event Loop will wait until either of these events happen, without consuming CPU time.

The reason why socket.gets and sleep behave like this is because their implementations talk directly with the Runtime Scheduler and the Event Loop, there's nothing magical about it. In general, the standard library already takes care of doing all of this so you don't have to.

Note, however, that fibers don't get executed right away. For example:

spawn do
  loop do
    puts "Hello!"
  end
end

Running the above code will produce no output and exit immediately.

The reason for this is that a fiber is not executed as soon as it is spawned. So, the main fiber, the one that spawns the above fiber, finishes its execution and the program exits.

One way to solve it is to do a sleep:

spawn do
  loop do
    puts "Hello!"
  end
end

sleep 1.second

This program will now print "Hello!" for one second and then exit. This is because the sleep call will schedule the main fiber to be executed in a second, and then executes another "ready to execute" fiber, which in this case is the one above.

Another way is this:

spawn do
  loop do
    puts "Hello!"
  end
end

Fiber.yield

This time Fiber.yield will tell the scheduler to execute the other fiber. This will print "Hello!" until the standard output blocks (the system call will tell us we have to wait until the output is ready), and then execution continues with the main fiber and the program exits. Here the standard output might never block so the program will continue executing forever.

If we want to execute the spawned fiber for ever, we can use sleep without arguments:

spawn do
  loop do
    puts "Hello!"
  end
end

sleep

Of course the above program can be written without spawn at all, just with a loop. sleep is more useful when spawning more than one fiber.

Spawning a call

You can also spawn by passing a method call instead of a block. To understand why this is useful, let's look at this example:

i = 0
while i < 10
  spawn do
    puts(i)
  end
  i += 1
end

Fiber.yield

The above program prints "10" ten times. The problem is that there's only one variable i that all spawned fibers refer to, and when Fiber.yield is executed its value is 10.

To solve this, we can do this:

i = 0
while i < 10
  proc = ->(x : Int32) do
    spawn do
      puts(x)
    end
  end
  proc.call(i)
  i += 1
end

Fiber.yield

Now it works because we are creating a Proc and we invoke it passing i, so the value gets copied and now the spawned fiber receives a copy.

To avoid all this boilerplate, the standard library provides a spawn macro that accepts a call expression and basically rewrites it to do the above. Using it, we end up with:

i = 0
while i < 10
  spawn puts(i)
  i += 1
end

Fiber.yield

This is mostly useful with local variables that change at iterations. This doesn't happen with block arguments. For example, this works as expected:

10.times do |i|
  spawn do
    puts i
  end
end

Fiber.yield

Spawning a fiber and waiting for it to complete

We can use a channel for this:

channel = Channel(Nil).new

spawn do
  puts "Before send"
  channel.send(nil)
  puts "After send"
end

puts "Before receive"
channel.receive
puts "After receive"

This prints:

Before receive
Before send
After receive

First, the program spawns a fiber but doesn't execute it yet. When we invoke channel.receive, the main fiber blocks and execution continues with the spawned fiber. Then channel.send(nil) is invoked, and so execution continues at channel.receive, which was waiting for a value. Then the main fiber continues executing and finishes, so the program exits without giving the other fiber a chance to print "After send".

In the above example we used nil just to communicate that the fiber ended. We can also use channels to communicate values between fibers:

channel = Channel(Int32).new

spawn do
  puts "Before first send"
  channel.send(1)
  puts "Before second send"
  channel.send(2)
end

puts "Before first receive"
value = channel.receive
puts value # => 1

puts "Before second receive"
value = channel.receive
puts value # => 2

Output:

Before first receive
Before first send
1
Before second receive
Before second send
2

Note that when the program executes a receive, that fiber blocks and execution continues with the other fiber. When send is executed, execution continues with the fiber that was waiting on that channel.

Here we are sending literal values, but the spawned fiber might compute this value by, for example, reading a file, or getting it from a socket. When this fiber will have to wait for I/O, other fibers will be able to continue executing code until I/O is ready, and finally when the value is ready and sent through the channel, the main fiber will receive it. For example:

require "socket"

channel = Channel(String).new

spawn do
  server = TCPServer.new("0.0.0.0", 8080)
  socket = server.accept
  while line = socket.gets
    channel.send(line)
  end
end

spawn do
  while line = gets
    channel.send(line)
  end
end

3.times do
  puts channel.receive
end

The above program spawns two fibers. The first one creates a TCPServer, accepts one connection and reads lines from it, sending them to the channel. There's a second fiber reading lines from standard input. The main fiber reads the first 3 messages sent to the channel, either from the socket or stdin, then the program exits. The gets calls will block the fibers and tell the Event Loop to continue from there if data comes.

Likewise, we can wait for multiple fibers to complete execution, and gather their values:

channel = Channel(Int32).new

10.times do |i|
  spawn do
    channel.send(i * 2)
  end
end

sum = 0
10.times do
  sum += channel.receive
end
puts sum # => 90

You can, of course, use receive inside a spawned fiber:

channel = Channel(Int32).new

spawn do
  puts "Before send"
  channel.send(1)
  puts "After send"
end

spawn do
  puts "Before receive"
  puts channel.receive
  puts "After receive"
end

puts "Before yield"
Fiber.yield
puts "After yield"

Output:

Before yield
Before send
Before receive
1
After receive
After send
After yield

Here channel.send is executed first, but since there's no one waiting for a value (yet), execution continues in other fibers. The second fiber is executed, there's a value on the channel, it's obtained, and execution continues, first with the first fiber, then with the main fiber, because Fiber.yield puts a fiber at the end of the execution queue.

Buffered channels

The above examples use unbuffered channels: when sending a value, if a fiber is waiting on that channel then execution continues on that fiber.

With a buffered channel, invoking send won't switch to another fiber unless the buffer is full:

# A buffered channel of capacity 2
channel = Channel(Int32).new(2)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "After send"
end

3.times do |i|
  puts channel.receive
end

Output:

Before send 1
Before send 2
Before send 3
After send
1
2
3

Note that the first send does not occupy space in the channel. This is because there is a receive invoked prior to the first send whereas the other 2 send invocations take place before their respective receive. The number of send calls do not exceed the bounds of the buffer and so the send fiber runs uninterrupted to completion.

Here's an example where all space in the buffer gets occupied:

# A buffered channel of capacity 1
channel = Channel(Int32).new(1)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "End of send fiber"
end

3.times do |i|
  puts channel.receive
end

Output:

Before send 1
Before send 2
Before send 3
1
2
3

Note that "End of send fiber" does not appear in the output because we receive the 3 send calls which means 3.times runs to completion and in turn unblocks the main fiber which executes to completion.

Here's the same snippet as the one we just saw - with the addition of a Fiber.yield call at the very bottom:

# A buffered channel of capacity 1
channel = Channel(Int32).new(1)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "End of send fiber"
end

3.times do |i|
  puts channel.receive
end

Fiber.yield

Output:

Before send 1
Before send 2
Before send 3
1
2
3
End of send fiber

With the addition of a Fiber.yield call at the end of the snippet we see the "End of send fiber" message in the output which would have otherwise been missed due to the main fiber executing to completion.

To the extent possible under law, the persons who contributed to this workhave waived
all copyright and related or neighboring rights to this workby associating CC0 with it.
https://crystal-lang.org/reference/guides/concurrency.html