package org.apache.nifi.processors.azure.eventhub.checkpoint;

import com.azure.core.util.CoreUtils;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ClusterNodeDisconnectedException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ConcurrentStateModificationException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.StateNotAvailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStore.class */
public class ComponentStateCheckpointStore implements CheckpointStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(ComponentStateCheckpointStore.class);
    private final String clientId;
    private final StateManager stateManager;

    public ComponentStateCheckpointStore(String str, StateManager stateManager) {
        this.clientId = str;
        this.stateManager = stateManager;
    }

    public void cleanUp(String str, String str2, String str3) {
        cleanUpMono(str, str2, str3).subscribe();
    }

    Mono<Void> cleanUpMono(String str, String str2, String str3) {
        return getState().doFirst(() -> {
            debug("cleanUp() -> Entering [{}, {}, {}]", str, str2, str3);
        }).flatMap(stateMap -> {
            Map<String, String> map = (Map) stateMap.toMap().entrySet().stream().filter(entry -> {
                String str4 = (String) entry.getKey();
                if (!str4.startsWith(CheckpointStoreKeyPrefix.OWNERSHIP.keyPrefix()) && !str4.startsWith(CheckpointStoreKeyPrefix.CHECKPOINT.keyPrefix())) {
                    return true;
                }
                PartitionContext convertPartitionContext = ComponentStateCheckpointStoreUtils.convertPartitionContext(str4);
                return convertPartitionContext.getFullyQualifiedNamespace().equalsIgnoreCase(str) && convertPartitionContext.getEventHubName().equalsIgnoreCase(str2) && convertPartitionContext.getConsumerGroup().equalsIgnoreCase(str3);
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            int size = stateMap.toMap().size() - map.size();
            if (size > 0) {
                debug("cleanUp() -> Removed {} item(s)", Integer.valueOf(size));
                return updateState(stateMap, map);
            }
            debug("cleanUp() -> Nothing to clean up", new Object[0]);
            return Mono.empty();
        }).doOnSuccess(r5 -> {
            debug("cleanUp() -> Succeeded", new Object[0]);
        }).retryWhen(createRetrySpec("cleanUp")).doOnError(th -> {
            debug("cleanUp() -> Failed: {}", th.getMessage());
        });
    }

    public Flux<PartitionOwnership> listOwnership(String str, String str2, String str3) {
        return getState().doFirst(() -> {
            debug("listOwnership() -> Entering [{}, {}, {}]", str, str2, str3);
        }).flatMapMany(stateMap -> {
            checkDisconnectedNode(stateMap);
            return getOwnerships(stateMap);
        }).filter(partitionOwnership -> {
            return partitionOwnership.getFullyQualifiedNamespace().equalsIgnoreCase(str) && partitionOwnership.getEventHubName().equalsIgnoreCase(str2) && partitionOwnership.getConsumerGroup().equalsIgnoreCase(str3);
        }).doOnNext(partitionOwnership2 -> {
            debug("listOwnership() -> Returning {}", ComponentStateCheckpointStoreUtils.ownershipToString(partitionOwnership2));
        }).doOnComplete(() -> {
            debug("listOwnership() -> Succeeded", new Object[0]);
        }).doOnError(th -> {
            debug("listOwnership() -> Failed: {}", th.getMessage());
        });
    }

    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> list) {
        return getState().doFirst(() -> {
            debug("claimOwnership() -> Entering [{}]", ComponentStateCheckpointStoreUtils.ownershipListToString(list));
        }).flatMapMany(stateMap -> {
            checkDisconnectedNode(stateMap);
            HashMap hashMap = new HashMap(stateMap.toMap());
            ArrayList arrayList = new ArrayList();
            long currentTimeMillis = System.currentTimeMillis();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                PartitionOwnership partitionOwnership = (PartitionOwnership) it.next();
                String createOwnershipKey = ComponentStateCheckpointStoreUtils.createOwnershipKey(partitionOwnership);
                if (stateMap.get(createOwnershipKey) != null) {
                    PartitionOwnership convertOwnership = ComponentStateCheckpointStoreUtils.convertOwnership(createOwnershipKey, stateMap.get(createOwnershipKey));
                    String eTag = convertOwnership.getETag();
                    String eTag2 = partitionOwnership.getETag();
                    if (StringUtils.isNotEmpty(eTag) && !eTag.equals(eTag2)) {
                        debug("claimOwnership() -> Already claimed {}", ComponentStateCheckpointStoreUtils.ownershipToString(convertOwnership));
                    }
                }
                PartitionOwnership eTag3 = new PartitionOwnership().setFullyQualifiedNamespace(partitionOwnership.getFullyQualifiedNamespace()).setEventHubName(partitionOwnership.getEventHubName()).setConsumerGroup(partitionOwnership.getConsumerGroup()).setPartitionId(partitionOwnership.getPartitionId()).setOwnerId(partitionOwnership.getOwnerId()).setLastModifiedTime(Long.valueOf(currentTimeMillis)).setETag(CoreUtils.randomUuid().toString());
                arrayList.add(eTag3);
                hashMap.put(createOwnershipKey, ComponentStateCheckpointStoreUtils.createOwnershipValue(eTag3));
                debug("claimOwnership() -> Claiming {}", ComponentStateCheckpointStoreUtils.ownershipToString(eTag3));
            }
            return arrayList.isEmpty() ? Flux.empty() : updateState(stateMap, hashMap).thenMany(Flux.fromIterable(arrayList));
        }).doOnNext(partitionOwnership -> {
            debug("claimOwnership() -> Returning {}", ComponentStateCheckpointStoreUtils.ownershipToString(partitionOwnership));
        }).doOnComplete(() -> {
            debug("claimOwnership() -> Succeeded", new Object[0]);
        }).retryWhen(createRetrySpec("claimOwnership")).doOnError(th -> {
            debug("claimOwnership() -> Failed: {}", th.getMessage());
        });
    }

    public Flux<Checkpoint> listCheckpoints(String str, String str2, String str3) {
        return getState().doFirst(() -> {
            debug("listCheckpoints() -> Entering [{}, {}, {}]", str, str2, str3);
        }).flatMapMany(stateMap -> {
            checkDisconnectedNode(stateMap);
            return getCheckpoints(stateMap);
        }).filter(checkpoint -> {
            return checkpoint.getFullyQualifiedNamespace().equalsIgnoreCase(str) && checkpoint.getEventHubName().equalsIgnoreCase(str2) && checkpoint.getConsumerGroup().equalsIgnoreCase(str3);
        }).doOnNext(checkpoint2 -> {
            debug("listCheckpoints() -> Returning {}", ComponentStateCheckpointStoreUtils.checkpointToString(checkpoint2));
        }).doOnComplete(() -> {
            debug("listCheckpoints() -> Succeeded", new Object[0]);
        }).doOnError(th -> {
            debug("listCheckpoints() -> Failed: {}", th.getMessage());
        });
    }

    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        return getState().doFirst(() -> {
            debug("updateCheckpoint() -> Entering [{}]", ComponentStateCheckpointStoreUtils.checkpointToString(checkpoint));
        }).flatMap(stateMap -> {
            checkDisconnectedNode(stateMap);
            HashMap hashMap = new HashMap(stateMap.toMap());
            hashMap.put(ComponentStateCheckpointStoreUtils.createCheckpointKey(checkpoint), ComponentStateCheckpointStoreUtils.createCheckpointValue(checkpoint));
            return updateState(stateMap, hashMap);
        }).doOnSuccess(r5 -> {
            debug("updateCheckpoint() -> Succeeded", new Object[0]);
        }).retryWhen(createRetrySpec("updateCheckpoint")).doOnError(th -> {
            debug("updateCheckpoint() -> Failed: {}", th.getMessage());
        });
    }

    private Retry createRetrySpec(String str) {
        return Retry.max(10L).filter(th -> {
            return th instanceof ConcurrentStateModificationException;
        }).doBeforeRetry(retrySignal -> {
            debug(str + "() -> Retry: {}", retrySignal);
        }).onRetryExhaustedThrow((retrySpec, retrySignal2) -> {
            return new ConcurrentStateModificationException(String.format("Retrials of concurrent state modifications has been exhausted (%d retrials)", 10));
        });
    }

    private Flux<PartitionOwnership> getOwnerships(StateMap stateMap) {
        return getEntries(stateMap, CheckpointStoreKeyPrefix.OWNERSHIP.keyPrefix(), ComponentStateCheckpointStoreUtils::convertOwnership);
    }

    private Flux<Checkpoint> getCheckpoints(StateMap stateMap) {
        return getEntries(stateMap, CheckpointStoreKeyPrefix.CHECKPOINT.keyPrefix(), ComponentStateCheckpointStoreUtils::convertCheckpoint);
    }

    private <T> Flux<T> getEntries(StateMap stateMap, String str, BiFunction<String, String, T> biFunction) throws ProcessException {
        return (Flux) stateMap.toMap().entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(str);
        }).map(entry2 -> {
            return biFunction.apply((String) entry2.getKey(), (String) entry2.getValue());
        }).collect(Collectors.collectingAndThen(Collectors.toList(), (v0) -> {
            return Flux.fromIterable(v0);
        }));
    }

    private void checkDisconnectedNode(StateMap stateMap) {
        if (Boolean.parseBoolean(stateMap.get(CheckpointStoreKey.CLUSTERED.key()))) {
            throw new ClusterNodeDisconnectedException("The node has been disconnected from the cluster, the checkpoint store is not accessible");
        }
    }

    private void debug(String str, Object... objArr) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[clientId={}] {}", ArrayUtils.addFirst(objArr, this.clientId), str);
        }
    }

    private Mono<StateMap> getState() {
        return Mono.defer(() -> {
            try {
                return Mono.just(this.stateManager.getState(Scope.CLUSTER));
            } catch (Exception e) {
                return Mono.error(new StateNotAvailableException(e));
            }
        });
    }

    private Mono<Void> updateState(StateMap stateMap, Map<String, String> map) {
        return Mono.defer(() -> {
            try {
                return this.stateManager.replace(stateMap, map, Scope.CLUSTER) ? Mono.empty() : Mono.error(new ConcurrentStateModificationException(String.format("Component state with version [%s] has been modified by another instance", stateMap.getStateVersion().orElse("new"))));
            } catch (Exception e) {
                return Mono.error(new StateNotAvailableException(e));
            }
        });
    }
}
