Rust : Tokio

De Justine's wiki
Aller à la navigation Aller à la recherche

A propos de cette page

Il s'agit de mes notes issues du tuto d'utilisation de l'environnement Tokio : ici

Présentation

Tokio est un runtime asynchrone pour Rust, et fournit les outils pour créer des applications asynchrone utilisant le réseau. Les composants principaux sont:

  • Un runtime multi-process pour l'exécution de code asynchrone
  • Une version asynchrone de la librairie standard
  • Un grand ecosystème de librairies associées.

Son rôle est avant tout d'accélerer les applications dans le cas où elles sont dépendante d'IO réseau en grande quantité, et pas dépendantes du CPU. Il n'est pas non plus intéressant pour accéder à un grand nombre de fichiers simultanément car les OS n'ont généralement pas d'API asynchrone pour les filesystems.

Setup

Le but du tutoriel est de montrer comment implémenter un client et un serveur Redis, avec un petit ensemble de commandes Redis. Ce projet s'appelle Mini-Redis et est sur Github.

Avec une version récente de Rust, on commence par le serveur mini-redis, qui nous permettra de tester notre client.

cargo install mini-redis
//Lancer le serveur
mini-redis-server
//Depuis un autre terminal
mini-redis-cli get foo
//Doit renvoyer (nil)

Hello Tokio

On commence par créer une application très simple, qui va se connecter au serveur mini-redis et passer la clef "hello" à "world".

cargo new my-redis
cd my-redis
//Cargo.toml
tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"

Puis dans le main.rs

use mini_redis::{client, Result};

#[tokio::main]
async fn main() -> Result<()> {
    // Open a connection to the mini-redis address.
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Set the key "hello" with value "world"
    client.set("hello", "world".into()).await?;

    // Get key "hello"
    let result = client.get("hello").await?;

    println!("got value from the server; result={:?}", result);

    Ok(())
}

En faisant un cargo run avec le serveur mini-redis fonctionnel dans un autre terminal, on a bien le résultat attendu.

En détail :

let mut client = client::connect("127.0.0.1:6379").await?;

Fonction fournie par mini-redis qui donne un handle sur un client tcp. L'opération est asynchrone, mais le code ressemble à du code synchrone; on sait qu'il est asynchrone grace à "await".

Programmation asynchrone ?

La plupart du temps, les programmes exécutés dans l'ordre dans lequel ils sont écrits. Si une tâche prend du temps, le thread est bloqué le temps que ça termine, ce qui peut être le cas pour une connexion TCP via laquelle un échange de données a lieu.

Avec la programmation asynchrone, les opération qui ne peuvent pas se terminer immédiatement vont en arrière plan. Le thread n'est pas bloqué et peut faire d'autres choses en attendant. Quand la tâche en arrière plan se termine, elle n'est plus suspendue et peut continuer. La programmation asynchrone peut permettre d'avoir des applications plus rapides, mais aussi bien plus compliquées. Elles forcent à gérer l'état des différentes tâches du programme.

Compile-time green threading (j'ai pas envie de traduire)

Rust implémente l'asynchrone avec les mots async et await. Les fonctions qui font de l'asynchrone sont marquées avec async:

pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> //etc

Les fonctions en "async fn" sont traduites par Rust lors de la compilation vers des routines asynchrones. N'importe quel appel à .await dans la fonction asynchrone renvoie le contrôle des opérations au thread, afin qu'il puisse faire autre chose pendant que les opérations se terminent en fond.

Utiliser async / await

Les fonctions asynchrones sont appellées comme n'importe quelle autre fonction, mais ne renvoient pas une valeur représentant le résultat de leurs opérations. Elles renvoient une valeur qui représente l'opération. Il faut utiliser .await sur cette valeur afin d'obtenir le résultat. Exemple:

async fn say_world() {
    println!("world");
}

#[tokio::main]
async fn main() {
    // Calling `say_world()` does not execute the body of `say_world()`.
    let op = say_world();

    // This println! comes first
    println!("hello");

    // Calling `.await` on `op` starts executing `say_world`.
    op.await;
}

Renvoie

hello
world

La valeur de retour d'une "async fn" est un type anonyme qui implément le trait "Future".

Fonction main asynchrone

La fonction main utilisée ici est différente de ce qu'on trouve habituellement : elle est asynchrone et annotée avec "#[tokio::main]". Une fonction async est nécessaire car on veut entrer dans un environnement asynchrone. Cependant, elle doit être exécutée par un runtime, qui contient le task scheduler, les I/O, les timers, etc. C'est le rôle de la macro #[tokio::main].

Cette macro sert à transformer notre 'async fn main' en une 'fn main()' synchrone qui initialize une instance du runtime et lance la fonction main asynchrone.

Avec ça:

#[tokio::main]
async fn main() {
    println!("hello");
}

On a en réalité

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

Features de Tokio

Tokio a beaucoup de fonctionnalités : TCP, UDP, sockets unix, etc. Nous avons ici utilisé la feature "full" pour l'exemple mais on peut en enlever pour alléger la compilation.

Spawning

On bouge le code précendent en example.

mkdir -p examples
mv src/main.rs examples/hello-redis.rs

Sockets entrants

On va avoir besoin de sockets TCP entrants sur le port 6379. On le fait avec tokio::net::TcpListener.

Many of Tokio's types are named the same as their synchronous equivalent in the Rust standard library. When it makes sense, Tokio exposes the same APIs as std but using async fn.

On bind nos sockets sur le port 6379 et on les accepte dans une boucle. Chaque socket est process puis fermé.

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    // Bind the listener to the address
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        // The second item contains the IP and port of the new connection.
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // The `Connection` lets us read/write redis **frames** instead of
    // byte streams. The `Connection` type is defined by mini-redis.
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // Respond with an error
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

On peut ensuite le lancer en tant que serveur et lancer notre exemple "hello-redis" dans un second terminal, ce qui donne

Error: "unimplemented"

Et le serveur donne:

GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

Concurrence

Le problème de notre serveur, au delà du fait qu'il ne renvoie que des erreurs, et qu'il ne process les requêtes que une par une - il reste bloqué dans sa boucle d'acceptation jusque à ce que la réponse soit entièrement écrite sur le socket. Hors, on veut gérer beaucoup de connections simultanées, on a besoin de concurrence ! On a besoin de concurrence, mais pas forcément de parrallélisme. Concurrence = plusieurs tâches en même temps, parrallélisme = plusieurs threads. Tokio peut gérer un grand nombre de tâches sur un seul thread !

On va donc spawn une nouvelle tâche pour chaque nouvelle connection. Notre main ressemble désormais à ça :

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // A new task is spawned for each inbound socket. The socket is
        // moved to the new task and processed there.
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

Tâches

Un "green thread" est un thread qui est schedule par un runtime ou une machine virtuelle, afin de reproduire un environnement multi-threadé.

Une tâche Tokio est un "green thread" asynchrone. On la créée en passant un block async a tokio::spawn, qui renvoie un JoinHandle. Le bloc en question peut renvoyer une valeur, que l'on récupère en appellant .await sur le JoinHandle afin de récupérer un result. Par exemple :

 #[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Do some async work
        "return value"
    });

    // Do some other work

    let out = handle.await.unwrap();
    println!("GOT {}", out);
}

Les tâches sont donc managées par le scheduler; elles peuvent être lancées sur le même thread ou sur un autre. Elles sont très légères, ne demandant que 64 octets de mémoire. On peut sans problème en appeller des milliers.

Static

sur les statiques Cependant, le lifetime d'une tâche est 'static. Pour rappel, un lifetime 'static signifie que la durée de vie court jusqu'à la fin du programme; la variable est immutable; elle ne peut être créée qu'à la compilation. Attention cependant, un &'static T est différent d'un T: 'static. &'static T est une référence immutable que l'on peut garder un temps indéfini. Ce n'est possible que si T est immutable et ne bouge pas après la création de sa référence. Après ça, j'ai pas tout compris... C'est compliqué, bon sang !

Bref, le code suivant ne compile pas :

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

Le compilateur renvoie un "function requires argument type to outlive `'static`". En effet, par défaut, les variable ne sont pas "move" dans les blocs asynchrones. Ici v continue d'appartenir à la fonction main. On a donc le même problème qu'avec un thread : on peut utiliser "move" et perdre la possession de la variable, ou bien utiliser un Arc ou autre.


Send

Les tâches tokio doivent implémenter "Send". Cela permet à tokio de déplacer les tâches entre des threads pendant qu'elles sont suspendues à un .await. Tasks are Send when all data that is held across .await calls is Send. En gros, tout ce qui est utilisé après .await dans la tâche doit être Send pour être sauvegardé. Je crois ?

Ca fonctionne :

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
        yield_now().await;
    });
}

Pas ça

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        // `rc` is used after `.await`. It must be persisted to
        // the task's state.
        yield_now().await;

        println!("{}", rc);
    });
}