RwLock (Read-Write Lock или блокировка чтения-записи) применяется в многопоточном программировании на Rust, позволяя нескольким потокам одновременно читать данные, и только один поток при этом имеет монопольный доступ для записи. Данный инструмент полезен для сценариев с различными требованиями к чтению и записи.
Наглядная схема действия RwLock:
Для получения монопольной блокировки на запись у RwLock применяется метод write(), а для чтения значения - метод read()
Рассмотрим простейший пример:
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
fn write(data: Arc<RwLock<i32>>) {
// устанавливаем блокировку на запись
let mut data_lock = data.write().unwrap();
for pass in 1..4 {
*data_lock += 1; // изменяем значение
println!("Writer (Pass{}) - value: {}", pass, *data_lock);
thread::sleep(Duration::from_millis(400)); // имитация долгой работы
}
}
fn read(id: u32, data: Arc<RwLock<i32>>) {
// получаем значение для чтения
let value = data.read().unwrap();
for pass in 1..4 {
println!("Reader {} (Pass{}) - value: {}", id, pass, value);
thread::sleep(Duration::from_millis(200)); // имитация долгой работы
}
}
fn main() {
let value = Arc::new(RwLock::new(0)); // определяем объект RwLock
let mut threads = vec![];
let value_copy = Arc::clone(&value);
// создаем поток писателя
let writer = thread::spawn(move || write(value_copy));
threads.push(writer);
for id in 1..4 {
let value_copy = Arc::clone(&value);
// создаем поток читателя
let reader = thread::spawn(move || read(id, value_copy));
threads.push(reader);
}
// ожидаем завершения потоков
for t in threads {
t.join().unwrap();
}
}
Здесь в функции main сначала создаем собственно объект RwLock, который обернут в Arc:
let value = Arc::new(RwLock::new(0)); // определяем объект RwLock
Для создания объекта RwLock применяется метод-конструктор RwLock::new(), в который передается собственно то значение, доступом к которому будет управлять RwLock. В нашем случае это число 0.
Вектор threads будет содержать все запускаемые потоки:
let mut threads = vec![];
Далее создаем поток условного "писателя". Он будет выполнять функцию write(), в которую передается копия объекта Arc.
let value_copy = Arc::clone(&value); // создаем поток писателя let writer = thread::spawn(move || write(value_copy));
В этой функции write сначала с помощью метода write() устанавливаем блокировку на запись и в цикле четыре раза последовательно изменяем значение, прибавляя к нему 1:
fn write(data: Arc<RwLock<i32>>) {
let mut data_lock = data.write().unwrap();
for pass in 1..4 {
*data_lock += 1;
println!("Writer (Pass{}) - value: {}", pass, *data_lock);
thread::sleep(Duration::from_millis(400)); // имитация долгой работы
}
}
Для имитации долгой работы устанавливается задержка в 400 миллисекунд. И для наглядности выводим текущее значение на консоль. При завершении области действия блокировки - в данном случае при завершении функции блокировка на запись автоматически удаляется.
Далее в функции main создаем три потока на чтение:
for id in 1..4 {
let value_copy = Arc::clone(&value);
// создаем поток читателя
let reader = thread::spawn(move || read(id, value_copy));
threads.push(reader);
}
При запуске потока читателя запускается функция read, в которую передаем номер читателя и копию объекта Arc. В функции read() получаем значение из RwLock и выводим его на консоль:
fn read(id: u32, data: Arc<RwLock<i32>>) {
// получаем значение для чтения
let value = data.read().unwrap();
for pass in 1..4 {
println!("Reader {} (Pass{}) - value: {}", id, pass, value);
thread::sleep(Duration::from_millis(200)); // имитация долгой работы
}
}
Опять же для наглядности устанавливаем небольшую задержку в 200 миллисекунд (меньше чем у писателя). Стоит отметить, что при чтении никакой блокировки не происходит.
Запустим приложение и проинспектируем результат:
Writer (Pass1) - value: 1 Writer (Pass2) - value: 2 Writer (Pass3) - value: 3 Reader 2 (Pass1) - value: 3 Reader 1 (Pass1) - value: 3 Reader 3 (Pass1) - value: 3 Reader 2 (Pass2) - value: 3 Reader 1 (Pass2) - value: 3 Reader 3 (Pass2) - value: 3 Reader 2 (Pass3) - value: 3 Reader 1 (Pass3) - value: 3 Reader 3 (Pass3) - value: 3
Сразу можно отметить, что конкретный консольный вывод недетерминарован. Но отметим некоторые моменты. В первую очередь, когда один поток получает монопольное право на запись, другие потоки ни читать, ни записывать данные в RwLock НЕ могут. В частности, по моему выводу видно, что пока поток писателя изменял значение, никто из потоков-читателей это значение не считывал.
Другой момент - при чтении блокировки не происходит, и по результату программы мы видим, что все три потока после выполнения метода data.read().unwrap() не блокируют друг друга.
Другой пример:
use std::thread;
use std::sync::{RwLock, Arc};
fn main() {
let data = Arc::new(RwLock::new(0));
let mut threads = vec![];
for thread_id in 1..4 {
let data = Arc::clone(&data);
let my_thread = thread::spawn(move || {
for pass in 1..4{
let mut write_guard = data.write().unwrap(); // получаем блокировку на запись
*write_guard += 1; // записываем - изменяем значение
println!("Thread{} writes {} (pass {})", thread_id, *write_guard, pass);
// сбрасываем блокировку на запись, чтобы другие потоки могли ее получить
drop(write_guard);
thread::sleep(std::time::Duration::from_millis(200)); // небольшая задержка
let read_guard = data.read().unwrap(); // получаем блокировку на чтение
println!("Thread{} reads {} (pass {})", thread_id, *read_guard, pass); // считываем данные
}
});
threads.push(my_thread);
}
for t in threads { t.join().unwrap(); }
println!("Final Result: {:?}", *data.read().unwrap());
}
Здесь запускаем 3 потока. Причем чтение и запись данных совмещены в одном потоке. Каждый поток проходит 3 цикла чтения-записи. Вначале поток получает блокировку на запись и изменяет данные:
for pass in 1..4{
let mut write_guard = data.write().unwrap(); // получаем блокировку на запись
*write_guard += 1; // записываем - изменяем значение
println!("Thread{} writes {} (pass {})", thread_id, *write_guard, pass);
Далее освобождаем блокировку на запись с помощью функции drop() и выполняем небольшую задержку:
drop(write_guard); thread::sleep(std::time::Duration::from_millis(200)); // небольшая задержка
Поскольку блокировка на запись освобождена, а блокировку на чтение поток еще не получил, то другие потоки могут в это время получить блокировку на запись и изменить данные.
Далее поток получает блокировку на чтение данные и считывает данные:
let read_guard = data.read().unwrap(); // получаем блокировку на чтение
println!("Thread{} reads {} (pass {})", thread_id, *read_guard, pass);
Пример консольного вывода программы:
Thread2 writes 1 (pass 1) Thread1 writes 2 (pass 1) Thread3 writes 3 (pass 1) Thread1 reads 3 (pass 1) Thread1 writes 4 (pass 2) Thread2 reads 4 (pass 1) Thread3 reads 4 (pass 1) Thread3 writes 5 (pass 2) Thread2 writes 6 (pass 2) Thread1 reads 6 (pass 2) Thread3 reads 6 (pass 2) Thread2 reads 6 (pass 2) Thread2 writes 7 (pass 3) Thread1 writes 8 (pass 3) Thread3 writes 9 (pass 3) Thread2 reads 9 (pass 3) Thread1 reads 9 (pass 3) Thread3 reads 9 (pass 3) Final Result: 9
Еще один пример - ограничим чтение 1 секундой:
use std::sync::{RwLock, Arc};
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let data = Arc::new(RwLock::new(22));
let mut threads = vec![];
for i in 0..3 {
let data = Arc::clone(&data);
let new_thread = thread::spawn(move || {
let timeout_duration = Duration::from_secs(1);
let start_time = Instant::now(); // получаем текущий момент времени
loop {
thread::sleep(Duration::from_millis(200));
if start_time.elapsed() >= timeout_duration { // проверяем, прошла ли 1 секунда
println!("Thread {} timed out.", i);
break; // если время вышло, выход из цикла
} else {
if let Ok(data_guard) = data.read() { // получаем блокировку на чтение
println!("Thread {} read {} successfully.", i, *data_guard);
}
}
}
});
threads.push(new_thread);
}
for t in threads { t.join().unwrap(); }
}
Пример работы программы:
Thread 0 read 22 successfully. Thread 2 read 22 successfully. Thread 1 read 22 successfully. Thread 0 read 22 successfully. Thread 1 read 22 successfully. Thread 2 read 22 successfully. Thread 0 read 22 successfully. Thread 1 read 22 successfully. Thread 2 read 22 successfully. Thread 0 read 22 successfully. Thread 2 read 22 successfully. Thread 1 read 22 successfully. Thread 0 timed out. Thread 1 timed out. Thread 2 timed out.
Может сложиться впечатление, что Mutex и RwLock выполняют одну и ту же функцию в приложении - блокировку доступа к общим данным. Однако при выборе между ними следует учитывать характер общих данных:
Если данные часто обновляются, а поддержание согласованности данных посредством эксклюзивного доступа имеет решающее значение, то применяется Mutex. Mutex подходит для ситуаций, когда как чтение, так и запись требуют монопольного доступа или когда отдельные блокировки усложняют работу без явного преимущества в производительности.
Если данные читаются чаще, чем записываются, то RwLock предлагает более эффективный подход. При использовании мьютекса в этой ситуации один поток, который хочет прочитать данные, должен ждать, когда другой поток прочитает эти данные, что снижает общую производительность. RwLock позволяет устранить подобные недостатки и отлично подходит для рабочих нагрузок с интенсивным чтением, повышая производительность за счет одновременного чтения и обеспечения безопасной записи. Разрешение нескольких одновременных операций чтения может значительно повысить производительность в таких сценариях.
В то же время стоит отметить, что характеристики производительности Mutex и RwLock могут различаться в зависимости от платформы, поэтому при выборе инструмента рекомендуется провести сравнительный анализ и профилирование, специфичные для конкретной задачи.