Pull to refresh

Comments 52

Очень интересный подход к шардированию.
Только вопрос с ханками возник. Вы сознательно пошли на то что менять количество нод на которых присутствует бакет можно только на кратное количество. Нет ли с этим проблем? Разве это не уменьшение гибкости по сравнению с дроблением и переносом чанков?

менять количество нод на которых присутствует бакет можно только на кратное количество

Не совсем так.

Автоматика запрограммирована именно на такое поведение, оно продиктовано следующей логикой. Увеличивать количество шардов, на которых присутствует бакет надо условно при превышении порога нагрузки от этого бакета на 1 шард. Но если нагрузка от бакета превысила порог на одном шарде, то значит она превысила порог и на всех других шардах где он присутствует, так как она везде равномерная.

Математически проще всегда удваивать количество шардов. Сами ханки тоже делятся по порогу количества ключей почти одновременно, так как число ключей в них растет синхронно.

Кроме того, тут заложена некоторая идея экспоненциального роста: Если нагрузка от бакета растет, то раскинем его на х2 шардов, чтобы у бакета было еще пространство расти по нагрузке в 2 раза.

Но в целом мы всегда можем руками развезти бакет на произвольное количество шардов, не только степень двойки. Условно 4 -> 5, или 4 -> 10. Наша схема позволяет нам это делать и не ограничивает этой гибкости.

Более корректно говорить что переезжают метаданные, так как в постгресе мы храним именно метаданные объектов.

В случае 4->5 переедет по 20% с каждого шарда на новый 5й шард. Тогда на всех шардах присутствия бакета останется одинаковое количество ханков. Мы поддерживаем равномерность размазывания бакета по шардам.

А как достигается равномерность в таком случае? Заранее создаётся много маленьких ханков, примерно как VNodes в Cassandra?

Нет, заранее ханки не создаются. Они создаются при сплите - когда живые ханки растут и в них становится больше 100к ключей. При превышении порога приходит сплиттер и делит ханк на 2 ханка по 50к.

Равномерность достигается за счет мувера. Мувер смотрит на каких шардах присутствует бакет, и выравнивает число ханков на каждом шарде присутствия, перенося ханки с более заполненных шардов на менее заполненные.

Немного подумав, назрел вопрос.

А как вы сплитите ханки? Чанки, сплитнуть легко, ключи в них отсортированы? Если не отсортированы ключи в ханках, то понятно, вы сплит делаете по пространству хешей, но тогда получается, что при листинге надо читать весь ханк и сортировать в памяти, что дорого, особенно если ханков много. Если отсортированы ключи в ханке, сплиттер сам переваривает всё в памяти (100к ключей), перебирая все ключи и выуживая нужные хеши на новый ханк?

Так же сопутствующий вопрос: Вы говорите, что, благодаря ханкам и профилю нагрузки на них, вы ходите в малое количество шардов для листинга, но весдь в пределах шарда надо сходить в каждый ханк, это не проблема если в бакете 1млрд+ объектов к 10к+ ханков?
Спасибо.

  1. Сплит ханков происходит по отдельному индексу (по хешу). Все что на этом этапе требуется - посчитать счетчики для нового ханка, то есть пересчитать объекты с хэшом в нужном диапазоне. Да, перебираем, но не 100к ключей, а сразу около половины - 50к ключей.

  2. Листинг происходит не так как вы говорите. Листингу не нужно заходить в каждый ханк. Индекс по ключу все еще есть. Внутри шарда листинг отбирает нужное количество ключей (не больше 1000), идя последовательно по именам ключей. На шарде могут быть миллиарды объектов, но под условие запроса подходит не больше 1000 и именно их запрос листинга вытягивает с шарда. Другими словами, запрос листинга не знает о существовании ханков.

По второму ответу:
получается у вас есть 2 индекса: по хешу в пределах ханка и по имени ключа в рамках всего шарда, т.е. мувер, при переезде ханка, должен ещё и подчистить в индексе имён ключей, который находится вне рамок ханка?
Спасибо.

Верно. И это происходит из коробки средствами postgres, муверу совсем не требуется совершать для этого отдельных действий.

Ну и индекс по хешу - не в пределах ханка, а в пределах всего шарда, это обычный индекс внутри базы данных

Спасибо. Всё гениальное - просто.

Вы разработали интересный механизм шардирования, по нему вопросов нет. Вопрос в другом, почему именно PostgreSQL для хранения метаданных при ваших масштабах? Это отличная СУБД общего назначения, но она никогда не умела горизонтальное в горизонтальное масштабирование. Не проще ли было выбрать для хранения метаданных СУБД, которая такое шардирование умеет из коробки? Например, Apache Cassandra: при правильном выборе ключа партиционирования вы можете размазывать нагрузку в таблице по кластеру любым нужным вам способом. Поддержка нескольких датацентров, резервирование, репликация, восстановление узлов после отказа, добавление новых узлов в кластер и вывод их из кластера - там все это уже есть.

Пожалуй самый честный ответ будет - потому что у нас была экспертиза в PostgreSQL.

Так исторически сложилось, что мы создавали S3 совместно с командой MDB PostgreSQL, у которой был большой опыт в том как строить высоконагруженные сервисы.

А у вас в компании нет инициативы по переходу на YDB повсеместно? Например, как гугл переводит все внутренние проекты на google spanner.

(пока я читал статью, думал про вопрос выбора PG - мое изначальное предположение было, что YDB использует ваш сервис, и вы не хотели иметь циклическую зависемость друг на друга, поэтому решили выбрать PG; но тогда я бы спросил - что не обязательно использовать облачную YDB, и можно деплоить свой кластер YDB)

Нет, общего перехода на YDB нет, выбираем технологии под задачи.

К слову, в некоторых новых регионах гусь использует YDB, так что в зависимости от условий мы можем выбирать разные технологии.

А подскажите пожалуйста почему YDB не подходит под задачу?

Подходит.

Исторически сложилось, что мы сделали шарды на PostgreSQL

А в некоторых новых регионах гусь использует YDB

А как вы достигаете консистентности между метадата и дата ?

В метаданных хранится ссылка на данные, это обеспечивает консистентность

Возможно я не понял ваш вопрос, можете, пожалуйста, пояснить?

Как вы достигаете консистентности, например, при удалении объекта из s3db и MDS?

Может ли быть сутация, когда пользователь удалил объект, но он еще показывается в листинге?

Есть ли проблема, когда пользователь удалил объект по ключу и сразу же создал новый объект по такому же ключу?

При удалении объекта из s3db в базе данных он удаляется не сразу, а перекладывается в отдельную очередь. Потом отдельный фоновый процесс cleanup разбирает очередь удалений и удаляет данные из MDS.

А пользователю сразу отправляется 200 OK или он ждет, пока разбирается очередь?

сразу отправляется 200 OK

Спасибо большое за ответы!

А какой алгоритм при создании объекта?

Предположу, что его нужно сохранить в MDS, а потом отправить запись об этом в очередь. Либо положить blob и мету в очередь

Сохранить в MDS и потом положить запись со ссылкой на данные в MDS в базу. В этом процессе очередь не фигурирует

А как в такой схеме синхронизировать MDS с базой?

Например, объект сохранился в MDS, а в базу нет (скажем, приложение упало по OOM). Если пользователь не сделает ретрай, то объект сохранится в MDS навсегда, а база про это не узнает.

Кажется, в такой схеме нужно где-то хранить объект и мету до тех пор, пока всё это не сохранится и в БД, и в MDS

А ещё, кажется, всё таки очередь в процессре фигурирует. Вам же нужно отправить в очередь информацию о том, что появился новый объект, чтобы посчитать счетчики. Но эта проблема решается outbox паттерном, благо у вас postgres

Если объект сохранился в MDS, а в базу нет, то в MDS останется мусор навсегда, а пользователь получит 500. Но это все-таки достаточно редкий тип фейлов, когда приложение падает именно в тот самый короткий момент пока мы не успели добавить запись в базу (а запрос в базу достаточно быстрый).

Про очередь счетчиков вы правы. Она тут фигурирует. И да, в той же транзакции, в которой мы добавляем запись о новом объекте в базу.

Ситуация, когда пользователь удалил объект, но он еще показывается в листинге возможна, если не включен режим консистентности и листинг будет забираться с реплики. В Яндекс Облаке мы используем режим консистентности по умолчанию для всех пользователей. Во внутренней инсталяции Яндекса по умолчанию не используется режим консистентности, там такая ситуация возможна.

Если пользователь удалил объект по ключу и сразу же создал новый объект по такому же ключу, то никакой проблемы нет. В базе данных это будет новая строчка со своей ссылкой на данные в MDS.

Ну вот, операция добавить и удалить файл должна сделать это атомарно в двух системах : pg и MDS, как добиться атомарности ?

Не нужно поддерживать атомарность между pg и mds

То что записали в mds - станет ссылкой внутри строки pg. PG обеспечивает атомарность транзакций. С этой строкой всегда будет ассоциирована только эта ссылка на данные.

Добавили файл - он сначала записался в mds, потом строчка со ссылкой на данные записалась в PG.

Удалили файл - строчка попала в очередь на удаление, атомарно пропала из листинга пользователя.

Через какое-то время фоновый процесс cleanup приходит разгребать эту очередь, берет эту строчку и удаляет данные из mds.

Добавили файл - он сначала записался в mds, потом строчка со ссылкой на данные записалась в PG

А вот как гарантировать что строчка запишется в Pg? Что если она не запишется?

Такое может быть, тогда в mds останется мусор, а в пользователю вернется 500-ка

Пара вопросов возникла:
- при увеличении количества ханков вы перевозите часть ключей из существующих ханков на новые? Если да, тут такая же история с блокировками? Если нет, как решается потом вопрос с удалением ключей? Который есть на старом ханке, но после слита уже должен создаваться на новом.
- Задавал давно в других чатах/каналах яндекса, так и не нашёл ответ. Как работает механизм ретроспективного применения lifecycle (прим. автоудаление) на больших бакетах (миллиарды ключей), это фоновый процесс? который помечает на удаление (сколько времени он отрабатывает на больших бакетах) или есть какая то более умная логика?
Спасибо.

По первому вопросу. Тут работают все те же процессы, что и с чанками. Когда в одном ханке становится слишком много ключей - приходит сплиттер и делит его пополам. Когда мы хотим перераспределить нагрузку между шардами - приходит мувер и переносит ханк на другой шард. Тут такая же история с блокировками.

По второму вопросу. lifecycle - это фоновый процесс. Сначала пробегаемся по бакетам, смотрим где включен lifecycle и на каких префиксах. Делим эти префиксы еще дополнительно по нашим чанкам / ханкам и эти задания отправляем в очередь. Потом воркеры берут эти задания в работу и уже пробегаются по объектам и смотрят что нужно удалить.

За счет параллельности и большого количества маленьких задач обработка сходится за конечное время. Не упираться в базу получается за счет размазывания нагрузки по шардам. Обычно выполнение lifecycle занимает несколько часов. Да, каждую ночь lifecycle обрабатывает миллиарды ключей, как по миллиардным бакетам, так и по множеству маленьких бакетов.

Т.е. если в бакете, согласно правилу lifecycle, объект должен удалиться, например, в 00:00 UTC, то мы не гарантируем что он исчезнет из листинга, перестанет билиться и перестанет отдаваться на GET/HEAD в 00:00 UTC, но обещаем, что за конечное время, в пределах суток, он удалится. Вопрос в контексте ретроспективности, когда lifecycle повесили после того как объект появился в крупном бакете.
Спасибо.

Cам себе отвечу на вопрос
Из документации Amazon S3
"

When you specify the number of days in the Transition and Expiration actions in an S3 Lifecycle configuration, note the following:

  • The value that you specify is the number of days since object creation when the action will occur.

  • Amazon S3 calculates the time by adding the number of days specified in the rule to the object creation time and rounding the resulting time to the next day at midnight UTC. For example, if an object was created on 1/15/2014 at 10:30 AM UTC and you specify 3 days in a transition rule, then the transition date of the object would be calculated as 1/19/2014 00:00 UTC.

"
Теперь всё ясно. Нет требований к секундной точности.

Mover использует двухфазный коммит.

Хотелось бы уточнить, так как возможно неверно понял идею :) Двухфазность заключается именно в том, что шард отдающий и шард принимающий должны подтвердить муверу, что они готовы к коммиту?

Наверное можно и так сказать

Глобально при помощи two-phase-commit (2PC) мы гарантируем что мы точно не оставили дубликата чанка на старом шарде, а также что мы не удалили чанк на старом шарде не создав его на новом (т.е. не потеряли метаданные).

https://www.postgresql.org/docs/current/two-phase.html

Если на одном из шардов, участвующих в 2PC что-то пошло не так, то отдельный фоновый процесс потом ищет все висящие 2PC на всех шардах и понимает, в какую сторону нужно разрешить ситуацию. Либо откатывает PREPARED TRANSACTION, либо наоборот коммитит. Так обеспечивается согласованность метаданных объектов на разных шардах.

Как у вас работает режим консистентности? Есть ли какой-то дополнительный сервис-оракул, который знает, какая транзакция для конкретного ключа последняя?

Если представить ситуацию, что у вас есть два параллельных запроса: один на запись, другой на чтение. И запись уже произошла, но не все реплики обновили метаданные. Здесь запрос на чтение должен как-то понять, что реплика, с которой читаются данные, имеет последние метаданные.

Когда режим консистентности включен - он обеспечивается просто за счет хождения в мастер. Сервиса-оракула нет (к слову, у AWS кажется такой есть).

Да, в AWS S3 такой есть (сам работаю как раз в AWS S3 Index 😃).

Дополнительный вопрос, есть ли у вас какой-то кеш метаданных над postgres? Чтобы лишний раз не нагружать базу для чтения. Тут как раз сервис-оракул может сильно помочь, чтобы подобное реализовать. Какие у вас лимиты RPS на запись/чтение для шарда?

Очень рад такому контакту с зарубежным коллегой :)

Кеш метаданных над postgres есть, но он неконсистентный. Включаем его для клиентов по согласованию, это не является общим механизмом на всех. Он помогает разгружать базу для чтения.

Над сервисом-оракулом думали, но пока не выявили явной необходимости в нем.

Конкретные цифры лимитов RPS на шард не могу озвучить. Зависит от окружения, железа и вида нагрузки. Когда мы видим что нагрузка на шарды начинает превышать условные пороги - это знак к тому что пора расширять инсталяцию и докидывать новых шардов.

Очень рад такому контакту с зарубежным коллегой :)

Взаимно, интересно как похожая система устроена в других компаниях :)

Над сервисом-оракулом думали, но пока не выявили явной необходимости в нем.

Как раз, если понадобится сделать кеш консистентным, то оракул будет полезным :)

Конкретные цифры лимитов RPS на шард не могу озвучить.

В AWS S3 ограничения такие: 3,500 PUT/COPY/POST/DELETE или 5,500 GET/HEAD (источник). Еще сложности добавляет необходимость поддерживать AWS S3 Event Notifications с учетом требования доставки 100% событий. Есть ли в вашем хранилище похожая функциональность?

👍

Да, интересная статья про лимиты! Получается в архитектуре Амазона есть ограничения на префикс, которые физически лежат на одном "шарде"?

В нашей схеме именно такого ограничения нет, мы можем разделить пространство ключей (или хешей) на достаточно маленькие кусочки и разнести их по разным шардам. Но каждый отдельный шард, конечно, имеет свои ограничения.

Насколько я понимаю, аналогия Event Notifications в Яндекс Облаке - Audit Trails.

Та сложность, о которой вы говорите, вероятно связана с необходимостью формировать событие очереди notifications в той же транзакции, что и само действие (например, object created)?

Получается в архитектуре Амазона есть ограничения на префикс, которые физически лежат на одном "шарде"?

В AWS S3 префикс по сути это и есть шард. Там так же есть split шардов, то же самое, что и у вас: если RPS сильно большой, то шард (префикс) разделяется на несколько под-префиксов. В вырожденном случае шард будет состоять из одного ключа. Отсюда можно понять максимальные лимиты на запросы к одному ключу.

Та сложность, о которой вы говорите, вероятно связана с необходимостью формировать событие очереди notifications в той же транзакции, что и само действие (например, object created)?

В какой-то степени да, но там все немного сложнее. Система сама по себе распределенная и нет однозначного мастера. Запись происходит через кворум. Соответственно для реализации событий там свой велосипед с логом транзакций и его обработкой :)

Насколько я понимаю, аналогия Event Notifications в Яндекс Облаке - Audit Trails.

Не совсем. Сбор аудитных логов есть отдельно от Event Notifications в AWS и GCP.

Суть Event Notifications в том, что на события в сервисе можно повесить свои кастомные обработчики. Например, запустить cloud serverless function / lambda.

Впрочем, допускаю, что эту же функциональность вполне можно самому реализовать на основе аудитных логов, которые придётся постоянно мониторить.

Очень интересно! Я еще совсем зеленый в этих делах, не особо много понимаю, можете, пожалуйста, пояснить, что такое «перегрузка по месту или по CPU»?

Допустим размер шарда 1 Тб. Если занятое место достигает 90% (900 Гб), то мы говорим что шард перегружен по месту.

Допустим шард имеет 16 ядер процессора CPU. Если в среднем загружены более 14 ядер, то есть остается очень маленький запас и периодически мы упираемся в полку лимита - 16 ядер - то мы говорим что шард перегружен по CPU

Спасибо!

А что вы используете для очередей? Если саму PG - как на ней их строите?

Sign up to leave a comment.