package com.scalar.db.util.groupcommit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.scalar.db.util.groupcommit.KeyManipulator;
import java.io.Closeable;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:com/scalar/db/util/groupcommit/GroupCommitter.class */
public class GroupCommitter<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(GroupCommitter.class);
    private final GroupSizeFixWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupSizeFixWorker;
    private final DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> delayedSlotMoveWorker;
    private final GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupCleanupWorker;

    @Nullable
    private final GroupCommitMonitor groupCommitMonitor;
    private final KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY> keyManipulator;
    private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupManager;
    private final AtomicBoolean closing = new AtomicBoolean();

    public GroupCommitter(String str, GroupCommitConfig groupCommitConfig, KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY> keyManipulator) {
        logger.info("Starting GroupCommitter. Label: {}, Config: {}", str, groupCommitConfig);
        this.keyManipulator = keyManipulator;
        this.groupManager = createGroupManager(groupCommitConfig, keyManipulator);
        this.groupCleanupWorker = createGroupCleanupWorker(str, groupCommitConfig, this.groupManager);
        this.delayedSlotMoveWorker = createDelayedSlotMoveWorker(str, groupCommitConfig, this.groupManager, this.groupCleanupWorker);
        this.groupSizeFixWorker = createGroupSizeFixWorker(str, groupCommitConfig, this.groupManager, this.delayedSlotMoveWorker, this.groupCleanupWorker);
        if (groupCommitConfig.metricsMonitorLogEnabled()) {
            this.groupCommitMonitor = createGroupCommitMonitor(str);
        } else {
            this.groupCommitMonitor = null;
        }
    }

    GroupCommitMetrics getMetrics() {
        return new GroupCommitMetrics(this.groupSizeFixWorker.size(), this.delayedSlotMoveWorker.size(), this.groupCleanupWorker.size(), this.groupManager.sizeOfNormalGroupMap(), this.groupManager.sizeOfDelayedGroupMap());
    }

    public void setEmitter(Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emittable) {
        this.groupManager.setEmitter(emittable);
    }

    public FULL_KEY reserve(CHILD_KEY child_key) {
        if (this.closing.get()) {
            throw new GroupCommitException(String.format("Reserving a new slot isn't allowed since already closed. Child key: %s", child_key));
        }
        while (true) {
            FULL_KEY reserveNewSlot = this.groupManager.reserveNewSlot(child_key);
            if (reserveNewSlot != null) {
                return reserveNewSlot;
            }
            logger.debug("Failed to reserve a new value slot since the group was already size-fixed. Retrying. Key: {}", child_key);
        }
    }

    public void ready(FULL_KEY full_key, V v) throws GroupCommitException {
        KeyManipulator.Keys<PARENT_KEY, CHILD_KEY, FULL_KEY> keysFromFullKey = this.keyManipulator.keysFromFullKey(full_key);
        boolean z = false;
        while (true) {
            Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> group = this.groupManager.getGroup(keysFromFullKey);
            if (group.putValueToSlotAndWait(keysFromFullKey.childKey, v)) {
                return;
            }
            if (z) {
                throw new GroupCommitConflictException(String.format("Failed to put a value to the slot. The slot might be already removed before this operation. Group: %s, Full key: %s, Value: %s", group, full_key, v));
            }
            z = true;
            logger.debug("The state of the group has been changed. Retrying. Group: {}, Keys: {}", group, keysFromFullKey);
        }
    }

    public void remove(FULL_KEY full_key) {
        if (this.groupManager.removeSlotFromGroup(this.keyManipulator.keysFromFullKey(full_key))) {
            return;
        }
        logger.debug("Failed to remove the slot. Slots in a group that is already done can be automatically removed. Full key: {}", full_key);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.info("Closing GroupCommitter");
        this.closing.set(true);
        Instant plusMillis = Instant.now().plusMillis(10000L);
        while (true) {
            Uninterruptibles.sleepUninterruptibly(200L, TimeUnit.MILLISECONDS);
            GroupCommitMetrics metrics = getMetrics();
            if (!metrics.hasRemaining()) {
                break;
            }
            if (plusMillis.isBefore(Instant.now())) {
                logger.info("Ongoing slot still remains. Metrics: {}", metrics);
                plusMillis = Instant.now().plusMillis(10000L);
            }
        }
        logger.info("No ongoing group remains. Closing all the resources");
        if (this.groupCommitMonitor != null) {
            this.groupCommitMonitor.close();
        }
        this.groupSizeFixWorker.close();
        this.delayedSlotMoveWorker.close();
        this.groupCleanupWorker.close();
    }

    @VisibleForTesting
    GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> createGroupManager(GroupCommitConfig groupCommitConfig, KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY> keyManipulator) {
        return new GroupManager<>(groupCommitConfig, keyManipulator);
    }

    @VisibleForTesting
    GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> createGroupCleanupWorker(String str, GroupCommitConfig groupCommitConfig, GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupManager) {
        GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupCleanupWorker = new GroupCleanupWorker<>(str, groupCommitConfig.timeoutCheckIntervalMillis(), groupManager);
        groupManager.setGroupCleanupWorker(groupCleanupWorker);
        return groupCleanupWorker;
    }

    @VisibleForTesting
    DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> createDelayedSlotMoveWorker(String str, GroupCommitConfig groupCommitConfig, GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupManager, GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupCleanupWorker) {
        return new DelayedSlotMoveWorker<>(str, groupCommitConfig.timeoutCheckIntervalMillis(), groupManager, groupCleanupWorker);
    }

    @VisibleForTesting
    GroupSizeFixWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> createGroupSizeFixWorker(String str, GroupCommitConfig groupCommitConfig, GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupManager, DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> delayedSlotMoveWorker, GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupCleanupWorker) {
        GroupSizeFixWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupSizeFixWorker = new GroupSizeFixWorker<>(str, groupCommitConfig.timeoutCheckIntervalMillis(), delayedSlotMoveWorker, groupCleanupWorker);
        groupManager.setGroupSizeFixWorker(groupSizeFixWorker);
        return groupSizeFixWorker;
    }

    @VisibleForTesting
    GroupCommitMonitor createGroupCommitMonitor(String str) {
        return new GroupCommitMonitor(str, this::getMetrics);
    }
}
