Pull to refresh

Что означает I в ACID и как это можно использовать

Reading time8 min
Views5.9K

Пройдя много собеседований, выяснилось, что довольно приличная часть собеседующих, спрашивавших или как-то затрагивавших тему транзакций и их работы, не знают как работают транзакции и что означает для разработчика термин изоляция. Вплоть до архитектора в одной очень большой российской компании, для которого выводы, использованные мною для формулирования решения при прохождении архитектурной секции оказались чем-то вроде бреда. Пока готовится вторая статья (Миллиард абитуриентов МИРЭА 2), можно отвлечься и разобрать тему, продемонстрировать разработчикам что означает для них I в ACID.

Короткий пример

Изоляция позволяет нам гарантировать, что в случае конкурентного доступа к ресурсу, мы не видим, что делают другие транзакции. Но что это означает на практике?

Возьмем простой пример. Есть два потока и таблица1 с записями, так же есть таблица2, в которой могут быть записи с ссылками на первую. Поток1 добавляет в таблицу2 запись с ссылкой на таблицу1. Поток2 удаляет ту же самую запись из таблицы1. Оба закрывают свои транзакции единомоментно, вплоть до такта процессора. Вопрос, какой будет результат?

Подсказка

Целостность базы данных при любых раскладах должна быть сохранена, это основное требование к любой реляционной СУБД.

Еще пример

На самом деле изоляция позволяет нам реализовать механизм межсерверных блокировок. Это может быть очень полезно, когда нужно гарантировать состояние, обеспечить, что какая-то операция будет точно выполнена хотя бы один раз. Предлагаю рассмотреть на примере отправки уведомлений пользователю.

Необходимо отправлять уведомления массово нескольким пользователям. Можно конечно использовать кафку, но кафка не гарантирует, что сообщение будет точно обработано. Тем более что во время отправки сообщений в кафку может часть уйти, а часть не отправится - в итоге вы считаете, что сообщение ушло, а на деле - нет. Получите-распишитесь в испорченном состоянии, которое невозможно исправить. Кафка по сути своей вообще ничего не гарантирует и является ненадежным источником/приемником данных. Не тешьте себя иллюзиями. У разработчика микросервисов все кроме его стейта всегда является ненадежным, а стейт в БД. Почему бы не сохранить записи об уведомлении в СУБД, а дальше уже по одному отправлять и контроллировать процесс отправки? Так не только будет видно, сколько нужно еще отправить уведомлений, но и в случае необходимости перезапустить этот процесс.

Так как это сделать?

Изоляция дает нам возможность осуществлять конкурентный доступ к ресурсу, гарантируя что состояние БД будет валидным с точки зрения описанных вами правил.

Одним из простейших способов реализации блокировки является дополнительная колонка статуса записи. Например целое. Что бы заблокировать запись, нужно сделать следующий запрос:

UPDATE update_lock.lock_table t SET state = 1 WHERE t.id = ? AND t.state = 0

Обратите внимание, что нужно обязательно проверять, что t.state = 0, иначе работать не будет. Так же нужно обязательно закрыть транзакцию, что бы изменения были применены СУБД и остальные потоки увидели эти изменения.

Хорошо, вы можете не поверить, и сказать, что автор ничего не понимает в СУБД, но вот вам пример, который 100% надежен. Он не может никаким образом сломаться и работает быстрее, чем приведенный выше вариант.

Можно блокировать записи при помощи инструкции INSERT. Добавляется таблица, ссылки на оригинальную таблицу не обязательны, но вот существование главного ключа для уникальности - обязательно, все что остается - это сделать вставку:

INSERT INTO insert_lock.lock_table(id) VALUES(?)

Обязательно закрываем транзакцию. Если другой поток сделает вставку, то мы не сможем закрыть транзакцию и получим исключение - тогда откатываем изменения и пытаемся блокировать другую задачу.

А протестировать?

Для того, что бы убедится, что это действительно работает, напишем простейшую программу работающую напрямую с jdbc.

Для упрощения написания различных тестов сразу сделаем два абстрактаных класса, они помогут нам при написании тестов.

AbstractDBTest.java
package com.lastrix.dblock;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;

public abstract class AbstractDBTest {
    public static final int THREAD_COUNT = 8;

    private final int threadCount;

    protected AbstractDBTest() {
        this(THREAD_COUNT);
    }

    protected AbstractDBTest(int threadCount) {
        this.threadCount = threadCount;
    }

    /**
     * Execute test plan according to your configuration
     */
    public void run(){
        try (var connection = createConnection()) {
            setup(connection);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

        var start = Instant.now();
        doTest();
        System.out.println("Time taken = " + Duration.between(start, Instant.now()).toMillis() + " ms");
    }

    /**
     * This test creates N threads each of them will wait until each ready to execute plan.
     * The method will wait until all threads finished
     */
    private void doTest() {
        CountDownLatch latch = new CountDownLatch(threadCount);
        var complete = new CountDownLatch(threadCount);
        var runnable = new Runnable() {
            @Override
            public void run() {
                try (var connection = createConnection()) {
                    latch.countDown();
                    latch.await();
                    runJob(connection, getThreadId());
                } catch (Throwable e) {
                    throw new RuntimeException(e);
                } finally {
                    complete.countDown();
                }
            }
        };

        for (int i = 0; i < threadCount; i++) {
            new Thread(runnable, "test-thread-" + i).start();
        }

        try {
            complete.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static int getThreadId() {
        var n = Thread.currentThread().getName();
        var idx = n.lastIndexOf('-');
        return Integer.parseInt(n.substring(idx + 1));
    }

    private static Connection createConnection() throws SQLException {
        Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/test", "test", "test2002");
        connection.setAutoCommit(false);
        return connection;
    }

    /**
     * Create schema, tables and rows needed for tests
     * @param connection
     * @throws SQLException
     */
    protected abstract void setup(Connection connection) throws SQLException;

    /**
     * This method should perform actual testing
     * @param connection
     * @param tid
     * @throws Exception
     */
    protected abstract void runJob(Connection connection, int tid) throws Exception;
}

AbstractJobRunner.java
package com.lastrix.dblock;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Abstract class for handling locking/unlocking testing
 */
public abstract class AbstractJobRunner implements Runnable {
    protected final Connection connection;
    protected final int tid;
    private final int totalLockCount;

    protected AbstractJobRunner(Connection connection, int tid, int totalLockCount) {
        this.connection = connection;
        this.tid = tid;
        this.totalLockCount = totalLockCount;
    }

    @Override
    public final void run() {
        int locked = 0;
        while (locked < totalLockCount) {
            if (tryLock()) {
                locked++;
                System.out.println("Lock acquired by thread " + Thread.currentThread().getName());
                int lockedCount = getLockCounter().incrementAndGet();
                if (lockedCount > 1) {
                    System.out.println("Locked by multiple threads! " + lockedCount);
                }
                if (unlock()) {
                    System.out.println("Released lock by " + Thread.currentThread().getName());
                } else {
                    System.out.println("Failed to unlock by " + Thread.currentThread().getName());
                }
            }
        }
    }

    protected final void rollbackSafely() {
        try {
            connection.rollback();
        } catch (SQLException ignored) {
        }
    }

    /**
     * Try to lock row in database, return true if success
     *
     * @return boolean
     */
    protected abstract boolean tryLock();

    /**
     * Try to unlock row in database, return true if success, this method should be successful all the time
     *
     * @return boolean
     */
    protected abstract boolean unlock();

    /**
     * Get current lock counter for checking that only single thread acquired our lock
     *
     * @return AtomicInteger
     */
    protected abstract AtomicInteger getLockCounter();
}

Первым протестируем блокировки на основе запроса update. Что бы заблокировать запись, потребуется 2 запроса (да знаю, что есть select for update, не будьте душнилами).

            try (var selStmt = connection.prepareStatement("SELECT id FROM db_locks.update_lock t WHERE t.state = 0 ORDER BY id LIMIT 1;");
                 var updStmt = connection.prepareStatement("UPDATE db_locks.update_lock t SET state = 1 WHERE t.id = ? AND t.state = 0")
            ) {
                try (var rs = selStmt.executeQuery()) {
                    if (rs.next()) {
                        id = rs.getInt(1);
                    } else {
                        rollbackSafely();
                        return false;
                    }
                }
                updStmt.setInt(1, id);
                boolean success = updStmt.executeUpdate() > 0;
                connection.commit();
                return success;
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }

Если убрать в запросе update проверку, что state = 0, то в консоли можно будет увидеть предупреждение, что больше, чем один поток захватил лок. Такое происходит, потому что между двумя запросами была завершена транзакция другого потока, захватившего лок и мы видим эти изменения, при выполнении update.

Что бы разблокировать, нужно выполнить запрос, аналогичный блокировке:

            try (var stmt = connection.prepareStatement("UPDATE db_locks.update_lock t SET state = 0 WHERE t.id = ? AND t.state = 1")) {
                stmt.setInt(1, id);
                if (stmt.executeUpdate() == 0) {
                    rollbackSafely();
                    return false;
                }
                // we need to do this before commit, otherwise race-condition may occur
                getLockCounter().decrementAndGet();
                connection.commit();
                return true;
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }

Несмотря на то, что в нашем примере возможны только два состояния лока, использовать такую блокировку можно для много-этапных обработок, поэтому желательно обновлять запись, задавая исходное значение, которое мы ожидаем увидеть в момент изменения. Так можно избежать всяких странностей, да и отлаживать будет проще, так как сразу увидите проблему в логах.

Вариант с вставкой записи в спец-таблицу является более быстрым, чем обновление (можете объяснить в комментариях почему так), однако такой подход более ограничен. Что бы получить блокировку - осуществляем вставку записи в таблицу:

            try (var stmt = connection.prepareStatement("INSERT INTO db_locks.insert_lock(id) VALUES(?)")) {
                stmt.setInt(1, id);
                boolean success = stmt.executeUpdate() > 0;
                connection.commit();
                return success;
            } catch (SQLException e) {
                rollbackSafely();
                if (isValidFailure(e)) {
                    return false;
                }
                throw new RuntimeException(e);
            }

Что бы этот способ заработал как надо, нужно контроллировать исключения. В данном случае может произойти 2 исключения, а именно: ошибка нарушения целостности БД из-за дубликата главного ключа, а так же ошибка завершения транзакции из-за нарушения целостости БД из-за дубликата главного ключа. Ошибки эти разные, и проверять их надо отдельно:

            if (e.getMessage().contains("duplicate key value violates unique constraint")) {
                return true;
            }
            return e.getMessage().contains("current transaction is aborted, commands ignored until end of transaction block");

При появлении таких ошибок нужно обязательно сделать rollback, иначе в дальнейшем подключение будет некорректным - транзакция не закрыта, а значит все наши попытки добавить запись сразу будут завершаться первым исключением.

Что бы освободить такой лок нужно удалить запись из таблицы.

Хотя в статье не приводятся результаты производительности для таких блокировок, все же второй вариант работает намного быстрее (примерно в два раза), попробуйте самостоятельно выяснить причину.

Недостатки такой блокировки

Она работает довольно медленно, как и любой запрос к СУБД. 10-200 мкс на этом спокойно теряются, если осуществляется конкурентный доступ множеством потоков. В случае, если ваши фоновые задачи очень долгие (от 0,5 мс) - например запись в кафку или обращение к другому сервису, то данный подход уже является более предпочтительным, чем например костыль в виде межсервисных блокировок redis, особенно если вы его только для этого и используете.

Вторым недостатком является необходимость сбрасывать состояние блокировки по прохождении определенного таймаута (хотя в redis же то же самое должно быть). Все что требуется - это раз в определенный период выполнять запрос, который все "зависшие" записи сбросит в начальное состояние, что заставит задачу выполняться заново.

Заключение

В данной работе рассмотрен способ применения изолированности транзакций для реализации межсервисных блокировок с гарантированной целостностью состояния. Такой подход позволяет гарантировать, что данные не будут испорчены, из-за использования каких-то сторонних систем. Так же это позволяет сэкономить на оборудовании, так как не требуется ставить дополнительный софт, ради ненужного костыля.

Основным преимуществом является возможность написания таких алгоритмов, которые при обработке пользователя все эффекты сохраняют в БД, а уже затем они в фоне обрабатываются и применяются к системе в целом. Это гарантирует, что запрос пользователя будет обработан полностью, а если что-то серьезное произойдет и нужно будет освободить ресурсы, то у вас уже есть план для выполнения, так как все хранится в БД, что может оказаться полезным при обработке много-этапных и/или многокомпонентных эффектов запросов пользователя.

Исходный код можно найти в репозитории.

Для работодателей

https://hh.ru/resume/b33504daff020c31070039ed1f77794a774336

Спасите котика от изоляции!

Tags:
Hubs:
Total votes 10: ↑1 and ↓9-6
Comments12

Articles