package io.camunda.zeebe.shared.management;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import io.camunda.zeebe.broker.client.api.BrokerClient;
import io.camunda.zeebe.broker.client.api.BrokerClusterState;
import io.camunda.zeebe.broker.system.configuration.FlowControlCfg;
import io.camunda.zeebe.gateway.admin.BrokerAdminRequest;
import io.camunda.zeebe.logstreams.impl.flowcontrol.LimitSerializer;
import io.camunda.zeebe.protocol.impl.encoding.AdminResponse;
import io.camunda.zeebe.shared.management.FlowControlEndpoint;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.agrona.collections.IntHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:io/camunda/zeebe/shared/management/FlowControlServiceImpl.class */
public class FlowControlServiceImpl implements FlowControlEndpoint.FlowControlService {
    private static final Logger LOG = LoggerFactory.getLogger(FlowControlServiceImpl.class);
    private final BrokerClient client;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/camunda/zeebe/shared/management/FlowControlServiceImpl$FlowControlStatus.class */
    public static final class FlowControlStatus extends Record {
        private final int partitionId;
        private final JsonNode flowControlConfig;

        FlowControlStatus(int i, JsonNode jsonNode) {
            this.partitionId = i;
            this.flowControlConfig = jsonNode;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, FlowControlStatus.class), FlowControlStatus.class, "partitionId;flowControlConfig", "FIELD:Lio/camunda/zeebe/shared/management/FlowControlServiceImpl$FlowControlStatus;->partitionId:I", "FIELD:Lio/camunda/zeebe/shared/management/FlowControlServiceImpl$FlowControlStatus;->flowControlConfig:Lcom/fasterxml/jackson/databind/JsonNode;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, FlowControlStatus.class), FlowControlStatus.class, "partitionId;flowControlConfig", "FIELD:Lio/camunda/zeebe/shared/management/FlowControlServiceImpl$FlowControlStatus;->partitionId:I", "FIELD:Lio/camunda/zeebe/shared/management/FlowControlServiceImpl$FlowControlStatus;->flowControlConfig:Lcom/fasterxml/jackson/databind/JsonNode;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, FlowControlStatus.class, Object.class), FlowControlStatus.class, "partitionId;flowControlConfig", "FIELD:Lio/camunda/zeebe/shared/management/FlowControlServiceImpl$FlowControlStatus;->partitionId:I", "FIELD:Lio/camunda/zeebe/shared/management/FlowControlServiceImpl$FlowControlStatus;->flowControlConfig:Lcom/fasterxml/jackson/databind/JsonNode;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public int partitionId() {
            return this.partitionId;
        }

        public JsonNode flowControlConfig() {
            return this.flowControlConfig;
        }
    }

    @Autowired
    public FlowControlServiceImpl(BrokerClient brokerClient) {
        this.client = brokerClient;
    }

    @Override // io.camunda.zeebe.shared.management.FlowControlEndpoint.FlowControlService
    public CompletableFuture<Map<Integer, JsonNode>> get() {
        BrokerClusterState topology = this.client.getTopologyManager().getTopology();
        List list = topology.getPartitions().stream().map(num -> {
            return fetchFlowConfigOnPartition(topology, num);
        }).toList();
        return CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[0])).thenApply(r5 -> {
            return (Map) list.stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toMap((v0) -> {
                return v0.partitionId();
            }, (v0) -> {
                return v0.flowControlConfig();
            }));
        });
    }

    @Override // io.camunda.zeebe.shared.management.FlowControlEndpoint.FlowControlService
    public CompletableFuture<Map<Integer, JsonNode>> set(FlowControlCfg flowControlCfg) {
        LOG.info("Setting flow control configuration to {}", flowControlCfg);
        try {
            byte[] serialize = flowControlCfg.serialize();
            BrokerClusterState topology = this.client.getTopologyManager().getTopology();
            return CompletableFuture.allOf((CompletableFuture[]) topology.getPartitions().stream().map(num -> {
                return broadcastOnPartition(topology, num, brokerAdminRequest -> {
                    brokerAdminRequest.setFlowControlConfiguration(serialize);
                });
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).thenCompose(r3 -> {
                return get();
            });
        } catch (JsonProcessingException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private CompletableFuture<Void> broadcastOnPartition(BrokerClusterState brokerClusterState, Integer num, Consumer<BrokerAdminRequest> consumer) {
        return CompletableFuture.allOf((CompletableFuture[]) getMembers(brokerClusterState, num).stream().map(num2 -> {
            BrokerAdminRequest brokerAdminRequest = new BrokerAdminRequest();
            brokerAdminRequest.setBrokerId(num2.intValue());
            brokerAdminRequest.setPartitionId(num.intValue());
            consumer.accept(brokerAdminRequest);
            return this.client.sendRequest(brokerAdminRequest);
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private CompletableFuture<FlowControlStatus> fetchFlowConfigOnPartition(BrokerClusterState brokerClusterState, Integer num) {
        int leaderForPartition = brokerClusterState.getLeaderForPartition(num.intValue());
        BrokerAdminRequest brokerAdminRequest = new BrokerAdminRequest();
        brokerAdminRequest.setBrokerId(leaderForPartition);
        brokerAdminRequest.setPartitionId(num.intValue());
        brokerAdminRequest.getFLowControlConfiguration();
        return this.client.sendRequest(brokerAdminRequest).thenApply(brokerResponse -> {
            return new FlowControlStatus(num.intValue(), LimitSerializer.deserialize(((AdminResponse) brokerResponse.getResponse()).getPayload()));
        });
    }

    private IntHashSet getMembers(BrokerClusterState brokerClusterState, Integer num) {
        int leaderForPartition = brokerClusterState.getLeaderForPartition(num.intValue());
        Set set = (Set) Optional.ofNullable(brokerClusterState.getFollowersForPartition(num.intValue())).orElseGet(Set::of);
        Set set2 = (Set) Optional.ofNullable(brokerClusterState.getInactiveNodesForPartition(num.intValue())).orElseGet(Set::of);
        IntHashSet intHashSet = new IntHashSet(brokerClusterState.getReplicationFactor());
        intHashSet.add(leaderForPartition);
        intHashSet.addAll(set);
        intHashSet.addAll(set2);
        return intHashSet;
    }
}
