При работе с многопоточными программами на Rust крайне важно защитить общие данные от одновременного доступа, который может привести к гонке данных (data race) и непредсказуемому поведению. Для управления доступа к общим ресурсам язык Rust предоставляет такой инструмент как мьютекс (mutex - сокращение от "mutual exclusion" - "взаимное исключение"). Мьютекс представляет примитив синхронизации в Rust, который помогает защитить общие данные в нескольких потоках и который гарантирует, что только один поток может одновременно получить доступ к защищенным данным, предотвращая гонку данных.
Мьютекс реализован как смарт-указатель Mutex, который присутствует в стандартной библиотеке Rust в модуле std::sync. Для управления доступом Mutex предоставляет метод lock. Когда поток получает блокировку с помощью метода lock(), это гарантирует, что никакие другие потоки не смогут одновременно изменять или получать доступ к защищенным данным. Этот механизм синхронизации необходим для предотвращения гонок за данными и обеспечения упорядоченного выполнения параллельного кода.
Кроме того, мьютекс обеспечивает защиту от потенциальных утечек ресурсов, автоматически снимая блокировку, когда блокировка мьютекса (обычно представленная в виде переменной) выходит за пределы области действия. Этот механизм автоматического снятия упрощает управление блокировками и снижает вероятность случайного возникновения взаимоблокировок или других проблем синхронизации в программах Rust.
Наглядная схема работы Mutex:
Рассмотрим простейший пример создания и применения мьютекса.
use std::sync::Mutex;
fn main() {
let data = Mutex::new(22); // создаем мьютекс для доступа к общему ресурсу - числу 22
{
let mut guard = data.lock().unwrap(); // блокируем мьютекс, получаем доступ к ресурсу
*guard *= 2; // различные действия с ресурсом
} // после завершения области действия блокировки Mutex автоматически разблокируется
println!("Data: {:?}", data.lock().unwrap()); // Data: 44
}
Сначала мы создаем мьютекс:
let data = Mutex::new(22);
В функцию Mutex::new() передается собственно то значение, которое выступает в качестве общего разделяемого ресурса - в примере выше это просто число 22.
Во вложенном блоке кода с помощью метода lock() получаем блокировку в виде объекта Result, а для получения значения из Result далее по цепочки вызываем метод unwrap()
let mut guard = data.lock().unwrap();
То есть guard здесь - это объект блокировки, через который мы можем получить общий ресурс. И в частности, далее увеличиваем этот общий ресурс в два раза:
*guard *= 2;
Для обращения к значению применяется операция разыменовывания *
Когда завершается блок кода, в котором получена переменная guard, соотственно заканчивается область действия этой переменной, поэтому блокировка автоматически снимается (ведь она больше не нужна),
и нам не надо явным образом вызывать метод unlock() для разблокировки мьютекса
В конце получаем значение из мьютекса. Опять же для получания выполняем метод lock():
println!("Data: {:?}", data.lock().unwrap()); // Data: 44
Однако приведенный выше пример не очень демонстративный, поскольку мьютекс все таки задуман для работы в многопоточной среде. Чтобы обеспечить безопасное совместное использование данных между потоками, Mutex можно объединить со смарт-указателем Arc (атомарным счетчиком ссылок), что позволит нескольким потокам совместно владеть данными, защищенными Mutex. Эта комбинация обеспечивает как безопасность потоков, так и подсчет ссылок, гарантируя, что данные переживут потоки, которые их используют. Теперь возьмем пример с несколькими потоками. Допустим, нам надо, чтобы потоки приложения обращались к некоторому общему ресурсу. К примеру, общий ресурс представляет число, а потоки должны увеличивать это число на 1. Для разграничения доступа к общему ресурсу применим мьютексы:
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
// создаем разделяемый ресурс с помощью Arc и Mutex
let data = Arc::new(Mutex::new(0));
let mut threads = vec![]; // вектор потоков
for _ in 0..4 {
let data_clone = Arc::clone(&data); // копируем Arc
// создаем поток
let t = thread::spawn(move || {
let mut data_lock = data_clone.lock().unwrap(); // получаем эксклюзивный доступ к ресурсу для текущего потока
*data_lock += 1; // изменяем общие данные
});
threads.push(t); // добавляем поток к вектор потоков
}
// ожидаем завершения потоков
for t in threads {
t.join().unwrap();
}
println!("Data: {}", data.lock().unwrap()); // Data: 4
}
В начале определяем разделяемый общий ресурс, причем засовываем его в мьютекс, которым инициализируется объект Arc:
let data = Arc::new(Mutex::new(0));
По сути ресурс в данном случае - это число 0 (но в принципе это может быть любой ресурс).
Далее создаем 4 потока. Перед созданием потока сначала создаем копию ресурса Arc:
for _ in 0..4 {
let data_clone = Arc::clone(&data); // копируем Arc
При запуске потока передаем в него копию ресурса Arc:
let t = thread::spawn(move || {
let mut data_lock = data_clone.lock().unwrap(); // получаем эксклюзивный доступ к ресурсу для текущего потока
*data_lock += 1; // изменяем общие данные
});
В потоке блокируем доступ к ресурсу, применяя метод lock():
let mut data_lock = data.lock().unwrap();
После этого изменяем значение
*data_lock += 1;
С помощью операции разыменовывания (*data_lock) мы можем обратиться непосредственно к самому значению - к числу.
При завершении контекста, в котором блокируется мьютекс, - в примере выше лямбда-выражение, которое передается в thread::spawn(), мьютекс разблокируется, и его захватывает другой поток.
Блокировка и разблокировка мьютекса имеет решающее значение для обеспечения эксклюзивного доступа. Когда поток блокирует мьютекс, он получает эксклюзивный контроль над защищенными данными. Как только поток завершает свою работу и блокировка выходит за пределы области действия, мьютекс автоматически разблокируется, позволяя другим потокам получить к нему доступ.
В итоге каждый из 4 потоков захватит мьютекс, увеличит в нем значение на 1, и программа даст следующий консольный вывод:
Data: 4
Теперь посмотрим на чуть более детальный пример:
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn process_data(id:u32, data: Arc<Mutex<u32>>) {
// получаем эксклюзивный доступ к ресурсу для текущего потока
let mut data_lock = data.lock().unwrap();
for _ in 0..4 {
*data_lock = *data_lock + 1;
thread::sleep(Duration::from_millis(200));
println!("Thread[{}] num={}", id, data_lock);
}
// когда data_lock выходит из области действия, мьютекс автоматически освобождается
}
fn main(){
// создаем разделяемый ресурс с помощью Arc и Mutex
let data = Arc::new(Mutex::new(0));
// определяем вектор для хранения потоков
let mut threads = vec! [];
// создаем потоки
for id in 0..4 {
let data_clone = Arc::clone(&data); // копируем Arc
let t = thread::spawn(move || process_data(id, data_clone));
threads.push(t);
}
// ожидаем завершения потоков
for t in threads {
t.join().unwrap();
}
println!("{}", data.lock().unwrap());
}
В принципе тут похожий пример, только для ольшей ясности действия потока я вынес в отдельную функцию process_data, которая запускается в thread::spawn():
// создаем потоки
for id in 0..4 {
let data_clone = Arc::clone(&data); // копируем Arc
let t = thread::spawn(move || process_data(id, data_clone));
threads.push(t);
}
В функцию process_data передается номер потока и копию ресурса Arc. В начале функции блокируем доступ, применяя метод lock():
let mut data_lock = data.lock().unwrap();
После этого изменяем значение
for _ in 0..4 {
*data_lock = *data_lock + 1;
thread::sleep(Duration::from_millis(200));
println!("Thread[{}] num={}", id, data_lock);
}
В итоге мы получим упорядоченный вывод:
Thread[0] num=1 Thread[0] num=2 Thread[0] num=3 Thread[0] num=4 Thread[2] num=5 Thread[2] num=6 Thread[2] num=7 Thread[2] num=8 Thread[1] num=9 Thread[1] num=10 Thread[1] num=11 Thread[1] num=12 Thread[3] num=13 Thread[3] num=14 Thread[3] num=15 Thread[3] num=16 16
И еще более сложный пример:
use std::sync::{Mutex, Arc};
use std::thread;
use std::time::Duration;
use std::collections::HashMap;
struct Bank {
accounts: Mutex<HashMap<u32, f64>> // банковские счета
}
impl Bank {
// функция-конструктор
fn new() -> Self {
Bank {
accounts: Mutex::new(HashMap::new()),
}
}
// добавление денег на счет
fn deposit(&self, account_id: u32, amount: f64) {
let mut accounts = self.accounts.lock().unwrap();
let balance = accounts.entry(account_id).or_insert(0.0);
*balance += amount;
}
// снятие денег со счета
fn withdraw(&self, account_id: u32, amount: f64) {
let mut accounts = self.accounts.lock().unwrap();
let balance = accounts.entry(account_id).or_insert(0.0);
if *balance >= amount {
*balance -= amount;
}
else {
println!("Недостаточно денег на счете: {}", account_id);
}
}
// проверка баланса
fn check_balance(&self, account_id: u32) -> f64 {
let accounts = self.accounts.lock().unwrap();
*accounts.get(&account_id).unwrap_or(&0.0)
}
}
fn main() {
let bank = Arc::new(Bank::new()); // банк
let mut threads = vec![];
for id in 1..4 {
let bank_clone = Arc::clone(&bank);
// создаем поток клиента банка
let account_thread = thread::spawn(move || {
// чтобы данные для счетов различались, умножаем на номер счета
bank_clone.deposit(id, 30.0 * id as f64);
thread::sleep(Duration::from_millis(200)); // имитация долгой работы
bank_clone.withdraw(id, 10.0 * id as f64);
let balance = bank_clone.check_balance(id);
println!("Счет {} - Остаток на счете: {:.2}", id, balance);
});
threads.push(account_thread);
}
// ожидаем завершения потоков
for t in threads {
t.join().unwrap();
}
}
Здесь определена структура Bank, которая представляет банк и которая содержит объект Mutex<HashMap<u32, f64>>
для хранения банковских счетов. Каждый счет представлен записью в HashMap, где ключи - представляют числовые номера счетов, а значения типа f64 - количество денег на счете. Несколько потоков имитируют банковские транзакции, включая добавление на счет, снятие средств и проверки баланса.
Мьютекс гарантирует, что только один поток может одновременно получить доступ к банковским счетам, предотвращая гонки данных и обеспечивая согласованность данных. Консольный вывод программы:
Счет 3 - Остаток на счете: 60.00 Счет 2 - Остаток на счете: 40.00 Счет 1 - Остаток на счете: 20.00
Мьютексы нередко используются в многопоточных сценариях производитель-потребитель для синхронизации доступа к общему буферу. Пример демонстрации шаблона производитель-потребитель на основе Mutex:
use std::sync::{Arc, Mutex};
use std::thread;
const BUFFER_SIZE: usize = 4; // количество производимых элементов одним производителем
fn main() {
let buffer = Arc::new(Mutex::new(Vec::new())); // общие данные
let mut threads = vec![];
// Потоки производителей
for i in 1..3 {
// копируем Arc
let buffer_clone = Arc::clone(&buffer);
// запускаем поток
let producer = thread::spawn(move || {
for j in 0..BUFFER_SIZE {
let mut buffer = buffer_clone.lock().unwrap();
let value = i * BUFFER_SIZE + j;
println!("Producer{} produced {}", i, value);
buffer.push(value); // добавляем данные в вектор
}
});
threads.push(producer);
}
// потоки потребителей
for i in 1..3 {
let buffer_clone = Arc::clone(&buffer);
let consumer = thread::spawn(move || {
for _ in 0..BUFFER_SIZE {
let mut buffer = buffer_clone.lock().unwrap(); // получаем данные из буфера
if let Some(value) = buffer.pop() { // если в буфере есть данные
println!("Consumer{} consumed {}", i, value); // потребляем их
}
}
});
threads.push(consumer);
}
for t in threads { t.join().unwrap(); }
}
В этом примере несколько потоков-производителей добавляют элементы в общий буфер, а потоки-потребители удаляют и потребляют элементы из одного и того же буфера. Мьютекс обеспечивает монопольный доступ к буферу, предотвращая гонки данных. Консольный вывод:
Producer2 produced 8 Producer2 produced 9 Producer2 produced 10 Producer2 produced 11 Consumer1 consumed 11 Consumer1 consumed 10 Consumer1 consumed 9 Consumer1 consumed 8 Producer1 produced 4 Producer1 produced 5 Producer1 produced 6 Producer1 produced 7 Consumer2 consumed 7 Consumer2 consumed 6 Consumer2 consumed 5 Consumer2 consumed 4