База по шардированию базы

ARTICLES 19.12.22 19.12.22 285
Бесплатные курсына главную сниппетов

Возможность горизонтального масштабирования это одно из важнейших нефункциональных требований индустрии в последнее время. Рост бизнеса со стороны IT выглядит чаще всего как рост нагрузки и цены отказа системы. Нам всем хочется создавать такие приложения, которые будут одинаково быстро и стабильно работать как с сотней, так и с сотней тысяч клиентов. Для этого необходимо еще на стадии проектирования закладывать потенциал для масштабирования, одним из способов которого является шардирование.

Мы на пальцах рассмотрим что такое шардирование, как оно помогает в масштабировании и даже рассмотрим тот самый этап «роста».

О чём речь?

Шардинг (или шардирование) — это разделение хранилища на несколько независимых частей, шардов (от англ. shard — осколок). Не путайте шардирование с репликацией, в случае которой выделенные экземпляры базы данных являются не составными частями общего хранилища, а копиями друг друга. 

Шардирование помогает оптимизировать хранение данных приложения за счёт их распределения между инсталляциями БД (которые находятся на разных железках), что улучшает отзывчивость сервиса, так как размер данных в целом на каждом инстансе станет меньше. 

Шардирование — это разновидность партиционирования (от англ. partition — деление, раздел). Отличие в том, что партиционирование подразумевает разделение данных внутри одной БД, а шардирование распределяет их по разным экземплярам БД. 

Способы шардирования

Осуществить шардирование можно несколькими способами:

  1. Средствами БД. Некоторые базы — MongoDB, Elasticsearch, ClickHouse и другие — умеют самостоятельно распределять данные между своими экземплярами, для этого достаточно настроить конфигурацию. На мой взгляд, это лучший вариант.

  2. Надстройками к БД. Самый спорный способ — применение надстроек, которые выполняют шардирование, например Vitess или Citus, поскольку при этом есть риск потери данных и производительности.

  3. Клиентскими средствами. В этом случае экземпляры БД даже не подозревают о существовании друг друга, шардированием управляет стороннее приложение — со всеми вытекающими рисками.

Методы работы в этих способах схожи: мы выбираем ключ для распределения данных (это может быть идентификатор, временная метка или хеш записи) и в соответствии с ним записываем информацию в нужный шард. Как правило, ключи стараются выбирать так, чтобы данные были равномерно распределены по шардам. Сделать это не сложно — достаточно ориентироваться на текущее содержимое БД. 

Важно учитывать для чего вы делаете шардирование. В случае если требуется распределить нагрузку на запись, необходимо подобрать такой ключ, который обеспечит равномерное распределение запросов между инстансами. Нельзя забывать и о «горячих» данных, запросы к которым происходят чаще, из-за чего нагрузка на шарды оказывается неравномерной. Для этого можно добавить в приложение метрику, показывающую, сколько раз в какой шард будут попадать данные по конкретному ключу. 

Пример шардирования

Давайте в качестве примера сделаем клиентское шардирование горячо любимой в Ozon PostgreSQL. Приложение будет на Go, а мигрировать будем с помощью Goose. Для начала нам надо добавить сами шарды, то есть развернуть еще одну инсталляцию БД. Отвлекаться на детальный разбор того, как правильно раскатывать PostgreSQL, мы не будем.

Добавим в наш Storage маппинг шардов: 

// Обозначим количество шардов.
    const bucketQuantity = 2

    const (
    Shard1 ShardNum = iota
    Shard2
    )
    // Для лучшей семантики.
    type ShardNum int
    type shardMap map[ShardNum]*sqlx.DB

    type Storage struct {
    shardMap shardMap
    }

Напишем конструктор для Storage который возьмёт на себя все задачи по инициализации соединений с БД.

func initShardMap(ctx context.Context, dsns map[ShardNum]string) shardMap {
    m := make(shardMap, len(dsns))
    for sh, dsn := range dsns {
    m[sh] = discoveryShard(ctx, dsn)
    }

    return m
    }

    func discoveryShard(ctx context.Context, dsn string) *sqlx.DB {
    db, err := sqlx.ConnectContext(ctx, "postgres", dsn)
    if err != nil {
    panic(err)
    }

    return db
    }

    func NewStorage(ctx context.Context, dsns map[ShardNum]string) *Storage {
    return &Storage{
    shardMap: initShardMap(ctx, dsns),
    }
    }

Переходим к работе с данными. Реализуем методы для их записи в шарды и чтения оттуда. Начинается всё с определения того, в какой шард идти. 

При условии равномерности распределения наших ID (представим, что это действительно так) нам хватит классического остатка от деления. Выглядеть это будет примерно так:

func (s *Storage) shardByItemID(itemID int64) ShardNum {
    return ShardNum(itemID % bucketQuantity)
    }

У нас есть вот такой незаурядный  метод чтения из БД. Тут стоит обратить внимание на то, что мы выполняем запрос на инстансе БД из нашего маппинга, а получаем инстанс (*sqlx.DB) по идентификатору шарда из сигнатуры.

func (s *Storage) getItemsByID(ctx context.Context, shard *sqlx.DB, itemsIDs []int64) ([]models.Item, error) {
    items := make([]models.Item, 0)

    query, args, err := sq.
    Select(itemsTableFields...).
    From(itemsTable).
    Where(sq.Eq{itemIDField: itemsIDs}).
    PlaceholderFormat(sq.Dollar).
    ToSql()
    if err != nil {
    err = errors.Wrap(err, "[create query]")
    return items, err
    }

    err = shard.SelectContext(ctx, &items, query, args...)
    return items, err
    }

Сам идентификатор шарда мы получаем чуть выше, когда распределяем наши ItemIDs по кубышкам. Само распределение выглядит вот так:

func (s *Storage) sortItemsIDsByShard(itemIDs ...int64) map[ShardNum][]int64 {
    shardToItems := make(map[ShardNum][]int64)

    for _, id := range itemIDs {
    shardID := s.shardByItemID(id)
    if _, ok := shardToItems[shardID]; !ok {
    shardToItems[shardID] = make([]int64, 0)
    }

    shardToItems[shardID] = append(shardToItems[shardID], id)
    }

    return shardToItems
    }

Ну и инфраструктурная обёрточка — чтобы запросы выполнялись параллельно. Вот так будет выглядеть публичный метод получения Item. Кажется, что он довольно большой, но в действительности  большую часть метода съедают раскручивания каналов.

func (s *Storage) GetItems(ctx context.Context, itemIDs ...int64) ([]models.Item, error) {
    shardToItems := s.sortItemsIDsByShard(itemIDs...)

    respChan := make(chan []models.Item, len(shardToItems))
    errChan := make(chan error, len(shardToItems))
    wg := &sync.WaitGroup{}

    for shardID, ids := range shardToItems {
    wg.Add(1)
    shard := s.shardMap[shardID]
    go s.asyncGetItemsByID(ctx, shard, ids, wg, respChan, errChan)
    }

    wg.Wait()
    close(respChan)
    close(errChan)

    result := make([]models.Item, 0)
    for items := range respChan {
    result = append(result, items...)
    }

    errs := make([]error, 0, len(errChan))
    for e := range errChan {
    errs = append(errs, e)
    }
    err := multierr.Combine(errs...)

    return result, err
    }

Для того чтобы не терять смысл getItemsByID за нагромождением каналов и Wait-групп, мы просто обернем всё это в asyncGetItemsByID:

unc (s *Storage) asyncGetItemsByID(
    ctx context.Context,
    shard *sqlx.DB,
    itemsIDs []int64,
    wg *sync.WaitGroup,
    resp chan<- []models.Item,
    errs chan<- error,
    ) {
    defer wg.Done()
    items, err := s.getItemsByID(ctx, shard, itemsIDs)
    if err != nil {
    errs <- errors.Wrapf(err, "[getItemsByID] can't select from shard %d", shard)
    }

    resp <- items
    }
Всё то же самое мы проделываем для записи данных в шарды:
func (s *Storage) AddItems(ctx context.Context, items ...models.Item) error {
            itemsByShardMap := s.itemsByShard(items...)
            errChan := make(chan error, len(itemsByShardMap))
            wg := &sync.WaitGroup{}

            for shardID, items := range itemsByShardMap {
            wg.Add(1)
            shard := s.shardMap[shardID]
            go s.asyncAddItems(ctx, errChan, wg, shard, items...)
            }

            wg.Wait()
            close(errChan)

            errs := make([]error, 0, len(errChan))
            for e := range errChan {
            errs = append(errs, e)
            }

            return multierr.Combine(errs...)
            }

            func (s *Storage) itemsByShard(items ...models.Item) map[ShardNum][]models.Item {
            itemsByShard := make(map[ShardNum][]models.Item)

            for _, item := range items {
            shardID := s.shardByItemID(item.ID)
            if _, ok := itemsByShard[shardID]; !ok {
            itemsByShard[shardID] = make([]models.Item, 0)
            }

            itemsByShard[shardID] = append(itemsByShard[shardID], item)
            }

            return itemsByShard
            }

            func (s *Storage) asyncAddItems(
                ctx context.Context,
                errChan chan<- error, wg *sync.WaitGroup,
                shard *sqlx.DB,
                items ...models.Item) {
            defer wg.Done()
            err := s.addItems(ctx, shard, items...)
            errChan <- errors.Wrapf(err, "[asyncAddItems] can't insert to shard")
            }

            func (s *Storage) addItems(ctx context.Context, shard *sqlx.DB, items ...models.Item) error {
            q := sq.
            Insert(itemsTable).
            Columns(itemsTableFields...).
            PlaceholderFormat(sq.Dollar)

            for _, item := range items {
            q = q.Values(item.ID, item.CreatedAt)
            }

            query, args, err := q.ToSql()
            if err != nil {
            return errors.Wrap(err, "[create query]")
            }

            _, err = shard.DB.ExecContext(ctx, query, args...)
            return err
            }
        

Ну и скриптик для миграции всего этого дела:

#!/usr/bin/env bash
    export MIGRATION_DIR=./migrations/

    if [ "${STAGE}" = "production" ]; then
    if [ "$1" = "--dryrun" ]; then
    goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" status
    goose -dir ${MIGRATION_DIR} postgres "user=${USER2} password=${PASSWORD2} dbname=${DBNAME2} host=${HOST2} port=${PORT2} sslmode=disable" status
    else
    goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" up
    goose -dir ${MIGRATION_DIR} postgres "user=${USER2} password=${PASSWORD2} dbname=${DBNAME2} host=${HOST2} port=${PORT2} sslmode=disable" up
    fi

    elif [ "${STAGE}" = "staging" ]; then
    if [ "$1" = "--dryrun" ]; then
    goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" status
    else
    goose -dir ${MIGRATION_DIR} postgres "user=${USER1} password=${PASSWORD1} dbname=${DBNAME1} host=${HOST1} port=${PORT1} sslmode=disable" up
    fi
    elif [ "${STAGE}" = "development" ]; then
    exit 0
    fi

Очень удобно шардировать в приложении, где еще нет данных, а следовательно нет необходимости их перетаскивать. Но что делать, если мы шардим рабочее приложение? Тут, как говорится у нас на Руси, case-by-case.

Можно просто скопировать данные в другой шард, а потом просто удалить их из источника, но на практике я такого не встречал. 

Для полноты картины разберём вариант решардинга в условиях, когда нам не хотелось бы останавливать сервис. Писать данные будем только в новый маппинг шардов, а вот читать их будем сразу из старого и нового. 

Начинается всё с создания дублирующей схемы шардов внутри Storage. Внесём изменения в константы:

const legacyBucketQuantity = 2
    const bucketQuantity = 3

    const (
    Shard1 ShardNum = iota
    Shard2
    Shard3
    )

Заведём внутри Storage shardMapLegacy, который содержит дорешардинговый маппинг:

type Storage struct {
    shardMapLegacy shardMap
    shardMap   	shardMap
    }

Ну и инициализация. В конструктор Storage теперь будем передавать также две схемы:

func NewStorage(ctx context.Context, dsns map[ShardNum]string, dsnsLegacy map[ShardNum]string) *Storage {
    return &Storage{
    shardMap:       initShardMap(ctx, dsns),
    shardMapLegacy: initShardMap(ctx, dsnsLegacy),
    }
    }

Заводим метод для получения shardID, чтобы после переноса данных его удалить:

func (s *Storage) legacyShardByItemID(itemID int64) ShardNum {
    return ShardNum(itemID % legacyBucketQuantity)
    }

Ну и ещё чуть-чуть дублирования кода. Речь о практически полной копии sortItemsIDsByShard; разница лишь в том, что для получения идентификатора шарда мы используем ранее модифицированную функцию. 

func (s *Storage) sortItemsIDsByLegacyShard(itemIDs ...int64) map[ShardNum][]int64 {
    shardToItems := make(map[ShardNum][]int64)

    for _, id := range itemIDs {
    shardID := s.legacyShardByItemID(id)
    if _, ok := shardToItems[shardID]; !ok {
    shardToItems[shardID] = make([]int64, 0)
    }

    shardToItems[shardID] = append(shardToItems[shardID], id)
    }

    return shardToItems
    }

Метод добавления Items в изменении не нуждается, так как мы условились, что данные пишем всегда в свежие шарды, а вот GetItems надо подправить. Теперь он будет конкурентно выполнять запрос сразу в две схемы, а полученные данных мы будем склеивать, отдавая предпочтение данным с актуального маппинга шардов.

func (s *Storage) GetItems(ctx context.Context, itemIDs ...int64) ([]models.Item, error) {
    wg := &sync.WaitGroup{}

    resultLegacy := make([]models.Item, 0)
    resultActual := make([]models.Item, 0)

    var err error

    wg.Add(1)
    go func() {
    defer wg.Done()

    res, e := s.getItems(ctx, itemIDs...)
    err = multierr.Append(err, e)
    resultActual = res
    }()

    wg.Add(1)
    go func() {
    defer wg.Done()

    res, e := s.getItemsFromLegacyShardMap(ctx, itemIDs...)
    err = multierr.Append(err, e)
    resultLegacy = res
    }()

    wg.Wait()
    result := mergeItems(resultActual, resultLegacy)

    return result, err
    }

Склейка результата выглядит так: 

func mergeItems(items, legacyItems []models.Item) []models.Item {
    itemsMap := make(map[models.Item]struct{})

    for _, item := range legacyItems {
    itemsMap[item] = struct{}{}
    }

    for _, item := range items {
    itemsMap[item] = struct{}{}
    }

    mergedItems := make([]models.Item, 0, len(itemsMap))
    for item, _ := range itemsMap {
    mergedItems = append(mergedItems, item)
    }

    return mergedItems
    }

Опционально можно добавить метрику для отслеживания частоты запросов по старому маппингу, которая будет сигнализировать нам о том, что данные перетащились и можно отключать дублирующий легаси-флоу. 

При таком подходе остаётся только один вопрос: что делать с «мёртвыми» данными,  которые лежат не в своих шардах после решардинга? 

Вариант в лоб: предположим, что мы зарешардились с двух до четырёх шардов, идём — и на каждом легаси-шарде выполняем запрос на удаление записей, где полученный для ID ключ шардирования не соответствует текущему шарду:

DELETE FROM items
    WHERE ctid IN (
    SELECT ctid
    FROM items
    WHERE id % 4 NOT IN (2, 2-4)
    );

З.Ы. Вариант SQL-запроса предполагает, что мы храним гошный UInt64 в постгревом BigInt. В этом случае положительные гошные числа могут превратиться в отрицательные постгревые, поэтому делаем NOT IN для ренджа.

Иногда встречаются системы, где данные имеют свойства ‎‎‎‎«протухать». В таких системах самое логичное оставить данные после решардинга, и дождаться пока они «протухнут».

И пара слов об упячках, с которыми я сталкивался, — о партиционировании внутри одной БД и шардинге целыми партициями. Поначалу кажется, что это логично и даже элегантно. Ведь для решардинга достаточно просто перетащить целую партицию с одного шарда в другой. И это ФАТАЛЬНАЯ ОШИБКА. Со временем вы устанете от трёхэтажного мата негодования, вызванного пятиэтажными пакетными запросами, из-за которых горячие данные не будут нормально попадать в кеш. Такой способ работает лишь в том случае, если партиционирование выполняется по дате, но запросы, как правило, обращаются к свежим или старым данным, как, например, во многих OLAP-системах. В остальных случаях перспективнее держать данные в рамках одной партиции, а решардить их путём постепенного переноса, если, конечно, БД не предусматривает своих вариантов решения проблемы решардинга.

Вместо вывода

Рано или поздно вам придётся заняться решардингом. А потом ещё раз, и ещё, и ещё, особенно, если бизнес будет расти и данных будет становиться всё больше. Поэтому приберегите инструменты, которые вам помогли однажды, и ничего страшного, если запускать вы их будете раз в полгода. 

Клиентское шардирование — надо. 

Ключ шардирования выбираем с умом, предварительно медитируем над метриками, чтобы чётко видеть картину того, как данные пишутся, запрашиваются и хранятся.

Не так страшно шардирование, как решардинг. Важно заранее подумать о том, как вы будете решать вопросы консистентности при решардинге.

 

на главную сниппетов
Курсы