package org.infinispan.xsite.statetransfer;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.XSiteStateTransferMode;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.XSiteResponse;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.commands.XSiteStateTransferCancelSendCommand;
import org.infinispan.xsite.commands.XSiteStateTransferClearStatusCommand;
import org.infinispan.xsite.commands.XSiteStateTransferRestartSendingCommand;
import org.infinispan.xsite.commands.XSiteStateTransferStartSendCommand;
import org.infinispan.xsite.commands.XSiteStateTransferStatusRequestCommand;
import org.infinispan.xsite.response.AutoStateTransferResponseCollector;
import org.infinispan.xsite.status.SiteState;
import org.infinispan.xsite.status.TakeOfflineManager;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:org/infinispan/xsite/statetransfer/XSiteStateTransferManagerImpl.class */
public class XSiteStateTransferManagerImpl implements XSiteStateTransferManager {
    private static final Log log;
    private static final BiFunction<Throwable, String, Void> DEBUG_CANCEL_FAIL;

    @Inject
    RpcManager rpcManager;

    @Inject
    CommandsFactory commandsFactory;

    @Inject
    XSiteStateConsumer consumer;

    @Inject
    XSiteStateProvider provider;

    @Inject
    TakeOfflineManager takeOfflineManager;

    @ComponentName(KnownComponentNames.CACHE_NAME)
    @Inject
    String cacheName;

    @Inject
    BlockingManager blockingManager;
    private volatile boolean isStateTransferInProgress;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile int currentTopologyId = -1;
    private final ConcurrentMap<String, RemoteSiteStatus> sites = new ConcurrentHashMap();

    public XSiteStateTransferManagerImpl(Configuration configuration) {
        for (BackupConfiguration backupConfiguration : configuration.sites().allBackups()) {
            this.sites.put(backupConfiguration.site(), RemoteSiteStatus.fromConfiguration(backupConfiguration));
        }
    }

    @Start
    public void start() {
        this.sites.remove(this.rpcManager.getTransport().localSiteName());
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void notifyStatePushFinished(String str, Address address, boolean z) {
        RemoteSiteStatus remoteSiteStatus = this.sites.get(str);
        if (!$assertionsDisabled && remoteSiteStatus == null) {
            throw new AssertionError();
        }
        if (remoteSiteStatus.confirmStateTransfer(address, z)) {
            cancelStateTransferSending(str).exceptionally(th -> {
                log.xsiteCancelSendFailed(th, str);
                return null;
            });
            if (remoteSiteStatus.isSync()) {
                sendStateTransferFinishToRemoteSite(remoteSiteStatus).exceptionally(th2 -> {
                    log.xsiteCancelReceiveFailed(th2, getLocalSite(), str);
                    return null;
                });
            }
        }
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public final void startPushState(String str) {
        this.rpcManager.blocking(asyncStartPushState(validateSite(str)));
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public List<String> getRunningStateTransfers() {
        return (List) this.sites.values().stream().filter((v0) -> {
            return v0.isStateTransferInProgress();
        }).map((v0) -> {
            return v0.getSiteName();
        }).collect(Collectors.toList());
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public Map<String, StateTransferStatus> getStatus() {
        return (Map) this.sites.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((RemoteSiteStatus) entry.getValue()).getStatus();
        }));
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void clearStatus() {
        this.sites.values().forEach((v0) -> {
            v0.clearStatus();
        });
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void cancelPushState(String str) {
        RemoteSiteStatus validateSite = validateSite(str);
        validateSite.cancelStateTransfer();
        AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        CompletionStage<Void> cancelStateTransferSending = cancelStateTransferSending(str);
        cancelStateTransferSending.exceptionally(th -> {
            log.xsiteCancelSendFailed(th, str);
            return null;
        });
        aggregateCompletionStage.dependsOn(cancelStateTransferSending);
        if (validateSite.isSync()) {
            CompletionStage<Void> sendStateTransferFinishToRemoteSite = sendStateTransferFinishToRemoteSite(validateSite);
            sendStateTransferFinishToRemoteSite.exceptionally(th2 -> {
                log.xsiteCancelReceiveFailed(th2, getLocalSite(), str);
                return null;
            });
            aggregateCompletionStage.dependsOn(sendStateTransferFinishToRemoteSite);
        }
        aggregateCompletionStage.freeze().toCompletableFuture().join();
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public Map<String, StateTransferStatus> getClusterStatus() {
        XSiteStateTransferStatusRequestCommand buildXSiteStateTransferStatusRequestCommand = this.commandsFactory.buildXSiteStateTransferStatusRequestCommand();
        StatusResponseCollector statusResponseCollector = new StatusResponseCollector();
        getStatus().forEach(statusResponseCollector);
        return (Map) this.rpcManager.blocking(this.rpcManager.invokeCommandOnAll(buildXSiteStateTransferStatusRequestCommand, statusResponseCollector, this.rpcManager.getSyncRpcOptions()));
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void clearClusterStatus() {
        XSiteStateTransferClearStatusCommand buildXSiteStateTransferClearStatusCommand = this.commandsFactory.buildXSiteStateTransferClearStatusCommand();
        CompletionStage invokeCommandOnAll = this.rpcManager.invokeCommandOnAll(buildXSiteStateTransferClearStatusCommand, VoidResponseCollector.ignoreLeavers(), this.rpcManager.getSyncRpcOptions());
        buildXSiteStateTransferClearStatusCommand.invokeLocal(this);
        this.rpcManager.blocking(invokeCommandOnAll);
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public String getSendingSiteName() {
        return this.consumer.getSendingSiteName();
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void cancelReceive(String str) {
        CompletionStage<Void> sendToLocalSite = sendToLocalSite(this.commandsFactory.buildXSiteStateTransferFinishReceiveCommand(str));
        this.consumer.endStateTransfer(str);
        this.rpcManager.blocking(sendToLocalSite);
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void becomeCoordinator(String str) {
        startCoordinating(Collections.singleton(str), this.rpcManager.getMembers());
        if (this.isStateTransferInProgress) {
            doCancelSendingForRestart(str);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debugf("Restarting x-site state transfer for site %s", str);
        }
        try {
            this.rpcManager.blocking(restartStateTransferSending(str));
        } catch (Exception e) {
            log.failedToRestartXSiteStateTransfer(str, e);
        }
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public XSiteStateProvider getStateProvider() {
        return this.provider;
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public XSiteStateConsumer getStateConsumer() {
        return this.consumer;
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void startAutomaticStateTransferTo(ByteString byteString, boolean z) {
        log.debugf("startAutomaticStateTransferTo(%s, %s)", byteString, Boolean.valueOf(z));
        String byteString2 = byteString.toString();
        RemoteSiteStatus remoteSiteStatus = this.sites.get(byteString2);
        if (skipAutomaticStateTransferEnabled(byteString2, remoteSiteStatus)) {
            return;
        }
        isStateTransferRequired(remoteSiteStatus, z).whenComplete((bool, th) -> {
            if (th != null) {
                Log.XSITE.unableToStartXSiteAutStateTransfer(this.cacheName, byteString2, th);
            } else if (bool.booleanValue()) {
                bringSiteOnline(byteString2).thenRun(() -> {
                    asyncStartPushState(remoteSiteStatus);
                });
            } else {
                Log.XSITE.debugf("[%s] Cross-Site state transfer not required for site '%s'", this.cacheName, byteString2);
            }
        });
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public XSiteStateTransferMode stateTransferMode(String str) {
        RemoteSiteStatus remoteSiteStatus = this.sites.get(str);
        return remoteSiteStatus == null ? XSiteStateTransferMode.MANUAL : remoteSiteStatus.stateTransferMode();
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public boolean setAutomaticStateTransfer(String str, XSiteStateTransferMode xSiteStateTransferMode) {
        RemoteSiteStatus remoteSiteStatus = this.sites.get(str);
        return remoteSiteStatus != null && remoteSiteStatus.setStateTransferMode(xSiteStateTransferMode);
    }

    private boolean skipAutomaticStateTransferEnabled(String str, RemoteSiteStatus remoteSiteStatus) {
        if (remoteSiteStatus == null) {
            Log.XSITE.debugf("[%s] Cross-Site automatic state transfer not started for site '%s'. It is not a backup location for this cache", this.cacheName, str);
            return true;
        }
        if (remoteSiteStatus.isSync()) {
            Log.XSITE.debugf("[%s] Cross-Site automatic state transfer not started for site '%s'. The backup strategy is set to SYNC", this.cacheName, str);
            return true;
        }
        if (remoteSiteStatus.stateTransferMode() != XSiteStateTransferMode.MANUAL) {
            return false;
        }
        Log.XSITE.debugf("[%s] Cross-Site automatic state transfer not started for site '%s'. Automatic state transfer is disabled", this.cacheName, str);
        return true;
    }

    private CompletionStage<Boolean> isStateTransferRequired(RemoteSiteStatus remoteSiteStatus, boolean z) {
        String siteName = remoteSiteStatus.getSiteName();
        return this.rpcManager.invokeCommandOnAll(this.commandsFactory.buildXSiteAutoTransferStatusCommand(siteName), new AutoStateTransferResponseCollector(this.takeOfflineManager.getSiteState(siteName) == SiteState.OFFLINE, remoteSiteStatus.stateTransferMode(), z), this.rpcManager.getSyncRpcOptions()).thenApply((v0) -> {
            return v0.canDoAutomaticStateTransfer();
        });
    }

    private CompletionStage<Void> bringSiteOnline(String str) {
        return this.blockingManager.runBlocking(() -> {
            this.takeOfflineManager.bringSiteOnline(str);
        }, "bring-online-" + str);
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void onTopologyUpdated(CacheTopology cacheTopology, boolean z) {
        int topologyId = cacheTopology.getTopologyId();
        if (log.isDebugEnabled()) {
            log.debugf("Topology change. TopologyId: %s. State transfer in progress? %s", Integer.toString(topologyId), Boolean.valueOf(z));
        }
        this.currentTopologyId = topologyId;
        this.isStateTransferInProgress = z;
        List<Address> members = cacheTopology.getMembers();
        boolean equals = members.get(0).equals(this.rpcManager.getAddress());
        Collection<String> sitesMissingCoordinator = this.provider.getSitesMissingCoordinator(new HashSet(members));
        if (equals) {
            startCoordinating(sitesMissingCoordinator, members);
        }
        if (z) {
            this.sites.values().stream().filter((v0) -> {
                return v0.isStateTransferInProgress();
            }).map((v0) -> {
                return v0.getSiteName();
            }).forEach(this::doCancelSendingForRestart);
            return;
        }
        for (RemoteSiteStatus remoteSiteStatus : this.sites.values()) {
            if (remoteSiteStatus.restartStateTransfer(members)) {
                String siteName = remoteSiteStatus.getSiteName();
                if (log.isDebugEnabled()) {
                    log.debugf("Topology change detected! Restarting x-site state transfer for site %s", siteName);
                }
                try {
                    restartStateTransferSending(siteName);
                } catch (Exception e) {
                    log.failedToRestartXSiteStateTransfer(siteName, e);
                }
            }
        }
    }

    private CompletionStage<Void> asyncStartPushState(RemoteSiteStatus remoteSiteStatus) {
        String siteName = remoteSiteStatus.getSiteName();
        if (!remoteSiteStatus.startStateTransfer(this.rpcManager.getMembers())) {
            CompletableFuture.failedFuture(log.xsiteStateTransferAlreadyInProgress(siteName));
        }
        XSiteResponse xSiteResponse = null;
        if (remoteSiteStatus.isSync()) {
            xSiteResponse = this.rpcManager.invokeXSite(remoteSiteStatus.getBackup(), this.commandsFactory.buildXSiteStateTransferControlRequest(true));
        }
        if (!this.isStateTransferInProgress) {
            return xSiteResponse == null ? asyncStartLocalSend(remoteSiteStatus) : xSiteResponse.thenCompose(r5 -> {
                return asyncStartLocalSend(remoteSiteStatus);
            });
        }
        if (log.isDebugEnabled()) {
            log.debugf("Not starting state transfer to site '%s' while rebalance in progress. Waiting until it is finished!", siteName);
        }
        return xSiteResponse == null ? CompletableFutures.completedNull() : xSiteResponse;
    }

    private CompletionStage<Void> asyncStartLocalSend(RemoteSiteStatus remoteSiteStatus) {
        XSiteStateTransferStartSendCommand buildXSiteStateTransferStartSendCommand = this.commandsFactory.buildXSiteStateTransferStartSendCommand(remoteSiteStatus.getSiteName(), this.currentTopologyId);
        CompletionStage<Void> sendToLocalSite = sendToLocalSite(buildXSiteStateTransferStartSendCommand);
        buildXSiteStateTransferStartSendCommand.setOrigin(this.rpcManager.getAddress());
        buildXSiteStateTransferStartSendCommand.invokeLocal(this.provider);
        sendToLocalSite.exceptionally(th -> {
            handleFailure(remoteSiteStatus, th);
            return null;
        });
        return sendToLocalSite;
    }

    private String getLocalSite() {
        return this.rpcManager.getTransport().localSiteName();
    }

    private void doCancelSendingForRestart(String str) {
        try {
            if (log.isDebugEnabled()) {
                log.debugf("Canceling x-site state transfer for site %s", str);
            }
            CompletionStage<Void> cancelStateTransferSending = cancelStateTransferSending(str);
            if (log.isDebugEnabled()) {
                cancelStateTransferSending.exceptionally(th -> {
                    return DEBUG_CANCEL_FAIL.apply(th, str);
                });
            }
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                DEBUG_CANCEL_FAIL.apply(e, str);
            }
        }
    }

    private RemoteSiteStatus validateSite(String str) throws NullPointerException, IllegalArgumentException {
        RemoteSiteStatus remoteSiteStatus = this.sites.get(Objects.requireNonNull(str, "Site name cannot be null."));
        if (remoteSiteStatus == null) {
            throw log.siteNotFound(str);
        }
        return remoteSiteStatus;
    }

    private CompletionStage<Void> cancelStateTransferSending(String str) {
        XSiteStateTransferCancelSendCommand buildXSiteStateTransferCancelSendCommand = this.commandsFactory.buildXSiteStateTransferCancelSendCommand(str);
        CompletionStage<Void> sendToLocalSite = sendToLocalSite(buildXSiteStateTransferCancelSendCommand);
        buildXSiteStateTransferCancelSendCommand.invokeLocal(this.provider);
        return sendToLocalSite;
    }

    private CompletionStage<Void> restartStateTransferSending(String str) {
        XSiteStateTransferRestartSendingCommand buildXSiteStateTransferRestartSendingCommand = this.commandsFactory.buildXSiteStateTransferRestartSendingCommand(str, this.currentTopologyId);
        CompletionStage<Void> sendToLocalSite = sendToLocalSite(buildXSiteStateTransferRestartSendingCommand);
        buildXSiteStateTransferRestartSendingCommand.setOrigin(this.rpcManager.getAddress());
        buildXSiteStateTransferRestartSendingCommand.invokeLocal(this.provider);
        return sendToLocalSite;
    }

    private void startCoordinating(Collection<String> collection, Collection<Address> collection2) {
        if (log.isDebugEnabled()) {
            log.debugf("Becoming the x-site state transfer coordinator for %s", collection);
        }
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            RemoteSiteStatus remoteSiteStatus = this.sites.get(it.next());
            if (!$assertionsDisabled && remoteSiteStatus == null) {
                throw new AssertionError();
            }
            remoteSiteStatus.startStateTransfer(collection2);
        }
    }

    private void handleFailure(RemoteSiteStatus remoteSiteStatus, Throwable th) {
        String siteName = remoteSiteStatus.getSiteName();
        if (log.isDebugEnabled()) {
            log.debugf(th, "Handle start state transfer failure to %s", siteName);
        }
        remoteSiteStatus.failStateTransfer();
        CompletionStage<Void> cancelStateTransferSending = cancelStateTransferSending(siteName);
        if (log.isDebugEnabled()) {
            cancelStateTransferSending.exceptionally(th2 -> {
                return DEBUG_CANCEL_FAIL.apply(th2, siteName);
            });
        }
        if (remoteSiteStatus.isSync()) {
            CompletionStage<Void> sendStateTransferFinishToRemoteSite = sendStateTransferFinishToRemoteSite(remoteSiteStatus);
            if (log.isDebugEnabled()) {
                sendStateTransferFinishToRemoteSite.exceptionally(th3 -> {
                    log.debugf(th3, "Exception while cancel receiving in remote site %s", siteName);
                    return null;
                });
            }
        }
    }

    private CompletionStage<Void> sendToLocalSite(CacheRpcCommand cacheRpcCommand) {
        return this.rpcManager.invokeCommandOnAll(cacheRpcCommand, VoidResponseCollector.ignoreLeavers(), fifoSyncRpcOptions());
    }

    private CompletionStage<Void> sendStateTransferFinishToRemoteSite(RemoteSiteStatus remoteSiteStatus) {
        return this.rpcManager.invokeXSite(remoteSiteStatus.getBackup(), this.commandsFactory.buildXSiteStateTransferControlRequest(false));
    }

    private RpcOptions fifoSyncRpcOptions() {
        RpcOptions syncRpcOptions = this.rpcManager.getSyncRpcOptions();
        return new RpcOptions(DeliverOrder.PER_SENDER, syncRpcOptions.timeout(), syncRpcOptions.timeUnit());
    }

    static {
        $assertionsDisabled = !XSiteStateTransferManagerImpl.class.desiredAssertionStatus();
        log = LogFactory.getLog(XSiteStateTransferManagerImpl.class);
        DEBUG_CANCEL_FAIL = (th, str) -> {
            log.debugf(th, "Unable to cancel x-site state transfer for site %s", str);
            return null;
        };
    }
}
