Межпотоковое взаимодействие через каналы

Последнее обновление: 05.06.2024

Каналы в языке Rust представляют механизм, который позволяет взаимодействовать потокам друг с другом. В Rust функциональность каналов доступна через модуль std::sync::mpsc. К слову, "mpsc" означает "много производителей, один потребитель". Это означает, что несколько потоков могут отправлять данные одному получателю, что обеспечивает эффективную и синхронизированную связь в многопоточной среде. Рассмотрим простейший пример:

use std::thread;
use std::sync::mpsc;

fn main() {

    let (sender, receiver) = mpsc::channel(); // создаем канал

    let sender_clone = sender.clone(); // создаем копию отправителя 
    // запускаем новый поток
    let worker = thread::spawn(move || {

        sender_clone.send("Hello METANIT.COM").unwrap(); // посылаем в поток сообщение
    });
    // получаем из потока сообщение
    let received_data = receiver.recv().unwrap();
    // выводим его на консоль
    println!("Received message: {}", received_data);

    // ждем завершения вторичного потока
    worker.join().unwrap();
}

Для создания канала применяется метод mpsc::channel(), который возворащает кортеж из двух элементов - получателя (первый элемент) и отправителя сообщения (второй элемент):

let (sender, receiver) = mpsc::channel(); // создаем канал

В данном случае создаваемый канал используется для связи между вторичным потокам worker и главным потоком - функцией main. И чтобы сохранить владение объектом отправителя в основном потоке, создаем копию отправителя:

let sender_clone = sender.clone(); // создаем копию отправителя 

Затем создаем вторичный поток worker, который отправляет данные с помощью метода send(), используя копию отправителя:

let worker = thread::spawn(move || {

    sender_clone.send("Hello METANIT.COM").unwrap(); // посылаем в поток сообщение
});

Для получения сообщения у объекта получателя вызываем метод recv():

let received_data = receiver.recv().unwrap();

Консольный вывод:

Received message: Hello METANIT.COM

Естественно мы можем передавать между потоками не только примитивные данные или строки, но и более сложные по своему характеру данные, например, объекты пользовательских структур:

use std::thread;
use std::sync::mpsc;

// структура для передачи между потоками
struct Message {

    id: u32,
    text: String
}

fn main() {

    let (sender, receiver) = mpsc::channel(); // создаем канал

    let sender_clone = sender.clone(); // создаем копию отправителя 
    // запускаем новый поток
    let worker = thread::spawn(move || {

        let message = Message {  id: 22, text: "Hello METANIT.COM".to_string()};
        sender_clone.send(message).unwrap(); // посылаем в поток сообщение
    });
    // получаем из потока сообщение
    let received_message = receiver.recv().unwrap();
    // выводим его на консоль
    println!("Received message ID: {}  Text: {}", received_message.id, received_message.text);

    // ждем завершения вторичного потока
    worker.join().unwrap();
}

При этом стоит учитывать, что при передаче объекта методу send() для отправки в поток в этот метод также переходит владение объектом. Соответственно после отправки мы не сможем использовать в потоке отправителя ранее отправленный объект:

// запускаем новый поток отправителя
let worker = thread::spawn(move || {

    let message = Message {  id: 22, text: "Hello METANIT.COM".to_string()};
    sender_clone.send(message).unwrap(); // посылаем в поток сообщение
    // после этого мы не можем использовать объект message
    // println!("{}", message.text);    // после отправки message нельзя использовать
});

Аналогичным образом можно отправлять и получать наборы значений:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let messages = vec![
            String::from("Hello World"),
            String::from("Hello METANIT.COM"),
            String::from("Hello Rust"),
        ];
        // в цикле отправляем элементы вектора
        for mes in messages {
            tx.send(mes).unwrap();
            thread::sleep(Duration::from_secs(1));  // задержка для имитации работы
        }
    });
    // в цикле получаем отправленные значения
    for received in rx {
        println!("{received}");
    }
}

Здесь в потоке в цикле отправляются все элементы-сообщения из вектора. Для их получения мы работаем с получателем rx как с итератором и через цикле перебираем его, извлекая очередное сообщение.

Множество отправителей

Как ранее было сказано, "mpsc" расшифровывается как "multiple producer, single consumer", то есть множество производителей (отправителей) и один потребитель (получатель). Сымитируем несколько отправителей, которые отправляют данные одному получателю:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = tx.clone();   // клонируем отправителя

    // поток первого отправителя
    thread::spawn(move || {
        let messages = vec![
            String::from("Hello World"),
            String::from("Hello METANIT.COM"),
            String::from("Hello Rust"),
        ];
        // в цикле отправляем элементы вектора
        for mes in messages {
            tx.send(mes).unwrap();
            thread::sleep(Duration::from_secs(1));  // задержка для имитации работы
        }
    });
    // поток второго отправителя
     thread::spawn(move || {
        let messages = vec![
            String::from("Good bye C++"),
            String::from("Good bye Python"),
        ];
        // в цикле отправляем элементы вектора
        for mes in messages {
            tx1.send(mes).unwrap();
            thread::sleep(Duration::from_secs(1));  // задержка для имитации работы
        }
    });
    // в цикле получаем отправленные значения
    for received in rx {
        println!("{received}");
    }
}

Ключевой момент состоит в клонировании производителя в переменную tx1. Затем, как и в общем случае, создаем поток и отправляем в нем данные в канал. Консольный вывод программы:

Hello World
Good bye C++
Hello METANIT.COM
Good bye Python
Hello Rust
Помощь сайту
Юмани:
410011174743222
Номер карты:
4048415020898850