package io.camunda.zeebe.dynamic.config.changes;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.dynamic.config.ClusterConfigurationManager;
import io.camunda.zeebe.dynamic.config.api.ClusterConfigurationRequestFailedException;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeAppliers;
import io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinator;
import io.camunda.zeebe.dynamic.config.state.ClusterConfiguration;
import io.camunda.zeebe.dynamic.config.state.ClusterConfigurationChangeOperation;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.util.Either;
import java.util.List;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/dynamic/config/changes/ConfigurationChangeCoordinatorImpl.class */
public class ConfigurationChangeCoordinatorImpl implements ConfigurationChangeCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(ConfigurationChangeCoordinatorImpl.class);
    private final ClusterConfigurationManager clusterTopologyManager;
    private final ConcurrencyControl executor;
    private final MemberId localMemberId;

    public ConfigurationChangeCoordinatorImpl(ClusterConfigurationManager clusterConfigurationManager, MemberId memberId, ConcurrencyControl concurrencyControl) {
        this.clusterTopologyManager = clusterConfigurationManager;
        this.executor = concurrencyControl;
        this.localMemberId = memberId;
    }

    @Override // io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinator
    public ActorFuture<ClusterConfiguration> getClusterConfiguration() {
        return this.clusterTopologyManager.getClusterConfiguration();
    }

    @Override // io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinator
    public ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> applyOperations(ConfigurationChangeCoordinator.ConfigurationChangeRequest configurationChangeRequest) {
        return applyOrDryRun(false, configurationChangeRequest);
    }

    @Override // io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinator
    public ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> simulateOperations(ConfigurationChangeCoordinator.ConfigurationChangeRequest configurationChangeRequest) {
        return applyOrDryRun(true, configurationChangeRequest);
    }

    @Override // io.camunda.zeebe.dynamic.config.changes.ConfigurationChangeCoordinator
    public ActorFuture<ClusterConfiguration> cancelChange(long j) {
        ActorFuture<ClusterConfiguration> createFuture = this.executor.createFuture();
        this.executor.run(() -> {
            this.clusterTopologyManager.updateClusterConfiguration(clusterConfiguration -> {
                if (!validateCancel(j, clusterConfiguration, createFuture)) {
                    return clusterConfiguration;
                }
                LOG.warn("Cancelling configuration change '{}'. Following operations have been already applied: {}. Following pending operations won't be applied: {}", new Object[]{Long.valueOf(j), (List) clusterConfiguration.pendingChanges().map((v0) -> {
                    return v0.completedOperations();
                }).orElse(List.of()), (List) clusterConfiguration.pendingChanges().map((v0) -> {
                    return v0.pendingOperations();
                }).orElse(List.of())});
                ClusterConfiguration cancelPendingChanges = clusterConfiguration.cancelPendingChanges();
                createFuture.complete(cancelPendingChanges);
                return cancelPendingChanges;
            });
        });
        return createFuture;
    }

    private ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> applyOrDryRun(boolean z, ConfigurationChangeCoordinator.ConfigurationChangeRequest configurationChangeRequest) {
        ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> createFuture = this.executor.createFuture();
        this.executor.run(() -> {
            this.clusterTopologyManager.getClusterConfiguration().onComplete((clusterConfiguration, th) -> {
                if (th != null) {
                    failFuture(createFuture, th);
                    return;
                }
                if (!configurationChangeRequest.isForced() && !isCoordinator(clusterConfiguration)) {
                    failFuture(createFuture, new ClusterConfigurationRequestFailedException.InternalError(String.format("Cannot process request to change the configuration. The broker '%s' is not the coordinator.", this.localMemberId)));
                    return;
                }
                Either<Exception, List<ClusterConfigurationChangeOperation>> operations = configurationChangeRequest.operations(clusterConfiguration);
                if (operations.isLeft()) {
                    failFuture(createFuture, (Throwable) operations.getLeft());
                } else {
                    applyOrDryRunOnTopology(z, clusterConfiguration, (List) operations.get(), createFuture);
                }
            }, this.executor);
        });
        return createFuture;
    }

    private void applyOrDryRunOnTopology(boolean z, ClusterConfiguration clusterConfiguration, List<ClusterConfigurationChangeOperation> list, ActorFuture<ConfigurationChangeCoordinator.ConfigurationChangeResult> actorFuture) {
        if (list.isEmpty()) {
            actorFuture.complete(new ConfigurationChangeCoordinator.ConfigurationChangeResult(clusterConfiguration, clusterConfiguration, ((Long) clusterConfiguration.lastChange().map((v0) -> {
                return v0.id();
            }).orElse(0L)).longValue(), list));
        } else {
            validateTopologyChangeRequest(clusterConfiguration, list).onComplete((clusterConfiguration2, th) -> {
                if (th != null) {
                    failFuture(actorFuture, th);
                    return;
                }
                ActorFuture<ClusterConfiguration> createFuture = this.executor.createFuture();
                if (z) {
                    createFuture.complete(clusterConfiguration.startConfigurationChange(list));
                } else {
                    applyTopologyChange(list, clusterConfiguration, clusterConfiguration2, createFuture);
                }
                createFuture.onComplete((clusterConfiguration2, th) -> {
                    if (th == null) {
                        actorFuture.complete(new ConfigurationChangeCoordinator.ConfigurationChangeResult(clusterConfiguration, clusterConfiguration2, ((Long) clusterConfiguration2.pendingChanges().map((v0) -> {
                            return v0.id();
                        }).orElse(0L)).longValue(), list));
                    } else {
                        failFuture(actorFuture, th);
                    }
                });
            });
        }
    }

    private ActorFuture<ClusterConfiguration> validateTopologyChangeRequest(ClusterConfiguration clusterConfiguration, List<ClusterConfigurationChangeOperation> list) {
        ActorFuture<ClusterConfiguration> createFuture = this.executor.createFuture();
        if (clusterConfiguration.isUninitialized()) {
            failFuture(createFuture, new ClusterConfigurationRequestFailedException.OperationNotAllowed("Cannot apply configuration change. The configuration is not initialized."));
        } else if (clusterConfiguration.hasPendingChanges()) {
            failFuture(createFuture, new ClusterConfigurationRequestFailedException.ConcurrentModificationException(String.format("Cannot apply configuration change. Another configuration change [%s] is in progress.", clusterConfiguration)));
        } else {
            simulateTopologyChange(clusterConfiguration.startConfigurationChange(list), new ConfigurationChangeAppliersImpl(new NoopPartitionChangeExecutor(), new NoopClusterMembershipChangeExecutor()), createFuture);
        }
        return createFuture;
    }

    private void applyTopologyChange(List<ClusterConfigurationChangeOperation> list, ClusterConfiguration clusterConfiguration, ClusterConfiguration clusterConfiguration2, ActorFuture<ClusterConfiguration> actorFuture) {
        this.executor.run(() -> {
            this.clusterTopologyManager.updateClusterConfiguration(clusterConfiguration3 -> {
                if (clusterConfiguration3.equals(clusterConfiguration)) {
                    return clusterConfiguration3.startConfigurationChange(list);
                }
                throw new ClusterConfigurationRequestFailedException.ConcurrentModificationException("Topology changed while applying the change. Please retry.");
            }).onComplete((clusterConfiguration4, th) -> {
                if (th != null) {
                    failFuture(actorFuture, th);
                } else {
                    LOG.debug("Applying the topology change has started. The resulting topology will be {}", clusterConfiguration2);
                    actorFuture.complete(clusterConfiguration4);
                }
            });
        });
    }

    private void simulateTopologyChange(ClusterConfiguration clusterConfiguration, ConfigurationChangeAppliersImpl configurationChangeAppliersImpl, ActorFuture<ClusterConfiguration> actorFuture) {
        if (!clusterConfiguration.hasPendingChanges()) {
            actorFuture.complete(clusterConfiguration);
            return;
        }
        ConfigurationChangeAppliers.ClusterOperationApplier applier = configurationChangeAppliersImpl.getApplier(clusterConfiguration.nextPendingOperation());
        Either<Exception, UnaryOperator<ClusterConfiguration>> init = applier.init(clusterConfiguration);
        if (init.isLeft()) {
            failFuture(actorFuture, new ClusterConfigurationRequestFailedException.InvalidRequest((Throwable) init.getLeft()));
        } else {
            ClusterConfiguration clusterConfiguration2 = (ClusterConfiguration) ((UnaryOperator) init.get()).apply(clusterConfiguration);
            applier.apply().onComplete((unaryOperator, th) -> {
                if (th != null) {
                    failFuture(actorFuture, new ClusterConfigurationRequestFailedException.InvalidRequest(th));
                } else {
                    simulateTopologyChange(clusterConfiguration2.advanceConfigurationChange(unaryOperator), configurationChangeAppliersImpl, actorFuture);
                }
            });
        }
    }

    private void failFuture(ActorFuture<?> actorFuture, Throwable th) {
        LOG.warn("Failed to handle topology request", th);
        if (th instanceof ClusterConfigurationRequestFailedException) {
            actorFuture.completeExceptionally(th);
        } else {
            actorFuture.completeExceptionally(new ClusterConfigurationRequestFailedException.InternalError(th));
        }
    }

    private boolean validateCancel(long j, ClusterConfiguration clusterConfiguration, ActorFuture<ClusterConfiguration> actorFuture) {
        if (clusterConfiguration.isUninitialized()) {
            failFuture(actorFuture, new ClusterConfigurationRequestFailedException.InvalidRequest("Cannot cancel change " + j + " because the topology is not initialized"));
            return false;
        }
        if (!clusterConfiguration.hasPendingChanges()) {
            failFuture(actorFuture, new ClusterConfigurationRequestFailedException.InvalidRequest("Cannot cancel change " + j + " because no change is in progress"));
            return false;
        }
        if (clusterConfiguration.pendingChanges().orElseThrow().id() == j) {
            return true;
        }
        failFuture(actorFuture, new ClusterConfigurationRequestFailedException.InvalidRequest("Cannot cancel change " + j + " because it is not the current change"));
        return false;
    }

    private boolean isCoordinator(ClusterConfiguration clusterConfiguration) {
        return this.localMemberId.equals(clusterConfiguration.members().keySet().stream().min((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElse(null));
    }
}
