Мьютексы

Последнее обновление: 17.05.2024

При работе с многопоточными программами на Rust крайне важно защитить общие данные от одновременного доступа, который может привести к гонке данных (data race) и непредсказуемому поведению. Для управления доступа к общим ресурсам язык Rust предоставляет такой инструмент как мьютекс (mutex - сокращение от "mutual exclusion" - "взаимное исключение"). Мьютекс представляет примитив синхронизации в Rust, который помогает защитить общие данные в нескольких потоках и который гарантирует, что только один поток может одновременно получить доступ к защищенным данным, предотвращая гонку данных.

Мьютекс реализован как смарт-указатель Mutex, который присутствует в стандартной библиотеке Rust в модуле std::sync. Для управления доступом Mutex предоставляет метод lock. Когда поток получает блокировку с помощью метода lock(), это гарантирует, что никакие другие потоки не смогут одновременно изменять или получать доступ к защищенным данным. Этот механизм синхронизации необходим для предотвращения гонок за данными и обеспечения упорядоченного выполнения параллельного кода.

Кроме того, мьютекс обеспечивает защиту от потенциальных утечек ресурсов, автоматически снимая блокировку, когда блокировка мьютекса (обычно представленная в виде переменной) выходит за пределы области действия. Этот механизм автоматического снятия упрощает управление блокировками и снижает вероятность случайного возникновения взаимоблокировок или других проблем синхронизации в программах Rust.

Наглядная схема работы Mutex:

указатель Mutex в Rust

Рассмотрим простейший пример создания и применения мьютекса.

use std::sync::Mutex;

fn main() {

    let data = Mutex::new(22); //  создаем мьютекс для доступа к общему ресурсу - числу 22

    {
        let mut guard = data.lock().unwrap(); // блокируем мьютекс, получаем доступ к ресурсу
        *guard *= 2; // различные действия с ресурсом
    } // после завершения области действия блокировки Mutex автоматически разблокируется

    println!("Data: {:?}", data.lock().unwrap()); // Data: 44
}

Сначала мы создаем мьютекс:

let data = Mutex::new(22);

В функцию Mutex::new() передается собственно то значение, которое выступает в качестве общего разделяемого ресурса - в примере выше это просто число 22.

Во вложенном блоке кода с помощью метода lock() получаем блокировку в виде объекта Result, а для получения значения из Result далее по цепочки вызываем метод unwrap()

let mut guard = data.lock().unwrap();

То есть guard здесь - это объект блокировки, через который мы можем получить общий ресурс. И в частности, далее увеличиваем этот общий ресурс в два раза:

*guard *= 2;

Для обращения к значению применяется операция разыменовывания *

Когда завершается блок кода, в котором получена переменная guard, соотственно заканчивается область действия этой переменной, поэтому блокировка автоматически снимается (ведь она больше не нужна), и нам не надо явным образом вызывать метод unlock() для разблокировки мьютекса

В конце получаем значение из мьютекса. Опять же для получания выполняем метод lock():

println!("Data: {:?}", data.lock().unwrap()); // Data: 44

Однако приведенный выше пример не очень демонстративный, поскольку мьютекс все таки задуман для работы в многопоточной среде. Чтобы обеспечить безопасное совместное использование данных между потоками, Mutex можно объединить со смарт-указателем Arc (атомарным счетчиком ссылок), что позволит нескольким потокам совместно владеть данными, защищенными Mutex. Эта комбинация обеспечивает как безопасность потоков, так и подсчет ссылок, гарантируя, что данные переживут потоки, которые их используют. Теперь возьмем пример с несколькими потоками. Допустим, нам надо, чтобы потоки приложения обращались к некоторому общему ресурсу. К примеру, общий ресурс представляет число, а потоки должны увеличивать это число на 1. Для разграничения доступа к общему ресурсу применим мьютексы:

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {

    // создаем разделяемый ресурс с помощью Arc и Mutex
    let data = Arc::new(Mutex::new(0)); 
    let mut threads = vec![]; // вектор потоков

    for _ in 0..4 {

        let data_clone = Arc::clone(&data);  // копируем Arc
        // создаем поток
        let t = thread::spawn(move || {

            let mut data_lock = data_clone.lock().unwrap(); //  получаем эксклюзивный доступ к ресурсу для текущего потока
            *data_lock += 1; // изменяем общие данные
        });
        threads.push(t); // добавляем поток к вектор потоков
    }

    // ожидаем завершения потоков
    for t in threads {

        t.join().unwrap();
    }
    println!("Data: {}", data.lock().unwrap()); // Data: 4
}

В начале определяем разделяемый общий ресурс, причем засовываем его в мьютекс, которым инициализируется объект Arc:

let data = Arc::new(Mutex::new(0));

По сути ресурс в данном случае - это число 0 (но в принципе это может быть любой ресурс).

Далее создаем 4 потока. Перед созданием потока сначала создаем копию ресурса Arc:

for _ in 0..4 {

    let data_clone = Arc::clone(&data);  // копируем Arc

При запуске потока передаем в него копию ресурса Arc:

let t = thread::spawn(move || {

    let mut data_lock = data_clone.lock().unwrap(); //  получаем эксклюзивный доступ к ресурсу для текущего потока
    *data_lock += 1; // изменяем общие данные
});

В потоке блокируем доступ к ресурсу, применяя метод lock():

let mut data_lock = data.lock().unwrap();

После этого изменяем значение

*data_lock += 1; 

С помощью операции разыменовывания (*data_lock) мы можем обратиться непосредственно к самому значению - к числу.

При завершении контекста, в котором блокируется мьютекс, - в примере выше лямбда-выражение, которое передается в thread::spawn(), мьютекс разблокируется, и его захватывает другой поток.

Блокировка и разблокировка мьютекса имеет решающее значение для обеспечения эксклюзивного доступа. Когда поток блокирует мьютекс, он получает эксклюзивный контроль над защищенными данными. Как только поток завершает свою работу и блокировка выходит за пределы области действия, мьютекс автоматически разблокируется, позволяя другим потокам получить к нему доступ.

В итоге каждый из 4 потоков захватит мьютекс, увеличит в нем значение на 1, и программа даст следующий консольный вывод:

Data: 4

Теперь посмотрим на чуть более детальный пример:

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn process_data(id:u32, data: Arc<Mutex<u32>>) {
    // получаем эксклюзивный доступ к ресурсу для текущего потока
    let mut data_lock = data.lock().unwrap();
    for _ in 0..4 {
            
        *data_lock = *data_lock  + 1;
        thread::sleep(Duration::from_millis(200));
        println!("Thread[{}] num={}", id, data_lock);
    }
    // когда data_lock выходит из области действия, мьютекс автоматически освобождается
}

fn main(){
    
    // создаем разделяемый ресурс с помощью Arc и Mutex
    let data = Arc::new(Mutex::new(0));

    // определяем вектор для хранения потоков
    let mut threads = vec! [];

    // создаем потоки
    for id in 0..4 {

        let data_clone = Arc::clone(&data); // копируем Arc
        let t = thread::spawn(move || process_data(id, data_clone));
        threads.push(t);
    }

    // ожидаем завершения потоков
    for t in threads {

        t.join().unwrap();
    }
    println!("{}", data.lock().unwrap());
}

В принципе тут похожий пример, только для ольшей ясности действия потока я вынес в отдельную функцию process_data, которая запускается в thread::spawn():

// создаем потоки
for id in 0..4 {

    let data_clone = Arc::clone(&data); // копируем Arc
    let t = thread::spawn(move || process_data(id, data_clone));
    threads.push(t);
}

В функцию process_data передается номер потока и копию ресурса Arc. В начале функции блокируем доступ, применяя метод lock():

let mut data_lock = data.lock().unwrap();

После этого изменяем значение

for _ in 0..4 {
            
    *data_lock = *data_lock  + 1;
    thread::sleep(Duration::from_millis(200));
    println!("Thread[{}] num={}", id, data_lock);
}

В итоге мы получим упорядоченный вывод:

Thread[0] num=1
Thread[0] num=2
Thread[0] num=3
Thread[0] num=4
Thread[2] num=5
Thread[2] num=6
Thread[2] num=7
Thread[2] num=8
Thread[1] num=9
Thread[1] num=10
Thread[1] num=11
Thread[1] num=12
Thread[3] num=13
Thread[3] num=14
Thread[3] num=15
Thread[3] num=16
16

И еще более сложный пример:

use std::sync::{Mutex, Arc};
use std::thread;
use std::time::Duration;
use std::collections::HashMap;

struct Bank {

    accounts: Mutex<HashMap<u32, f64>>  // банковские счета
}

impl Bank {
    // функция-конструктор
    fn new() -> Self {
        Bank {
            accounts: Mutex::new(HashMap::new()),
        }
    }
    // добавление денег на счет
    fn deposit(&self, account_id: u32, amount: f64) {

        let mut accounts = self.accounts.lock().unwrap();
        let balance = accounts.entry(account_id).or_insert(0.0);
        *balance += amount;
    }
    // снятие денег со счета
    fn withdraw(&self, account_id: u32, amount: f64) {

        let mut accounts = self.accounts.lock().unwrap();
        let balance = accounts.entry(account_id).or_insert(0.0);
        if *balance >= amount {
            *balance -= amount;
        } 
        else {

            println!("Недостаточно денег на счете: {}", account_id);
        }
    }
    // проверка баланса
    fn check_balance(&self, account_id: u32) -> f64 {

        let accounts = self.accounts.lock().unwrap();
        *accounts.get(&account_id).unwrap_or(&0.0)
    }
}

fn main() {

    let bank = Arc::new(Bank::new());   // банк
    let mut threads = vec![];

    for id in 1..4 {

        let bank_clone = Arc::clone(&bank);
        // создаем поток клиента банка
        let account_thread = thread::spawn(move || {

            // чтобы данные для счетов различались, умножаем на номер счета
            bank_clone.deposit(id, 30.0 * id as f64);
            thread::sleep(Duration::from_millis(200)); // имитация долгой работы
            bank_clone.withdraw(id, 10.0 * id as f64);
            let balance = bank_clone.check_balance(id);
            println!("Счет {} - Остаток на счете: {:.2}", id, balance);
        });

        threads.push(account_thread);
    }

    // ожидаем завершения потоков
    for t in threads {

        t.join().unwrap();
    }
}

Здесь определена структура Bank, которая представляет банк и которая содержит объект Mutex<HashMap<u32, f64>> для хранения банковских счетов. Каждый счет представлен записью в HashMap, где ключи - представляют числовые номера счетов, а значения типа f64 - количество денег на счете. Несколько потоков имитируют банковские транзакции, включая добавление на счет, снятие средств и проверки баланса. Мьютекс гарантирует, что только один поток может одновременно получить доступ к банковским счетам, предотвращая гонки данных и обеспечивая согласованность данных. Консольный вывод программы:

Счет 3 - Остаток на счете: 60.00
Счет 2 - Остаток на счете: 40.00
Счет 1 - Остаток на счете: 20.00

Задача производитель-потребитель

Мьютексы нередко используются в многопоточных сценариях производитель-потребитель для синхронизации доступа к общему буферу. Пример демонстрации шаблона производитель-потребитель на основе Mutex:

use std::sync::{Arc, Mutex};
use std::thread;

const BUFFER_SIZE: usize = 4; // количество производимых элементов одним производителем

fn main() {

    let buffer = Arc::new(Mutex::new(Vec::new())); // общие данные

    let mut threads = vec![];

    // Потоки производителей
    for i in 1..3 {

        // копируем Arc
        let buffer_clone = Arc::clone(&buffer);
        // запускаем поток
        let producer = thread::spawn(move || {

            for j in 0..BUFFER_SIZE {

                let mut buffer = buffer_clone.lock().unwrap();
                let value = i * BUFFER_SIZE + j;
                println!("Producer{} produced {}", i, value);
                buffer.push(value); // добавляем данные в вектор
            }
        });
        threads.push(producer);
    }

    // потоки потребителей
    for i in 1..3 {

        let buffer_clone = Arc::clone(&buffer);
        let consumer = thread::spawn(move || {

            for _ in 0..BUFFER_SIZE {

                let mut buffer = buffer_clone.lock().unwrap(); // получаем данные из буфера
                if let Some(value) = buffer.pop() { // если в буфере есть данные
                    println!("Consumer{} consumed {}", i, value); // потребляем их
                }
            }
        });
        threads.push(consumer);
    }

    for t in threads { t.join().unwrap(); }
}

В этом примере несколько потоков-производителей добавляют элементы в общий буфер, а потоки-потребители удаляют и потребляют элементы из одного и того же буфера. Мьютекс обеспечивает монопольный доступ к буферу, предотвращая гонки данных. Консольный вывод:

Producer2 produced 8
Producer2 produced 9
Producer2 produced 10
Producer2 produced 11
Consumer1 consumed 11
Consumer1 consumed 10
Consumer1 consumed 9
Consumer1 consumed 8
Producer1 produced 4
Producer1 produced 5
Producer1 produced 6
Producer1 produced 7
Consumer2 consumed 7
Consumer2 consumed 6
Consumer2 consumed 5
Consumer2 consumed 4
Помощь сайту
Юмани:
410011174743222
Номер карты:
4048415020898850