Возможность горизонтального масштабирования это одно из важнейших нефункциональных требований индустрии в последнее время. Рост бизнеса со стороны IT выглядит чаще всего как рост нагрузки и цены отказа системы. Нам всем хочется создавать такие приложения, которые будут одинаково быстро и стабильно работать как с сотней, так и с сотней тысяч клиентов. Для этого необходимо еще на стадии проектирования закладывать потенциал для масштабирования, одним из способов которого является шардирование.
Мы на пальцах рассмотрим что такое шардирование, как оно помогает в масштабировании и даже рассмотрим тот самый этап «роста».
Шардинг (или шардирование) — это разделение хранилища на несколько независимых частей, шардов (от англ. shard — осколок). Не путайте шардирование с репликацией, в случае которой выделенные экземпляры базы данных являются не составными частями общего хранилища, а копиями друг друга.
Шардирование помогает оптимизировать хранение данных приложения за счёт их распределения между инсталляциями БД (которые находятся на разных железках), что улучшает отзывчивость сервиса, так как размер данных в целом на каждом инстансе станет меньше.
Шардирование — это разновидность партиционирования (от англ. partition — деление, раздел). Отличие в том, что партиционирование подразумевает разделение данных внутри одной БД, а шардирование распределяет их по разным экземплярам БД.
Осуществить шардирование можно несколькими способами:
Средствами БД. Некоторые базы — MongoDB, Elasticsearch, ClickHouse и другие — умеют самостоятельно распределять данные между своими экземплярами, для этого достаточно настроить конфигурацию. На мой взгляд, это лучший вариант.
Надстройками к БД. Самый спорный способ — применение надстроек, которые выполняют шардирование, например Vitess или Citus, поскольку при этом есть риск потери данных и производительности.
Клиентскими средствами. В этом случае экземпляры БД даже не подозревают о существовании друг друга, шардированием управляет стороннее приложение — со всеми вытекающими рисками.
Методы работы в этих способах схожи: мы выбираем ключ для распределения данных (это может быть идентификатор, временная метка или хеш записи) и в соответствии с ним записываем информацию в нужный шард. Как правило, ключи стараются выбирать так, чтобы данные были равномерно распределены по шардам. Сделать это не сложно — достаточно ориентироваться на текущее содержимое БД.
Важно учитывать для чего вы делаете шардирование. В случае если требуется распределить нагрузку на запись, необходимо подобрать такой ключ, который обеспечит равномерное распределение запросов между инстансами. Нельзя забывать и о «горячих» данных, запросы к которым происходят чаще, из-за чего нагрузка на шарды оказывается неравномерной. Для этого можно добавить в приложение метрику, показывающую, сколько раз в какой шард будут попадать данные по конкретному ключу.
Давайте в качестве примера сделаем клиентское шардирование горячо любимой в Ozon PostgreSQL. Приложение будет на Go, а мигрировать будем с помощью Goose. Для начала нам надо добавить сами шарды, то есть развернуть еще одну инсталляцию БД. Отвлекаться на детальный разбор того, как правильно раскатывать PostgreSQL, мы не будем.
Добавим в наш Storage маппинг шардов:
// Обозначим количество шардов.
const bucketQuantity = 2
const (
Shard1 ShardNum = iota
Shard2
)
// Для лучшей семантики.
type ShardNum int
type shardMap map[ShardNum]*sqlx.DB
type Storage struct {
shardMap shardMap
}
Напишем конструктор для Storage который возьмёт на себя все задачи по инициализации соединений с БД.
func initShardMap(ctx context.Context, dsns map[ShardNum]string) shardMap {
m := make(shardMap, len(dsns))
for sh, dsn := range dsns {
m[sh] = discoveryShard(ctx, dsn)
}
return m
}
func discoveryShard(ctx context.Context, dsn string) *sqlx.DB {
db, err := sqlx.ConnectContext(ctx, "postgres", dsn)
if err != nil {
panic(err)
}
return db
}
func NewStorage(ctx context.Context, dsns map[ShardNum]string) *Storage {
return &Storage{
shardMap: initShardMap(ctx, dsns),
}
}
Переходим к работе с данными. Реализуем методы для их записи в шарды и чтения оттуда. Начинается всё с определения того, в какой шард идти.
При условии равномерности распределения наших ID (представим, что это действительно так) нам хватит классического остатка от деления. Выглядеть это будет примерно так:
func (s *Storage) shardByItemID(itemID int64) ShardNum {
return ShardNum(itemID % bucketQuantity)
}
У нас есть вот такой незаурядный метод чтения из БД. Тут стоит обратить внимание на то, что мы выполняем запрос на инстансе БД из нашего маппинга, а получаем инстанс (*sqlx.DB) по идентификатору шарда из сигнатуры.
func (s *Storage) getItemsByID(ctx context.Context, shard *sqlx.DB, itemsIDs []int64) ([]models.Item, error) {
items := make([]models.Item, 0)
query, args, err := sq.
Select(itemsTableFields...).
From(itemsTable).
Where(sq.Eq{itemIDField: itemsIDs}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
err = errors.Wrap(err, "[create query]")
return items, err
}
err = shard.SelectContext(ctx, &items, query, args...)
return items, err
}
Сам идентификатор шарда мы получаем чуть выше, когда распределяем наши ItemIDs по кубышкам. Само распределение выглядит вот так:
func (s *Storage) sortItemsIDsByShard(itemIDs ...int64) map[ShardNum][]int64 {
shardToItems := make(map[ShardNum][]int64)
for _, id := range itemIDs {
shardID := s.shardByItemID(id)
if _, ok := shardToItems[shardID]; !ok {
shardToItems[shardID] = make([]int64, 0)
}
shardToItems[shardID] = append(shardToItems[shardID], id)
}
return shardToItems
}
Ну и инфраструктурная обёрточка — чтобы запросы выполнялись параллельно. Вот так будет выглядеть публичный метод получения Item. Кажется, что он довольно большой, но в действительности большую часть метода съедают раскручивания каналов.
func (s *Storage) GetItems(ctx context.Context, itemIDs ...int64) ([]models.Item, error) {
shardToItems := s.sortItemsIDsByShard(itemIDs...)
respChan := make(chan []models.Item, len(shardToItems))
errChan := make(chan error, len(shardToItems))
wg := &sync.WaitGroup{}
for shardID, ids := range shardToItems {
wg.Add(1)
shard := s.shardMap[shardID]
go s.asyncGetItemsByID(ctx, shard, ids, wg, respChan, errChan)
}
wg.Wait()
close(respChan)
close(errChan)
result := make([]models.Item, 0)
for items := range respChan {
result = append(result, items...)
}
errs := make([]error, 0, len(errChan))
for e := range errChan {
errs = append(errs, e)
}
err := multierr.Combine(errs...)
return result, err
}
Для того чтобы не терять смысл getItemsByID за нагромождением каналов и Wait-групп, мы просто обернем всё это в asyncGetItemsByID:
unc (s *Storage) asyncGetItemsByID(
ctx context.Context,
shard *sqlx.DB,
itemsIDs []int64,
wg *sync.WaitGroup,
resp chan<- []models.Item,
errs chan<- error,
) {
defer wg.Done()
items, err := s.getItemsByID(ctx, shard, itemsIDs)
if err != nil {
errs <- errors.Wrapf(err, "[getItemsByID] can't select from shard %d", shard)
}
resp <- items
}
func (s *Storage) AddItems(ctx context.Context, items ...models.Item) error {
itemsByShardMap := s.itemsByShard(items...)
errChan := make(chan error, len(itemsByShardMap))
wg := &sync.WaitGroup{}
for shardID, items := range itemsByShardMap {
wg.Add(1)
shard := s.shardMap[shardID]
go s.asyncAddItems(ctx, errChan, wg, shard, items...)
}
wg.Wait()
close(errChan)
errs := make([]error, 0, len(errChan))
for e := range errChan {
errs = append(errs, e)
}
return multierr.Combine(errs...)
}
func (s *Storage) itemsByShard(items ...models.Item) map[ShardNum][]models.Item {
itemsByShard := make(map[ShardNum][]models.Item)
for _, item := range items {
shardID := s.shardByItemID(item.ID)
if _, ok := itemsByShard[shardID]; !ok {
itemsByShard[shardID] = make([]models.Item, 0)
}
itemsByShard[shardID] = append(itemsByShard[shardID], item)
}
return itemsByShard
}
func (s *Storage) asyncAddItems(
ctx context.Context,
errChan chan<- error, wg *sync.WaitGroup,
shard *sqlx.DB,
items ...models.Item) {
defer wg.Done()
err := s.addItems(ctx, shard, items...)
errChan <- errors.Wrapf(err, "[asyncAddItems] can't insert to shard")
}
func (s *Storage) addItems(ctx context.Context, shard *sqlx.DB, items ...models.Item) error {
q := sq.
Insert(itemsTable).
Columns(itemsTableFields...).
PlaceholderFormat(sq.Dollar)
for _, item := range items {
q = q.Values(item.ID, item.CreatedAt)
}
query, args, err := q.ToSql()
if err != nil {
return errors.Wrap(err, "[create query]")
}
_, err = shard.DB.ExecContext(ctx, query, args...)
return err
}
Ну и скриптик для миграции всего этого дела:
#!/usr/bin/env bash
export MIGRATION_DIR=./migrations/
if [ "${STAGE}" = "production" ]; then
if [ "$1" = "--dryrun" ]; then
goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" status
goose -dir ${MIGRATION_DIR} postgres "user=${USER2} password=${PASSWORD2} dbname=${DBNAME2} host=${HOST2} port=${PORT2} sslmode=disable" status
else
goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" up
goose -dir ${MIGRATION_DIR} postgres "user=${USER2} password=${PASSWORD2} dbname=${DBNAME2} host=${HOST2} port=${PORT2} sslmode=disable" up
fi
elif [ "${STAGE}" = "staging" ]; then
if [ "$1" = "--dryrun" ]; then
goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" status
else
goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" up
fi
elif [ "${STAGE}" = "development" ]; then
exit 0
fi
Очень удобно шардировать в приложении, где еще нет данных, а следовательно нет необходимости их перетаскивать. Но что делать, если мы шардим рабочее приложение? Тут, как говорится у нас на Руси, case-by-case.
Приложение можно остановить? Прекрасно! Останавливаем разбор очередей, отключаем обработку запросов или вовсе останавливаем приложение, делаем резервную копию базы, если ещё не сделали, перетаскиваем данные в соответствии с выбранным ключом и снова вводим приложение в эксплуатацию, предварительно проведя регрессионное тестирование.
Приложение должно отвечать на запросы? Тогда делаем временную надстройку внутри репозитория. Пишем данные всегда в новые шарды. Читаем сначала из нового шарда; если там нет нужных данных, то обращаемся к старым шардам. Если данные оказываются в старом шарде, то при желании можно их переносить в новый — и в конце концов данные перераспределятся между шардами. Только не забудьте добавить метрики, чтобы не пропустить этот знаменательный момент. И обязательно проверьте, все ли данные переехали или только те, к которым идут запросы. Да, и на первых порах это не то чтобы положительно скажется на отзывчивости приложения, будьте к этому готовы.
Если приложение пишет данные в БД только по событиям из условной kafka, а синхронные запросы (REST/GRPC) только читающие (классическая ситуация для event sourcing), то мы отключаем чтение из kafka, выкатываем в prod инстанс версии приложения, которое уже живет с новой схемой шардов, но синхронные запросы шлем только на инстанс приложения предыдущей версии (оно же canary-deploy). Далее джоба внутри приложения последовательно читает данные по старому маппингу, и пишет в новый, после переноса можно сразу же и удалять данные в старой схеме.
Можно просто скопировать данные в другой шард, а потом просто удалить их из источника, но на практике я такого не встречал.
Для полноты картины разберём вариант решардинга в условиях, когда нам не хотелось бы останавливать сервис. Писать данные будем только в новый маппинг шардов, а вот читать их будем сразу из старого и нового.
Начинается всё с создания дублирующей схемы шардов внутри Storage. Внесём изменения в константы:
const legacyBucketQuantity = 2
const bucketQuantity = 3
const (
Shard1 ShardNum = iota
Shard2
Shard3
)
Заведём внутри Storage shardMapLegacy, который содержит дорешардинговый маппинг:
type Storage struct {
shardMapLegacy shardMap
shardMap shardMap
}
Ну и инициализация. В конструктор Storage теперь будем передавать также две схемы:
func NewStorage(ctx context.Context, dsns map[ShardNum]string, dsnsLegacy map[ShardNum]string) *Storage {
return &Storage{
shardMap: initShardMap(ctx, dsns),
shardMapLegacy: initShardMap(ctx, dsnsLegacy),
}
}
Заводим метод для получения shardID, чтобы после переноса данных его удалить:
func (s *Storage) legacyShardByItemID(itemID int64) ShardNum {
return ShardNum(itemID % legacyBucketQuantity)
}
Ну и ещё чуть-чуть дублирования кода. Речь о практически полной копии sortItemsIDsByShard; разница лишь в том, что для получения идентификатора шарда мы используем ранее модифицированную функцию.
func (s *Storage) sortItemsIDsByLegacyShard(itemIDs ...int64) map[ShardNum][]int64 {
shardToItems := make(map[ShardNum][]int64)
for _, id := range itemIDs {
shardID := s.legacyShardByItemID(id)
if _, ok := shardToItems[shardID]; !ok {
shardToItems[shardID] = make([]int64, 0)
}
shardToItems[shardID] = append(shardToItems[shardID], id)
}
return shardToItems
}
Метод добавления Items в изменении не нуждается, так как мы условились, что данные пишем всегда в свежие шарды, а вот GetItems надо подправить. Теперь он будет конкурентно выполнять запрос сразу в две схемы, а полученные данных мы будем склеивать, отдавая предпочтение данным с актуального маппинга шардов.
func (s *Storage) GetItems(ctx context.Context, itemIDs ...int64) ([]models.Item, error) {
wg := &sync.WaitGroup{}
resultLegacy := make([]models.Item, 0)
resultActual := make([]models.Item, 0)
var err error
wg.Add(1)
go func() {
defer wg.Done()
res, e := s.getItems(ctx, itemIDs...)
err = multierr.Append(err, e)
resultActual = res
}()
wg.Add(1)
go func() {
defer wg.Done()
res, e := s.getItemsFromLegacyShardMap(ctx, itemIDs...)
err = multierr.Append(err, e)
resultLegacy = res
}()
wg.Wait()
result := mergeItems(resultActual, resultLegacy)
return result, err
}
Склейка результата выглядит так:
func mergeItems(items, legacyItems []models.Item) []models.Item {
itemsMap := make(map[models.Item]struct{})
for _, item := range legacyItems {
itemsMap[item] = struct{}{}
}
for _, item := range items {
itemsMap[item] = struct{}{}
}
mergedItems := make([]models.Item, 0, len(itemsMap))
for item, _ := range itemsMap {
mergedItems = append(mergedItems, item)
}
return mergedItems
}
Опционально можно добавить метрику для отслеживания частоты запросов по старому маппингу, которая будет сигнализировать нам о том, что данные перетащились и можно отключать дублирующий легаси-флоу.
При таком подходе остаётся только один вопрос: что делать с «мёртвыми» данными, которые лежат не в своих шардах после решардинга?
Вариант в лоб: предположим, что мы зарешардились с двух до четырёх шардов, идём — и на каждом легаси-шарде выполняем запрос на удаление записей, где полученный для ID ключ шардирования не соответствует текущему шарду:
DELETE FROM items
WHERE ctid IN (
SELECT ctid
FROM items
WHERE id % 4 NOT IN (2, 2-4)
);
З.Ы. Вариант SQL-запроса предполагает, что мы храним гошный UInt64 в постгревом BigInt. В этом случае положительные гошные числа могут превратиться в отрицательные постгревые, поэтому делаем NOT IN для ренджа.
Иногда встречаются системы, где данные имеют свойства «протухать». В таких системах самое логичное оставить данные после решардинга, и дождаться пока они «протухнут».
И пара слов об упячках, с которыми я сталкивался, — о партиционировании внутри одной БД и шардинге целыми партициями. Поначалу кажется, что это логично и даже элегантно. Ведь для решардинга достаточно просто перетащить целую партицию с одного шарда в другой. И это ФАТАЛЬНАЯ ОШИБКА. Со временем вы устанете от трёхэтажного мата негодования, вызванного пятиэтажными пакетными запросами, из-за которых горячие данные не будут нормально попадать в кеш. Такой способ работает лишь в том случае, если партиционирование выполняется по дате, но запросы, как правило, обращаются к свежим или старым данным, как, например, во многих OLAP-системах. В остальных случаях перспективнее держать данные в рамках одной партиции, а решардить их путём постепенного переноса, если, конечно, БД не предусматривает своих вариантов решения проблемы решардинга.
Рано или поздно вам придётся заняться решардингом. А потом ещё раз, и ещё, и ещё, особенно, если бизнес будет расти и данных будет становиться всё больше. Поэтому приберегите инструменты, которые вам помогли однажды, и ничего страшного, если запускать вы их будете раз в полгода.
Клиентское шардирование — надо.
Ключ шардирования выбираем с умом, предварительно медитируем над метриками, чтобы чётко видеть картину того, как данные пишутся, запрашиваются и хранятся.
Не так страшно шардирование, как решардинг. Важно заранее подумать о том, как вы будете решать вопросы консистентности при решардинге.
на главную сниппетов