Каналы в языке Rust представляют механизм, который позволяет взаимодействовать потокам друг с другом. В Rust функциональность каналов доступна через модуль std::sync::mpsc. К слову, "mpsc" означает "много производителей, один потребитель". Это означает, что несколько потоков могут отправлять данные одному получателю, что обеспечивает эффективную и синхронизированную связь в многопоточной среде. Рассмотрим простейший пример:
use std::thread;
use std::sync::mpsc;
fn main() {
let (sender, receiver) = mpsc::channel(); // создаем канал
let sender_clone = sender.clone(); // создаем копию отправителя
// запускаем новый поток
let worker = thread::spawn(move || {
sender_clone.send("Hello METANIT.COM").unwrap(); // посылаем в поток сообщение
});
// получаем из потока сообщение
let received_data = receiver.recv().unwrap();
// выводим его на консоль
println!("Received message: {}", received_data);
// ждем завершения вторичного потока
worker.join().unwrap();
}
Для создания канала применяется метод mpsc::channel(), который возворащает кортеж из двух элементов - получателя (первый элемент) и отправителя сообщения (второй элемент):
let (sender, receiver) = mpsc::channel(); // создаем канал
В данном случае создаваемый канал используется для связи между вторичным потокам worker и главным потоком - функцией main. И чтобы сохранить владение объектом отправителя в основном потоке, создаем копию отправителя:
let sender_clone = sender.clone(); // создаем копию отправителя
Затем создаем вторичный поток worker, который отправляет данные с помощью метода send(), используя копию отправителя:
let worker = thread::spawn(move || {
sender_clone.send("Hello METANIT.COM").unwrap(); // посылаем в поток сообщение
});
Для получения сообщения у объекта получателя вызываем метод recv():
let received_data = receiver.recv().unwrap();
Консольный вывод:
Received message: Hello METANIT.COM
Естественно мы можем передавать между потоками не только примитивные данные или строки, но и более сложные по своему характеру данные, например, объекты пользовательских структур:
use std::thread;
use std::sync::mpsc;
// структура для передачи между потоками
struct Message {
id: u32,
text: String
}
fn main() {
let (sender, receiver) = mpsc::channel(); // создаем канал
let sender_clone = sender.clone(); // создаем копию отправителя
// запускаем новый поток
let worker = thread::spawn(move || {
let message = Message { id: 22, text: "Hello METANIT.COM".to_string()};
sender_clone.send(message).unwrap(); // посылаем в поток сообщение
});
// получаем из потока сообщение
let received_message = receiver.recv().unwrap();
// выводим его на консоль
println!("Received message ID: {} Text: {}", received_message.id, received_message.text);
// ждем завершения вторичного потока
worker.join().unwrap();
}
При этом стоит учитывать, что при передаче объекта методу send() для отправки в поток в этот метод также переходит владение объектом. Соответственно после отправки
мы не сможем использовать в потоке отправителя ранее отправленный объект:
// запускаем новый поток отправителя
let worker = thread::spawn(move || {
let message = Message { id: 22, text: "Hello METANIT.COM".to_string()};
sender_clone.send(message).unwrap(); // посылаем в поток сообщение
// после этого мы не можем использовать объект message
// println!("{}", message.text); // после отправки message нельзя использовать
});
Аналогичным образом можно отправлять и получать наборы значений:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec![
String::from("Hello World"),
String::from("Hello METANIT.COM"),
String::from("Hello Rust"),
];
// в цикле отправляем элементы вектора
for mes in messages {
tx.send(mes).unwrap();
thread::sleep(Duration::from_secs(1)); // задержка для имитации работы
}
});
// в цикле получаем отправленные значения
for received in rx {
println!("{received}");
}
}
Здесь в потоке в цикле отправляются все элементы-сообщения из вектора. Для их получения мы работаем с получателем rx как с итератором и через цикле перебираем его, извлекая очередное сообщение.
Как ранее было сказано, "mpsc" расшифровывается как "multiple producer, single consumer", то есть множество производителей (отправителей) и один потребитель (получатель). Сымитируем несколько отправителей, которые отправляют данные одному получателю:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone(); // клонируем отправителя
// поток первого отправителя
thread::spawn(move || {
let messages = vec![
String::from("Hello World"),
String::from("Hello METANIT.COM"),
String::from("Hello Rust"),
];
// в цикле отправляем элементы вектора
for mes in messages {
tx.send(mes).unwrap();
thread::sleep(Duration::from_secs(1)); // задержка для имитации работы
}
});
// поток второго отправителя
thread::spawn(move || {
let messages = vec![
String::from("Good bye C++"),
String::from("Good bye Python"),
];
// в цикле отправляем элементы вектора
for mes in messages {
tx1.send(mes).unwrap();
thread::sleep(Duration::from_secs(1)); // задержка для имитации работы
}
});
// в цикле получаем отправленные значения
for received in rx {
println!("{received}");
}
}
Ключевой момент состоит в клонировании производителя в переменную tx1. Затем, как и в общем случае, создаем поток и отправляем в нем данные в канал. Консольный вывод программы:
Hello World Good bye C++ Hello METANIT.COM Good bye Python Hello Rust