Разработка → Распределённые вычисления поверх Ceph RADOS и AsyncMessenger

OlegBou 14 июня в 18:14 1,7k
Допустим, у вас имеются миллионы и миллионы хранимых объектов данных, например, данный портал или телеметрия с космического телескопа или адронного коллайдера.
Перемещение вычислений в сторону данных может приводить к снижению временных затрат на порядки за счёт исключения необходимости перемещения самих данных в сетевой среде для их обработки. В точном соответствии с притчей о горе и Магомеде. Именно этой цели служит класс RADOS, вызовы к которому могут выполняться функциями librados.

Асинхронная система сообщений существенно снижает накладные расходы самого сетевого уровня Ceph, а применение абстракций NetworkStack делает возможной реализацию различных протоколов стека (POSIX/ SPDK/ DPDK/ RDMA). В том числе применение объектов класса RADOS.

Программирование при помощи librados


Для достижения максимальной производительности и внутренней гибкости Ceph можно применять встроенные в библиотеку librados вызовы функций, доступные для большинства языком программирования, например, для C, C++, Python, PHP и Java. С этой целью вы сначала устанавливаете инструменты разработки. Например, для Debian- дистрибутивов:

$ sudo apt-get install build-essential
$ sudo apt-get install librados-dev

Затем в своём приложении мы:

  1. Выполняем необходимые подготовительные операции самого приложения (считывание параметров, подготовку справки об этом приложении и т.п.)
  2. Считываем файл настроек ceph.conf для получения мониторов
  3. Подключаемся к кластеру Ceph
  4. Открываем нужный пул RADOS
  5. Открываем образ для чтения/записи
  6. Выполняем необходимую операцию чтения/записи
  7. Закрываем соединение с пулом
  8. Закрываем соединение с кластером Ceph

(Пример приложения)

Порой приложение требует атомарности выполняемой операции, состоящей из нескольких действий, например, из записи собственно данных и их атрибутов. То есть, если в процессе выполнения атомарной (неделимой) последовательности действий происходит какое- либо прерывание или некий сбой, отвергаются все уже выполненные действия. Реальной записи или изменения не происходит. Долгое время разработчики Ceph не теряли надежды на поддержку атомарности за счёт средств btrfs, поскольку её реализация в рамках xfs приводит к снежному кому проблем, самой малой из которых является необходимость, как минимум, дублированной записи. В конце концов, начиная с выпуска Kraken для лежащего в основе OSD хранилища было принято решение по умолчанию отказываться от файлового хранения с применением POSIX файловых систем и применять упрощённую файловую систему BlueFS для хранения собственно данных и RocksDB для хранения метаданных и, возможно, отложенных записей (WAL, write-ahead log). Данное хранилище получило название BlueStore (указывающая на Blочную природу хранилища, изначально имевшего незатейливое название NewStore). Подробнее...

Теперь, имея в руках полноценную реализацию транзакций, не отягощённую избыточными накладными расходами, мы со спокойной совестью можем их использование в своих приложениях. Посмотрим как меняется скелет нашей программы:

  1. Выполняем необходимые подготовительные операции самого приложения (считывание параметров, подготовку справки об этом приложении и т.п.)
  2. Считываем файл настроек ceph.conf для получения мониторов
  3. Подключаемся к кластеру Ceph
  4. Открываем нужный пул RADOS
  5. Инициализируем списки буферов обмена (по числу необходимых действий) и заполняем их данными
  6. Создаём транзакцию
  7. Последовательно записываем данные буферов обмена в транзакцию
  8. Фиксируем транзакцию
  9. Закрываем соединение с пулом
  10. Закрываем соединение с кластером Ceph

(Пример приложения)

Ещё одной достойной функциональностью librados является организация приложений наблюдатель- уведомитель (watch — notify).

Работа наблюдателя организуется путём обратного вызова (callback). При создании наблюдателя через вызов соответствующей функции вы передаёте в таком вызове двумя параметрами ссылки на необходимые функции обратного вызова. Первая применяется для выполнения действия в случае получения уведомления, а вторая используется когда что-то пошло не так, в ней вы производите необходимые действия по отработке ошибок.

Например, таким образом вы создаёте моментальный снимок некоторого объекта. Клиент, которому требуется выполнить такой снимок, отправляет всем находящимся в ожидании для данного объекта клиентам (watcher) уведомления (выступая в качестве notifier), в котором он сообщает, что может сбросить свой кэш данного объекта и выполнить проверку на непротиворечивость данных.

(Пример приложения)

Распределённые вычисления при помощи классов RADOS Ceph


Библиотека librados работает с объектами, хранящимися в распределённой системе хранения RADOS (Reliable Autonomic Distributed Object Store), в основе которого лежат демоны хранения объектов (OSD, Object Storage Daemon), каждый из которых обслуживает некое собственное хранилище (например, уже упоминавшееся BlueStore).

Одним из примечательных свойств OSD является возможность локальной обработки хранимых в нём данных методами класса RADOS. Это открывает широкие возможности революционного увеличения производительности распределённой обработки хранимых данных, поскольку избавляет вас от необходимости обмена самими данными для такой обработки.

Например, вы можете вычислять хеш-значения и контрольные суммы данного объекта, осуществлять их проверку. Вы можете выполнять поиск в таких данных и их анализ, выставляя результаты, скажем, в метаданных самого объекта.

Одним из простейших способов такой разработки является применение языка сценариев Lua, который, начиная с выпуска Kraken встроен в класс RADOS. Данный сценарий, как правило, в виде строк JSON (скажем, в программе с применением librados на Python) передаётся в имеющийся объект класса RADOS, где он и исполняется.

Пример приложения изменения в объекте всех строчных букв на прописные. Отметим, что по умолчанию исполнение сценариев Lua в OSD отключено, для включения такой возможности необходимо внести приведённые в примере изменения в файлы настроек OSD.

К сожалению, в настоящее время функциональные возможности языка сценариев Lua достаточно ограничены. Для решения более сложных задач вам придётся скомпилировать исходный код Ceph со встроенным в него классом RADOS, написанным, например, на C или C++. В Примере приложения вычисления MD5 объекта приводятся пошаговые инструкции построения такого решения, а также сопоставляются результаты времён его работы в сравнении с аналогичным приложением, исполняющимся на клиенте, находящимся не на узле OSD и вынужденном считывать данные с OSD и записывать в него результат расчёта. результат сравнения даёт преимущество в абсолютных затратах времени в два порядка.

Поскольку данный метод обработки может приводить в случае ошибок к утрате данных и нарушению работы самого OSD, естественно, все применяемые методы должны быть тщательно проверены и иметь всеобъемлющие средства обработки ошибок.
Мы предлагаем подход к решению такой проблемы, основанный на выводе подобных вычислений в отдельный LXC с изолированным пространством имён, причём с возможностью применения вычислений на пробрасываемых в данный контейнер ресурсах GPU и/или FPGA, с последующим использованием обсуждаемой далее методики асинхронного обмена сообщениями.

Ceph Async Messenger


Дополнительные средства для распределённой обработки данных предоставляет имеющаяся в Ceph система асинхронного обмена сообщениями. Например, таким образом реализован механизм применения RDMA в реализации Mellanox.

Первоначально AsyncMessenger, судя по всему, разрабатывался в качестве расширения epoll, призванного полностью вытеснить SimpleMessenger, являющийся первоначальной системой обмена сообщений и лежащей в основе сетевого протокола Ceph. Такая потребность вызвана тем фактом, что для каждой пары участников однорангового взаимодействия (peering) в SimpleMessenger создаются 4 потока (по два с каждой стороны). С ростом числа участников это приводит к экспоненциальному росту общего числа потоков (thread) в узлах участников.

В настоящее время все три метода реализуют сетевой протокол Ceph, при этом применяя один общий рабочий пул.

В протоколе AsyncMessenger участвуют сервер и клиент. Сервер выполняет инициализацию, привязывается к file descriptor (fd) и осуществляет ожидание уведомлений по нему (listen). В отличие от определённых POSIX методов select() и poll(), epoll предоставляет механизм обработки сообщений со сложностью O(1), в отличие от O(n) для SimpleMessenger, исключая перебор событий, не имеющих активных fd. AsyncMessenger применяет библиотеку libevent, предоставляемую средствами epoll.

Обработка события осуществляется сервером по получению уведомления на fd, либо по тайм- ауту. Помимо этого сервер может добавлять ожидание fd, принимать соединения, добавлять приём fd и осуществлять взаимодействие.

Клиент инициирует установление соединения и устанавливает его для выполнения взаимодействия. Весь обмен сетевого уровня приложений обрабатывается машиной состояний AsyncConnection. Статья Wei Jin акцентирует основные моменты данного типа взаимодействия.

Дальнейшим развитием механизма AsyncMessenger является развитие абстракции NetworkStack, которая позволяет осуществлять распределённую поддержку различных сетевых стеков (Posix/ DPDK/ RDMA), а также встроенной в BlueStore поддержки SPDK.

Данный уровень абстракции реализует понятия ServerSocket и ConnectedSocket, причём первый ожидает поступления запросов, а второй собственно и осуществляет чтение и запись всех данных. Основные моменты реализации приложений с применением данных абстракций приводятся в статье Стек асинхронной системы сообщений Ceph того же Wei Jin.

Материалы:
Проголосовать:
+8
Сохранить: