Pull to refresh

Потоки, блокировки и условные переменные в C++11 [Часть 1]

Reading time 8 min
Views 433K
В первой части этой статьи основное внимание будет уделено потокам и блокировкам в С++11, условные переменные во всей своей красе будут подробно рассмотрены во второй части

Потоки


В C++11, работа с потокам осуществляется по средствам класса std::thread (доступного из заголовочного файла <thread>), который может работать с регулярными функциями, лямбдами и функторами. Кроме того, он позволяет вам передавать любое число параметров в функцию потока.
#include <thread>
 
void threadFunction()
{
     // do smth
}
 
int main()
{
     std::thread thr(threadFunction);
     thr.join();
     return 0;
}

В этом примере, thr — это объект, представляющий поток, в котором будет выполняться функция threadFunction(). Вызов join блокирует вызывающий поток (в нашем случае — поток main) до тех пор, пока thr (а точнее threadFunction()) не выполнит свою работу. Если функция потока возвращает значение — оно будет проигнорировано. Однако принять функция может любое количество параметров.
void threadFunction(int i, double d, const std::string &s)
{
     std::cout << i << ", " << d << ", " << s << std::endl;
}
 
int main()
{
     std::thread thr(threadFunction, 1, 2.34, "example");
     thr.join();
     return 0;
}

Несмотря на то, что передавать можно любое число параметров, все они были переданы по значению Если в функцию необходимо передать параметры по ссылке, они должны быть обернуты в std::ref или std::cref, как в примере:
void threadFunction(int &a)
{
     a++;
}
 
int main()
{
     int a = 1;
     std::thread thr(threadFunction, std::ref(a));
     thr.join();
     std::cout << a << std::endl; 
     return 0;
}

Программа напечатает в консоль 2. Если не использовать std::ref, то результатом работы программы будет 1.

Помимо метода join, следует рассмотреть еще один, похожий метод — detach.
detach позволяет отсоединить поток от объекта, иными словами, сделать его фоновым. К отсоединенным потокам больше нельзя применять join.
int main()
{
     std::thread thr(threadFunction);
     thr.detach();
     return 0;
}

Также следует отметить, что если функция потока кидает исключение, то оно не будет поймано try-catch блоком. Т.е. следующий код не будет работать (точнее работать то будет, но не так как было задумано: без перехвата исключений):
try
{
     std::thread thr1(threadFunction);
     std::thread thr2(threadFunction);
 
     thr1.join();
     thr2.join();
}

catch (const std::exception &ex)
{
     std::cout << ex.what() << std::endl;
}

Для передачи исключений между потоками, необходимо ловить их в функции потока и хранить их где-то, чтобы, в дальнейшем, получить к ним доступ.
std::mutex                       g_mutex;
std::vector<std::exception_ptr>  g_exceptions;

void throw_function()
{
     throw std::exception("something wrong happened");
}

void threadFunction()
{
     try
     {
          throw_function();
     }
     catch (...)
     {
          std::lock_guard<std::mutex> lock(g_mutex);
          g_exceptions.push_back(std::current_exception());
     }
}

int main()
{
     g_exceptions.clear();
     std::thread thr(threadFunction);
     thr.join();

     for(auto &e: g_exceptions)
     {
          try 
          {
               if(e != nullptr)
                    std::rethrow_exception(e);
          }
          catch (const std::exception &e)
          {
               std::cout << e.what() << std::endl;
          }
     }
     return 0;
}

Прежде, чем двигаться дальше, хочу отметить некоторые полезные функции, предоставляемые <thread>, в пространстве имен std::this_thread:
  • get_id: возвращает id текущего потока
  • yield: говорит планировщику выполнять другие потоки, может использоваться при активном ожидании
  • sleep_for: блокирует выполнение текущего потока в течение установленного периода
  • sleep_until: блокирует выполнение текущего потока, пока не будет достигнут указанный момент времени

Блокировки


В последнем примере, я должен был синхронизировать доступ к вектору g_exceptions, чтобы быть уверенным, что только один поток одновременно может вставить новый элемент. Для этого я использовал мьютекс и блокировку на мьютекс. Мьютекс — базовый элемент синхронизации и в С++11 представлен в 4 формах в заголовочном файле <mutex>:

Приведу пример использования std::mutex с упомянутыми ранее функциями-помощниками get_id() и sleep_for():
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
 
std::mutex g_lock;
 
void threadFunction()
{
     g_lock.lock();
 
     std::cout << "entered thread " << std::this_thread::get_id() << std::endl;
     std::this_thread::sleep_for(std::chrono::seconds(rand()%10));
     std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;
 
     g_lock.unlock();
}
 
int main()
{
     srand((unsigned int)time(0));
     std::thread t1(threadFunction);
     std::thread t2(threadFunction);
     std::thread t3(threadFunction);
     t1.join();
     t2.join();
     t3.join();
     return 0;
}

Программа должна выдавать примерно следующее:
entered thread 10144
leaving thread 10144
entered thread 4188
leaving thread 4188
entered thread 3424
leaving thread 3424

Перед обращением к общим данным, мьютекс должен быть заблокирован методом lock, а после окончания работы с общими данными — разблокирован методом unlock.

Следующий пример показывает простой потокобезопасный контейнер (реализованный на базе std::vector), имеющий методы add() для добавления одного элемента и addrange() для добавления нескольких элементов.
Примечание: и всё же этот контейнер не является полностью потокобезопасным по нескольким причинам, включая использование va_args. Также, метод dump() не должен принадлежать контейнеру, а должен быть автономной функцией. Цель этого примера в том, что показать основные концепции использования мьютексов, а не не сделать полноценный, безошибочный, потокобезопасный контейнер.
template <typename T>
class container 
{
     std::mutex _lock;
     std::vector<T> _elements;
public:
     void add(T element) 
     {
          _lock.lock();
          _elements.push_back(element);
          _lock.unlock();
     } 
     void addrange(int num, ...)
     {
          va_list arguments;
          va_start(arguments, num);
          for (int i = 0; i < num; i++)
          {
               _lock.lock();
               add(va_arg(arguments, T));
               _lock.unlock();
          }
          va_end(arguments); 
     }
     void dump()
     {
          _lock.lock();
          for(auto e: _elements)
          std::cout << e << std::endl;
          _lock.unlock();
     }
};
 
void threadFunction(container<int> &c)
{
     c.addrange(3, rand(), rand(), rand());
}
 
int main()
{
     srand((unsigned int)time(0));
     container<int> cntr;
     std::thread t1(threadFunction, std::ref(cntr));
     std::thread t2(threadFunction, std::ref(cntr));
     std::thread t3(threadFunction, std::ref(cntr));
     t1.join();
     t2.join();
     t3.join();
     cntr.dump();
     return 0;
}

При выполнении этой программы произойдет deadlock (взаимоблокировка, т.е. заблокированный поток так и останется ждать). Причиной является то, что контейнер пытается получить мьютекс несколько раз до его освобождения (вызова unlock), что невозможно. Здесь и выходит на сцену std::recursive_mutex, который позволяет получать тот же мьютекс несколько раз. Максимальное количество получения мьютекса не определено, но если это количество будет достигно, то lock бросит исключение std::system_error. Поэтому, решение проблемы в коде выше (кроме изменения реализации addrange(), чтобы не вызывались lock и unlock), заключается в замене мьютекса на std::recursive_mutex.
template <typename T>
class container 
{
     std::recursive_mutex _lock;
     // ...
};

Теперь, результат работы программы будет следующего вида:
6334
18467
41
6334
18467
41
6334
18467
41

Вы, наверное, заметили, что при вызове threadFunction(), генерируются одни и те же числа. Это происходит потому, что функция void srand (unsigned int seed); инициализирует seed только для потока main. В других потоках, генератор псевдо-случайных чисел не инициализируется и получаются каждый раз одни и те же числа.
Явная блокировка и разблокировка могут привести к ошибкам, например, если вы забудете разблокировать поток или, наоборот, будет неправильный порядок блокировок — все это вызовет deadlock. Std предоставляет несколько классов и функций для решения этой проблемы.
Классы «обертки» позволяют непротиворечиво использовать мьютекс в RAII-стиле с автоматической блокировкой и разблокировкой в рамках одного блока. Эти классы:
  • lock_guard: когда объект создан, он пытается получить мьютекс (вызывая lock()), а когда объект уничтожен, он автоматически освобождает мьютекс (вызывая unlock())
  • unique_lock: в отличие от lock_guard, также поддерживает отложенную блокировку, временную блокировку, рекурсивную блокировку и использование условных переменных

С учетом этого, мы можем переписать класс контейнер следующим образом:
template <typename T>
class container 
{
     std::recursive_mutex _lock;
     std::vector<T> _elements;
public:
     void add(T element) 
     {
          std::lock_guard<std::recursive_mutex> locker(_lock);
          _elements.push_back(element);
     }
     void addrange(int num, ...)
     {
          va_list arguments;
          va_start(arguments, num);
          for (int i = 0; i < num; i++)
          {
               std::lock_guard<std::recursive_mutex> locker(_lock);
               add(va_arg(arguments, T));
          }
          va_end(arguments); 
     }
     void dump()
     {
          std::lock_guard<std::recursive_mutex> locker(_lock);
          for(auto e: _elements)
               std::cout << e << std::endl;
     }
};

Можно поспорить насчет того, что метод dump() должен быть константным, ибо не изменяет состояние контейнера. Попробуйте сделать его таковым и получите ошибку при компиляции:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' 
                                                  to ‘std::recursive_mutex &'

Мьютекс (не зависимо от формы реализации), должен быть получен и освобожден, а это подразумевает использование не константных методов lock() и unlock(). Таким образом, аргумент lock_guard не может быть константой. Решение этой проблемы заключается в том, чтобы сделать мьютекс mutable, тогда спецификатор const будет игнорироваться и это позволит изменять состояние из константных функций.
template <typename T>
class container 
{
     mutable std::recursive_mutex _lock;
     std::vector<T> _elements;
public:
     void dump() const
     {
          std::lock_guard<std::recursive_mutex> locker(_lock);
          for(auto e: _elements)
               std::cout << e << std::endl;
     }
};

Конструкторы классов «оберток» могут принимать параметр, определяющий политику блокировки:
  • defer_lock типа defer_lock_t: не получать мьютекс
  • try_to_lock типа try_to_lock_t: попытаться получить мьютекс без блокировки
  • adopt_lock типа adopt_lock_t: предполагается, что у вызывающего потока уже есть мьютекс

Объявлены они следующим образом:
struct defer_lock_t { };
struct try_to_lock_t { };
struct adopt_lock_t { };
 
constexpr std::defer_lock_t defer_lock = std::defer_lock_t();
constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();
constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();

Помимо «оберток» для мьютексов, std также предоставляет несколько методов для блокировки одного или нескольких мьютексов:
  • lock: блокирует мьютекс, используя алгоритм избегания deadlock'ов (используя lock(), try_lock() и unlock())
  • try_lock: пытается блокировать мьютексы в порядке, в котором они были указаны

Вот типичный пример возникновения взаимоблокировки (deadlock): у нас есть некий контейнер с элементами и функция exchange(), которая меняет местами два элемента разных контейнеров. Для потокобезопасности, функция синхронизирует доступ к этим контейнерам, получая мьютекс, связанный с каждым контейнером.
template <typename T>
class container 
{
public:
     std::mutex _lock;
     std::set<T> _elements;
     void add(T element) 
     {
          _elements.insert(element);
     }
     void remove(T element) 
     {
          _elements.erase(element);
     }
};
 
void exchange(container<int> &c1, container<int> &c2, int value)
{
     c1._lock.lock();
     std::this_thread::sleep_for(std::chrono::seconds(1)); // симулируем deadlock
     c2._lock.lock();    
     c1.remove(value);
     c2.add(value);
     c1._lock.unlock();
     c2._lock.unlock();
}

Предположим, что эта функция вызвана из двух разных потоков, из первого потока: элемент удаляется из 1 контейнера и добавляется во 2, из второго потока, наоборот, элемент удаляется из 2 контейнера и добавляется в 1. Это может вызвать deadlock (если контекст потока переключается от одного потока к другому, сразу после первой блокировки).
int main()
{
     srand((unsigned int)time(NULL));
     container<int> cntr1; 
     cntr1.add(1);
     cntr1.add(2);
     cntr1.add(3);
     container<int> cntr2; 
     cntr2.add(4);
     cntr2.add(5);
     cntr2.add(6);
     std::thread t1(exchange, std::ref(cntr1), std::ref(cntr2), 3);
     std::thread t2(exchange, std::ref(cntr2), std::ref(cntr1), 6);
     t1.join();
     t2.join();
     return 0;
}

Для решения этой проблемы можно использовать std::lock, который гарантирует блокировку безопасным (с точки зрения взаимоблокировки) способом:
void exchange(container<int> &c1, container<int> &c2, int value)
{
     std::lock(c1._lock, c2._lock); 
     c1.remove(value);
     c2.add(value);
     c1._lock.unlock();
     c2._lock.unlock();
}

Продолжение: условные переменные
Tags:
Hubs:
+67
Comments 14
Comments Comments 14

Articles