Pull to refresh

Атомарная обработка блоков данных без блокировки

Reading time 5 min
Views 13K
Использование алгоритмов без блокировки всегда было чем-то пугающим для разработчика. Очень трудно представить себе организацию доступа к данным без блокировки, таким образом, чтобы два или более потока не могли одновременно обрабатывать один и тот же блок данных. Большинство разработчиков используют стандартные контейнеры типа стеков или связных списков без блокировки, но не более того. В этой же статье я хотел бы рассказать, как организовать доступ к данным в многопоточной среде без блокировки.

Основная идея такого метода заключается в том, что каждый поток использует отдельный буфер, в который копирует данные из основного буфера, обрабатывает их и затем меняет местами указатель на свой буфер с указателем на основной буфер. Рассмотрим следующий код:

#include <stdio.h>

struct data_s {
  int a;
  int b;
};
struct data_s reader_tmp, writer_tmp, data;
struct data_s *reader_tmp_ptr, *writer_tmp_ptr, *data_ptr;
int done = 0;

void process(struct data_s *data) {
  data->a++;
  data->b++;
}
void* writer(void* p) {
  struct data_s *tmp_ptr;
  int i;
  for(i = 0; i < 1000000; i++) {
    do {
      tmp_ptr = data_ptr;
      writer_tmp_ptr->a = tmp_ptr->a;
      writer_tmp_ptr->b = tmp_ptr->b;
      process(writer_tmp_ptr);
    } while(!__sync_compare_and_swap(&data_ptr, tmp_ptr, writer_tmp_ptr));
    writer_tmp_ptr = tmp_ptr;
  }
}

void* reader(void *p) {
  struct data_s *tmp_ptr;
  int a, b;
  while(!done) {
    do {
      tmp_ptr = data_ptr;
      reader_ptr->a = tmp_ptr->a;
      reader_ptr->b = tmp_ptr->b;
      a = tmp_ptr->a;
      b = tmp_ptr->b;
    } while(!__sync_bool_compare_and_swap(&data_ptr, tmp_ptr, reader_tmp_ptr));
    reader_tmp_ptr = tmp_ptr;
    printf(“data = {%d, %d}\n”, a, b);
  }
}

int main() {
  pthread_t reader_thread, writer_thread;
  data.a = 0;
  data.b = 0;
  data_ptr = &data;
  writer_tmp_ptr = &writer_tmp;
  reader_tmp_ptr = &reader_tmp;
  pthread_create(&read_thread, NULL, reader, NULL);
  pthread_create(&write_thread, NULL, writer, NULL);
  pthread_join(write_thread, NULL);
  done = 1;
  pthread_join(read_thread, NULL);
  return 0;
}

В приведённом коде данные перед обработкой копируются из буфера, на который указывает data_ptr в буфер, на который указывает writer_tmp_ptr. А затем эти указатели меняются местами. Причём в data_ptr writer_tmp_ptr записывается с использованием атомарной операции compare_and_swap, которая сравнивает первый аргумент со вторым и если они совпадают, то записывает третий аргумент в первый и возвращает true. Иначе, возвращает false. Для чего это нужно? Рассмотрим на примере функции reader. Допустим поток, выполняющий эту функцию приостановился после строчки a = tmp_ptr->a; В этот момент, tmp_ptr указывает на data. Тут же начал работать поток, выполняющий функцию writer. Выполнив первую итерацию он поменял местами writer_tmp_ptr и data_ptr и начал следующую итерацию, остановившись после строчки data->b++; В данной ситуации writer_tmp_ptr указывает на data и tmp_ptr в функции reader указывает на data. Получается одновременное чтение и модификация одного и того же буфера. Но так как указатели data_ptr и tmp_ptr уже не совпадают, то операция compare_and_swap обнаружит коллизию и выполнит операцию чтения ещё раз. Почему же присваивание reader_tmp_ptr = tmp_ptr не проходит такую проверку?

Всё просто. Переменная reader_tmp_ptr является специфичной переменной для потока, в котором она выполняется. В данном примере я сделал её глобальной, что не совсем правильно, т.к. в случае, с несколькими читающими потоками, пришлось бы заводить ещё одну глобальную переменную для второго потока, и внутри функции определять, какой поток в данный момент выполняется, чтоб использовать ту или иную переменную в качестве уникального для потока указателя на буфер. Оптимальный вариант — это использование т.н. специфичные для потока переменные. Например, библиотека pthread имеет такие замечательные функции, как pthread_getspecific/pthread_setspecific. Целью же написания этого кода было наглядно показать читателю, как работает данный алгоритм. Без оптимизаций, которые могут только запутать представление о самой сути.

Казалось бы, всё идеально, программа должна выводить на экран пары одинаковых значений, но не так всё просто. Представим также, что поток, выполняющий функцию reader, остановился после строчки a = tmp_ptr->a; после чего, поток, выполняющий функцию writer, завершил 2 итерации и выполняет третью. Остановившись после завершения функции process. Далее поток, выполняющий функцию reader возобновляет свою работу. В этой ситуации значения переменных a и b не совпадут, но операция compare_and_swap вернёт true, т.к. data_ptr снова указвает на data, другими словами data_ptr и tmp_ptr снова совпадают. Это называется проблема ABA. Одним из способов решения этой проблемы, является добавление к указателю счётчика, который увеличивается каждый раз, когда ему присваивается новое значение. В следующем примере такая проблема отсутствует.

#include <stdio.h>
#include <stdint.h>
#include <pthread.h>

struct data_s {
  int a;
  int b;
};

struct data_pointer_s {
  union {
    uint64_t qw[2];
    struct {
      struct data_s *data_ptr;
      uint64_t aba_counter;
    };
  };
};

static inline char cas128bit(volatile struct data_pointer_s *a, struct data_pointer_s b, struct data_pointer_s c) {
  char result;
  __asm__ __volatile__(
    "lock cmpxchg16b %1\n\t"
    "setz %0\n"
    : "=q" (result)
    , "+m" (a->qw)
    : "a" (b.data_ptr), "d" (b.aba_counter)
    , "b" (c.data_ptr), "c" (c.aba_counter)
    : "cc"
  );
  return result;
}
struct data_s reader_tmp, writer_tmp, data;
struct data_pointer_s reader_tmp_ptr, writer_tmp_ptr, data_ptr;
int done = 0;

void process(struct data_s *data) {
  data->a++;
  data->b++;
}

void* writer(void* p) {
  struct data_pointer_s tmp_ptr;
  int i;
  for(i = 0; i < 1000000; i++) {
    do {
      tmp_ptr = data_ptr;
      writer_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a;
      writer_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b;
      process(writer_tmp_ptr.data_ptr);
      writer_tmp_ptr.aba_counter = tmp_ptr.aba_counter + 1;
    } while(!cas128bit(&data_ptr, tmp_ptr, writer_tmp_ptr));
    writer_tmp_ptr = tmp_ptr;
  }
}

void* reader(void *p) {
  struct data_pointer_s tmp_ptr;
  int a, b;
  while(!done) {
    do {
      tmp_ptr = data_ptr;
      reader_tmp_ptr.data_ptr->a = tmp_ptr.data_ptr->a;
      reader_tmp_ptr.data_ptr->b = tmp_ptr.data_ptr->b;
      a = tmp_ptr.data_ptr->a;
      b = tmp_ptr.data_ptr->b;
      reader_tmp_ptr.aba_counter = tmp_ptr.aba_counter + 1;
    } while(!cas128bit(&data_ptr, tmp_ptr, reader_tmp_ptr));
    reader_tmp_ptr = tmp_ptr;
    printf("data = {%d, %d}\n", a, b);
  }
}

int main() {
  pthread_t reader_thread, writer_thread;
  
  data.a = 0;
  data.b = 0;
  data_ptr.data_ptr = &data;
  data_ptr.aba_counter = 0;
  writer_tmp_ptr.data_ptr = &writer_tmp;
  writer_tmp_ptr.aba_counter = 0;
  reader_tmp_ptr.data_ptr = &reader_tmp;
  reader_tmp_ptr.aba_counter = 0;
  
  pthread_create(&reader_thread, NULL, reader, NULL);
  pthread_create(&writer_thread, NULL, writer, NULL);

  pthread_join(writer_thread, NULL);
  done = 1;
  pthread_join(reader_thread, NULL);
  return 0;
}

Следует отметить, что эффективность данного кода зависит от объёма копируемых данных и от сложности функции process. Если требуется атомарная обработка блоков данных, объёмами в несколько десятков мегабайт, то использование мьютексов было бы намного эффективнее. Также неплохо было бы рассмотреть возможность добавления небольшой задержки(порядка нескольких микросекунд) каждый раз после того как compare_and_swap возвращает false, чтоб дать другому потоку возможность закончить операцию. Опять-таки, наличие задержки, и время напрямую будут зависить от специфики выполняемой задачи.

Отдельно хотелось бы выразить благодарность пользователю vladvic за помощь в понимании и представлении того, как действует данный алгорим.
Tags:
Hubs:
+14
Comments 4
Comments Comments 4

Articles