Some Background Information

Before we go into the details about Futures in Rust, let's take a quick look at the alternatives for handling concurrent programming in general and some pros and cons for each of them.

While we do that we'll also explain some aspects when it comes to concurrency which will make it easier for us when we dive into Futures specifically.

For fun, I've added a small snippet of runnable code with most of the examples. If you're like me, things get way more interesting then and maybe you'll see some things you haven't seen before along the way.

Threads provided by the operating system

Now, one way of accomplishing concurrent programming is letting the OS take care of everything for us. We do this by simply spawning a new OS thread for each task we want to accomplish and write code like we normally would.

The runtime we use to handle concurrency for us is the operating system itself.

Advantages:

  • Simple
  • Easy to use
  • Switching between tasks is reasonably fast
  • You get parallelism for free

Drawbacks:

  • OS level threads come with a rather large stack. If you have many tasks waiting simultaneously (like you would in a web server under heavy load) you'll run out of memory pretty fast.
  • There are a lot of syscalls involved. This can be pretty costly when the number of tasks is high.
  • The OS has many things it needs to handle. It might not switch back to your thread as fast as you'd wish.
  • Might not be an option on some systems

Using OS threads in Rust looks like this:

use std::thread;

fn main() {
    println!("So we start the program here!");
    let t1 = thread::spawn(move || {
        thread::sleep(std::time::Duration::from_millis(200));
        println!("We create tasks which gets run when they're finished!");
    });

    let t2 = thread::spawn(move || {
        thread::sleep(std::time::Duration::from_millis(100));
        println!("We can even chain callbacks...");
        let t3 = thread::spawn(move || {
            thread::sleep(std::time::Duration::from_millis(50));
            println!("...like this!");
        });
        t3.join().unwrap();
    });
    println!("While our tasks are executing we can do other stuff here.");

    t1.join().unwrap();
    t2.join().unwrap();
}

OS threads sure have some pretty big advantages. So why all this talk about "async" and concurrency in the first place?

First, for computers to be efficient they need to multitask. Once you start to look under the covers (like how an operating system works) you'll see concurrency everywhere. It's very fundamental in everything we do.

Secondly, we have the web.

Web servers are all about I/O and handling small tasks (requests). When the number of small tasks is large it's not a good fit for OS threads as of today because of the memory they require and the overhead involved when creating new threads.

This gets even more problematic when the load is variable which means the current number of tasks a program has at any point in time is unpredictable. That's why you'll see so many async web frameworks and database drivers today.

However, for a huge number of problems, the standard OS threads will often be the right solution. So, just think twice about your problem before you reach for an async library.

Now, let's look at some other options for multitasking. They all have in common that they implement a way to do multitasking by having a "userland" runtime.

Green threads/stackful coroutines

In this book I'll use the term "green threads" to mean stackful coroutines to differentiate them from the other continuation mechanisms described in this chapter. You can, however, see the term "green threads" be used to describe a broader set of continuation mechanisms in different literature or discussions on the internet.

Green threads use the same mechanism as an OS - creating a thread for each task, setting up a stack, saving the CPU's state, and jumping from one task(thread) to another by doing a "context switch".

We yield control to the scheduler (which is a central part of the runtime in such a system) which then continues running a different task.

Rust had green threads once, but they were removed before it hit 1.0. The state of execution is stored in each stack so in such a solution there would be no need for async, await, Future or Pin. In many ways, green threads mimics how an operating system facilitates concurrency, and implementing them is a great learning experience.

The typical flow looks like this:

  1. Run some non-blocking code.
  2. Make a blocking call to some external resource.
  3. CPU "jumps" to the "main" thread which schedules a different thread to run and "jumps" to that stack.
  4. Run some non-blocking code on the new thread until a new blocking call or the task is finished.
  5. CPU "jumps" back to the "main" thread, schedules a new thread which is ready to make progress, and "jumps" to that thread.

These "jumps" are known as context switches. Your OS is doing it many times each second as you read this.

Advantages:

  1. Simple to use. The code will look like it does when using OS threads.
  2. A "context switch" is reasonably fast.
  3. Each stack only gets a little memory to start with so you can have hundreds of thousands of green threads running.
  4. It's easy to incorporate preemption which puts a lot of control in the hands of the runtime implementors.

Drawbacks:

  1. The stacks might need to grow. Solving this is not easy and will have a cost.
  2. You need to save the CPU state on every switch.
  3. It's not a zero cost abstraction (Rust had green threads early on and this was one of the reasons they were removed).
  4. Complicated to implement correctly if you want to support many different platforms.

A green threads example could look something like this:

The example presented below is an adapted example from an earlier gitbook I wrote about green threads called Green Threads Explained in 200 lines of Rust. If you want to know what's going on you'll find everything explained in detail in that book. The code below is wildly unsafe and it's just to show a real example. It's not in any way meant to showcase "best practice". Just so we're on the same page.

Press the expand icon in the top right corner to show the example code (you'll actually find a minimal implementation of green threads)

#![feature(naked_functions)]
 use std::arch::asm;

 const DEFAULT_STACK_SIZE: usize = 1024 * 1024 * 2;
 const MAX_THREADS: usize = 4;
 static mut RUNTIME: usize = 0;

 pub struct Runtime {
     threads: Vec<Thread>,
     current: usize,
 }

 #[derive(PartialEq, Eq, Debug)]
 enum State {
     Available,
     Running,
     Ready,
 }

 struct Thread {
     id: usize,
     stack: Vec<u8>,
     ctx: ThreadContext,
     state: State,
     task: Option<Box<dyn Fn()>>,
 }

 #[derive(Debug, Default)]
 #[repr(C)]
 struct ThreadContext {
     rsp: u64,
     r15: u64,
     r14: u64,
     r13: u64,
     r12: u64,
     rbx: u64,
     rbp: u64,
     thread_ptr: u64,
 }

 impl Thread {
     fn new(id: usize) -> Self {
         Thread {
             id,
             stack: vec![0_u8; DEFAULT_STACK_SIZE],
             ctx: ThreadContext::default(),
             state: State::Available,
             task: None,
         }
     }
 }

 impl Runtime {
     pub fn new() -> Self {
         let base_thread = Thread {
             id: 0,
             stack: vec![0_u8; DEFAULT_STACK_SIZE],
             ctx: ThreadContext::default(),
             state: State::Running,
             task: None,
         };

         let mut threads = vec![base_thread];
         threads[0].ctx.thread_ptr = &threads[0] as *const Thread as u64;
         let mut available_threads: Vec<Thread> = (1..MAX_THREADS).map(|i| Thread::new(i)).collect();
         threads.append(&mut available_threads);

         Runtime {
             threads,
             current: 0,
         }
     }

     pub fn init(&self) {
         unsafe {
             let r_ptr: *const Runtime = self;
             RUNTIME = r_ptr as usize;
         }
     }

     pub fn run(&mut self) -> ! {
         while self.t_yield() {}
         std::process::exit(0);
     }

     fn t_return(&mut self) {
         if self.current != 0 {
             self.threads[self.current].state = State::Available;
             self.t_yield();
         }
     }

     #[inline(never)]
     fn t_yield(&mut self) -> bool {
         let mut pos = self.current;
         while self.threads[pos].state != State::Ready {
             pos += 1;
             if pos == self.threads.len() {
                 pos = 0;
             }
             if pos == self.current {
                 return false;
             }
         }

         if self.threads[self.current].state != State::Available {
             self.threads[self.current].state = State::Ready;
         }

         self.threads[pos].state = State::Running;
         let old_pos = self.current;
         self.current = pos;

         unsafe {
            let old: *mut ThreadContext = &mut self.threads[old_pos].ctx;
            let new: *const ThreadContext = &self.threads[pos].ctx;
            asm!("call switch", in("rdi") old, in("rsi") new, clobber_abi("C"));
        }
        self.threads.len() > 0
     }

     pub fn spawn<F: Fn() + 'static>(f: F){
         unsafe {
             let rt_ptr = RUNTIME as *mut Runtime;
             let available = (*rt_ptr)
                 .threads
                 .iter_mut()
                 .find(|t| t.state == State::Available)
                 .expect("no available thread.");

             let size = available.stack.len();
             let s_ptr = available.stack.as_mut_ptr().offset(size as isize);
             let s_ptr = (s_ptr as usize & !15) as *mut u8;
             available.task = Some(Box::new(f));
             available.ctx.thread_ptr = available as *const Thread as u64;
             //ptr::write(s_ptr.offset((size - 8) as isize) as *mut u64, guard as u64);
             std::ptr::write(s_ptr.offset(-16) as *mut u64, guard as u64);
             std::ptr::write(s_ptr.offset(-24) as *mut u64, skip as u64);
             std::ptr::write(s_ptr.offset(-32) as *mut u64, call as u64);
             available.ctx.rsp = s_ptr.offset(-32) as u64;
             available.state = State::Ready;
         }
     }
 }

 fn call(thread: u64) {
     let thread = unsafe { &*(thread as *const Thread) };
     if let Some(f) = &thread.task {
         f();
     }
 }

 #[naked]
 unsafe extern "C" fn skip() {
     asm!("ret", options(noreturn))
 }

 fn guard() {
     unsafe {
         let rt_ptr = RUNTIME as *mut Runtime;
         let rt = &mut *rt_ptr;
         println!("THREAD {} FINISHED.", rt.threads[rt.current].id);
         rt.t_return();
     };
 }

 pub fn yield_thread() {
     unsafe {
         let rt_ptr = RUNTIME as *mut Runtime;
         (*rt_ptr).t_yield();
     };
 }
#[naked]
#[no_mangle]
unsafe extern "C" fn switch() {
    asm!(
        "mov 0x00[rdi], rsp",
        "mov 0x08[rdi], r15",
        "mov 0x10[rdi], r14",
        "mov 0x18[rdi], r13",
        "mov 0x20[rdi], r12",
        "mov 0x28[rdi], rbx",
        "mov 0x30[rdi], rbp",
        "mov rsp, 0x00[rsi]",
        "mov r15, 0x08[rsi]",
        "mov r14, 0x10[rsi]",
        "mov r13, 0x18[rsi]",
        "mov r12, 0x20[rsi]",
        "mov rbx, 0x28[rsi]",
        "mov rbp, 0x30[rsi]",
        "mov rdi, 0x38[rsi]",
        "ret", options(noreturn)
    );
}
#[cfg(not(windows))]
pub fn main() {
    let mut runtime = Runtime::new();
    runtime.init();
    Runtime::spawn(|| {
        println!("I haven't implemented a timer in this example.");
        yield_thread();
        println!("Finally, notice how the tasks are executed concurrently.");
    });
    Runtime::spawn(|| {
        println!("But we can still nest tasks...");
        Runtime::spawn(|| {
            println!("...like this!");
        })
    });
    runtime.run();
}
#[cfg(windows)]
fn main() { }

Still hanging in there? Good. Don't get frustrated if the code above is difficult to understand. If I hadn't written it myself I would probably feel the same. You can always go back and read the book which explains it later.

Callback based approaches

You probably already know what we're going to talk about in the next paragraphs from JavaScript which I assume most know.

If your exposure to JavaScript callbacks has given you any sorts of PTSD earlier in life, close your eyes now and scroll down for 2-3 seconds. You'll find a link there that takes you to safety.

The whole idea behind a callback based approach is to save a pointer to a set of instructions we want to run later together with whatever state is needed. In Rust this would be a closure. In the example below, we save this information in a HashMap but it's not the only option.

The basic idea of not involving threads as a primary way to achieve concurrency is the common denominator for the rest of the approaches. Including the one Rust uses today which we'll soon get to.

Advantages:

  • Easy to implement in most languages
  • No context switching
  • Relatively low memory overhead (in most cases)

Drawbacks:

  • Since each task must save the state it needs for later, the memory usage will grow linearly with the number of callbacks in a chain of computations.
  • Can be hard to reason about. Many people already know this as "callback hell".
  • It's a very different way of writing a program, and will require a substantial rewrite to go from a "normal" program flow to one that uses a "callback based" flow.
  • Sharing state between tasks is a hard problem in Rust using this approach due to its ownership model.

An extremely simplified example of a how a callback based approach could look like is:

fn program_main() {
    println!("So we start the program here!");
    set_timeout(200, || {
        println!("We create tasks with a callback that runs once the task finished!");
    });
    set_timeout(100, || {
        println!("We can even chain sub-tasks...");
        set_timeout(50, || {
            println!("...like this!");
        })
    });
    println!("While our tasks are executing we can do other stuff instead of waiting.");
}

fn main() {
    RT.with(|rt| rt.run(program_main));
}

use std::sync::mpsc::{channel, Receiver, Sender};
use std::{cell::RefCell, collections::HashMap, thread};

thread_local! {
    static RT: Runtime = Runtime::new();
}

struct Runtime {
    callbacks: RefCell<HashMap<usize, Box<dyn FnOnce() -> ()>>>,
    next_id: RefCell<usize>,
    evt_sender: Sender<usize>,
    evt_receiver: Receiver<usize>,
}

fn set_timeout(ms: u64, cb: impl FnOnce() + 'static) {
    RT.with(|rt| {
        let id = *rt.next_id.borrow();
        *rt.next_id.borrow_mut() += 1;
        rt.callbacks.borrow_mut().insert(id, Box::new(cb));
        let evt_sender = rt.evt_sender.clone();
        thread::spawn(move || {
            thread::sleep(std::time::Duration::from_millis(ms));
            evt_sender.send(id).unwrap();
        });
    });
}

impl Runtime {
    fn new() -> Self {
        let (evt_sender, evt_receiver) = channel();
        Runtime {
            callbacks: RefCell::new(HashMap::new()),
            next_id: RefCell::new(1),
            evt_sender,
            evt_receiver,
        }
    }

    fn run(&self, program: fn()) {
        program();
        for evt_id in &self.evt_receiver {
            let cb = self.callbacks.borrow_mut().remove(&evt_id).unwrap();
            cb();
            if self.callbacks.borrow().is_empty() {
                break;
            }
        }
    }
}

We're keeping this super simple, and you might wonder what's the difference between this approach and the one using OS threads and passing in the callbacks to the OS threads directly.

The difference is that the callbacks are run on the same thread using this example. The OS threads we create are basically just used as timers but could represent any kind of resource that we'll have to wait for.

From callbacks to promises

You might start to wonder by now, when are we going to talk about Futures?

Well, we're getting there. You see Promises, Futures, and other names for deferred computations are often used interchangeably.

There are formal differences between them, but we won't cover those here. It's worth explaining promises a bit since they're widely known due to their use in JavaScript. Promises also have a lot in common with Rust's Futures.

First of all, many languages have a concept of promises, but I'll use the one from JavaScript in the examples below.

Promises are one way to deal with the complexity which comes with a callback based approach.

Instead of:

setTimer(200, () => {
  setTimer(100, () => {
    setTimer(50, () => {
      console.log("I'm the last one");
    });
  });
});

We can do this:

function timer(ms) {
    return new Promise((resolve) => setTimeout(resolve, ms));
}

timer(200)
.then(() => timer(100))
.then(() => timer(50))
.then(() => console.log("I'm the last one"));

The change is even more substantial under the hood. You see, promises return a state machine which can be in one of three states: pending, fulfilled or rejected.

When we call timer(200) in the sample above, we get back a promise in the state pending.

Since promises are re-written as state machines, they also enable an even better syntax which allows us to write our last example like this:

async function run() {
    await timer(200);
    await timer(100);
    await timer(50);
    console.log("I'm the last one");
}

You can consider the run function as a pausable task consisting of several sub-tasks. On each "await" point it yields control to the scheduler (in this case it's the well-known JavaScript event loop).

Once one of the sub-tasks changes state to either fulfilled or rejected, the task is scheduled to continue to the next step.

Syntactically, Rust's Futures 0.1 was a lot like the promises example above, and Rust's Futures 0.3 is a lot like async/await in our last example.

Now this is also where the similarities between JavaScript promises and Rust's Futures stop. The reason we go through all this is to get an introduction and get into the right mindset for exploring Rust's Futures.

To avoid confusion later on: There's one difference you should know. JavaScript promises are eagerly evaluated. That means that once it's created, it starts running a task. Rust's Futures on the other hand are lazily evaluated. They need to be polled once before they do any work.


PANIC BUTTON (next chapter)