Pull to refresh
39.56

Квест по синхронизации аналитического и оперативного хранилищ в реальном времени без потерь на сотнях терабайт данных

Level of difficultyMedium
Reading time12 min
Views2.2K

Я работаю в компании STM Labs, где мы строим большие высоконагруженные системы класса Big Data. Эта статья написана по мотивам моего выступления на конференции Saint Highload 2023. Хочу рассказать вам увлекательную историю про то, как мы искали лучшее решение по синхронизации аналитического и оперативного хранилищ в реальном времени. Нам важно было сделать это без потерь, потому что на кону стояли сотни и более терабайт данных.

Сразу обозначу, чего в этой статье не будет:

  • Я не буду подробно говорить о типах СУБД и их различиях.

  • Я не буду делать обзор аналитических СУБД. Тут каждый выбирает сам.

  • Я не буду подробно останавливаться на архитектуре, отказоустойчивости и масштабировании СУБД MongoDB.

  • Я не буду делать обзор отличий OLAP и OLTP.

  • Я не буду делать обзор и сравнение реализаций CDC в различных СУБД.

Прежде чем начать это увлекательное путешествие, давайте сделаем небольшой обзор предметной области, чтобы в принципе понимать семантику данных, модель нагрузки и так далее.

Итак, предметная область — это большая, а точнее глобальная Track&Trace система для обеспечения единого процесса отслеживания статуса различных групп товаров. Система реализует поэкземплярный контроль движения товара в логистических цепочках, начиная от процесса производства. Далее идут логистика и дистрибуция, и заканчивается всё ритейлом, либо списанием, либо другим конечным состоянием, в которое переходит товар в конце своего жизненного цикла.

Пара слов об архитектуре самого процессинга

Это не предмет данной статьи, но для лучшего её понимания важно представлять, как в принципе устроен процессинг в нашем решении.

Итак, у нас есть некий источник событий или документов о движении товаров в логистических цепочках. Эти события поступают во входящую очередь, построенную на базе брокера Apache Kafka. Обработчики вычитывают сообщения из очереди и на основе этих данных выполняют некую бизнес-логику. Бизнес-логика — это, фактически, просто смена состояния товара при его движении по товаропроводящей цепочке в рамках определенной модели, описанной сложным конечным автоматом (finite state machine).

По мере движения товара меняются его текущий владелец, статус и другие атрибуты. Ну и самый правый элемент в архитектуре — это оперативное хранилище, в котором отражено актуальное состояние товара.

В качестве оперативного хранилища мы используем MongoDB.
В качестве оперативного хранилища мы используем MongoDB.

Описание задачи звучит максимально просто, но по факту она простой не является. Перед нами стоит задача синхронизации оперативных и аналитических данных. Давайте честно: у кого из нас вообще поднят отдельный аналитический кластер? Уверен, что некоторые читатели понимают всю боль и, самое главное, причины, почему не надо гонять аналитические запросы на оперативных данных.

Наши требования к синхронизации выглядят так:

Первое и самое важное требование — это, конечно же, надежность. Любая синхронизация должна быть надежной, как швейцарские часы — то есть мы должны сходиться в ноль без потерь!

Мы также должны хотя бы постараться обеспечить минимальное отставание данных в аналитическом кластере от оперативных данных.

Плюс обеспечить высокую отказоустойчивость всего решения и сделать так, чтобы наша синхронизация не создавала дополнительной паразитной нагрузки на оперативное хранилище — или минимизировать подобные издержки. Особенно это важно в часы наибольшей/пиковой нагрузки на систему.

Модель нагрузки выглядит так:

  1. Нагрузка на запись — 10 тысяч операций в секунду.

  2. Нагрузка на чтение — чуть побольше, 15–20 тысяч операций в секунду.

  3. Размер датасета — 150+ терабайт.

Долгий путь самурая

Наш путь самурая был длинным, поскольку делалось много попыток реализации разных подходов. Вся эпопея продлилась чуть больше года, но мы таки пришли к целевому решению, которое полностью нас устраивает.

Начинали мы так же, как начинают все. У нас просто было одно оперативное хранилище, и все аналитические запросы мы шарашили прямо туда. Слабоумие и отвага! Как вы понимаете, ничем хорошим это не закончилось: с ростом объемов данных в один прекрасный момент всё просто перестало работать.

Тогда мы всё-таки подняли отдельный аналитический кластер. Первое, что было реализовано — интервальные выгрузки, чтобы сделать переливку данных более контролируемой. Однако это решение нас не устраивало, потому что при таких переливках регулярно случались потери данных. Пришлось городить всякие сверки, перепроливки... В общем, это был кромешный ад! И тогда мы твердо решили, что надо что-то менять! Как вы поняли, это был спойлер, а самое интересное у нас впереди! Итак, путешествие начинается…

Итерация номер один. Прямые запросы в оперативное хранилище

Да, с этого все начинают... Понятно, что в погоне за функциональными возможностями никто не думает про архитектуру, особенно на старте проекта.

Очевидно, что объемы оперативных данных не появляются единомоментно. Я к тому, что мы можем себе позволить эту историю на начальном этапе реализации проекта или даже как целевой вариант, когда объемы небольшие — если это не Big Data или другое высоконагруженное решение. Тогда с этим можно жить всю жизнь и не городить никакой огород в виде отдельного аналитического кластера.

Итак, на старте проекта мы просто брали и действительно приземляли все аналитические запросы прямо в оперативное хранилище. Паразитная нагрузка?! Не, не слышали :)

Плюсы такого подхода лежат на поверхности:

  • это действительно дешево и сердито, так как не нужен отдельный аналитический кластер;

  • можно быстро клепать новые отчеты для бизнеса, буквально печь как горячие пирожки.

Минусы, в принципе, тоже очевидны:

  • с ростом объемов данных влияние на производительность работы всей системы возрастает;

  • аналитические запросы создают паразитную нагрузку на оперативное хранилище;

  • новые отчеты требуют создания дополнительных индексов. И так по нарастающей, как снежный ком… Это оказывает сильное влияние на вставку данных.

Ну и самая, на мой взгляд, ужасная вещь — это смешение OLAP- и OLTP-запросов. Мы все взрослые люди и давно не верим в существование Деда Мороза и Серебряной пули. Так и любая база данных имеет свою специализацию и предназначена для определенных целей и задач.

В результате мы получили деградацию производительности основных бизнес-процессов.

Никого это не устраивало, так что пришло время что-то менять и идти дальше.

Итерация номер два. Отдельный аналитический кластер

Небольшое уточнение - у нас на самом деле несколько аналитических потребителей - HDFS, Elastic, Clickhouse. Важно, что это не просто очередной кластер MongoDB.

Нам его предоставили, и тут мы задумались: «А как в него переливать данные?»

Самое простое решение — это интервальные выгрузки. Берем и переливаем периодически все дельты изменений за интервал. Делать это можно контролируемо, управляемо — то есть мы можем выбрать какие-то часы, когда нагрузка на систему относительно небольшая.

Плюсы здесь тоже очевидны: есть контролируемая управляемая нагрузка, аналитические запросы больше не выполняются на оперативных данных. Соответственно, можно удалить все аналитические индексы.

База данных буквально выдохнула, при вставке ушла деградация. Это было большое достижение, мы все очень радовались!

Но, к сожалению, по мере эксплуатации этого решения вылезали различные болячки. Интервальные выгрузки у нас базировались на так называемой дате последней операции (бизнес-дате). Смысл в том, что с товаром по мере его движения по логистической цепочке производятся различные операции. По дате последний операции мы и делали выгрузки. Просто брали временной интервал и выгружали все операции за него. Но проблема в том, что иногда в базе данных происходили обновления, при которых эта бизнес-дата не обновлялась, а значит дельта изменений не попадала в интервальную выгрузку и фактически аналитика не видела это изменение.

Второй проблемой стало то, что в этой истории нет нормальной обработки операции delete. Если что-то удаляется полностью, то в дельте этого нет! И нам пришлось буквально изолентой сбоку прикручивать костыль, который нас вообще не устраивал! Мы помучились какое-то время, даже пытались реализовать различные сверки, переливки, доливки и так далее… В конце концов нам это надоело, и мы перешли к следующему этапу — решили начать с чистого листа и всё переписать.

Итерация номер три, финальная. Или CDC против ETL

CDC или Change Data Capture — это механизм, который фактически построен поверх журнала опережающей записи. В MongoDB это oplog. Доступ к журналу опережающей записи возможен через API, плюс есть набор инструментов, которые позволяют вам этот журнал читать.

Самый большой плюс CDC-реализации именно в MongoDB – это то, что все изменения отслеживаются на уровне протокола репликации. Репликация у нас уже имелась, так как база была развернута в топологии отказоустойчивого кластера. Получается, бонусом и совершенно бесплатно в довесок к репликации мы получили готовый CDC.

В MongoDB реализация CDC называется Change Streams. Для работы с ним есть специальный API, который позволяет вам трекать все изменения.

Приведу пример на языке python:

cursor = db.inventory.watch(full_document="updateLookup")
document = next(cursor)

В примере мы отслеживаем изменения в коллекции inventory с помощью метода watch().

Этот метод возвращает нам курсор, по которому мы итерируемся и читаем все изменения, происходящие в базе данных.

Очень крутая фича именно в Change Streams – это то, что можно отслеживать изменения не только в одной коллекции, но и на уровне всего деплоймента. Для тех, кто не знаком с MongoDB, напомню, что коллекция — это аналог таблицы в обычной базе данных.

Плюс Change Streams утилизируют всю мощь aggregation framework. Для тех, кто не работал c MongoDB, но знаком с реляционными СУБД, поясню, что это аналог GROUP BY. Да, здесь всё чуть мощнее — это целый фреймворк, который позволяет делать фильтрацию, агрегацию и многие другие крутые вещи.

Однако помимо API вендор предоставляет компонент, который называется Mongo Connector. Этот компонент реализует два режима:

  • первый режим — это Sink, который позволяет читать данные из топика Kafka и сохранять эти данные MongoDB;

  • второй режим — Source. Он, собственно, про то, что нам нужно. Этот режим позволяет отслеживать все изменения в базе данных с помощью Change Streams и публиковать в топике Kafka.

Круто! Почему бы не попробовать это решение?! Интеграция с Kafka у нас уже готова.

Параметры и настройки Mongo Connector

Описание

connection.uri

Строка подключения к СУБД

database

Название БД для отслеживания изменений

collection

Название коллекции для отслеживания изменений

topic.namespace.map

Маппинг на название топика, куда отправляются события CDC

poll.max.batch.size=10

Максимальный размер пачки документов (кол-во) для пакетной обработки

poll.await.time.ms=1000 

Максимальное время сбора пакета документов

pipeline=[{$match: {"$and": [{"ns.coll": {$regex: /^coll_prefix/}}, { "fullDocument.entity_type" : 0 }]}},{ $addFields: { "storage": "MAIN" } }]

Пайплайн для агрегации и фильтрации. Тут фильтруем по префиксу названия коллекции (ns.coll) + плюс по атрибуту entity_type: 0 (документы, у которых значение этого атрибута отличается, фильтруются)

Плюс мы добавляем в формат события CDC статический атрибут "storage": "MAIN"

change.stream.full.document=updateLookup

Возвращает измененный полный документ. По умолчанию только дельта изменений

mongo.errors.tolerance=all

Продолжить процессинг в случае возникновения ошибок

mongo.errors.log.enable=true

Логировать ошибки

mongo.errors.deadletterqueue.topic.name=deadletterOP

Топик для ошибочных документов

startup.mode=timestamp

Передать все изменения, начиная с указанного времени

startup.mode.timestamp.start.at.operation.time

Время, с которого начинаем читать oplog после старта

bootstrap.servers

Подключение к Kafka

offset.flush.interval.ms=90000

Интервал сброса данных в Kafka

offset.flush.timeout.ms=17000 

Тайм-аут на запись в Kafka

offset.storage.file.filename

Файл для хранения текущего офсета

producer.max.request.size=18000000

Максимальный размер запроса для записи в Kafka

producer.buffer.memory=8388608 

Размер буфера в памяти для операции flush

producer.enable.idempotence=true

Идемпотентная запись

Документация по Mongo Connector очень скудная, многие вещи мы нашли буквально методом тыка, вернее reverse-инжиниринга, то есть копания в коде самого коннектора :)

Понятно, что есть настройки подключения к СУБД. Можно указать базу данных, для которой мы хотим отслеживать изменения. Можно указать название коллекции, на которой мы должны отслеживать эти изменения. Можно настроить mapping топиков, то есть куда какие изменения писать. Например:

colA → topicA, colB → topicB и так далее, в том числе более сложный маппинг.

Естественно, в коннекторе уже поддерживается пакетный режим обработки для обеспечения высокой производительности. Он копит изменения и сохраняет их в Kafka. Политика накопления конфигурируется — по времени ожидания изменений или по их количеству.

Я уже сказал ранее, что Change Streams реализует и использует всю мощь Aggregation Framework. В нашем решении мы подписываемся на получение изменений данных сразу в нескольких коллекциях по префиксу названия коллекций. Префикс названия задается в виде регулярного выражения.

Далее мы с помощью фильтрации анализируем только нужные нам документы в этих коллекциях, и если в документе присутствует поле entity_type со значением 0, то такие документы нас устраивают. Все остальные документы отсеиваются. Плюс мы сразу добавляем статические метаданные: например, поле storage со значение MAIN используется, чтобы указать источник данных и чтобы потребитель мог использовать эти метаданные для своих нужд.

Следующий важный параметр — это режим работы коннектора. По умолчанию, если вы не задаете этот режим (а он, кстати, не задан!) то Change Streams просто возвращают вам дельту изменений. А дальше, имея дельту изменений, вы должны каким-то образом получить измененный документ. Звучит сложно…

Мы долго плясали с бубном, искали разные другие режимы и нашли режим updateLookup. Эта штука дала нам фактически измененный полный консистентный документ, а не просто дельту изменений + описание этих изменений.

Помимо этого коннектор реализует логику обработки ошибок из коробки – error tolerance. Не надо отдельно реализовывать её в своем приложении. Бери и используй готовое.

Следующий плюс — удобство и гибкость чтения журнала опережающей записи oplog. Можно читать его от Адама и Евы, а можно с определенного времени. Эта фишка удобна для повторной обработки.

Очень крутая фича — это идемпотентная обработка в самом коннекторе. Опять же, не нужно париться об этом на уровне приложения.

Сравним голый API и Connector:

Критерий сравнения

Change Streams API

Connector

Требуется интеграция с MongoDB Change Streams API в приложении

Да

Нет

Поддерживает режим Sink

Нет

Да

Масштабирование из коробки

Нет

Средствами Kafka – добавлением партиций в топик

Демпфирование нагрузки из коробки

Нет

Да – очередь в Kafka

Сложность и стоимость инфраструктуры и эксплуатации

Проще/Дешевле

Сложнее/Дороже (требуется кластер Kafka + мощности под Connector)

Максимальная гибкость интеграции

Да

Нет, ограничены возможностями Connector + привязаны к Kafka

Как выглядит формат самого события CDC Event

{
  "schema": {
    "type": "string",
    "optional": false
  },
  "payload": "{}"
}

Самая интересная часть формата – это payload:

{
  "_id": {
    "_data": "826447837900000D5B2B022C0100296E5A1004AF54C14"
  },
  "operationType": "update",
  "clusterTime": {
    "$timestamp": {
      "t": 1682408313,
      "i": 3419
    }
  },
  "fullDocument": {},
  "ns": {
    "db": "prod",
    "coll": "product.history"
  },
  "documentKey": {
    "entity.id": “11103671001152nZ5zeBdkrqnMo",
    "_id": {
      "$oid": "643d30d39ea769ba2c8a4ffb"
    }
  },
  "updateDescription": {
    "updatedFields": {},
    "removedFields": []
  }
}

Хочу обратить отдельное внимание на поле operationType. При описании решения интервальных выгрузок я указал на проблему, связанную с невозможностью нормально реализовать логику обработки операции удаления — delete. Так вот, здесь она решена — у нас есть тип операции delete. Для этого существует специальный CDC event.

Далее идет, собственно, сам документ fullDocument — это уже измененный документ. То есть, если у вас включен режим updateLookup, в это поле будет записан полный консистентный измененный документ.

Чуть не забыл упомянуть про clusterTime — это фактически время, когда это изменение данных случилось на уровне базы данных. Это очень важный параметр, мы его используем для слайсинга данных, чтобы отсечь исторические данные по timestamp.

И наконец последняя структура — updateDescription. Это, собственно, описание тех полей документа, которые изменились и/или добавились. Все эти метаданные вы тоже можете использовать в своей бизнес-логике при необходимости.

Как выглядит общая укрупненная архитектура решения

Я не буду демонстрировать здесь какую-то монструозную архитектуру. Мы рассмотрим упрощенную схему, понятную всем:

Остановлюсь кратко на элементе CDC Processor – это наш микросервис, который занимается дополнительным post процессингом CDC-событий.

Еще один интересный квадратик – это элементы для репроцессинга. К сожалению, полностью отказаться от репроцессинга и какой-то ручной синхронизации нам не удалось, но мы это полностью автоматизировали. Сделали специальный режим ручной синхронизации и повторной обработки, которые фактически реализованы как набор команд, поступающих в специальную контролирующую очередь на базе Kafka.

CDC Processor вычитывает эти команды, исполняет их и отправляет результат выполнения нашим аналитическим потребителям.

Можно ли обойтись без CDC Processor в самом общем случае? Конечно можно! Уверен, что 90% задач можно решить, просто используя функционал самого Mongo Connector. В нашем случае потребовались дополнительная фильтрация, слайсинг и обогащения, а также смена формата для аналитических потребителей — для них мы распиливаем всю историю движения товара на отдельные события и отсекаем исторические. Какие плюсы у CDC Processor на перспективу развития решения? В будущем можно сделать его более универсальным и реализовать всю кастомную логику в виде плагинов.

Едем дальше. Мониторинг — куда же без него на production. С моей точки зрения, запуск системы на prod без мониторинга — это самоубийство. В рамках мониторинга мы, конечно, отслеживаем статус самих элементов архитектуры. Но самое важное — это мониторинг очередей. Отсутствие роста LAG во входящих очередях говорит о том, что приложение штатно переваривает всю входящую нагрузку с нужной производительностью. Рост LAG говорит о том, что приложение не справляется и его пора масштабировать в горизонт — ставить дополнительные инстансы для процессинга.

С исходящими очередями та же история: аналитические потребители тоже должны успевать вычитывать всё то, что мы им отгрузили… Конечно, в Kafka есть retention, но это для демпфирования пиковых нагрузок и аварий.

Итак, что же мы построили?

Итоги нашего долгого пути в разрезе сравнения двух подходов — ETL и CDC

Критерий сравнения

ETL

CDC

Надежная синхронизация оперативного хранилища с аналитическим кластером без потерь

Нет, есть завязка на бизнес- дату последней операции, которая не всегда обновляется Отсутствует нормальная обработка delete

Да, получаем все изменения документа, включая событие delete

Минимальное отставание данных в аналитической базе данных

Нет, интервальная выгрузка истории изменений. Данные отстают

Да, все изменения получаем в реальном времени

Минимальная равномерная дополнительная нагрузка на источник при синхронизации, в том числе в ЧНН

Нет, нагрузка выше, чем в CDC, т.к. есть дополнительный запрос всех изменений на интервале [start_dt, end_dt]

Да, работаем на уровне oplog поверх протокола репликации

Высокая отказоустойчивость при сбоях

Нет. Много точек отказа в самом ETL pipeline, которые сложно контролировать с точки зрения потенциальной потери данных при переливке

Да, есть встроенный механизм error tolerance и возможна повторная обработка

Вот такой вот получился эксперимент. Хотелось бы сказать, что все трюки и фишечки имплементированы профессионалами, не повторяйте на своём опыте! Но с другой стороны, может быть, кому-то будет полезно его повторить и сделать свои собственные выводы :)

Tags:
Hubs:
Total votes 7: ↑7 and ↓0+7
Comments6

Articles

Information

Website
stm-labs.ru
Registered
Founded
Employees
201–500 employees
Location
Россия
Representative
Катерина Школьникова