Introduction

Don't block the event loop! Don't poll in a busy loop! Increase throughput! Use async I/O! Concurrency is not parallelism!

You've most likely heard and read claims like these many times before, and maybe, at some point you've thought you understood everything only to find yourself confused a moment later. Especially when you want to understand how it works on a fundamental level.

Me too.

So I spent a couple of hundred hours trying to fix that for myself. I wrote this book as a result of that research, and now I invite you to join me as we try to unveil the secrets of async programming.

This book aims to take a look at the why and how of concurrent programming. First we build a good foundation of basic knowledge, before we use that knowledge to investigate how Node.js works by building a Node-inspired runtime.

This book is developed in the open and has its repository here. The book and the accompanying code is MIT licensed so feel free to clone away and play with it.

I warn you though, we need to venture from philosophical heights where we try to formally define a "task" all the way down to the deep waters where firmware and other strange creatures rule (I believe some of the more wicked creatures there are tasked with naming low level OS syscalls and structures on Windows. However, I have yet to confirm this).

Everything in this book will cover the topics for the three major Operating Systems Linux, macOS and Windows. We'll also only cover the details on how this works on 64 bit systems.

Who is this book for?

I originally started out wanting to explore the fundamentals and inner workings of Rust's Futures. Reading through RFCs, motivations and discussions I realized that to really understand the why and how of Rust's Futures, I needed a very good understanding of how async code works in general, and the different strategies to handle it.

This book might be interesting if you:

  • Want to take a deep dive into what concurrency is and strategies on how to deal with it

  • Are curious on how to make syscalls on three different platforms, and do it on three different abstraction levels.

  • Want to know more about how the OS, CPU and hardware handles concurrency.

  • Want to learn the basics of Epoll, Kqueue and IOCP.

  • Think using our research to write a toy node.js runtime is pretty cool.

  • Want to know more about what the Node event loop really is, and why most diagrams of it on the web are pretty misleading.

  • Already know some Rust but want to learn more.

So, what do you think? Is the answer yes to some of these questions? Well, then join me on this venture as we try to get a better understanding of all these subjects.

We'll only use Rust's standard library. The reason for this is that we really want to know how things work, and Rust's standard library strikes the perfect balance for this task providing abstractions but they're thin enough to let us easily peek under the covers and see what really happens.

Following along

Even though I use mdbook, which has the nice benefit of being able to run the code we write directly in the book, we're working with I/O and cross platform syscalls in this book which is not a good fit for the Rust playground.

My best recommendation is to create a project on your local computer and follow along by copying the code over and run it locally.

You can also clone or download the example code from the git repository

Prerequisites

You don't have to be a Rust programmer to follow along. This book will have numerous chapters where we explore concepts, and where the code examples are small and easy to understand, but it will be more code towards the end and you'll get the most out of it by learning the basics first. In this case The Rust Programming Language is the best place to start.

I do recommend that you read my book preceding this Green threads explained in 200 lines of Rust since I cover quite a bit about Rust basics, stacks, threads and inline assembly there and will not repeat everything here. However, it's definitely not a must.

You will find everything you need to set up Rust here

Disclaimer

  1. We'll implement a toy version of the Node.js event loop (a bad, but working and conceptually similar event loop)

  2. We'll not primarily focus on code quality and safety. I will focus on understanding the concepts and ideas behind the code. We will have to make many shortcuts to keep this concise and short.

  3. I will however do my best to point out hazards and the shortcuts we make. I will try to point out obvious places we could do a better job or take big shortcuts.

Even though we cover some complex topics we'll have to simplify them significantly to be able to learn anything from them in a small(ish) book. You can probably spend the better part of a career becoming an expert in several of the fields we cover, so forgive me already now for not being able to cover all of them with the precision, thoroughness and respect they deserve.

Credits

Substantial contributions will be credited here.

Contributing

I have no other interest in this than to share knowledge that can be hard to come by and make it easier for the next curious person to understand. If you want to contribute to make this better there are two places to go:

  1. The base repo for this book for all feedback and content changes
  2. The base repo for the code example we use for all improvements to the example code

Everything from spelling mistakes to correcting errors or inaccuracies are greatly appreciated. It will only make this book better for the next person reading it.

Why I wrote this and its companion books

This started as a wish to write an article about Rust's Futures 3.0. The result so far is 3 books about concurrency in general and hopefully, at some point a fourth about Rust's Futures exclusively.

This process has also made me realize why I have vague memories from my childhood of threats being made about stopping the car and letting me off if I didn't stop asking "why?" to everything.

Basically, the list below is a result of this urge to understand why while reading the RFCs and discussions about Rust's async story:

What's the difference between concurrency and parallelism?

Right off the bat, we'll dive into this subject by defining what concurrency is. Since it is quite easy to confuse "concurrent" with "parallel", we will try to make a clear distinction between the two from the get-go.

Concurrency is about dealing with a lot of things at the same time.

Parallelism is about doing a lot of things at the same time.

We call the concept of progressing multiple tasks at the same time Multitasking. There are two ways to multitask. One is by progressing tasks concurrently, but not at the same time. Another is to progress tasks at the exact same time in parallel.

parallel vs concurrency

Lets start off with some definitions

Resource

Something we need to be able to progress a task. Our resources are limited. This could be CPU time or memory.

Task

A set of operations that requires some kind of resource to progress. A task must consist of several sub-operations.

Parallel

Something happening independently at the exact same time.

Concurrent

Tasks that are in progress at the same time, but not necessarily progressing simultaneously.

This is an important distinction. If two tasks are running concurrently, but are not running in parallel, they must be able to stop and resume their progress. We say that a task is interruptable if it allows for this kind of concurrency.

The mental model I use.

I firmly believe the main reason we find parallel and concurrent programming hard to reason about stems from how we model events in our everyday life. We tend to define these terms loosely so our intuition is often wrong.

It doesn't help that concurrent is defined in the dictionary as: operating or occurring at the same time which doesn't really help us much when trying to describe how it differs from parallel

For me, this first clicked when I started to understand why we want to make a distinction between parallel and concurrent in the first place!

The why has everything to do with resource utilization and efficiency.

Efficiency is the (often measurable) ability to avoid wasting materials, energy, efforts, money, and time in doing something or in producing a desired result.

Parallelism

Is increasing the resources we use to solve a task. It has nothing to do with efficiency.

Concurrency

Has everything to do with efficiency and resource utilization. Concurrency can never make one single task go faster. It can only help us utilize our resources better and thereby finish a set of tasks faster.

Let's draw some parallels to process economics

In businesses that manufacture goods, we often talk about LEAN processes. And this is pretty easy to compare with why programmers care so much about what we can achieve if we handle tasks concurrently.

I'll let let this 3 minute video explain it for me:

OK, so it's not the newest video on the subject, but it explains a lot in 3 minutes. Most importantly the gains we try to achieve when applying LEAN techniques, and most importantly: eliminate waiting and non-value-adding tasks.

In programming we could say that we want to avoid blocking and polling (in a busy loop).

Now would adding more resources (more workers) help in the video above? Yes, but we use double the resources to produce the same output as one person with an optimal process could do. That's not the best utilization of our resources.

To continue the parallel we started, we could say that we could solve the problem of a freezing UI while waiting for an I/O event to occur by spawning a new thread and poll in a loop or block there instead of our main thread. However, that new thread is either consuming resources doing nothing, or worse, using one core to busy loop while checking if an event is ready. Either way, it's not optimal, especially if you run a server you want to utilize fully.

If you consider the coffee machine as some I/O resource, we would like to start that process, then move on to preparing the next job, or do other work that needs to be done instead of waiting.

But that means there are things happening in parallel here?

Yes, the coffee machine is doing work while the "worker" is doing maintenance and filling water. But this is the crux: Our reference frame is the worker, not the whole system. The guy making coffee is your code.

It's the same when you make a database query. After you've sent the query to the database server, the CPU on the database server will be working on your request while you wait for a response. In practice, it's a way of parallelizing your work.

Concurrency is about working smarter. Parallelism is a way of throwing more resources at the problem.

Concurrency and its relation to I/O

As you might understand from what I've written so far, writing async code mostly makes sense when you need to be smart to make optimal use of your resources.

Now, if you write a program that is working hard to solve a problem, there often is no help in concurrency, this is where parallelism comes into play since it gives you a way to throw more resources at the problem if you can split it into parts that you can work on in parallel.

I can see two major use cases for concurrency:

  1. When performing I/O and you need to wait for some external event to occur
  2. When you need to divide your attention and prevent one task from waiting too long

The first is the classic I/O example: you have to wait for a network call, a database query or something else to happen before you can progress a task. However, you have many tasks to do so instead of waiting you continue work elsewhere and either check in regularly to see if the task is ready to progress or make sure you are notified when that task is ready to progress.

The second is an example that is often the case when having a UI. Let's pretend you only have one core. How do you prevent the whole UI from becoming unresponsive while performing other CPU intensive tasks?

Well, you can stop whatever task you're doing every 16ms, and run the "update UI" task, and then resume whatever you were doing afterwards. This way, you will have to stop/resume your task 60 times a second, but you will also have a fully responsive UI which has roughly a 60 Hz refresh rate.

What about threads provided by the OS?

We'll cover threads a bit more when we talk about strategies for handling I/O, but I'll mention them here as well. One challenge when using OS threads to understand concurrency is that they appear to be mapped to cores. That's not necessarily a correct mental model to use even though most operating systems will try to map one thread to one core up to the number of threads is equal to the number of cores.

Once we create more threads than there are cores, the OS will switch between our threads and progress each of them concurrently using the scheduler to give each thread some time to run. And you also have to consider the fact that your program is not the only one running on the system. Other programs might spawn several threads as well which means there will be many more threads than there are cores on the CPU.

Therefore, threads can be a means to perform tasks in parallel, but they can also be a means to achieve concurrency.

This brings me over to the last part about concurrency. It needs to be defined in some sort of reference frame.

Changing the reference frame

When you write code that is perfectly synchronous from your perspective, stop for a second and consider how that looks from the operating system perspective.

The Operating System might not run your code from start to end at all. It might stop and resume your process many times. The CPU might get interrupted and handle some inputs while you think it's only focused on your task.

So synchronous execution is only an illusion. But from the perspective of you as a programmer, it's not, and that is the important takeaway:

When we talk about concurrency without providing any other context we are using you as a programmer and your code (your process) as the reference frame. If you start pondering about concurrency without keeping this in the back of your head it will get confusing very fast.

The reason I spend so much time on this is that once you realize that, you'll start to see that some of the things you hear and learn that might seem contradicting really is not. You'll just have to consider the reference frame first.

If this still sounds complicated, I understand. Just sitting and reflecting about concurrency is difficult, but if we try to keep these thoughts in the back of our head when we work with async code I promise it will get less and less confusing.

Async history

In the beginning, computers had one CPU and it executed a set of instructions written by a programmer one by one. No scheduling, no threads, no multitasking. This was how computers worked for a long time. We're talking back when a program looked like a deck of these:

Image

There were operating systems being researched even very early and when personal computing started to grow in the 80s, operating systems like DOS were the standard on most consumer PCs.

These operating systems usually yielded control of the entire CPU to the program currently executing and it was up to the programmer to make things work and implement any kind of multitasking for their program. This worked fine, but as interactive UIs using a mouse and windowed operating systems became the norm, this model simply couldn't work anymore.

Non-Preemptive multitasking

The first method used to be able to keep a UI interactive (and running background processes), was accomplished by what we call non-preemptive multitasking.

This kind of multitasking put the responsibility of letting the OS run other tasks like responding to input from the mouse, or running a background task in the hands of the programmer.

Typically the programmer yielded control to the OS.

Besides off-loading a huge responsibility to every programmer writing a program for your platform, this was naturally error-prone. A small mistake in a programs code could halt or crash the entire system.

If you remember Windows 95, you also remember the times when a window hung and you could paint the entire screen with it (almost the same way as the end in Solitaire, the card game that came with Windows).

This was reportedly a typical error in the code that was supposed to yield control to the operating system.

Preemptive multitasking

While non-preemptive multitasking sounded like a good idea, it turned out to create serious problems as well. Letting every program and programmer out there be responsible for having a responsive UI in an operating system can ultimately lead to a bad user experience since every bug out there could halt the entire system.

The solution was to place the responsibility of scheduling the CPU resources between the programs that requested it (including to OS itself) in the hands of the OS. The OS can stop the execution of a process, do something else, and switch back.

On a single-core machine, you can visualize this as running a program you wrote, the OS has to stop your program to update the mouse position before it switches back to your program to continue. This happens so frequently that we don't observe any difference whether the CPU has a lot of work or is idle.

The OS is then responsible for scheduling tasks and does this by switching contexts on the CPU. This process can happen many times each second, not only to keep the UI responsive but it can also give some time to other background tasks and IO events.

This is now the prevailing way to design an operating system.

If you want to learn more about this kind of threaded multitasking I recommend reading my previous book about green threads. This is a nice introduction where you'll be able to get the basic knowledge you need about threads, contexts, stacks and scheduling.

Hyperthreading

As CPUs evolved and added more functionality, such as several ALUs (Arithmetic Logic Units), and additional logic units, the CPU manufacturers realized that the entire CPU was never utilized fully. For example, when an operation only required some parts of the CPU, an instruction could be run on the ALU simultaneously. This became the start of Hyperthreading.

You see, on your computer today that it has i.e. 6 cores, and 12 logical cores. This is exactly where Hyperthreading comes in. It "simulates" two cores on the same core by using unused parts of the CPU to drive progress on thread "2" simultaneously as it's running the code on thread "1". It does this by using a number of smart tricks (like the one with the ALU).

Now, having hyperthreading, we could actually offload some work on one thread while keeping the UI interactive by responding to events in the second thread even though we only had one CPU core thereby utilizing our hardware better.

You might wonder about the performance of Hyper Threading?

It turns out that Hyperthreading has been continuously improved since the '90s. Since you're not actually running two CPU's there will be some operations that need to wait for each other to finish. The performance gain of hyperthreading compared to multithreading in a single core seems to be somewhere close to 30 % but it largely depends on the workload.

Multicore processors

As most know, the clock frequency of processors has been flat for a long time. Processors get faster by improving caches, branch prediction, speculative execution and working on the processing pipelines of the processors, but the gains seems to be diminishing.

On the other hand, new processors are so small they allow us to have many on the same chip instead. Now, most CPUs have many cores, and most often each core will also have the ability to perform hyperthreading.

So how synchronous is the code you write, really?

Like many things, this depends on your perspective. From the perspective of your process and the code you write for it, everything will normally happen in the order you write it.

From the OS perspective, it might, or might not, interrupt your code, pause it and run some other code in the meantime before resuming your process.

From the perspective of the CPU, it will mostly execute instructions one at a time *. They don't care who wrote the code though so when a hardware interrupt happens, they will immediately stop and give control to an interrupt handler. This is how the CPU handles concurrency.

* However, modern CPUs can also do a lot of things in parallel. Most CPUs are pipelined, meaning that the next instruction is loaded while the current is executing. It might have a branch predictor that tries to figure out what instructions to load next.

The processor can also reorder instructions by using "out of order execution" if it believes it makes things faster this way without "asking" or "telling" the programmer or the OS so you might not have any guarantee that A happens before B.

The CPU offloads some work to separate "coprocessors" like the FPU for floating-point calculations leaving the main CPU ready to do other tasks et cetera.

As a high-level overview, it's OK to model the CPU as operating in a synchronous manner, but lets for now just make a mental note that this is a model with some caveats that become especially important when talking about parallelism, synchronization primitives (like mutexes and atomics) and the security of computers and operating systems.

The Operating System

The operating system stands in the centre of everything we do as programmers (well, unless you're writing an Operating System or are in the Embedded realm), so there is no way for us to discuss any kind of fundamentals in programming without talking about operating systems in a bit of detail.

Concurrency from the operating systems' perspective

"Operating systems has been "faking" synchronous execution since the 90s."

This ties into what I talked about in the first chapter when I said that concurrent needs to be talked about within a reference frame and I explained that the OS might stop and start your process at any time.

What we call synchronous code is in most cases code that appears as synchronous to us as programmers. Neither the OS or the CPU live in a fully synchronous world.

Operating systems use preemptive multitasking and as long as the operating system you're running is preemptively scheduling processes, you won't have a guarantee that your code runs instruction by instruction without interruption.

The operating system will make sure that all important processes get some time from the CPU to make progress.

This is not as simple when we're talking about modern machines with 4-6-8-12 physical cores since you might actually execute code on one of the CPU's uninterrupted if the system is under very little load. The important part here is that you can't know for sure and there is no guarantee that your code will be left to run uninterrupted.

Teaming up with the OS.

When programming it's often easy to forget how many moving pieces that need to cooperate when we care about efficiency. When you make a web request, you're not asking the CPU or the network card to do something for you, you're asking the operating system to talk to the network card for you.

There is no way for you as a programmer to make your system optimally efficient without playing to the strengths of the operating system. You basically don't have access to the hardware directly.

However, this also means that to understand everything from the ground up, you'll also need to know how your operating system handles these tasks.

To be able to work with the operating system, we'll need to know how we can communicate with it and that's exactly what we're going to go through next.

Communicating with the operating system

In this chapter I want to dive into:

  • What a System Call is
  • Abstraction levels
  • Challenges of writing low-level cross-platform code

Syscall primer

Communication with the operating system is done through System Calls or "syscalls" as we'll call them from now on. This is a public API that the operating system provides and that programs we write in "userland" can use to communicate with the OS.

Most of the time these calls are abstracted away for us as programmers by the language or the runtime we use. A language like Rust makes it trivial to make a syscall though which we'll see below.

Now, syscalls is an example of something that is unique to the kernel you're communicating with, but the UNIX family of kernels has many similarities. UNIX systems expose this through libc.

Windows, on the other hand, uses its own API, often referred to as WinAPI, and that can be radically different from how the UNIX based systems operate.

Most often though there is a way to achieve the same things. In terms of functionality, you might not notice a big difference but as we'll see below and especially when we dig into how epoll, kqueue and IOCP work, they can differ a lot in how this functionality is implemented.

Syscall example

To get a bit more familiar with syscalls we'll implement a very basic one for the three architectures: BSD(macos), Linux and Windows. We'll also see how this is implemented in three levels of abstractions.

The syscall we'll implement is the one used when we write something to stdout since that is such a common operation and it's interesting to see how it really works.

The lowest level of abstraction

For this to work we need to write some inline assembly. We'll start by focusing on the instructions we write to the CPU.

If you want a more thorough introduction to inline assembly I can refer you to the relevant chapter in my previous book if you haven't read it already.

Now at this level of abstraction, we'll write different code for all three platforms.

On Linux and macOS the syscall we want to invoke is called write. Both systems operate based on the concept of file descriptors and stdout is one of these already present when you start a process.

On Linux a write syscall can look like this
(You can run the example by clicking "play" in the right corner)

#![feature(llvm_asm)]
fn main() {
    let message = String::from("Hello world from interrupt!\n");
    syscall(message);
}

#[cfg(target_os = "linux")]
#[inline(never)]
fn syscall(message: String) {
    let msg_ptr = message.as_ptr();
    let len = message.len();

    unsafe {
        llvm_asm!("
        mov     $$1, %rax   # system call 1 is write on Linux
        mov     $$1, %rdi   # file handle 1 is stdout
        mov     $0, %rsi    # address of string to output
        mov     $1, %rdx    # number of bytes
        syscall             # call kernel, syscall interrupt
    "
        :
        : "r"(msg_ptr), "r"(len)
        : "rax", "rdi", "rsi", "rdx"
        )
    }
}

The code to initiate the write syscall on Linux is 1 so when we write $$1 we're writing the literal value 1 to the rax register.

$$ in inline assembly using the AT&T syntax is how you write a literal value. A single $ means you're referring to a parameter so when we write $0 we're referring to the first parameter called msg_ptr. We also need to clobber the registers we write to so that we let the compiler know that we're modifying them and it can't rely on storing any values in these.

Coincidentally, placing the value 1 into the rdi register means that we're referring to stdout which is the file descriptor we want to write to. This has nothing to do with the fact that the write syscall also has the code 1.

Secondly, we pass in the address of our string buffer and the length of the buffer in the registers rsi and rdx respectively, and call the syscall instruction.

The syscall instruction is a rather new one. On the earlier 32-bit systems in the x86 architecture, you invoked a syscall by issuing a software interrupt int 0x80. A software interrupt is considered slow at the level we're working at here so later a separate instruction for it called syscall was added. The syscall instruction uses VDSO, which is a memory page attached to each process' memory, so no context switch is necessary to execute the system call.

On macOS, the syscall will look something like this:
(since the Rust playground is running Linux, we can't run this example here)

#![feature(llvm_asm)]
fn main() {
    let message = String::from("Hello world from interrupt!\n");
    syscall(message);
}

#[cfg(target_os = "macos")]
fn syscall(message: String) {
    let msg_ptr = message.as_ptr();
    let len = message.len();
    unsafe {
        llvm_asm!(
            "
        mov     $$0x2000004, %rax   # system call 0x2000004 is write on macos
        mov     $$1, %rdi           # file handle 1 is stdout
        mov     $0, %rsi            # address of string to output
        mov     $1, %rdx            # number of bytes
        syscall                     # call kernel, syscall interrupt
    "
        :
        : "r"(msg_ptr), "r"(len)
        : "rax", "rdi", "rsi", "rdx"
        )
    };
}

As you see this is not that different from the one we wrote for Linux, with the exception of the fact that syscall write has the code 0x2000004 instead of 1.

What about Windows?

This is a good opportunity to explain why writing code like we do above is a bad idea.

You see, if you want your code to work for a long time you have to worry about what guarantees the OS gives you. As far as I know, both Linux and macOS give some guarantees that for example $$0x2000004 on macOS will always refer to write (I'm not sure how strong these guarantees are though). Windows gives absolutely zero guarantees when it comes to low-level internals like this.

Windows has changed it's internals numerous times and provides no official documentation. The only thing we got is reverse engineered tables like this. That means that what was write can be changed to delete the next time you run Windows update.

The next level of abstraction

The next level of abstraction is to use the API which all three operating systems provide for us.

Already we can see that this abstraction helps us remove some code since fortunately for us, in this specific example, the syscall is the same on Linux and on macOS so we only need to worry if we're on Windows and therefore use the #[cfg(not(target_os = "windows"))] conditional compilation flag. For the Windows syscall, we do the opposite.

Using the OS provided API in Linux and macOS

You can run this code directly here in the window. However, the Rust playground runs on Linux, you'll need to copy the code over to a Windows machine if you want to try it out the code for Windows further down.

Our syscall will now look like this
(You can run this code here. It will work for both Linux and macOS)

use std::io;

fn main() {
    let sys_message = String::from("Hello world from syscall!\n");
    syscall(sys_message).unwrap();
}

// and: http://man7.org/linux/man-pages/man2/write.2.html
#[cfg(not(target_os = "windows"))]
#[link(name = "c")]
extern "C" {
    fn write(fd: u32, buf: *const u8, count: usize) -> i32;
}

#[cfg(not(target_os = "windows"))]
fn syscall(message: String) -> io::Result<()> {
    let msg_ptr = message.as_ptr();
    let len = message.len();
    let res = unsafe { write(1, msg_ptr, len) };

    if res == -1 {
        return Err(io::Error::last_os_error());
    }
    Ok(())
}

I'll explain what we just did here. I assume that the main method needs no comment.


#![allow(unused_variables)]
fn main() {
#[link(name = "c")]
}

Every Linux installation comes with a version of libc which is a C-library for communicating with the operating system. Having a libc with a consistent API means they can change the underlying implementation without breaking everyone's code. This flag tells the compiler to link to the "c" library on the system we're compiling for.


#![allow(unused_variables)]
fn main() {
extern "C" {
    fn write(fd: u32, buf: *const u8, count: usize);
}
}

extern "C" or only extern (C is assumed if nothing is specified) means we're linking to specific functions in the "c" library using the "C" calling convention. As you'll see on Windows we'll need to change this since it uses a different calling convention than the UNIX family.

The function we're linking to needs to have the exact same name, in this case, write. The parameters don't need to have the same name but they must be in the right order and it's good practice to name them the same as in the library you're linking to.

The write function takes a file descriptor which in this case is a handle to stdout. In addition, it expects us to provide a pointer to an array of u8 values and the length of the buffer.


#![allow(unused_variables)]
fn main() {
#[cfg(not(target_os = "windows"))]
fn syscall_libc(message: String) {
    let msg_ptr = message.as_ptr();
    let len = message.len();
    unsafe { write(1, msg_ptr, len) };
}
}

First, we get a pointer to the underlying buffer of our string. This will be a pointer of type *const u8 which matches our buf argument. The len of the buffer corresponds to the count argument.

You might ask how we know that 1 is the file-handle to stdout and where we found that value.

You'll notice this a lot when writing syscalls from Rust. Usually, constants are defined in the C header files which we can't link to, so we need to search them up. 1 is always the file descriptor for stdout on UNIX systems.

Wrapping the libc functions and providing these constants is exactly what the crate libc provides for us and why you'll see that used instead of writing the type of code we do here.

A call to an FFI function is always unsafe so we need to use the unsafe keyword here.

Using the API on Windows

This syscall will look like this on Windows:
(You'll need to copy this code over to a Windows machine to try this out)

use std::io;

fn main() {
    let sys_message = String::from("Hello world from syscall!\n");
    syscall(sys_message).unwrap();
}

#[cfg(target_os = "windows")]
#[link(name = "kernel32")]
extern "stdcall" {
    /// https://docs.microsoft.com/en-us/windows/console/getstdhandle
    fn GetStdHandle(nStdHandle: i32) -> i32;
    /// https://docs.microsoft.com/en-us/windows/console/writeconsole
    fn WriteConsoleW(
        hConsoleOutput: i32,
        lpBuffer: *const u16,
        numberOfCharsToWrite: u32,
        lpNumberOfCharsWritten: *mut u32,
        lpReserved: *const std::ffi::c_void,
    ) -> i32;
}

#[cfg(target_os = "windows")]
fn syscall(message: String) -> io::Result<()> {

    // let's convert our utf-8 to a format windows understands
    let msg: Vec<u16> = message.encode_utf16().collect();
    let msg_ptr = msg.as_ptr();
    let len = msg.len() as u32;

    let mut output: u32 = 0;
        let handle = unsafe { GetStdHandle(-11) };
        if handle  == -1 {
            return Err(io::Error::last_os_error())
        }

        let res = unsafe {
            WriteConsoleW(handle, msg_ptr, len, &mut output, std::ptr::null())
            };
        if res  == 0 {
            return Err(io::Error::last_os_error());
        }

    assert_eq!(output as usize, len);
    Ok(())
}

Now, just by looking at the code above you see it starts to get a bit more complex, but let's spend some time to go through line by line what we do here as well.

#[cfg(target_os = "windows")]
#[link(name = "kernel32")]

The first line is just telling the compiler to only compile this if the target_os is Windows.

The second line is a linker directive, telling the linker we want to link to the library kernel32 (if you ever see an example that links to user32 that will also work).


#![allow(unused_variables)]
fn main() {
extern "stdcall" {
    /// https://docs.microsoft.com/en-us/windows/console/getstdhandle
    fn GetStdHandle(nStdHandle: i32) -> i32;
    /// https://docs.microsoft.com/en-us/windows/console/writeconsole
    fn WriteConsoleW(
        hConsoleOutput: i32,
        lpBuffer: *const u16,
        numberOfCharsToWrite: u32,
        lpNumberOfCharsWritten: *mut u32,
        lpReserved: *const std::ffi::c_void,
    ) -> i32;
}
}

First of all, extern "stdcall", tells the compiler that we won't use the C calling convention but use Windows calling convention called stdcall.

The next part is the functions we want to link to. On Windows, we need to link to two functions to get this to work: GetStdHandle and WriteConsoleW. GetStdHandle retrieves a reference to a standard device like stdout.

WriteConsole comes in two flavours, WriteConsoleW that takes in Unicode text and WriteConsoleA that takes ANSI encoded text.

Now, ANSI encoded text works fine if you only write English text, but as soon as you write text in other languages you might need to use special characters that are not possible to represent in ANSI but is possible in utf-8 and our program will break.

That's why we'll convert our utf-8 encoded text to utf-16 encoded Unicode codepoints that can represent these characters and use the WriteConsoleW function.

#[cfg(target_os = "windows")]
fn syscall(message: String) -> io::Result<()> {

    // let's convert our utf-8 to a format windows understands
    let msg: Vec<u16> = message.encode_utf16().collect();
    let msg_ptr = msg.as_ptr();
    let len = msg.len() as u32;

    let mut output: u32 = 0;
        let handle = unsafe { GetStdHandle(-11) };
        if handle  == -1 {
            return Err(io::Error::last_os_error())
        }

        let res = unsafe {
            WriteConsoleW(handle, msg_ptr, len, &mut output, std::ptr::null())
            };

        if res  == 0 {
            return Err(io::Error::last_os_error());
        }

    assert_eq!(output, len);
    Ok(())
}

The first thing we do is to convert the text to utf-16 encoded text which Windows uses. Fortunately, Rust has a built-in function to convert our utf-8 encoded text to utf-16 code points. encode_utf16 returns an iterator over u16 code points that we can collect to a Vec.

let msg: Vec<u16> = message.encode_utf16().collect();
let msg_ptr = msg.as_ptr();
let len = msg.len() as u32;

Next, we get the pointer to the underlying buffer of our Vec and get the length.

let handle = unsafe { GetStdHandle(-11) };
   if handle  == -1 {
       return Err(io::Error::last_os_error())
   }

The next is a call to GetStdHandle. We pass in the value -11. The values we need to pass in for the different standard devices is actually documented together with the GetStdHandle documentation:

HandleValue
Stdin-10
Stdout-11
StdErr-12

Now we're lucky here, it's not that common that we find this information together with the documentation for the function we call, but it's very convenient when we do.

The return codes to expect is also documented thoroughly for all functions so we handle potential errors here in the same way as we did for the Linux/macOS syscalls.


#![allow(unused_variables)]
fn main() {
let res = unsafe {
    WriteConsoleW(handle, msg_ptr, len, &mut output, std::ptr::null())
    };

if res  == 0 {
    return Err(io::Error::last_os_error());
}
}

Next up is the call to the WriteConsoleW function. There is nothing too fancy about this.

The highest level of abstraction

This is simple, most standard libraries provide this abstraction for you. In rust that would simply be:


#![allow(unused_variables)]
fn main() {
println!("Hello world from Stdlib");
}

Our finished cross-platform syscall

use std::io;

fn main() {
    let sys_message = String::from("Hello world from syscall!\n");
    syscall(sys_message).unwrap();
}

// and: http://man7.org/linux/man-pages/man2/write.2.html
#[cfg(not(target_os = "windows"))]
#[link(name = "c")]
extern "C" {
    fn write(fd: u32, buf: *const u8, count: usize) -> i32;
}

#[cfg(not(target_os = "windows"))]
fn syscall(message: String) -> io::Result<()> {
    let msg_ptr = message.as_ptr();
    let len = message.len();
    let res = unsafe { write(1, msg_ptr, len) };

    if res == -1 {
        return Err(io::Error::last_os_error());
    }
    Ok(())
}

#[cfg(target_os = "windows")]
#[link(name = "kernel32")]
extern "stdcall" {
    /// https://docs.microsoft.com/en-us/windows/console/getstdhandle
    fn GetStdHandle(nStdHandle: i32) -> i32;
    /// https://docs.microsoft.com/en-us/windows/console/writeconsole
    fn WriteConsoleW(
        hConsoleOutput: i32,
        lpBuffer: *const u16,
        numberOfCharsToWrite: u32,
        lpNumberOfCharsWritten: *mut u32,
        lpReserved: *const std::ffi::c_void,
    ) -> i32;
}

#[cfg(target_os = "windows")]
fn syscall(message: String) -> io::Result<()> {

    // let's convert our utf-8 to a format windows understands
    let msg: Vec<u16> = message.encode_utf16().collect();
    let msg_ptr = msg.as_ptr();
    let len = msg.len() as u32;

    let mut output: u32 = 0;
        let handle = unsafe { GetStdHandle(-11) };
        if handle  == -1 {
            return Err(io::Error::last_os_error())
        }

        let res = unsafe {
            WriteConsoleW(handle, msg_ptr, len, &mut output, std::ptr::null())
            };

        if res  == 0 {
            return Err(io::Error::last_os_error());
        }

    assert_eq!(output, len);
    Ok(())
}

About writing cross-platform abstractions

If you isolate the code needed only for Linux and macOS, you'll see that it's not many lines of code to write. But once you want to make a cross-platform variant, the amount of code explodes.

This is a recurring problem when we're curious about how this works on three different platforms, but we need some basic understanding of how the different operating systems work under the covers.

My experience, in general, is that Linux and macOS have simpler API requiring fewer lines of code, and often (but not always) the exact same call works for both systems.

Windows, on the other hand, is more complex, requires you to set up more structures to pass information (instead of using primitives), and often way more lines of code. What Windows does have is very good documentation so even though it's more work you'll also find the official documentation very helpful.

This complexity is why the Rust community (other languages often have something similar) gathers around crates like libc which already have defined most methods and constants you need.

A note about "hidden" complexity

There is a lot of "hidden" complexity when writing cross-platform code at this level. One hurdle is to get something working, which can prove to be quite a challenge. Getting it to work correctly and safely while covering all edge cases is an additional challenge.

Are we 100% sure that all valid utf-8 code points which we use in Rust are valid utf-16 encoded Unicode that Windows will display correctly?

I think so, but being 100 % sure about this is not as easy as one might think.

Does the CPU cooperate with the Operating System?

If you had asked me this question when I first thought I understood how programs work, I would most likely answer no. We run programs on the CPU and we can do whatever we want if we know how to do it. Now, first of all, I wouldn't have thought this through, but unless you learn how this work from the bottom up, it's not easy to know for sure.

What started to make me think I was very wrong was some code looking like this:

#![feature(llvm_asm)]
fn main() {
    let t = 100;
    let t_ptr: *const usize = &t;
    let x = dereference(t_ptr);

    println!("{}", x);
}

fn dereference(ptr: *const usize) -> usize {
    let res: usize;
    unsafe {
        llvm_asm!("mov ($1), $0":"=r"(res): "r"(ptr))
        };
    res
}

Here we write a dereference function using assembly instructions. We know there is no way the OS is involved here.

As you see, this code will output 100 as expected. But let's now instead create a pointer with the address 99999999999999 which we know is invalid and see what happens when we pass that into the same function:

#![feature(llvm_asm)]
fn main() {
    let t = 99999999999999 as *const usize;
    let x = dereference(t);

    println!("{}", x);
}
fn dereference(ptr: *const usize) -> usize {
    let res: usize;
    unsafe {
    llvm_asm!("mov ($1), $0":"=r"(res): "r"(ptr));
    }

    res
}

Now we get a segmentation fault. Not surprising really, but how does the CPU know that we're not allowed to dereference this memory?

  • Does the CPU ask the OS if this process is allowed to access this memory location every time we dereference something?
  • Won't that be very slow?
  • How does the CPU know that it has an OS running on top of it at all?
  • Do all CPUs know what a segmentation fault is?
  • Why do we get an error message at all and not just a crash?

Down the rabbit hole

Yes, this is a small rabbit hole. It turns out that there is a great deal of co-operation between the OS and the CPU, but maybe not the way you naively would think.

Many modern CPUs provide some basic infrastructure that Operating Systems use. This infrastructure gives us the security and stability we expect. Actually, most advanced CPUs provide a lot more options than operating systems like Linux, BSD and Windows actually use.

There is especially two that I want to address here:

  1. How the CPU prevents us from accessing memory we're not supposed to access
  2. How the CPU handles asynchronous events like I/O

We'll cover the first one here and the second in the next chapter.

If you want to know more about how this works in detail I will absolutely recommend that you give Philipp Oppermann's excellent series a read. It's extremely well written and will answer all these questions and many more.

How does the CPU prevent us from accessing memory we're not supposed to?

As I mentioned, modern CPUs have already some definition of basic concepts. Some examples of this are:

Exactly how this works will differ depending on the exact CPU so we'll treat them in general terms here.

Most modern CPUs, however, have an MMU (Memory Management Unit). This is a part of the CPU (often etched on the same dye even). The MMUs job is to translate between the virtual address we use in our programs, to a physical address.

When the OS starts a process (like our program) it sets up a page table for our process, and makes sure a special register on the CPU points to this page table.

Now, when we try to dereference t_ptr in the code above, the address is at some point sent to the MMU for translation, which looks it up in the page table to translate it to a physical address in memory where it can fetch the data.

In the first case, it will point to a memory address on our stack that holds the value 100.

When we pass in 99999999999999 and ask it to fetch what's stored at that address (which is what dereferencing does) it looks for the translation in the page table but can't find it.

The CPU then treats this as a page fault.

At boot, the OS provided the CPU with an Interrupt Descriptor Table. This table has a predefined format where the OS provides handlers for the predefined exceptions the CPU can encounter.

Since the OS provided a pointer to a function that handles Page Fault the CPU jumps to that function when we try to dereference 99999999999999 and thereby hands over control to the Operating System.

The OS then prints a nice message for us letting us know that we encountered what it calls a segmentation fault. This message will therefore vary depending on the OS you run the code on.

But can't we just change the page table in the CPU?

Now, this is where Privilege Level comes in. Most modern operating systems operate with two Ring Levels. Ring 0, the kernel space, and Ring 3, user-space.

Privilege rings

Most CPUs have a concept of more rings than what most modern operating systems use. This has historical reasons, which is also why Ring 0 and Ring 3 are used (and not 1, 2).

Now every entry in the page table has additional information about it, amongst that information is the information about what ring it belongs to. This information is set up when your OS boots up.

Code executed in Ring 0 has almost unrestricted access to external devices, memory and is free to change registers that provide security at the hardware level.

Now, the code you write in Ring 3 will typically have extremely restricted access to I/O and certain CPU registers (and instructions). Trying to issue an instruction or setting a register from Ring 3 to change the page table will be prevented already at the CPU. The CPU will then treat this as an exception and jump to the handler for that exception provided by the OS.

This is also the reason why you have no other choice than to cooperate with the OS and handle I/O tasks through syscalls. The system wouldn't be very secure if this wasn't the case.

Interrupts, Firmware and I/O

We're nearing an end of the general CS subjects in the book, and we'll start to dig our way out of the rabbit hole soon.

This part tries to tie things together and look at how the whole computer works as a system to handle I/O and concurrency.

Let's get to it!

A simplified overview

Let's go through some of the steps where we imagine that we read from a network card:

Simplified Overview

click the image to open a larger view

Disclaimer We're making things simple here. This is a rather complex operation but we'll focus on what interests us most and skip a few steps along the way.

1. Our code

We register a socket. This happens by issuing a syscall to the OS. Depending on the OS we either get a file descriptor (macOS/Linux) or a socket (Windows).

The next step is that we register our interest in read events on that socket.

2. Registering events with the OS

This is handled in one of three ways:

  1. We tell the operating system that we're interested in `Read` events but we want to wait for it to happen by `yielding` control over our thread to the OS. The OS then suspends our thread by storing the register state and switches to some other thread.

    From our perspective this will be blocking our thread until we have data to read.

  2. We tell the operating system that we're interested in `Read` events but we just want a handle to a task which we can `poll` to check if the event is ready or not.

    The OS will not suspend our thread, so this will not block our code

  3. We tell the operating system that we are probably going to be interested in many events, but we want to subscribe to one event queue. When we `poll` this queue it will block until one or more event occurs.

    This will block our thread while we wait for events to occur

My next book will be about alternative C since that is a very interesting model of handling I/O events that's going to be important later on to understand why Rust's concurrency abstractions are modeled the way they are. For that reason we won't cover this in detail here.

3. The Network Card

We're skipping some steps here but it's not very vital to our understanding.

Meanwhile on the network card there is a small microcontroller running specialized firmware. We can imagine that this microcontroller is polling in a busy loop checking if any data is incoming.

The exact way the Network Card handles its internals can be different from this (and most likely is). The important part is that there is a very simple but specialized CPU running on the network card doing work to check if there are incoming events.

Once the firmware registers incoming data, it issues a Hardware Interrupt.

4. Hardware Interrupt

This is a very simplified explanation. If you're interested in knowing more about how this works, I can recommend Robert Mustacchi's excellent article Turtles on the wire: understanding how the OS uses the modern NIC.

Modern CPUs have a set of Interrupt Request Lines for it to handle events that occur from external devices. A CPU has a fixed set of interrupt lines.

A hardware interrupt is an electrical signal that can occur at any time. The CPU immediately interrupts its normal workflow to handle the interrupt by saving the state of its registers and looking up the interrupt handler. The interrupt handlers are defined in the Interrupt Descriptor Table.

5. Interrupt Handler

The Interrupt Descriptor Table (IDT) is a table where the OS (or a driver) registers handlers for different interrupts that may occur. Each entry points to a handler function for a specific interrupt. The handler function for a Network Card would typically be registered and handled by a driver for that card.

The IDT is not stored on the CPU as it might seem in the diagram. It's located in a fixed and know location in main memory. The CPU only holds a pointer to the table in one of it's registers.

6. Writing the data

This is a step that might vary a lot depending on the CPU and the firmware on the network card. If the Network Card and the CPU supports Direct Memory Access (which should be the standard on all modern systems today) the Network Card will write data directly to a set of buffers the OS already has set up in main memory.

In such a system the firmware on the Network Card might issue an Interrupt when the data is written to memory. DMA is very efficient since the CPU is only notified when the data is already in memory. On older systems the CPU needed to devote resources to handle the data transfer from the network card.

The DMAC (Direct Memory Access Controller) is just added since in such a system, it would control the access to memory. It's not part of the CPU as in the diagram above. We're deep enough in the rabbit hole now and this is not really important for us right now so let's move on.

7. The driver

The driver would normally handle the communication between the OS and the Network Card. At some point the buffers are filled, and the network card issues an Interrupt. The CPU then jumps to the handler of that interrupt. The interrupt handler for this exact type of interrupt is registered by the driver, so it's actually the driver that handles this event and in turn informs the kernel that the data is ready to be read.

8. Reading the data

Depending on whether we chose alternative A, B or C the OS will:

  1. Wake our thread
  2. Return Ready on the next poll
  3. Wake the thread and return a Read event for the handler we registered.

Interrupts

As I hinted at above, there are two kinds of interrupts:

  1. Hardware Interrupts
  2. Software Interrupts

They are very different in nature.

Hardware Interrupts

Hardware interrupts are created by sending an electrical signal through an Interrupt Request Line (IRQ). These hardware lines signals the CPU directly.

Software Interrupts

These are interrupts issued from software instead of hardware. As in the case of a hardware interrupt, the CPU jumps to the Interrupt Descriptor Table and runs the handler for the specified interrupt.

Firmware

Firmware doesn't get much attention from most of us; however, they're a crucial part of the world we live in. They run on all kinds of hardware, and have all kinds of strange and peculiar ways to make the computer we program on work.

When I think about firmware, I think about the scenes from Star Wars where they walk into a bar with all kinds of strange and obscure creatures. I imagine the world of firmware is much like this, few of us know what they do or how they work on a particular system.

Now, firmware needs a microcontroller or similar to be able to work. Even the CPU has firmware which makes it work. That means there are many more small "CPUs" on our system than the cores we program against.

Why is this important? Well, you remember that concurrency is all about efficiency right? Well, since we have many CPU's already doing work for us on our system, one of our concerns is to not replicate or duplicate that work when we write code.

If a network card has firmware that continually checks if new data has arrived, it's pretty wasteful if we duplicate that by letting our CPU continually check if new data arrives as well. It's much better if we either check once in a while or even better, gets notified when data has arrived for us.

Strategies for handling I/O

Before we dive into Writing some code we'll finish off this part of the book talking a bit about different strategies of handling I/O and concurrency. Now, just note that I'm covering I/O in general here, but I use network communication as the main example. Different strategies can have different strengths depending on what type of I/O we're talking about.

1. Using OS threads

Now one way of accomplishing this is letting the OS take care of everything for us. We do this by simply spawning a new OS thread for each task we want to accomplish and write code like we normally would.

Pros:

  • Simple
  • Easy to code
  • Reasonably performant
  • You get parallelism for free

Cons:

  • OS level threads come with a rather large stack. If you have many tasks waiting simultaneously (like you would in a web-server under heavy load) you'll run out of memory pretty soon.
  • There are a lot of syscalls involved. This can be pretty costly when the number of tasks is high.
  • The OS has many things it needs to handle. It might not switch back to your thread as fast as you'd wish
  • The OS doesn't know which tasks to prioritize, and you might want to give some tasks a higher priority than others.

2. Green threads

Another common way of handling this is green threads. Languages like Go uses this to great success. In many ways this is similar to what the OS does but the runtime can be better adjusted and suited to your specific needs.

Pros:

  • Simple to use for the user. The code will look like it does when using OS threads
  • Reasonably performant
  • Abundant memory usage is less of a problem
  • You are in full control over how threads are scheduled and if you want you can prioritize them differently.

Cons:

  • You need a runtime, and by having that you are duplicating part of the work the OS already does. The runtime will have a cost which in some cases can be substantial.
  • Can be difficult to implement in a flexible way to handle a wide variety of tasks

3. Poll based event loops supported by the OS

The third way we're covering today is the one that most closely matches an ideal solution. In this solution we register an interest in an event, and then let the OS tell us when it's ready.

The way this works is that we tell the OS that we're interested in knowing when data is arriving for us on the network card. The network card issues an interrupt when something has happened, in which case the driver lets the OS know that the data is ready.

Now, we still need a way to "suspend" many tasks while waiting, and this is where Node's "runtime" or Rust's Futures come in to play.

Pros:

  • Close to optimal resource utilization
  • It's very efficient
  • Gives us the maximum amount of flexibility to decide how to handle the events that occurs

Cons:

  • Different operating systems have different ways of handling these kind of queues. Some of them are difficult to reconcile with each other. Some operating systems have limitations on what I/O operations support this method.
  • Great flexibility comes with a good deal of complexity
  • Difficult to write an ergonomic API with an abstraction layer that accounts for the differences between the operating systems without introducing unwanted costs.
  • Only solves part of the problem—the programmer still needs a strategy for suspending tasks that are waiting.

Final note

The Node runtime uses a combination of both 1 and 3, but it tries to force all I/O to use alternative 3. This design is also part of the reason why Node is so good at handling many connections concurrently. Node uses a callback-based approach to suspend tasks.

Rust's async story is modeled around option 3, and one of the reasons it has taken a long time is related to the cons of this method and choosing a way to model how tasks should be suspended. Rust's Futures model a task as a State Machine where a suspension point represents a state.

Epoll, Kqueue and IOCP

There are some well-known libraries which implement a cross platform event queue using Epoll, Kqueue and IOCP for Linux, Mac, and Windows, respectively.

Part of Node's runtime is based on libuv, which is a cross platform asynchronous I/O library. libuv is not only used in Node but also forms the foundation of how Julia and Pyuv create a cross platform event queue; most languages have bindings for it.

In Rust we have mio - Metal IO. Mio powers the OS event queue used in Tokio, which is a runtime that provides I/O, networking, scheduling etc. Mio is to Tokio what libuv is to Node.

Tokio powers many web frameworks, among those is Actix Web, which is known to be very performant.

Since we want to understand how everything works, I decided to create an extremely simplified version of an event queue. I called it minimio since it's greatly inspired by mio.

I have written about how this works in detail in Epoll, Kqueue and IOCP explained. In that book we also create the event loop which we will use as the cross platform event loop in this book. You can visit the code at its Github repository if you're curious.

Nevertheless, we'll give each of them a brief introduction here so you know the basics.

Why use an OS-backed event queue?

If you remember my previous chapters, you know that we need to cooperate closely with the OS to make I/O operations as efficient as possible. Operating systems like Linux, macOS and Windows provide several ways of performing I/O, both blocking and non-blocking.

So blocking operations are the least flexible to use for us as programmers since we yield control to the OS, which suspends our thread. The big advantage is that our thread gets woken up once the event we're waiting for is ready.

Non-blocking methods are more flexible but need to have a way to tell us if a task is ready or not. This is most often done by returning some kind of data that says if it's Ready or NotReady. One drawback is that we need to check this status regularly to be able to tell if the state has changed.

Event queuing via Epoll/kqueue/IOCP is a way to enjoy the flexibility of a non-blocking method without its aforementioned drawback.

We will not cover methods like poll and select, but I have an article for you here if you want to learn a bit about these methods and how they differ from epoll.

Readiness-based event queues

Epoll and Kqueue are known as readiness-based event queues, which means they let you know when an action is ready to be performed. An example of this is a socket that is ready to be read from.

Basically this happens when we want to read data from a socket using epoll/kqueue:

  1. We create an event queue by calling the syscall epoll_create or kqueue.
  2. We ask the OS for a file descriptor representing a network socket.
  3. Through another syscall, we register an interest in Read events on this socket. It's important that we also inform the OS that we'll be expecting to receive a notification when the event is ready in the event queue we created in (1).
  4. Next, we call epoll_wait or kevent to wait for an event. This will block (suspend) the thread it's called on.
  5. When the event is ready, our thread is unblocked (resumed), and we return from our "wait" call with data about the event that occurred.
  6. We call read on the socket we created in 2.

Completion-based event queues

IOCP stands for I/O Completion Ports and is a completion-based event queue. This type of queue notifies you when events are completed. An example of this is when data has been read into a buffer.

Below is a basic breakdown of what happens in this type of event queue:

  1. We create an event queue by calling the syscall CreateIoCompletionPort.
  2. We create a buffer and ask the OS to give us a handle to a socket.
  3. We register an interest in Read events on this socket with another syscall, but this time we also pass in the buffer we created in (2) to which the data will be read.
  4. Next, we call GetQueuedCompletionStatusEx, which will block until an event has completed.
  5. Our thread is unblocked, and our buffer is now filled with the data we're interested in.

Epoll

Epoll is the Linux way of implementing an event queue. In terms of functionality, it has a lot in common with Kqueue. The advantage of using epoll over other similar methods on Linux like select or poll is that epoll was designed to work very efficiently with a large number of events.

Kqueue

Kqueue is the macOS way of implementing an event queue, which originated from BSD, in operating systems such as FreeBSD, OpenBSD, etc. In terms of high level functionality, it's similar to Epoll in concept but different in actual use.

Some argue it's a bit more complex to use and a bit more abstract and "general".

IOCP

IOCP or Input Output Completion Ports is the way Windows handles this type of event queue.

A Completion Port will let you know when an event has Completed. Now this might sound like a minor difference, but it's not. This is especially apparent when you want to write a library since abstracting over both means you'll either have to model IOCP as readiness-based or model epoll/kqueue as completion-based.

Lending out a buffer to the OS also provides some challenges since it's very important that this buffer stays untouched while waiting for an operation to return.

My experience investigating this suggests that getting readiness-based models to behave like the completion-based models is easier than the other way around. This means you should get IOCP to work first and then fit epoll or kqueue into that design.

Introducing our main example

Now we've finally come to the part of this book where we will write some code.

The Node event loop is a complex piece of software developed over many years. We will have to simplify things a lot.

We'll try to implement the parts that are important for us to understand Node a little better and most importantly use it as an example where we can use our knowledge from the previous chapters to make something that actually works.

Our main goal here is to explore async concepts. Using Node as an example is mostly for fun.

We want to write something like this:

/// Think of this function as the javascript program you have written
fn javascript() {
    print("First call to read test.txt");
    Fs::read("test.txt", |result| {
        let text = result.into_string().unwrap();
        let len = text.len();
        print(format!("First count: {} characters.", len));

        print(r#"I want to create a "magic" number based on the text."#);
        Crypto::encrypt(text.len(), |result| {
            let n = result.into_int().unwrap();
            print(format!(r#""Encrypted" number is: {}"#, n));
        })
    });

    print("Registering immediate timeout 1");
    set_timeout(0, |_res| {
        print("Immediate1 timed out");
    });
    print("Registering immediate timeout 2");
    set_timeout(0, |_res| {
        print("Immediate2 timed out");
    });

    // let's read the file again and display the text
    print("Second call to read test.txt");
    Fs::read("test.txt", |result| {
        let text = result.into_string().unwrap();
        let len = text.len();
        print(format!("Second count: {} characters.", len));

        // aaand one more time but not in parallel.
        print("Third call to read test.txt");
        Fs::read("test.txt", |result| {
            let text = result.into_string().unwrap();
            print_content(&text, "file read");
        });
    });

    print("Registering a 3000 and a 500 ms timeout");
    set_timeout(3000, |_res| {
        print("3000ms timer timed out");
        set_timeout(500, |_res| {
            print("500ms timer(nested) timed out");
        });
    });

    print("Registering a 1000 ms timeout");
    set_timeout(1000, |_res| {
        print("SETTIMEOUT");
    });

    // `http_get_slow` lets us define a latency we want to simulate
    print("Registering http get request to google.com");
    Http::http_get_slow("http//www.google.com", 2000, |result| {
        let result = result.into_string().unwrap();
        print_content(result.trim(), "web call");
    });
}

fn main() {
    let rt = Runtime::new();
    rt.run(javascript);
}

We'll have print statements at strategic places in our code so we can get a view of what's actually happening when and where.

Right off the bat you'll see that something is strange with the Rust code we have written in our example.

The code uses callbacks when we have an async operation we want to execute just like Javascript does, and we have "magic" modules like Fs or Crypto that we can call, just like when you import modules from Node.

Our code here is mostly calling functions that register an event and stores a callback to be run when the event is ready.

An example of this is the set_timeout function:


#![allow(unused_variables)]
fn main() {
set_timeout(0, |_res| {
    print("Immediate1 timed out");
});
}

What we really do here is register interest in a timeout event, and when that event occurs we want to run the callback |_res| { print("Immediate1 timed out"); }.

Now the parameter _res is an argument that is passed into our callback. In javascript it would be left out, but since we use a typed language we have created a type called Js.

Js is an enum that represents Javascript types. In the case of set_timeout it's Js::undefined. In the case of Fs::read it's an Js::String and so on.

When we run this code we'll get something looking like this:

And here is what our output will look like:

Thread: main     First call to read test.txt
Thread: main     Registering immediate timeout 1
Thread: main     Registered timer event id: 2
Thread: main     Registering immediate timeout 2
Thread: main     Registered timer event id: 3
Thread: main     Second call to read test.txt
Thread: main     Registering a 3000 and a 500 ms timeout
Thread: main     Registered timer event id: 5
Thread: main     Registering a 1000 ms timeout
Thread: main     Registered timer event id: 6
Thread: main     Registering http get request to google.com
Thread: pool3    received a task of type: File read
Thread: pool2    received a task of type: File read
Thread: main     Event with id: 7 registered.
Thread: main     ===== TICK 1 =====
Thread: main     Immediate1 timed out
Thread: main     Immediate2 timed out
Thread: pool3    finished running a task of type: File read.
Thread: pool2    finished running a task of type: File read.
Thread: main     First count: 39 characters.
Thread: main     I want to create a "magic" number based on the text.
Thread: pool3    received a task of type: Encrypt
Thread: main     ===== TICK 2 =====
Thread: main     SETTIMEOUT
Thread: main     Second count: 39 characters.
Thread: main     Third call to read test.txt
Thread: main     ===== TICK 3 =====
Thread: pool2    received a task of type: File read
Thread: pool3    finished running a task of type: Encrypt.
Thread: main     "Encrypted" number is: 63245986
Thread: main     ===== TICK 4 =====
Thread: pool2    finished running a task of type: File read.

===== THREAD main START CONTENT - FILE READ =====
Hello world! This is text to encrypt!
... [Note: Abbreviated for display] ...
===== END CONTENT =====

Thread: main     ===== TICK 5 =====
Thread: epoll    epoll event 7 is ready

===== THREAD main START CONTENT - WEB CALL =====
HTTP/1.1 302 Found
Server: Cowboy
Location: http://http/www.google.com
... [Note: Abbreviated for display] ...
===== END CONTENT =====

Thread: main     ===== TICK 6 =====
Thread: epoll    epoll event timeout is ready
Thread: main     ===== TICK 7 =====
Thread: main     3000ms timer timed out
Thread: main     Registered timer event id: 10
Thread: epoll    epoll event timeout is ready
Thread: main     ===== TICK 8 =====
Thread: main     500ms timer(nested) timed out
Thread: pool0    received a task of type: Close
Thread: pool1    received a task of type: Close
Thread: pool2    received a task of type: Close
Thread: pool3    received a task of type: Close
Thread: epoll    received event of type: Close
Thread: main     FINISHED

Don't worry, we'll explain everything, but I just wanted to start off by explaining where we want to end up.

What is Node?

We have to start with a short explanation of what Node is, just so we're on the same page.

Node is a Javascript runtime allowing Javascript to run on your desktop (or server). Javascript was originally designed as a scripting language for the browser, which means that it relies on the browser to both interpret it and provide a runtime for it.

This also means that Javascript on the desktop needs to be both interpreted (or compiled) and provided with a runtime to be able to do anything meaningful. On the desktop, the V8 javascript engine compiles Javascript, and Node provides the runtime.

Javascript has one advantage from a language design perspective: Everything is designed to be handled asynchronously. And as you know by now, this is pretty crucial if we want to make the most out of our hardware, especially if you have a lot of I/O operations to take care of.

One such scenario is a Web server. Web servers handle a lot of I/O tasks whether it's reading from the file system or communicating via the network card.

Why Node In Particular?

  • Javascript is unavoidable when doing web development for the browser. Using Javascript on the server allows programmers to use the same language for both front-end and back-end development.
  • There is a potential for code reuse between the back-end and the front-end
  • The design of Node allows it to make very performant web servers
  • Working with Json and web APIs is very easy when you only deal with Javascript

Helpful Facts

Let's start off by debunking some myths that might make it easier to follow along when we start to code.

The Javascript Event Loop

Javascript is a scripting language and can't do much on its own. It doesn't have an event loop. Now in a web browser, the browser provides a runtime, which includes an event loop. And on the server, Node provides this functionality.

You might say that Javascript as a language would be difficult to run (due to its callback-based model) without some sort of event loop, but that's beside the point.

Node is Multithreaded

Contrary to what I've seen claimed on several occasions, Node uses a thread pool so it's multithreaded. However, the part of Node that "progresses" your code, does indeed run on a single thread. When we say "don't block the event loop" we're referring to this thread since that will prevent Node from making progress on other tasks.

We'll see exactly why blocking this thread is a problem and how that's handled.

The V8 Javascript Engine

Now, this is where we need to focus a bit. The V8 engine is a javascript JIT compiler. That means that when you write a for loop, the V8 engine translates this to instructions that run on your CPU. There are many javascript engines, but Node was originally implemented on top of the V8 engine.

The V8 engine itself isn't very useful for us; it just interprets our Javascript. It can't do I/O, set up a runtime or anything like that. Writing Javascript only with V8 will be a very limited experience.

Since we write Rust (even though we made it look a bit "javascripty"), we'll not cover the part of interpreting javascript. We'll just focus on how Node works and how it handles concurrency since that's our main focus right now.

Nodes Event Loop(s)

Node internally divides its real work into two categories:

I/O-bound tasks

Tasks that mainly wait for some external event to occur are handled by the cross platform epoll/kqueue/IOCP event queue implemented in libuv and in our case minimio.

CPU-bound tasks

Tasks that are predominately CPU intensive are handled by a thread pool. The default size of this thread pool is 4 threads, but that can be configured by the Node runtime.

I/O tasks which can't be handled by the cross platform event queue are also handled here, which is the case with file reads that we use in our example.

Most C++ extensions for Node use this thread pool to perform their work, and that is one of many reasons they are used for calculation-intensive tasks.

Further Information

If you do want to know more about the Node event loop, I have one short page of the libuv documentation I can refer you to and two talks for you that I find great (and correct) on this subject:

Libuv Design Overview

This first talk one is held by @piscisaureus and is an excellent 15 minute overview. I especially recommend this one as its short and to the point.

The second one is slightly longer but is also an excellent talk held by Bryan Hughes

Now, relax, get a cup of tea and sit back while we go through everything together.

What's our plan?

I'll briefly list what we need to do to get this working here:

We need two event queues:

  1. We need a thread pool to execute our CPU intensive tasks or tasks that we want too run asynchronously but not in our OS backed event queue
  2. We need to make a simple cross platform epoll/kqueue/IOCP event loop. Now this turns out to be extremely interesting, but it's also a lot of code, so I split this section off into a separate "companion book" for those that want to explore this further. We use this library here called minimio.

Granted, we're breaking our rule that we don't use any dependencies, but it's our own dependency which we'll explain fully in due time.

We need a runtime

Our runtime will:

  1. Store our callbacks to be run at a later point
  2. Send tasks to be executed on the thread pool
  3. Register interests with the OS (through minimio)
  4. Poll our two event sources for new events
  5. Handle timers
  6. Provide a way for "modules" like Fs and Crypto to register tasks
  7. Progress all our tasks until we're finished

We need a few modules

  1. For handling file system tasks Fs
  2. For handling http calls Http
  3. For handling cryptological tasks Crypto

We need to make some helpers

We need some helpers to make our code readable and to provide the output we want to see. In contrast to any real runtime, we're interested in knowing what happens and when. To help with that we define three extra methods:

print which prints out a message that first tells us what thread the message is being outputted from and then a message we provide.

print_content does the same as print but is a way for us to print out more than a message in a nice way.

current is just a shortcut for us to get the name of the current thread. Since we want to track what's happening, we're going to need to print out which thread is issuing what output, so this will avoid cluttering up our code too much along the way.

Minimio

Minimio is a cross platform epoll/kqueue/IOCP based event loop that we will cover in the next book. I originally included it here, but implementing that for three architectures turned out to be a bit more involved than I first thought and needed more space than would fit in this book.

It's also easier to just read up on epoll/kqueue/IOCP when it's concentrated in a separate book.

Implementing our own runtime

Let's start to get some code written down; we have a lot to do.

The way we'll go about this is that I'll go through everything the way I find it easiest to parse and understand. That also means that sometimes I have to introduce a bit of code that will be explained later. Try not to worry if you don't understand something at first. I'll be going through everything.

The very first thing we need to do is to create a Rust project to run our code in:

cargo new async-basics
cd async-basics

Now, as I've explained, we'll need to use the minimio library (which will be explained in a separate book, but you can already look through the source code if you want to):

In Cargo.toml

[dependencies]
minimio = {git = "https://github.com/cfsamson/examples-minimio", branch = "master"}

A second option is to clone the repository containing all the code we're going to write and go through that as we go along:

git clone https://github.com/cfsamson/examples-node-eventloop

The next thing we need is a Runtime to hold all the state our Runtime needs.

First navigate to main.rs (located in src/main.rs).

We'll write everything in one file this time in roughly the same order as we go through them in this book.

Runtime struct

I've added comments to the code so it's easier to remember and understand.


#![allow(unused_variables)]
fn main() {
pub struct Runtime {
    /// Available threads for the threadpool
    available_threads: Vec<usize>,
    /// Callbacks scheduled to run
    callbacks_to_run: Vec<(usize, Js)>,
    /// All registered callbacks
    callback_queue: HashMap<usize, Box<dyn FnOnce(Js)>>,
    /// Number of pending epoll events, only used by us to print for this example
    epoll_pending_events: usize,
    /// Our event registrator which registers interest in events with the OS
    epoll_registrator: minimio::Registrator,
    // The handle to our epoll thread
    epoll_thread: thread::JoinHandle<()>,
    /// None = infinite, Some(n) = timeout in n ms, Some(0) = immediate
    epoll_timeout: Arc<Mutex<Option<i32>>>,
    /// Channel used by both our threadpool and our epoll thread to send events
    /// to the main loop
    event_reciever: Receiver<PollEvent>,
    /// Creates an unique identity for our callbacks
    identity_token: usize,
    /// The number of events pending. When this is zero, we're done
    pending_events: usize,
    /// Handles to our threads in the threadpool
    thread_pool: Vec<NodeThread>,
    /// Holds all our timers, and an Id for the callback to run once they expire
    timers: BTreeMap<Instant, usize>,
    /// A struct to temporarely hold timers to remove. We let Runtinme have
    /// ownership so we can reuse the same memory
    timers_to_remove: Vec<Instant>,
}
}

Now, I've added some comments here to explain what they're for and in the coming chapters we'll cover every one of them.

I'll continue by defining some of the types we use here.

Task


#![allow(unused_variables)]
fn main() {
struct Task {
    task: Box<dyn Fn() -> Js + Send + 'static>,
    callback_id: usize,
    kind: ThreadPoolTaskKind,
}

impl Task {
    fn close() -> Self {
        Task {
            task: Box::new(|| Js::Undefined),
            callback_id: 0,
            kind: ThreadPoolTaskKind::Close,
        }
    }
}
}

We need a task object, which represents a task we want to finish in our thread pool. I'll go through the types in this object in a later chapter so don't worry too much about them now if you find them hard to grasp. Everything will be explained.

We also create an implementation of a Close task. We need this to clean up after ourselves and close down the thread pool.

|| Js::Undefined might seem strange but it's only a function that returns Js::Undefined, we need it since we won't make task an Option just for this one case.

It's just so we don't have to match or map on task all the way through our code, it's more than enough to parse already.

NodeThread

First is NodeThread, which represents a thread in our thread pool. As you see we have a JoinHandle (which we get when we call thread::spawn) and the sending part of a channel. This channel, sends messages of the type Task.


#![allow(unused_variables)]
fn main() {
#[derive(Debug)]
struct NodeThread {
    pub(crate) handle: JoinHandle<()>,
    sender: Sender<Task>,
}
}

We introduced two new types here: Js and ThreadPoolTaskKind. First we'll cover ThreadPoolTaskKind.

In our example, we have three kinds of events: a FileRead which is a file that has been read, and an Encrypt that represents an operation from our Crypto module. The event Close is used to let the threads in our threadpool that we're closing the loop and let them finish before we exit our process.

ThreadPoolTaskKind

As you might understand, this object is only used in the threadpool.


#![allow(unused_variables)]
fn main() {
pub enum ThreadPoolTaskKind {
    FileRead,
    Encrypt,
    Close,
}
}

Js

Next is our Js object. This represents different Javascript types, and it's only used to make our code look more "javascripty", but it's also convenient for us to abstract over the return types of closures.

We'll also implement two convenience methods on this object to make our "javascripty" code look a bit cleaner.

We know the return types already based on our modules documentation - just like you would know it from the documentation when using a Node module but we need to actually handle the types in Rust so this will make that just slightly easier for us.


#![allow(unused_variables)]
fn main() {
#[derive(Debug)]
pub enum Js {
    Undefined,
    String(String),
    Int(usize),
}

impl Js {
    /// Convenience method since we know the types
    fn into_string(self) -> Option<String> {
        match self {
            Js::String(s) => Some(s),
            _ => None,
        }
    }

    /// Convenience method since we know the types
    fn into_int(self) -> Option<usize> {
        match self {
            Js::Int(n) => Some(n),
            _ => None,
        }
    }
}
}

PollEvent

Next we have the PollEvent. While we defined an enum to represent what kind of events we could send to the eventpool, we define some events that we can accept back from both our epoll based event queue and our threadpool.


#![allow(unused_variables)]
fn main() {
/// Describes the three main events our epoll-eventloop handles
enum PollEvent {
    /// An event from the `threadpool` with a tuple containing the `thread id`,
    /// the `callback_id` and the data which the we expect to process in our
    /// callback
    Threadpool((usize, usize, Js)),
    /// An event from the epoll-based eventloop holding the `event_id` for the
    /// event
    Epoll(usize),
    Timeout,
}
}

const RUNTIME

Lastly we have another convenience for us, and it's also necessary to make our Javascript code look a bit like Javascript.

First we have a static variable which represents our Runtime. It's actually a pointer to our runtime which we initialize to a null-pointer from the start.

We need to use unsafe to edit this. I'll explain later how this is safe, but I also want to mention here that it could be avoided by using lazy_static but that would both require us to add lazy_static as an dependency (which would be fine since it contains no "magic" that we want to explain in this book) but it also does make our code less readable, and it's complex enough.


#![allow(unused_variables)]
fn main() {
static mut RUNTIME: *mut Runtime = std::ptr::null_mut();
}

Let's now move on and look at what the heart of the runtime looks like: the main loop

Moving on

Now we've already gotten really far by introducing most of our runtime already in the first chapter. The next chapter will focus on implementing all the functionality we need for this to work.

The main loop

Let's put our event loop logic in the run function of our Runtime. The code which we present on this chapter is the body of this run function.

The run function on our Runtime will consume self so it's the last thing that we'll be able to call on this instance of our Runtime.

I'll include the whole method last so you can see it all together.


#![allow(unused_variables)]
fn main() {
impl Runtime {
    pub fn run(mut self, f: impl Fn()) {
        ...
    }
}
}

Initialization


#![allow(unused_variables)]
fn main() {
let rt_ptr: *mut Runtime = &mut self;
unsafe { RUNTIME = rt_ptr };
let mut ticks = 0; // just for us printing out

// First we run our "main" function
f();
}

The first two lines is just a hack we use in our code to make it "look" more like javascript. We take the pointer to self and set it in the global variable RUNTIME.

We could instead pass our runtime around but that wouldn't be very ergonomic. Another option would be to use lazy_static crate to initialize this field in a slightly safer way, but we'd have to explain what lazy_static do to keep our promise of minimal "magic".

To be honest, we only set this once, and it's set at the start of of our event loop and we only access this from the same thread we created it. It might not be pretty but it's safe.

ticks is only a counter for us to keep track of how many times we've looped which for display.

The last and least visible part of this code is actually where we kick everything off, calling f(). f will be the code we wrote in the javascript function in the last chapter. If this is empty nothing will happen.

Starting the event loop


#![allow(unused_variables)]
fn main() {
// ===== EVENT LOOP =====
while self.pending_events > 0 {
    ticks += 1;
}

self.pending_events keeps track of how many pending events we have, so that when no events are left we exit the loop since our event loop is finished.

So where does these events come from? In our javascript function f which we introduced in the chapter Introducing our main example you probably noticed that we called functions like set_timeout and Fs::read. These functions are defined in the Node runtime (as they are in ours), and their main responsibility is to create tasks and register interest on events. When one of these tasks or interests are registered this counter is increased.

ticks is just increasing a tick in the counter.

1. Process timers

self.process_expired_timers();

This method checks if any timers has expired. If we have timers that have expired we schedule the callbacks for the expired timers to run at the first call to self.run_callbacks().

Worth noting here is that timers with a timeout of 0 will already have timed out by the time we reach this function so their events will be processed.

2. Callbacks

self.run_callbacks();

Now we could have chosen to run the callbacks in the timer step but since this is the next step of our loop we do it here instead.

This step might seem unnecessary here but in Node it has a function. Some types of callbacks will be deferred to be run on the next iteration of the loop, which means that they're not run immediately. We won't implement this functionality in our example but it's worth noting.

3. Idle/Prepare

This is a step mostly used by Nodes internals. It's not important for understanding the big picture here but I included it since it's something you see in Nodes documentation so you know where we're at in the loop at this point.

4. Poll

This is an important step. This is where we'll receive events from our thread pool or our epoll event queue.

I refer to the epoll/kqueue/IOCP event queue as epoll here just so you know that it's not only epoll we're waiting for. From now on I will refer to the cross platform event queue as epoll in the code for brevity.

Calculate time until next timeout (if any)

The first thing we do is to check if we have any timers. If we have timers that will time out we calculate how many milliseconds it is to the first timer to timeout. We'll need this to make sure we don't block and forget about our timers.


#![allow(unused_variables)]
fn main() {
let next_timeout = self.get_next_timer();

let mut epoll_timeout_lock = self.epoll_timeout.lock().unwrap();
*epoll_timeout_lock = next_timeout;
// We release the lock before we wait in `recv`
drop(epoll_timeout_lock);
}

self.epoll_timeout is a Mutex so we need to lock it to be able to change the value it holds. Now, this is important, we need to make sure the lock is released before we poll. poll will suspend our thread, and it will try to read the value in self.epoll_timeout.

If we're still holding the lock we'll end up in a deadlock. drop(epoll_timeout_lock) releases the lock. We'll explain a bit more in detail how this works in the next chapter.

Wait for events


#![allow(unused_variables)]
fn main() {
if let Ok(event) = self.event_reciever.recv() {
    match event {
        PollEvent::Timeout => (),
        PollEvent::Threadpool((thread_id, callback_id, data)) => {
            self.process_threadpool_events(thread_id, callback_id, data);
        }
        PollEvent::Epoll(event_id) => {
            self.process_epoll_events(event_id);
        }
    }
}
self.run_callbacks();
}

Both our threadpool threads and our epoll thread holds a sending part of the channel self.event_reciever. If either a thread in the threadpool finishes a task, or if the epoll thread receives notification that an event is ready a PollEvent is sent through the channel and received here.

This will block our main thread until something happens, or a timeout occurs.

Note: Our epoll thread will read the timeout value we set in self.epoll_timeout, so if nothing happens before the timeout expires it will emit a PollEvent::Timeout event which simply causes our main event loop to continue and handle that timer.

Depending on whether it was a PollEvent::Timeout, PollEvent::Threadpool or a PollEvent::Epoll type of event that occurred, we handle the event accordingly.

We'll explain these methods in the following chapters.

5. Check


#![allow(unused_variables)]
fn main() {
// ===== CHECK =====
// an set immediate function could be added pretty easily but we won't that here
}

Node implements a check "hook" to the event loop next. Calls to setImmediate execute here. I just include it for completeness but we won't do anything in this phase.

6. Close Callbacks


#![allow(unused_variables)]
fn main() {
// ===== CLOSE CALLBACKS ======
// Release resources, we won't do that here, it's just another "hook" for our "extensions"
// to use. We release resources in every callback instead.
}

I pretty much explain this step in the comments. Typically releasing resources, like closing sockets, is done here.

Cleaning up

Since our run function basically will be the start and end of our Runtime we also need to clean up after ourselves. The following code makes sure all threads finish, release their resources and run all destructors:


#![allow(unused_variables)]
fn main() {
// We clean up our resources, makes sure all destructors run.
for thread in self.thread_pool.into_iter() {
    thread.sender.send(Task::close()).expect("threadpool cleanup");
    thread.handle.join().unwrap();
}

self.epoll_registrator.close_loop().unwrap();
self.epoll_thread.join().unwrap();

print("FINISHED");
}

First we loop through every thread in our threadpool and send a "close" Task to each of them. Then We call join on each JoinHandle. Calling join waits for the associated thread to finish so we know all destructors are run.

Next we call close_loop() on our epoll_registrator to signal the OS event queue that we want to close the loop and release our resources. We also join this thread so we don't end our process until all resources are released.

The final run function


#![allow(unused_variables)]
fn main() {
pub fn run(mut self, f: impl Fn()) {
    let rt_ptr: *mut Runtime = &mut self;
    unsafe { RUNTIME = rt_ptr };

    // just for us printing out during execution
    let mut ticks = 0;

    // First we run our "main" function
    f();

    // ===== EVENT LOOP =====
    while self.pending_events > 0 {
        ticks += 1;
        // NOT PART OF LOOP, JUST FOR US TO SEE WHAT TICK IS EXECUTING
        print(format!("===== TICK {} =====", ticks));

        // ===== 2. TIMERS =====
        self.process_expired_timers();

        // ===== 2. CALLBACKS =====
        // Timer callbacks and if for some reason we have postponed callbacks
        // to run on the next tick. Not possible in our implementation though.
        self.run_callbacks();

        // ===== 3. IDLE/PREPARE =====
        // we won't use this

        // ===== 4. POLL =====
        // First we need to check if we have any outstanding events at all
        // and if not we're finished. If not we will wait forever.
        if self.pending_events == 0 {
            break;
        }

        // We want to get the time to the next timeout (if any) and we
        // set the timeout of our epoll wait to the same as the timeout
        // for the next timer. If there is none, we set it to infinite (None)
        let next_timeout = self.get_next_timer();

        let mut epoll_timeout_lock = self.epoll_timeout.lock().unwrap();
        *epoll_timeout_lock = next_timeout;
        // We release the lock before we wait in `recv`
        drop(epoll_timeout_lock);

        // We handle one and one event but multiple events could be returned
        // on the same poll. We won't cover that here though but there are
        // several ways of handling this.
        if let Ok(event) = self.event_reciever.recv() {
            match event {
                PollEvent::Timeout => (),
                PollEvent::Threadpool((thread_id, callback_id, data)) => {
                    self.process_threadpool_events(thread_id, callback_id, data);
                }
                PollEvent::Epoll(event_id) => {
                    self.process_epoll_events(event_id);
                }
            }
        }
        self.run_callbacks();

        // ===== 5. CHECK =====
        // an set immediate function could be added pretty easily but we
        // won't do that here

        // ===== 6. CLOSE CALLBACKS ======
        // Release resources, we won't do that here, but this is typically
        // where sockets etc are closed.
    }

    // We clean up our resources, makes sure all destructors run.
    for thread in self.thread_pool.into_iter() {
        thread.sender.send(Task::close()).expect("threadpool cleanup");
        thread.handle.join().unwrap();
    }

    self.epoll_registrator.close_loop().unwrap();
    self.epoll_thread.join().unwrap();

    print("FINISHED");
}
}

Shortcuts

I'll mention some obvious shortcuts right here so you are aware of them. There are many "exceptions" that we don't cover in our example. We are focusing on the big picture just so we're on the same page. The process.nextTick function and the setImmediate function are two examples of this.

We don't cover the case where a server under heavy load might have too many callbacks to reasonably run in one poll which means that we could starve our I/O resources in the meantime waiting for them to finish, and probably several similar cases that a production runtime should care about.

As you'll probably notice, implementing a simple version is more than enough work for us to cover in this book, but hopefully you'll find yourself in pretty good shape to dig further once we're finished.

Setting up our runtime

The Threadpool

We still don't have a threadpool or a I/O event loop running so the next step is to set this up.

Let's take this step by step

Are you ready? Let's go!

The first thing we do is to add a new method that returns an instance of our Runtime:


#![allow(unused_variables)]
fn main() {
impl Runtime {
    pub fn new() -> Self {
}

First up is our thread pool. The first thing we do is to set up a channel which our threads can use to send messages to our main thread.

The channel will take a tuple (usize, usize, Js) which will be thread_id, callback_id and the data returned when we run our Task.

The Receiver part will be stored in our Runtime, and the Sender part will be cloned to each of our threads.

Node defaults to 4 threads which we will copy. This is configurable in Node but we will take a shortcut and hard code it:


#![allow(unused_variables)]
fn main() {
let (event_sender, event_receiver) = channel::<PollEvent>();
let mut threads = Vec::with_capacity(4);

for i in 0..4 {
    let (evt_sender, evt_receiver) = channel::<Task>();
    let event_sender = event_sender.clone();

    let handle = thread::Builder::new()
        .name(format!("pool{}", i))
        .spawn(move || {

            while let Ok(task) = evt_receiver.recv() {
                print(format!("received a task of type: {}", task.kind));

                if let ThreadPoolTaskKind::Close = task.kind {
                    break;
                };

                let res = (task.task)();
                print(format!("finished running a task of type: {}.", task.kind));

                let event = PollEvent::Threadpool((i, task.callback_id, res));
                event_sender.send(event).expect("threadpool");
            }
        })
        .expect("Couldn't initialize thread pool.");

    let node_thread = NodeThread {
        handle,
        sender: evt_sender,
    };

    threads.push(node_thread);
}

}

Next up is actually creating our threads. for i in 0..4 is an iterator over the values 0, 1, 2 and 3. Since we push each thread to a Vec these values will be treated as both the Id of the thread and the index it has in our Vec.

Next up we create a new channel which we will use to send messages to our threads. Each thread keeps their Receiver, and we'll store the Send part in the struct NodeThread which will represent a thread in our threadpool.


#![allow(unused_variables)]
fn main() {
let (evt_sender, evt_receiver) = channel::<Task>();
let event_sender = event_sender.clone();
}

As you see here, we also clone the Sender part which we'll pass on to each thread so they can send messages to our main thread.

After that's done we build our thread. We'll use thread::Builder::new() instead of use thread::spawn since we want to give each thread a name. We'll only use this name when we print from our event since it will be clear from which thread we printed the message.


#![allow(unused_variables)]
fn main() {
let handle = thread::Builder::new()
        .name(format!("pool{}", i))
        .spawn(move || {
}

You'll also see here that we spawn our thread finally and create a closure.

Why do we need the move keyword in this closure?

The reason is that this closure is spawned from the main thread, so any environment we close over needs to be owned, since it can't reference any values on the stack of the main thread. I'll leave you with a relevant quote from chapter about closures in TRPL

...they give a closure its own stack frame. Without move, a closure may be tied to the stack frame that created it, while a move closure is self-contained.

The body of our new threads are really simple, most of the lines are about printing out information for us to see:


#![allow(unused_variables)]
fn main() {
while let Ok(task) = evt_receiver.recv() {
        print(format!("received a task of type: {}", task.kind));

        if let ThreadPoolTaskKind::Close = task.kind {
            break;
        };

        let res = (task.task)();
        print(format!("finished running a task of type: {}.", task.kind));

        let event = PollEvent::Threadpool((i, task.callback_id, res));
        event_sender.send(event).expect("threadpool");
    }
})
}

The first thing we do is to listen on our Receive part of the channel (remember, we gave the Send part to our main thread). This function will actually park our thread until we receive a message so it consumes no resources while waiting.

When we get a task we first print out what kind of task we got.

The next thing we do is to check if this was a Close task, if thats true we break out of our loop which in turn will close the thread.

If it wasn't a Close task we run our task let res = (task.task)();. This is where the work will actually be done. We know from the signature of this task that it returns a Js object once it's finished.

The next thing we do is to print out that we finished running a task, before we send a PollEvent::Threadpool event with thread_id, the callback_id and the data returned as a Js object back to our main thread.

Back in our main thread again we'll finally we store the JoinHandle, and the Send part of the channel in our NodeThread struct and push it to our collection of threads (which now represents our threadpool).

The Epoll Thread

This will handle our Epoll/Kqueue/IOCP thread. This thread will only wait for incoming events reported by the OS, and once that's done it will send the Id of the event to our main thread which in turn will actually handle the event and call the callback.

The code here is a bit more involved, but we'll take it step by step below.

The code looks like this:


#![allow(unused_variables)]
fn main() {
let mut poll = minimio::Poll::new().expect("Error creating epoll queue");
let registrator = poll.registrator();
let epoll_timeout = Arc::new(Mutex::new(None));
let epoll_timeout_clone = epoll_timeout.clone();

let epoll_thread = thread::Builder::new()
    .name("epoll".to_string())
    .spawn(move || {
        let mut events = minimio::Events::with_capacity(1024);

        loop {
            let epoll_timeout_handle = epoll_timeout_clone.lock().unwrap();
            let timeout = *epoll_timeout_handle;
            drop(epoll_timeout_handle);

            match poll.poll(&mut events, timeout) {
                Ok(v) if v > 0 => {
                    for i in 0..v {
                        let event = events.get_mut(i).expect("No events in event list.");
                        print(format!("epoll event {} is ready", event.id()));

                        let event = PollEvent::Epoll(event.id());
                        event_sender.send(event).expect("epoll event");
                    }
                }
                Ok(v) if v == 0 => {
                    print("epoll event timeout is ready");
                    event_sender.send(PollEvent::Timeout).expect("epoll timeout");
                }
                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
                    print("received event of type: Close");
                    break;
                }
                Err(e) => panic!("{:?}", e),
                _ => unreachable!(),
            }
        }
    })
    .expect("Error creating epoll thread");
}

Lets start by initializing some variables:


#![allow(unused_variables)]
fn main() {
let mut poll = minimio::Poll::new().expect("Error creating epoll queue");
let registrator = poll.registrator();
let epoll_timeout = Arc::new(Mutex::new(None));
let epoll_timeout_clone = epoll_timeout.clone();
}

The first thing we do is to instantiate a new minimio::Poll. This is the main entry point into our kqueue/epoll/iocp event queue.

minimio::Poll does several things under the hood. It sets up a structure for us to store some information about the state of the event queue, and most importantly makes a syscall to the underlying OS and asks it for a handle to either an epoll instance, a kqueue or to an Completion Port. We won't register anything here yet, but we need this handle to later make sure we register interest with the queue we're polling.

Next up is also part of minimio we get a Registrator. This struct is "detached" from the Poll struct, but it holds a copy of the same handle to the event queue.

This way we can store the Registrator in our main thread and send off the Poll instance to our epoll thread. Our registrator can only register events to the queue and that's it.

How can Registrator know that the epoll thread hasn't stopped?

We'll cover this in detail in the next book, but both Poll and Registrator holds a reference to an AtomicBool which only job is to indicate if the queue is "alive" or not. In the Drop implemenation of Poll we set this flag to false in which case a call to register an event will return an Err.

epoll_timeout is the time to the next timeout. If there is no more timeouts the value is None. We wrap this in a Arc<Mutex<>>, since we'll be writing to this from the main thread, and reading from it in the epoll thread.

epoll_timeout_clone is just increasing the ref-count on our Arc so that we can send this to our epoll thread.

Next up is spawning our thread. We do this the exact same way as for the thread pool, but we name the thread epoll.


#![allow(unused_variables)]
fn main() {
let epoll_thread = thread::Builder::new()
    .name("epoll".to_string())
    .spawn(move || {
}

Now we're inside the epoll thread and will define what this thread needs to do to poll and handle events.

First we allocate a buffer to hold event objects that we get from our poll instance. These objects contain information about the event that's occurred including a token we pass in when we register the event. This token identifies what event has occurred. In our case the token is a simple usize.


#![allow(unused_variables)]
fn main() {
let mut events = minimio::Events::with_capacity(1024);
}

We allocate the buffer here since we only allocate this once when we do it here, and we want to avoid allocating a new buffer on every turn of our loop.

Basically, our epoll thread will run a loop which consciously polls for new events.


#![allow(unused_variables)]
fn main() {
loop {
}

The interesting logic is inside the loop, and first we read the timeout value which should be synced with the next timeout that expires in our main loop.


#![allow(unused_variables)]
fn main() {
let epoll_timeout_handle = epoll_timeout_clone.lock().unwrap();
let timeout = *epoll_timeout_handle;
drop(epoll_timeout_handle);
}

To do this we first need to lock the mutex so we know we have exclusive access to the timeout value. Now, epoll_timeout_handle is of the type Option<i32>, since i32 implements the Copy trait we can dereference it, which in this case will copy, the value and store it in our timeout variable.

drop(epoll_timeout_handle) is not something you'll see often. The MutexGuard we get in return when we call epoll_timeout_clone.lock().unwrap() is a RAII guard. Which means that it will hold a resource (in this case the lock on the mutex) until it's deallocated(released). In Rust, the release happens when the value is Dropped which normally is by the end of a scope ({...}).

We need to release the lock since the next call will block until an event occurs which means our lock wouldn't have been released and we would end up in a deadlock when trying to write a value to our epoll_timeout in our main thread.

The next part is a handful, but bear in mind that much of what we do here is printing out information for us to observe.

Calling poll will block the loop until an event occurs or the timeout has elapsed. poll takes in an exclusive reference to our event buffer, and an Option<i32> as a timeout. A value of None will block indefinitely.

When we say block here we mean that the OS parks our thread, and switches context to another thread. However, it keeps track over that our epoll thread listens to events and wakes it up again when any of the events we have registered interests to has happened.


#![allow(unused_variables)]
fn main() {
match poll.poll(&mut events, timeout) {
    Ok(v) if v > 0 => {
        for i in 0..v {
            let event = events.get_mut(i).expect("No events in event list.");
            print(format!("epoll event {} is ready", event.id()));

            let event = PollEvent::Epoll(event.id());
            event_sender.send(event).expect("epoll event");
        }
    }
    Ok(v) if v == 0 => {
        print("epoll event timeout is ready");
        event_sender.send(PollEvent::Timeout).expect("epoll timeout");
    }
    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
        print("received event of type: Close");
        break;
    }
    Err(e) => panic!("{:?}", e),
    _ => unreachable!(),
}
}

We match on the result of the poll so when the OS returns we choose what to do.

We basically have 4 cases we are concerned about:

  1. We get an Ok(n) where n is larger than 0, in this case we have events to process
  2. We get an Ok(n) where n is 0, we know this either is a spurious wakeup or that a timeout has occurred
  3. We get an Err of kind Interrupted, in which case we treat this as a close signal and we close the loop
  4. We get an Err which is not of type Interrupted, we know something bad has happened, and we panic!

If you haven't seen the syntax Ok(v) if v > 0 before it's what we call a match guard which lets us refine what we're matching against. In this case, we only match on values of v larger than 0.

For completeness I'll also explain Err(ref e) if e.kind(), the ref keyword tells the compiler that we want a reference to e and don't want to take ownership over it.

The last case _ => unreachable!() is just needed since the compiler doesn't realize that we're matching on all values of Ok() here. The value is of type Ok(usize) so it can't be negative, and we're telling the compiler here that we've got all cases covered.

Lastly we create a Runtime struct and store all the data we've intialized so far into it:


#![allow(unused_variables)]
fn main() {
  Runtime {
    available_threads: (0..4).collect(),
    callbacks_to_run: vec![],
    callback_queue: HashMap::new(),
    epoll_pending_events: 0,
    epoll_registrator: registrator,
    epoll_thread,
    epoll_timeout,
    event_receiver,
    identity_token: 0,
    pending_events: 0,
    thread_pool: threads,
    timers: BTreeMap::new(),
    timers_to_remove: vec![],
}
}

Worth noting is that we know all threads are available here so (0..4).collect() will just create a Vec<usize> with the values [0, 1, 2, 3].

In Rust, when we write...


#![allow(unused_variables)]
fn main() {
...
epoll_registrator: registrator,
epoll_thread,
...
}

...we're in assigning registrator to epoll_registrator which is a bit more descriptive name. Since we have a variable with the name epoll_thread already we don't need to write epoll_thread: epoll_thread since the compiler figures that out for us.

Now the final initialization code for our runtime breaks all "best practices" of how long methods you should have but for our case I find it easier to write about this if we don't need to jump between functions too much and can just cover all this logic from a-z:


#![allow(unused_variables)]
fn main() {
impl Runtime {
    pub fn new() -> Self {
        // ===== THE REGULAR THREADPOOL =====
        let (event_sender, event_receiver) = channel::<PollEvent>();
        let mut threads = Vec::with_capacity(4);

        for i in 0..4 {
            let (evt_sender, evt_receiver) = channel::<Task>();
            let event_sender = event_sender.clone();

            let handle = thread::Builder::new()
                .name(format!("pool{}", i))
                .spawn(move || {

                    while let Ok(task) = evt_receiver.recv() {
                        print(format!("received a task of type: {}", task.kind));

                        if let ThreadPoolTaskKind::Close = task.kind {
                            break;
                        };

                        let res = (task.task)();
                        print(format!("finished running a task of type: {}.", task.kind));

                        let event = PollEvent::Threadpool((i, task.callback_id, res));
                        event_sender.send(event).expect("threadpool");
                    }
                })
                .expect("Couldn't initialize thread pool.");

            let node_thread = NodeThread {
                handle,
                sender: evt_sender,
            };

            threads.push(node_thread);
        }

        // ===== EPOLL THREAD =====
        let mut poll = minimio::Poll::new().expect("Error creating epoll queue");
        let registrator = poll.registrator();
        let epoll_timeout = Arc::new(Mutex::new(None));
        let epoll_timeout_clone = epoll_timeout.clone();

        let epoll_thread = thread::Builder::new()
            .name("epoll".to_string())
            .spawn(move || {
                let mut events = minimio::Events::with_capacity(1024);

                loop {
                    let epoll_timeout_handle = epoll_timeout_clone.lock().unwrap();
                    let timeout = *epoll_timeout_handle;
                    drop(epoll_timeout_handle);

                    match poll.poll(&mut events, timeout) {
                        Ok(v) if v > 0 => {
                            for i in 0..v {
                                let event = events.get_mut(i).expect("No events in event list.");
                                print(format!("epoll event {} is ready", event.id()));

                                let event = PollEvent::Epoll(event.id());
                                event_sender.send(event).expect("epoll event");
                            }
                        }
                        Ok(v) if v == 0 => {
                            print("epoll event timeout is ready");
                            event_sender.send(PollEvent::Timeout).expect("epoll timeout");
                        }
                        Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
                            print("received event of type: Close");
                            break;
                        }
                        Err(e) => panic!("{:?}", e),
                        _ => unreachable!(),
                    }
                }
            })
            .expect("Error creating epoll thread");

        Runtime {
            available_threads: (0..4).collect(),
            callbacks_to_run: vec![],
            callback_queue: HashMap::new(),
            epoll_pending_events: 0,
            epoll_registrator: registrator,
            epoll_thread,
            epoll_timeout,
            event_receiver,
            identity_token: 0,
            pending_events: 0,
            thread_pool: threads,
            timers: BTreeMap::new(),
            timers_to_remove: vec![],
        }
    }
}
}

Timers

1. Check expired timers

The first step in the event loop is checking for expired timers, and we do this in the self.check_expired_timers() function


#![allow(unused_variables)]
fn main() {
    fn process_expired_timers(&mut self) {
        // Need an intermediate variable to please the borrowchecker
        let timers_to_remove = &mut self.timers_to_remove;

        self.timers
            .range(..=Instant::now())
            .for_each(|(k, _)| timers_to_remove.push(*k));

        while let Some(key) = self.timers_to_remove.pop() {
            let callback_id = self.timers.remove(&key).unwrap();
            self.callbacks_to_run.push((callback_id, Js::Undefined));
        }
    }

    fn get_next_timer(&self) -> Option<i32> {
        self.timers.iter().nth(0).map(|(&instant, _)| {
            let mut time_to_next_timeout = instant - Instant::now();
            if time_to_next_timeout < Duration::new(0, 0) {
                time_to_next_timeout = Duration::new(0, 0);
            }
            time_to_next_timeout.as_millis() as i32
        })
    }
}

The first thing to note here is that we check self.timers and to understand the rest of the syntax we'll have to look what kind of collection this is.

Now I chose a BTreeMap<Instant, usize> for this collection. The reason is that I want to have many Instant's chronologically. When I add a timer, I calculate at what instance it's supposed to be run and I add that to this collection.

BTrees are a very good data structure when you know that your keys will be ordered.

Choosing a BTreeMap here allows me to get a range range(..=Instant::now()) which is from the start of the map, up until or equal to the instant NOW.

Now I take every key in this range and add it to timers_to_remove, and the reason for this is that I found no good way to both get a range and remove the key's in one operation without allocating a small buffer every time. You can iterate over the range but due to the ownership rules you can't remove them at the same time, and we want to remove the timers, that we're done with.

The event loop will run repeatedly, so avoiding any allocations inside the loop is smart. There is no need to have this overhead.


#![allow(unused_variables)]
fn main() {
while let Some(key) = timers_to_remove.pop() {
    let callback_id = self.timers.remove(&key).unwrap();
    self.callbacks_to_run.push((callback_id, Js::Undefined));
}
}

The next step is to take every timer that has expired, remove the timer from our self.timers collection and get their callback_id.

As I explained in the previous chapter, this is an unique Id for this callback. What's important here is that we don't run the callback immediately. Node actually registers callbacks to be run on the next tick. An exception is the timers since they either have timed out or is a timer with a timeout of 0. In this case a timer will not wait for the next tick if it has timed out (or if it has a timeout of 0) but instead they will be invoked immediately as you'll see next.

Anyway, for now we add the callback id's to self.callbacks_to_run.

Before we continue, let's recap by looking what members of the Runtime struct we used here:


#![allow(unused_variables)]
fn main() {
pub struct Runtime {
    pending_events: usize,
    callbacks_to_run: Vec<(usize, Js)>,
    timers: BTreeMap<Instant, usize>,
}
}

Callbacks

2. Process callbacks

The next step is to handle any callbacks we've scheduled to run.


#![allow(unused_variables)]
fn main() {
fn run_callbacks(&mut self) {
    while let Some((callback_id, data)) = self.callbacks_to_run.pop() {
        let cb = self.callback_queue.remove(&callback_id).unwrap();
        cb(data);
        self.pending_events -= 1;
    }
}
}

Not all of Nodes callbacks are processed here. Some callbacks is called directly in the poll phase we'll introduce below. It's not difficult to implement but it adds unnecessary complexity to our example so we schedule all callbacks to be run in this step of the process. As long as you know this is an oversimplification you're going to be alright :)

Here we pop off all callbacks that are scheduled to run. As you see from our last update on the Runtime struct. callbacks_to_run is an array of callback_id and an argument type of Js.

So when we've got a callback_id we find the corresponding callback we have stored in self.callback_queue and remove the entry. What we get in return is a callback of type Box<dyn FnOnce(Js)>. We're going to explain this type more later but it's basically a closure stored on the heap that takes one argument of type Js.

cb(data) runs the code in this closure. After it's done it's time to decrease our counter of pending events: self.pending_events -= 1;.

Now, this step is important. As you might understand, any long running code in this callback is going to block our event loop, preventing it from progressing. So no new callbacks are handled and no new events are registered. This is why it's bad to write code that blocks the event loop.

Let's recap by looking at what members of the Runtime struct we used here:


#![allow(unused_variables)]
fn main() {
pub struct Runtime {
    callback_queue: HashMap<usize, Box<dyn FnOnce(Js)>>,
}
}

The Threadpool

The threadpool is where we'll process the CPU intensive tasks and the I/O tasks which can't be reasonably handled by epoll, kqueue or IOCP. One of these tasks are file system operations.

The reason for doing file I/O in the thread pool is complex, but the main takeaway is that due to how files are cached and how the hard drive works, most often the file I/O will be Ready almost immediately, so waiting for that in a event queue has very little effect in practice.

The second reason is that while Windows do have a completion based model, Linux and macOS doesn't. Reading a file into a buffer which your process controls can take some time, and if we do that in our main loop it will block a little bit which we really try to avoid.

By doing this in a thread pool we make sure that these operations won't block our main event loop and only notify us once the data is ready for us in memory.

The code we need to add to process events from the thread pool is short an simple:


#![allow(unused_variables)]
fn main() {
fn process_threadpool_events(&mut self, thread_id: usize, callback_id: usize, data: Js {
    self.callbacks_to_run.push((callback_id, data));
    self.available_threads.push(thread_id);
}
}

We take thread_id, callback_id and data as arguments. We get this through the channel we have shared with our threadpool threads.

Once we have that information we push our callbacks into the queue of callbacks_to_run which will run on the next call to run_callbacks we went through in the previous chapter.

Last we take the id of the thread that sent the finished task and put it into our pool of available threads.

I/O event queue

The I/O event queue is what handles most of our I/O tasks. Now we'll go through how we register events to that queue later on, but once an event is ready we it sends the event_id through our channel.


#![allow(unused_variables)]
fn main() {
fn process_epoll_events(&mut self, event_id: usize) {
    self.callbacks_to_run.push((event_id, Js::Undefined));
    self.epoll_pending_events -= 1;
}
}

As we'll see later on, the way we designed this, we actually made our event_id and callback_id the same value since both represents an unique value for this event. It simplifies things slightly for us.

We add the callback_id to the collection of callbacks to run. We pass in Js::Undefined since we'll not actually pass any data along here. You'll see why when we reach the `Http module chapter, but the main point is that the I/O queue doesn't return any data itself, it just tells us that data is ready to be read.

Lastly it's only for our own bookkeeping we decrement the count of outstanding epoll_pending_events so we keep track of how many events we have pending.

Why even keep track of how many epoll_events are pending? We don't use this value here, but I added it to make it easier to create some print statements showing the status of our runtime at different points. However, there are good reasons to keep track of these events even if we don't use them.

One area we're taking shortcuts on all the way here is security. If someone were to build a public facing server out of this, we need to account for slow networks and malicious users.

Since we use IOCP, which is a completion based model, we allocate memory for a buffer for each Read or Write event. When we lend this memory to the OS, we're in a weird situation. We own it, but we can't touch it. There are several ways in which we could register interest in more events than occur, and thereby allocating memory that is just held in the buffers. Now if someone wanted to crash our server, they could cause this intentionally.

A good practice is therefore to create a "high watermark" by keeping track of the number of pending events, and when we reach that watermark, we queue events instead of registering them with the OS.

By extension, this is also why you should always have a timeout on these events so that you at some point can reclaim the memory and return an timeout error if necessary.

Cleaning up

Lastly we clean up after ourselves by joining all our threads so we know that all destructors have ran without errors after we're finished.


#![allow(unused_variables)]
fn main() {
// We clean up our resources, make sure all destructors run.
for thread in self.thread_pool.into_iter() {
    thread.sender.send(Event::close()).expect("threadpool cleanup");
    thread.handle.join().unwrap();
}

self.epoll_registrator.close_loop().unwrap();
self.epoll_thread.join().unwrap();

print("FINISHED");
}

Before we join the thread on our thread pool we send a Close event which unparks the thread and exits the loop we're in.

How we clean up after ourselves in minimio will be covered in the book that covers this topic specifically.

If you want to read more about why it's a good practice to join all threads and clean up after us before we exit, I can recommend the article Join Your Threads written by @matklad.

Infrastructure

Now, for everything to work we need some helpers to make our infrastructure work.

First of all, we need a way to get the id of an available thread.


#![allow(unused_variables)]
fn main() {
   fn get_available_thread(&mut self) -> usize {
        match self.available_threads.pop() {
            Some(thread_id) => thread_id,
            // We would normally return None and not panic!
            None => panic!("Out of threads."),
        }
    }
}

As you see, we take one huge shortcut here. If we run out of threads, we panic!. This is not good, and we should rather implement logic to queue these requests and run them as soon as a thread is available. However, our code is already getting long, and it's not very important for our goal of learning about async.

Maybe this implementing such a queue is a good reader-exercise? Feel free to fork the repository and go ahead :)

The next thing we need to do is to create an unique identity for our callbacks.


#![allow(unused_variables)]
fn main() {
/// If we hit max we just wrap around
fn generate_identity(&mut self) -> usize {
    self.identity_token = self.identity_token.wrapping_add(1);
    self.identity_token
}

fn generate_cb_identity(&mut self) -> usize {
    let ident = self.generate_identity();
    let taken = self.callback_queue.contains_key(&ident);

    // if there is a collision or the identity is already there, we loop until we
    // find a new one. We don't cover the case where there are `usize::max_value()`
    // callbacks waiting, since if we're fast and queue a new event
    // every nanosecond, that would still take 585 years on a 64 bit system.
    if !taken {
        ident
    } else {
        loop {
            let possible_ident = self.generate_identity();
            if self.callback_queue.contains_key(&possible_ident) {
                break possible_ident;
            }
        }
    }
}
}

The function generate_cb_identity is where it all happens, genereate_identity is just a small function so we try to avoid the long functions we had in the introduction.

Now, there are some important considerations to be aware of. Even though we use several threads, we use a regular usize here and the reason for that is that it's only one thread that will be generating Id's. This could cause problems if several threads tried to read and generate new Id's at the same time.

We use the wrapping_add method on usize to get the next Id, this means that when we reach 18446744073709551615 we wrap around to 0 again.

We do check of our callback_queue contains our key (even though that is unlikely by design), and if it's taken we just generate a new one until we find a available one.

Next up is the method we use to add a callback to our callback_queue:


#![allow(unused_variables)]
fn main() {
/// Adds a callback to the queue and returns the key
fn add_callback(&mut self, ident: usize, cb: impl FnOnce(Js) + 'static) {
    let boxed_cb = Box::new(cb);
    self.callback_queue.insert(ident, boxed_cb);
}
}

If you haven't seen the signature cb: impl FnOnce(Js) + 'static before I'll explain it briefly here.

The impl ... means that we accept an arguments that implements the trait FnOnce(Js) with a 'static lifetime. FnOnce is a trait implemented by closures. There are three main traits a closure can implement in Rust and FnOnce is the one you'll use if you plan on consume an instance from the environment.

Since you consume the variable a closure implementing FnOnce can only be called once. Our closure will take ownership over resources we create in our main thread and consume it. We want this since once consumed, the resources we used will be cleaned up as a result of Rusts RAII pattern. It's implicit that FnOnce returns () in this case so we don't have to write FnOnce(Js) -> ().

Since callbacks are meant to only be called once, this is a perfectly fine bound for us to use here.

Now, traits doesn't have a size so for the compiler to be able to allocate space for it on the stack we either need to take a reference &FnOnce(Js) or place it on the heap using Box. We do the latter since that's the only thing that makes sense for our use case. Box is a pointer to a heap allocated variable which we do know the size of so we store that reference in our callback_queue HashMap.

What makes a closure? A function in rust can be defined as easily as || { }. If this is all we write it's the same as a function pointer, equivalent to just referencing my_method (without parenthesis). It becomes a closure as soon as you "close" over your environment by referencing variables that's not owned by the function.

Fn traits are automatically implemented, and whether it implements Fn, FnMut or FnOnce depend whether you take ownership over a non-copy variable, take a shared reference & or an exclusive reference &mut (often called a mutable reference).

Now that we got some closure basics out of the way we can move on. The next method is how we register I/O work. This is how we register an epoll event with our runtime:


#![allow(unused_variables)]
fn main() {
pub fn register_event_epoll(&mut self, token: usize, cb: impl FnOnce(Js) + 'static) {
    self.add_callback(token, cb);

    print(format!("Event with id: {} registered.", token));
    self.pending_events += 1;
    self.epoll_pending_events += 1;
}
}

The first thing we do is to add the callback to our callback_queue, calling the method we explained previously. Next we do a print statement, just since we want to print out the flow of our program we need to add this at strategic places.

One important thing to note here. Our token in this case is already guaranteed to be unique. We generate it in the Http module (which is the only one registering events by using this method in our example). The reason for this will become clear in a few short chapters. Just note that we don't need to call generate_cb_identity here.

We increase the counters on both pending_events and epoll_pending_events.

Our next method registers work for the thread pool


#![allow(unused_variables)]
fn main() {
pub fn register_event_threadpool(
    &mut self,
    task: impl Fn() -> Js + Send + 'static,
    kind: ThreadPoolTaskKind,
    cb: impl FnOnce(Js) + 'static,
) {
    let callback_id = self.generate_cb_identity();
    self.add_callback(callback_id, cb);

    let event = Task {
        task: Box::new(task),
        callback_id,
        kind,
    };

    // we are not going to implement a real scheduler here, just a LIFO queue
    let available = self.get_available_thread();
    self.thread_pool[available].sender.send(event).expect("register work");
    self.pending_events += 1;
}
}

Let's first have a look at the arguments to this function (aside from &mut self).

task: impl Fn() -> Js + Send + 'static is a task we want to run on a separate thread. This closure has the bond: Fn() -> Js + Send + 'static which means it's a closure that takes no arguments, but returns a type of Js. It needs to be Send since we're sending this task to another thread.

kind: ThreadPoolTaskKind lets us know what kind of task this. We do this for two reasons:

  1. We need to be able to signal a Close event to our threads
  2. We want to be able to print the kind of task each event received.

As you understand, we don't have to create a Kind for every task, but since we want to print out what the thread received we need some way of judging what kind of task each thread received.

The last argument cb: impl FnOnce(Js) + 'static is our callback. It's not a coincidence that our task returns a type of Js and our callback takes a Js as an argument. The result of the work we do in our thread is the input to our callback. This closure doesn't need to be Send since we don't pass the callback itself to the thread pool.

Next we generate a new identity with self.generate_cb_identity() and we add the callback to our callback queue.

Then we construct a new Event, and as I have shown earlier, we need to Box the closure.

Now, the last part could be made arbitrarily complex. This is where you decide how you want to schedule your work to the thread pool. In our case we just get an available thread (and panic! if we're out of thread - ouch), and we send our task to the thread which then runs it until it's finished.

You could make priorities based on TaskKind, you could try to decide which tasks are short and which are long and prioritize them based on load. A lot of exciting things could be done here. We will choose the simplest possible one though, and just push them directly to a thread in the order they come.

The last part of the "infrastructure" is a function to set a timeout.


#![allow(unused_variables)]
fn main() {
    fn set_timeout(&mut self, ms: u64, cb: impl Fn(Js) + 'static) {
        // Is it theoretically possible to get two equal instants? If so we'll have a bug...
        let now = Instant::now();
        let cb_id = self.generate_cb_identity();
        self.add_callback(cb_id, cb);
        let timeout = now + Duration::from_millis(ms);
        self.timers.insert(timeout, cb_id);
        self.pending_events += 1;
        print(format!("Registered timer event id: {}", cb_id));
    }
}

Set timeout uses std::time::Instant to get a representation of "now". It's the first thing we do since the user expects the timeout to be calculated from "now", and some of our operations here might take a little time.

We generate an identity for the callback cb passed in to set_timeout and add that callback to our callback queue.

We add the duration in milliseconds to our Instant so we know at what time our timeout times out.

We insert the callback_id instant to our BtreeMap with the calculated Instant as the key.

We increase the counter for pending_events and print out a message for us to be able to follow the flow of our program.

This might be a good time to talk briefly about our choice of a BTreeMap as the collection we store timers in.

From the documentation we can read "In theory, a binary search tree (BST) is the optimal choice for a sorted map, as a perfectly balanced BST performs the theoretical minimum amount of comparisons necessary to find an element (log2n)." Now, this isn't a Binary Tree but a BTree. While a BST allocates one node for each value, a BTree allocates a small Vec of values for each node. Modern computers reads much more data than we normally ask for into its caches, and thats one reason they love contiguous parts of memory. A BTree will result in a more optimal "cache efficiency" which often trumps the gains of the theoretically more optimal algorithm in a true BST.

Lastly, since we're talking about searching sorted collections here, and timeouts, is a perfect example of such, we'll of course use this when it's so readily available to us in Rusts standard library.

Modules

We're soon at the end now. Actually, our core runtime is finished, but we need a way to actually make it work with our "javascripty" code.

The modules here would be the equivalent of "C++" modules in Node, and in theory we could allow for people to make modules that register work to be either done in our threadpool or our epoll event queue.

The first one we'll cover is also the simplest, but it will also introduce us to why we stored a pointer to our Runtime as a global constant.


#![allow(unused_variables)]
fn main() {
pub fn set_timeout(ms: u64, cb: impl Fn(Js) + 'static) {
    let rt = unsafe { &mut *(RUNTIME as *mut Runtime) };
    rt.set_timeout(ms, cb);
}
}

To be able in our "javascript" to just call set_timeout(...) without injecting our runtime into our javascript function (which we could do if we wanted to write better Rust) we need to dereference a mutable pointer to our runtime.

Now, we know this is safe for two reasons.

  1. RUNTIME is always access from one thread
  2. We know that all calls to our modules will be in Runtime.run(...) at which point we know that RUNTIME will be a valid pointer

Now it's not pretty, and that's also why I explain why we do it, and why it's safe here. Normally, all unsafe code should contain a reason why it's used, and why it's safe in a comment.

Once we have referenced the pointer to RUNTIME we can call methods on it and in this case we simply call rt.set_timeout(ms, cb) which sets a timeout as you saw in the last chapter.

File module

The Fs module contains File operations. Right now we only expose a read method since that's all we need. As you can imagine we can add all sorts of methods here.


#![allow(unused_variables)]
fn main() {
struct Fs;
impl Fs {
    fn read(path: &'static str, cb: impl Fn(Js) + 'static) {
        let work = move || {
            // Let's simulate that there is a very large file we're reading allowing us to actually
            // observe how the code is executed
            thread::sleep(std::time::Duration::from_secs(1));
            let mut buffer = String::new();
            fs::File::open(&path)
                .unwrap()
                .read_to_string(&mut buffer)
                .unwrap();
            Js::String(buffer)
        };
        let rt = unsafe { &mut *RUNTIME };
        rt.register_event_threadpool(work, ThreadPoolTaskKind::FileRead, cb);
    }
}
}

We simply create an empty struct Fs;. This is one of Rusts Zero Sized Types (ZST) and does not occupy any space, but it's still useful for us.

The read method takes a file path and a callback as arguments. The callback will get called once the operation is finished and accepts a Js object as an input.

let work = move || {... is a closure. This closure is the actual Task that we want to run on our threadpool. None of the code in work will actually run here, we only define what we want to do when work() gets called.

In our closure we first wait for a second. These operations are so fast (and our file is so small) that if we want to observe what's going on in any meaningful way we need to slow things down. Let's pretend its a huge file that takes a second to read.

We read the file into a buffer and then return Js::String(buffer).

You might remember from the Infrastructure chapter that our register_work method received a task argument task: impl Fn() -> Js + Send + 'static. As you see here, our closure returns a Jsobject and takes no arguments, which means it conforms to this signature. The Fn trait will be automatically derived. Send is also an automatically derived trait, which means that we can't implement Send. However if we tried to send types that are !Send to our thread by referencing them in our closure we would get an error.

The last part is that we dereference our runtime and call rt.register_work(work, ThreadPoolTaskKind::FileRead, cb) to register the task with our threadpool.

Bonus material

You might be wondering why we (and libuv and Rusts own tokio) do file operations in the threadpool and not in our epoll-event-queue? It's I/O isn't it?

There are actually several reasons for this:

First and foremost. The OS will cache a lot of files that are frequently accessed, and when the data is cached it will be available immediately. There is no real I/O in such a case. In addition, it seems that most programs tend to access the same files over and over so a cache hit will often be the case. Think of a web server for example, there's often a very limited amount of data accessed on disk.

Now if we say that data is cached most of the time, so it's readily available, it can be more expensive to actually register an event with the epoll-event-queue - get an immediate notification that the event is Ready and then perform the read. It might actually be faster to just read it in a blocking manner right away.

However, in our design the file will be read on our main thread, which means that if it's a large file it will still take some time to read it from the OS cache to your process memory (your buffer) and that will block our entire event loop.

Better do that in the thread pool.

Secondly, the support for async file operations is limited and to a varying degree well implemented. The only system that does this pretty good is Windows since it uses a completion based model (which means it can let you know when the data is read into your buffer).

With the introduction of io_uring Linux has arguably made significant improvements in this regard, and now supports a completion based model as well. At the time of writing this book it's still early but the reports so far has been very promising. We might expect to see changes in the way we handle cross platform event loops in the future due to the fact that there is now two major systems supporting high performance completion based models.

It makes sense for a completion based model to try to do this asynchronously, but since the real effect are so small and the code complexity is high (especially when you're writing a server that is cross platform) most implementations find that using a thread pool gives good enough performance.

To sum it all up:

Threadpool:

  • Less code complexity
  • Good performance
  • Very little penalty in most use cases (like web servers)
  • Synchronous file operations are well optimized on most platforms

Async file I/O:

  • Increased code complexity
  • Poor and limited APIs (Linux and macOS has different limitations)
  • Weak platform support (does not work very well with a readiness based model)
  • Little to no real gain for most use cases

You want to know more you say? Of course, I have an article for you if you want to get to know even more about this specific topic.

Crypto module

As you'll soon understand, we won't actually cover cryptography here, but we'll simulate a CPU intensive operation that would block if not run on the threadpool.

Without further ado: The Glorious Crypto Module


#![allow(unused_variables)]
fn main() {
struct Crypto;
impl Crypto {
    fn encrypt(n: usize, cb: impl Fn(Js) + 'static + Clone) {
        let work = move || {
            fn fibonacchi(n: usize) -> usize {
                match n {
                    0 => 0,
                    1 => 1,
                    _ => fibonacchi(n - 1) + fibonacchi(n - 2),
                }
            }

            let fib = fibonacchi(n);
            Js::Int(fib)
        };

        let rt = unsafe { &mut *RUNTIME };
        rt.register_event_threadpool(work, ThreadPoolTaskKind::Encrypt, cb);
    }
}
}

Well, um, this is disappointing if you're into crypto. I'm sorry. The logic here is not very interesting, we take a number n as argument and calculate the n'th fibonacchi number recursively. A famous and inefficient way of putting your CPU to work.

The overall function will be familiar and look very similar to what we do in our Fs module since this module also sends work to the threadpool.

We create a closure (remember that none of the code in the closure will be run here, but it will get called in one of our worker threads).

Once that is finished we return the result as Js::Int(fib).

Lastly we dereference our RUNTIME and we call rt.register_work(work, ThreadPoolTaskKind::Encrypt, cb) to register our task with the threadpool.

Let's move on, we're very soon at the finish line.

Http module

The Http module is probably the one I personally find interesting. There is two reasons for this:

  1. We create a Http GET request from scratch and wait for the response
  2. The method epoll_registrator.register() is the result of a huge amount of research and work to get working. Look forward to the next book where we dive into that.

Let's look at the code first and then step through it:


#![allow(unused_variables)]
fn main() {
struct Http;
impl Http {
    pub fn http_get_slow(url: &str, delay_ms: u32, cb: impl Fn(Js) + 'static + Clone) {
        let rt: &mut Runtime = unsafe { &mut *RUNTIME };
        // Don't worry, http://slowwly.robertomurray.co.uk is a site for simulating a delayed
        // response from a server. Perfect for our use case.
        let adr = "slowwly.robertomurray.co.uk:80";
        let mut stream = minimio::TcpStream::connect(adr).unwrap();

        let request = format!(
            "GET /delay/{}/url/http://{} HTTP/1.1\r\n\
             Host: slowwly.robertomurray.co.uk\r\n\
             Connection: close\r\n\
             \r\n",
            delay_ms, url
        );

        stream
            .write_all(request.as_bytes())
            .expect("Error writing to stream");

        let token = rt.generate_cb_identity();
        rt.epoll_registrator
            .register(&mut stream, token, minimio::Interests::READABLE)
            .unwrap();

        let wrapped = move |_n| {
            let mut stream = stream;
            let mut buffer = String::new();
            stream
                .read_to_string(&mut buffer)
                .expect("Stream read error");
            cb(Js::String(buffer));
        };

        rt.register_event_epoll(token, wrapped);
    }
}
}

First we call the method http_get_slow since we're simulating a slow response, again this is for us to see and control how the events will occur since we're trying to learn.

In the function body, the first thing we do is dereference our runtime. We need to use a bit of its functionality here so we do this right away.

The next step is to create a minimio::TcpStream.


#![allow(unused_variables)]
fn main() {
let adr = "slowwly.robertomurray.co.uk:80";
let mut stream = minimio::TcpStream::connect(adr).unwrap();
}

Now why not the regular TcpStream from the standard library?

Well, you do remember that kqueue and epoll are readiness based and IOCP is completion based right?

Well, to have single ergonomic API for all platforms we need to abstract over something and we (like mio) choose to abstract over TcpStream.

While kqueue and epoll can just read when a Read event is ready, we need the TcpStream to create a buffer that it hands over to the OS on Windows. So when we call TcpStream read, we read from this buffer on Windows.

We connect to slowwly.robertomurray.co.uk:80 which is just a site Robert Murray has created to simulate slow responses. He's kind enough to let us all use it. We can choose the delay we want on each request.

Next we construct a http GET request. The line breaks, and spacing is important here as is the two blank lines at the bottom.


#![allow(unused_variables)]
fn main() {
let request = format!(
            "GET /delay/{}/url/http://{} HTTP/1.1\r\n\
             Host: slowwly.robertomurray.co.uk\r\n\
             Connection: close\r\n\
             \r\n",
            delay_ms, url
        );
}

You might wonder why we use \r\n\ as line breaks here instead of the standard \n?

This is because a http GET request expects CRLF and not just LF, using only \n will not result in a valid GET request.

We construct the request by passing in the delay we want and the address we want to be redirected to.

Next we write this request to our TcpStream. In a real implementation this would have been done by issuing the write in an async manner and then register an event when the write has happened.


#![allow(unused_variables)]
fn main() {
stream
    .write_all(request.as_bytes())
    .expect("Error writing to stream");
}

We write it blocking here since by implementing both read and write we'll have to create much more code, and the understanding of how this works will not really benefit that much (the understanding of how a web server works would though but thats beyond our scope today)

Now we get to the exciting part.

First we need to generate a token. This token will follow our event and be passed on to the OS. When the OS returns it also returns this token so we know what event occurred.


#![allow(unused_variables)]
fn main() {
let token = rt.generate_cb_identity();
}

For convenience we use the same token as our callback_id since it will be unique, and events <-> callbacks will map 1:1 the way we have designed this.

Our next call actually issues a syscall to the underlying OS and registers our interest in an event of type Readable.


#![allow(unused_variables)]
fn main() {
rt.epoll_registrator
            .register(&mut stream, token, minimio::Interests::readable())
            .unwrap();
}

The reason we need &mut stream here is Windows and IOCP. Our stream holds a buffer that we'll pass on to Windows. This buffer must not be touched while the OS lends it exclusively. This way we leverage Rusts borrowchecker to help us make sure of that.

However, this has a drawback. We don't really need it to be &mut on linux and macos so on these systems we might get a compiler warning letting us know it doesn't need to be &mut. There are ways around this though, but in the interest of actually finishing both books I had to stop somewhere and this is not the worlds end.

The main point here is that we register an interest to read in a non-blocking manner.

I'll repeat this part of the code so you have it right in front of you while i explain:


#![allow(unused_variables)]
fn main() {
    let wrapped = move |_n| {
            let mut stream = stream;
            let mut buffer = String::new();
            stream
                .read_to_string(&mut buffer)
                .expect("Stream read error");
            cb(Js::String(buffer));
        };
}

Since our callback expects a Js::String we can't actually pass the buffer. We need to wrap our callback so we first read out the data to a String which we then can pass to our code.

Lastly we register the I/O event with our runtime


#![allow(unused_variables)]
fn main() {
rt.register_event_epoll(token, wrapped);
}

Bonus section

Now in a better implementation, we would not issue a blocking call like read_to_string because it might be that at some point this will block if not all the data we need is present at once.

The right thing to do then would be to read parts of the data into a buffer while calling stream.read(..) in a loop, and at some point this might return an Err::WouldBlock at which we re-wrap our callback and re-register our read event to read the rest.

Another reason this might block is that threads might get what is called a "spurious" wakeup.

What is that? Well, the OS might wake up the thread, without there being any data there. The reason for this is kind of hard to get confirmed, but as far as I understand, this can happen if the OS is in doubt if an event occurred or not. It does apparently happen that some interrupts might get issued without the OS getting notified. This can be caused by unfortunate timing since there are some bits of code that needs to be executed "uninterrupted" and there is a way for the OS to instruct the CPU to filter some interrupts for short periods. The OS can't be "optimistic" in the sense that it assumes the event didn't happen since that would cause the process to wait indefinitely if the event did occur. So instead it might just wake up the thread.

Also, there are performance optimizations in operating systems that might cause them to choose to wake up waiting threads in a process for unknown reasons. Therefore, accounting for spurious wakeups is part of the "contract" between the programmer and the OS.

Either way, the OS assumes we have logic in place to account for this and just re-register our event if it was a spurious wakeup.

In our implementation, whether the data is not fully available or if it was a spurious wakeup, we'll end up blocking. It's fine, we're just trying to understand. We're not re-implementing libuv anyway.

The last part is registering this event with our epoll_thread.

Now, we're practically at the finish line. All the interesting parts are covered, we just need a few more small things to get it all up and running.

Putting the pieces together

The first step to get this to run is obvious. We need a main function. Fortunately it's a short one:

fn main() {
    let rt = Runtime::new();
    rt.run(javascript);
}

We instantiate our runtime, and we pass inn a pointer to our javascript function.

The next parts are just helper methods to let us print out interesting things about our Runtime while it's running.

For those new to Rust I'll spend the time to explain them anyway:


#![allow(unused_variables)]
fn main() {
fn current() -> String {
    thread::current().name().unwrap().to_string()
}
}

The current method prints out the name of the current thread it's called from.

Since we have several threads handling tasks for us, this will help us keep track of what goes on where.


#![allow(unused_variables)]
fn main() {
fn print(t: impl std::fmt::Display) {
    println!("Thread: {}\t {}", current(), t);
}
}

This function takes an argument that implements the Display trait. This trait defines how we want a type to displayed as text. We call our current function to get the name of the current thread and outputs it stdout.

The next function us a bit of an introduction to iterators. When we have content to print we use this one:


#![allow(unused_variables)]
fn main() {
fn print_content(t: impl std::fmt::Display, descr: &str) {
    println!(
        "\n===== THREAD {} START CONTENT - {} =====",
        current(),
        descr.to_uppercase()
    );

    let content = format!("{}", t);
    let lines = content.lines().take(2);
    let main_cont: String = lines.map(|l| format!("{}\n", l)).collect();
    let opt_location = content.find("Location");

    let opt_location = opt_location.map(|loc| {
        content[loc..]
        .lines()
        .nth(0)
        .map(|l| format!("{}\n",l))
        .unwrap_or(String::new())
    });

    println!(
        "{}{}... [Note: Abbreviated for display] ...",
        main_cont,
        opt_location.unwrap_or(String::new())
    );

    println!("===== END CONTENT =====\n");
}

}

Let's explain the iterators step by step:

First we have:


#![allow(unused_variables)]
fn main() {
let content = format!("{}", t);
let lines = content.lines().take(2);
let main_cont: String = lines.map(|l| format!("{}\n", l)).collect();
}

Content is just the content we want to print out. This is tailor made to print out the interesting parts of our http response, namely the first few lines and the location header so we know which call it was that got returned.

We only want to print out the two first lines so we get an iterator over those by calling content.lines().take(2).

The next step is to create a String out of these two lines.

lines.map(|l| format!("{}\n", l)).collect(); Takes every line and maps them as a new string which we append \n to to preserve the line breaks (if we don't do that the lines will come out as a single line). Then we collect that into a String.

We know they collect to a String since we annotated the type of main_cont in let main_cont: String.


#![allow(unused_variables)]
fn main() {
let opt_location = content.find("Location");
let opt_location = opt_location.map(|loc| {
        content[loc..]
        .lines()
        .nth(0)
        .map(|l| format!("{}\n",l))
        .unwrap_or(String::new())
    });
}

The next part finds the location of the "Location" header. find returns an Option, just keep that in mind.

The next step, we use map again, but this time we use map on an Option type. In this context, map means that if there is Some value, we want to work with that, which means that map(|loc| {... loc is an index of the where the Location header was found.

Now what we do is that we take a range of our content string starting from the index of where we found the Location header, and all the way to the end of the string.

From this range we access the iterator over its lines by calling lines(), we take the first line from this iterator nth(0). Now again, this returns an Option, so we use map again to define what we'll do in the case if it's Some.

This means that if the first line of content[loc..] is something we pass that into our closure map(|l| format!("{}\n",l)), which results in l as this line.

We simply map this line into a String which we have appended a line break to.

Lastly we unwrap the result of these operations. If you kept your thoungue right all the way through this should either be Some(the_first_line_starting_with_Location), or None. If it's None we just pass inn an empty String.

Lastly we output everything.

Congratulations

You made it! Well done my friend! Now, there are a few small chapters left but we're actually done going through all async basics and implementing our runtime. So relax, and give yourself a pat on the back. All the hard work is finished!

Final code

Remember, for this to run you need to create a cargo binary project, and add the following dependency to your Cargo.toml:

[dependencies]
minimio = {git = "https://github.com/cfsamson/examples-io-eventloop", branch = "master"}

Important
In the root of you project, add a file called test.txt, and add the text:

Hello world! This is a text to encrypt!

What you write in the file is not important. But how many characters is since we calculate the fibonacchi number based on how many characters there are in the file.

Add to much text and you'll wait for a long time. Of course, you can just change the logic yourself now to do something else.

In main.rs you can paste in this code:

/// Think of this function as the javascript program you have written
fn javascript() {
    print("First call to read test.txt");
    Fs::read("test.txt", |result| {
        let text = result.into_string().unwrap();
        let len = text.len();
        print(format!("First count: {} characters.", len));

        print(r#"I want to create a "magic" number based on the text."#);
        Crypto::encrypt(text.len(), |result| {
            let n = result.into_int().unwrap();
            print(format!(r#""Encrypted" number is: {}"#, n));
        })
    });

    print("Registering immediate timeout 1");
    set_timeout(0, |_res| {
        print("Immediate1 timed out");
    });
    print("Registering immediate timeout 2");
    set_timeout(0, |_res| {
        print("Immediate2 timed out");
    });

    // let's read the file again and display the text
    print("Second call to read test.txt");
    Fs::read("test.txt", |result| {
        let text = result.into_string().unwrap();
        let len = text.len();
        print(format!("Second count: {} characters.", len));

        // aaand one more time but not in parallel.
        print("Third call to read test.txt");
        Fs::read("test.txt", |result| {
            let text = result.into_string().unwrap();
            print_content(&text, "file read");
        });
    });

    print("Registering a 3000 and a 500 ms timeout");
    set_timeout(3000, |_res| {
        print("3000ms timer timed out");
        set_timeout(500, |_res| {
            print("500ms timer(nested) timed out");
        });
    });

    print("Registering a 1000 ms timeout");
    set_timeout(1000, |_res| {
        print("SETTIMEOUT");
    });

    // `http_get_slow` let's us define a latency we want to simulate
    print("Registering http get request to google.com");
    Http::http_get_slow("http//www.google.com", 2000, |result| {
        let result = result.into_string().unwrap();
        print_content(result.trim(), "web call");
    });
}

fn main() {
    let rt = Runtime::new();
    rt.run(javascript);
}

// ===== THIS IS OUR "NODE LIBRARY" =====
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::fs;
use std::io::{self, Read, Write};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};

use minimio;

static mut RUNTIME: *mut Runtime = std::ptr::null_mut();

struct Task {
    task: Box<dyn Fn() -> Js + Send + 'static>,
    callback_id: usize,
    kind: ThreadPoolTaskKind,
}

impl Task {
    fn close() -> Self {
        Task {
            task: Box::new(|| Js::Undefined),
            callback_id: 0,
            kind: ThreadPoolTaskKind::Close,
        }
    }
}

pub enum ThreadPoolTaskKind {
    FileRead,
    Encrypt,
    Close,
}

impl fmt::Display for ThreadPoolTaskKind {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        use ThreadPoolTaskKind::*;
        match self {
            FileRead => write!(f, "File read"),
            Encrypt => write!(f, "Encrypt"),
            Close => write!(f, "Close"),
        }
    }
}

#[derive(Debug)]
pub enum Js {
    Undefined,
    String(String),
    Int(usize),
}

impl Js {
    /// Convenience method since we know the types
    fn into_string(self) -> Option<String> {
        match self {
            Js::String(s) => Some(s),
            _ => None,
        }
    }

    /// Convenience method since we know the types
    fn into_int(self) -> Option<usize> {
        match self {
            Js::Int(n) => Some(n),
            _ => None,
        }
    }
}

/// NodeTheread represents a thread in our threadpool. Each event has a Joinhandle
/// and a transmitter part of a channel which is used to inform our main loop
/// about what events has occurred.
#[derive(Debug)]
struct NodeThread {
    pub(crate) handle: JoinHandle<()>,
    sender: Sender<Task>,
}

pub struct Runtime {
    /// Available threads for the threadpool
    available_threads: Vec<usize>,
    /// Callbacks scheduled to run
    callbacks_to_run: Vec<(usize, Js)>,
    /// All registered callbacks
    callback_queue: HashMap<usize, Box<dyn FnOnce(Js)>>,
    /// Number of pending epoll events, only used by us to print for this example
    epoll_pending_events: usize,
    /// Our event registrator which registers interest in events with the OS
    epoll_registrator: minimio::Registrator,
    // The handle to our epoll thread
    epoll_thread: thread::JoinHandle<()>,
    /// None = infinite, Some(n) = timeout in n ms, Some(0) = immediate
    epoll_timeout: Arc<Mutex<Option<i32>>>,
    /// Channel used by both our threadpool and our epoll thread to send events
    /// to the main loop
    event_reciever: Receiver<PollEvent>,
    /// Creates an unique identity for our callbacks
    identity_token: usize,
    /// The number of events pending. When this is zero, we're done
    pending_events: usize,
    /// Handles to our threads in the threadpool
    thread_pool: Vec<NodeThread>,
    /// Holds all our timers, and an Id for the callback to run once they expire
    timers: BTreeMap<Instant, usize>,
    /// A struct to temporarely hold timers to remove. We let Runtinme have
    /// ownership so we can reuse the same memory
    timers_to_remove: Vec<Instant>,
}

/// Describes the three main events our epoll-eventloop handles
enum PollEvent {
    /// An event from the `threadpool` with a tuple containing the `thread id`,
    /// the `callback_id` and the data which the we expect to process in our
    /// callback
    Threadpool((usize, usize, Js)),
    /// An event from the epoll-based eventloop holding the `event_id` for the
    /// event
    Epoll(usize),
    Timeout,
}

impl Runtime {
    pub fn new() -> Self {
        // ===== THE REGULAR THREADPOOL =====
        let (event_sender, event_reciever) = channel::<PollEvent>();
        let mut threads = Vec::with_capacity(4);

        for i in 0..4 {
            let (evt_sender, evt_reciever) = channel::<Task>();
            let event_sender = event_sender.clone();

            let handle = thread::Builder::new()
                .name(format!("pool{}", i))
                .spawn(move || {

                    while let Ok(task) = evt_reciever.recv() {
                        print(format!("received a task of type: {}", task.kind));

                        if let ThreadPoolTaskKind::Close = task.kind {
                            break;
                        };

                        let res = (task.task)();
                        print(format!("finished running a task of type: {}.", task.kind));

                        let event = PollEvent::Threadpool((i, task.callback_id, res));
                        event_sender.send(event).expect("threadpool");
                    }
                })
                .expect("Couldn't initialize thread pool.");

            let node_thread = NodeThread {
                handle,
                sender: evt_sender,
            };

            threads.push(node_thread);
        }

        // ===== EPOLL THREAD =====
        let mut poll = minimio::Poll::new().expect("Error creating epoll queue");
        let registrator = poll.registrator();
        let epoll_timeout = Arc::new(Mutex::new(None));
        let epoll_timeout_clone = epoll_timeout.clone();

        let epoll_thread = thread::Builder::new()
            .name("epoll".to_string())
            .spawn(move || {
                let mut events = minimio::Events::with_capacity(1024);

                loop {
                    let epoll_timeout_handle = epoll_timeout_clone.lock().unwrap();
                    let timeout = *epoll_timeout_handle;
                    drop(epoll_timeout_handle);

                    match poll.poll(&mut events, timeout) {
                        Ok(v) if v > 0 => {
                            for i in 0..v {
                                let event = events.get_mut(i).expect("No events in event list.");
                                print(format!("epoll event {} is ready", event.id()));

                                let event = PollEvent::Epoll(event.id());
                                event_sender.send(event).expect("epoll event");
                            }
                        }
                        Ok(v) if v == 0 => {
                            print("epoll event timeout is ready");
                            event_sender.send(PollEvent::Timeout).expect("epoll timeout");
                        }
                        Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
                            print("received event of type: Close");
                            break;
                        }
                        Err(e) => panic!("{:?}", e),
                        _ => unreachable!(),
                    }
                }
            })
            .expect("Error creating epoll thread");

        Runtime {
            available_threads: (0..4).collect(),
            callbacks_to_run: vec![],
            callback_queue: HashMap::new(),
            epoll_pending_events: 0,
            epoll_registrator: registrator,
            epoll_thread,
            epoll_timeout,
            event_reciever,
            identity_token: 0,
            pending_events: 0,
            thread_pool: threads,
            timers: BTreeMap::new(),
            timers_to_remove: vec![],
        }
    }

    /// This is the event loop. There are several things we could do here to
    /// make it a better implementation. One is to set a max backlog of callbacks
    /// to execute in a single tick, so we don't starve the threadpool or file
    /// handlers. Another is to dynamically decide if/and how long the thread
    /// could be allowed to be parked for example by looking at the backlog of
    /// events, and if there is any backlog disable it. Some of our Vec's will
    /// only grow, and not resize, so if we have a period of very high load, the
    /// memory will stay higher than we need until a restart. This could be
    /// dealt by using a different kind of data structure like a `LinkedList`.
    pub fn run(mut self, f: impl Fn()) {
        let rt_ptr: *mut Runtime = &mut self;
        unsafe { RUNTIME = rt_ptr };

        // just for us priting out during execution
        let mut ticks = 0;

        // First we run our "main" function
        f();

        // ===== EVENT LOOP =====
        while self.pending_events > 0 {
            ticks += 1;
            // NOT PART OF LOOP, JUST FOR US TO SEE WHAT TICK IS EXCECUTING
            print(format!("===== TICK {} =====", ticks));

            // ===== 1. TIMERS =====
            self.process_expired_timers();

            // ===== 2. CALLBACKS =====
            // Timer callbacks and if for some reason we have postponed callbacks
            // to run on the next tick. Not possible in our implementation though.
            self.run_callbacks();

            // ===== 3. IDLE/PREPARE =====
            // we won't use this

            // ===== 4. POLL =====
            // First we need to check if we have any outstanding events at all
            // and if not we're finished. If not we will wait forever.
            if self.pending_events == 0 {
                break;
            }

            // We want to get the time to the next timeout (if any) and we
            // set the timeout of our epoll wait to the same as the timeout
            // for the next timer. If there is none, we set it to infinite (None)
            let next_timeout = self.get_next_timer();

            let mut epoll_timeout_lock = self.epoll_timeout.lock().unwrap();
            *epoll_timeout_lock = next_timeout;
            // We release the lock before we wait in `recv`
            drop(epoll_timeout_lock);

            // We handle one and one event but multiple events could be returned
            // on the same poll. We won't cover that here though but there are
            // several ways of handling this.
            if let Ok(event) = self.event_reciever.recv() {
                match event {
                    PollEvent::Timeout => (),
                    PollEvent::Threadpool((thread_id, callback_id, data)) => {
                        self.process_threadpool_events(thread_id, callback_id, data);
                    }
                    PollEvent::Epoll(event_id) => {
                        self.process_epoll_events(event_id);
                    }
                }
            }
            self.run_callbacks();

            // ===== 5. CHECK =====
            // an set immediate function could be added pretty easily but we
            // won't do that here

            // ===== 6. CLOSE CALLBACKS ======
            // Release resources, we won't do that here, but this is typically
            // where sockets etc are closed.
        }

        // We clean up our resources, makes sure all destructors run.
        for thread in self.thread_pool.into_iter() {
            thread.sender.send(Task::close()).expect("threadpool cleanup");
            thread.handle.join().unwrap();
        }

        self.epoll_registrator.close_loop().unwrap();
        self.epoll_thread.join().unwrap();

        print("FINISHED");
    }

    fn process_expired_timers(&mut self) {
        // Need an intermediate variable to please the borrowchecker
        let timers_to_remove = &mut self.timers_to_remove;

        self.timers
            .range(..=Instant::now())
            .for_each(|(k, _)| timers_to_remove.push(*k));

        while let Some(key) = self.timers_to_remove.pop() {
            let callback_id = self.timers.remove(&key).unwrap();
            self.callbacks_to_run.push((callback_id, Js::Undefined));
        }
    }

    fn get_next_timer(&self) -> Option<i32> {
        self.timers.iter().nth(0).map(|(&instant, _)| {
            let mut time_to_next_timeout = instant - Instant::now();

            if time_to_next_timeout < Duration::new(0, 0) {
                time_to_next_timeout = Duration::new(0, 0);
            }

            time_to_next_timeout.as_millis() as i32
        })
    }

    fn run_callbacks(&mut self) {
        while let Some((callback_id, data)) = self.callbacks_to_run.pop() {
            let cb = self.callback_queue.remove(&callback_id).unwrap();
            cb(data);
            self.pending_events -= 1;
        }
    }

    fn process_epoll_events(&mut self, event_id: usize) {
        self.callbacks_to_run.push((event_id, Js::Undefined));
        self.epoll_pending_events -= 1;
    }

    fn process_threadpool_events(&mut self, thread_id: usize, callback_id: usize, data: Js) {
        self.callbacks_to_run.push((callback_id, data));
        self.available_threads.push(thread_id);
    }

    fn get_available_thread(&mut self) -> usize {
        match self.available_threads.pop() {
            Some(thread_id) => thread_id,
            // We would normally return None and the request and not panic!
            None => panic!("Out of threads."),
        }
    }

    /// If we hit max we just wrap around
    fn generate_identity(&mut self) -> usize {
        self.identity_token = self.identity_token.wrapping_add(1);
        self.identity_token
    }

    fn generate_cb_identity(&mut self) -> usize {
        let ident = self.generate_identity();
        let taken = self.callback_queue.contains_key(&ident);

        // if there is a collision or the identity is already there we loop until we find a new one
        // we don't cover the case where there are `usize::MAX` number of callbacks waiting since
        // that if we're fast and queue a new event every nanosecond that will still take 585.5 years
        // to do on a 64 bit system.
        if !taken {
            ident
        } else {
            loop {
                let possible_ident = self.generate_identity();
                if self.callback_queue.contains_key(&possible_ident) {
                    break possible_ident;
                }
            }
        }
    }

    /// Adds a callback to the queue and returns the key
    fn add_callback(&mut self, ident: usize, cb: impl FnOnce(Js) + 'static) {
        let boxed_cb = Box::new(cb);
        self.callback_queue.insert(ident, boxed_cb);
    }

    pub fn register_event_epoll(&mut self, token: usize, cb: impl FnOnce(Js) + 'static) {
        self.add_callback(token, cb);

        print(format!("Event with id: {} registered.", token));
        self.pending_events += 1;
        self.epoll_pending_events += 1;
    }

    pub fn register_event_threadpool(
        &mut self,
        task: impl Fn() -> Js + Send + 'static,
        kind: ThreadPoolTaskKind,
        cb: impl FnOnce(Js) + 'static,
    ) {
        let callback_id = self.generate_cb_identity();
        self.add_callback(callback_id, cb);

        let event = Task {
            task: Box::new(task),
            callback_id,
            kind,
        };

        // we are not going to implement a real scheduler here, just a LIFO queue
        let available = self.get_available_thread();
        self.thread_pool[available].sender.send(event).expect("register work");
        self.pending_events += 1;
    }

    fn set_timeout(&mut self, ms: u64, cb: impl Fn(Js) + 'static) {
        // Is it theoretically possible to get two equal instants? If so we'll have a bug...
        let now = Instant::now();

        let cb_id = self.generate_cb_identity();
        self.add_callback(cb_id, cb);

        let timeout = now + Duration::from_millis(ms);
        self.timers.insert(timeout, cb_id);

        self.pending_events += 1;
        print(format!("Registered timer event id: {}", cb_id));
    }
}

pub fn set_timeout(ms: u64, cb: impl Fn(Js) + 'static) {
    let rt = unsafe { &mut *(RUNTIME as *mut Runtime) };
    rt.set_timeout(ms, cb);
}

// ===== THIS IS PLUGINS CREATED IN C++ FOR THE NODE RUNTIME OR PART OF THE RUNTIME ITSELF =====
// The pointer dereferencing of our runtime is not striclty needed but is mostly for trying to
// emulate a bit of the same feeling as when you use modules in javascript. We could pass the runtime in
// as a reference to our startup function.

struct Crypto;
impl Crypto {
    fn encrypt(n: usize, cb: impl Fn(Js) + 'static + Clone) {
        let work = move || {
            fn fibonacchi(n: usize) -> usize {
                match n {
                    0 => 0,
                    1 => 1,
                    _ => fibonacchi(n - 1) + fibonacchi(n - 2),
                }
            }

            let fib = fibonacchi(n);
            Js::Int(fib)
        };

        let rt = unsafe { &mut *RUNTIME };
        rt.register_event_threadpool(work, ThreadPoolTaskKind::Encrypt, cb);
    }
}

struct Fs;
impl Fs {
    fn read(path: &'static str, cb: impl Fn(Js) + 'static) {
        let work = move || {
            // Let's simulate that there is a very large file we're reading allowing us to actually
            // observe how the code is executed
            thread::sleep(std::time::Duration::from_secs(1));
            let mut buffer = String::new();
            fs::File::open(&path)
                .unwrap()
                .read_to_string(&mut buffer)
                .unwrap();
            Js::String(buffer)
        };
        let rt = unsafe { &mut *RUNTIME };
        rt.register_event_threadpool(work, ThreadPoolTaskKind::FileRead, cb);
    }
}

struct Http;
impl Http {
    pub fn http_get_slow(url: &str, delay_ms: u32, cb: impl Fn(Js) + 'static + Clone) {
        let rt: &mut Runtime = unsafe { &mut *RUNTIME };

        // Don't worry, http://slowwly.robertomurray.co.uk is a site for simulating a delayed
        // response from a server. Perfect for our use case.
        let adr = "slowwly.robertomurray.co.uk:80";
        let mut stream = minimio::TcpStream::connect(adr).unwrap();
        let request = format!(
            "GET /delay/{}/url/http://{} HTTP/1.1\r\n\
             Host: slowwly.robertomurray.co.uk\r\n\
             Connection: close\r\n\
             \r\n",
            delay_ms, url
        );

        stream
            .write_all(request.as_bytes())
            .expect("Error writing to stream");

        let token = rt.generate_cb_identity();

        rt.epoll_registrator
            .register(&mut stream, token, minimio::Interests::READABLE)
            .unwrap();

        let wrapped = move |_n| {
            let mut stream = stream;
            let mut buffer = String::new();

            stream
                .read_to_string(&mut buffer)
                .expect("Stream read error");

            cb(Js::String(buffer));
        };

        rt.register_event_epoll(token, wrapped);
    }
}

fn print(t: impl std::fmt::Display) {
    println!("Thread: {}\t {}", current(), t);
}

fn print_content(t: impl std::fmt::Display, descr: &str) {
    println!(
        "\n===== THREAD {} START CONTENT - {} =====",
        current(),
        descr.to_uppercase()
    );

    let content = format!("{}", t);
    let lines = content.lines().take(2);
    let main_cont: String = lines.map(|l| format!("{}\n", l)).collect();
    let opt_location = content.find("Location");

    let opt_location = opt_location.map(|loc| {
        content[loc..]
        .lines()
        .nth(0)
        .map(|l| format!("{}\n",l))
        .unwrap_or(String::new())
    });

    println!(
        "{}{}... [Note: Abbreviated for display] ...",
        main_cont,
        opt_location.unwrap_or(String::new())
    );

    println!("===== END CONTENT =====\n");
}

fn current() -> String {
    thread::current().name().unwrap().to_string()
}

Shortcuts and improvements

There are several things we could do here to make this a better implementation which I want to point out here.

One is to set a max backlog of callbacks to execute in a single tick, so we don't starve the threadpool or file handlers. In other words, limit the amount of callbacks we execute in a single iteration.

Another is to dynamically decide if/and how long the thread could be allowed to be parked for example by looking at the backlog of events, and if there is any backlog disable any waiting at all.

When it comes to data structures, there is lots we could have done. If you look closely, you'll see that some of our Vec's will only grow, and not resize ever, so if we have a period of very high load,the memory will stay higher than it needs to be until a restart. This could be dealt with or a different data structure without this property could have been chosen (like a linked list).

Some of the shortcuts (like panicing when we have no available threads in the threadpool) is especially limiting. If you were to improve this example, that would be one of the first things to fix.

There is probably several other aspects that deserve to be mentioned here, but I'll leave it with this for now.

Conclusion

Congratulations!!

What a ride, huh? We're finished, take a break and get some fresh air. You've deserved it.

We did cover a lot in this book in a fairly condensed manner. I hope I kept my initial promise of what we were going to learn and that you also got something useful out of this.

Anyway, I want to thank you for reading all the way to the end, and I really do hope you enjoyed it.

Feel free to visit the repository of this book or the repository for our example code. I would love to hear from you if you think there is improvements I could make or if you just found it a good read.

Until next time!

Carl Fredrik Samson