package cern.c2mon.server.elasticsearch.client;

import cern.c2mon.server.elasticsearch.config.ElasticsearchProperties;
import cern.c2mon.server.elasticsearch.domain.IndexMetadata;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:cern/c2mon/server/elasticsearch/client/ElasticsearchClientTransport.class */
public final class ElasticsearchClientTransport implements ElasticsearchClient {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchClientTransport.class);
    private final ElasticsearchProperties properties;
    private TransportClient client;

    @Autowired
    public ElasticsearchClientTransport(ElasticsearchProperties elasticsearchProperties) {
        this.properties = elasticsearchProperties;
        setup();
        connectAsynchronously();
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public BulkProcessor getBulkProcessor(BulkProcessor.Listener listener) {
        return BulkProcessor.builder(this.client, listener).setBulkActions(this.properties.getBulkActions()).setBulkSize(new ByteSizeValue(this.properties.getBulkSize(), ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueSeconds(this.properties.getBulkFlushInterval())).setConcurrentRequests(this.properties.getConcurrentRequests()).build();
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public boolean createIndex(IndexMetadata indexMetadata, String str) {
        CreateIndexRequestBuilder prepareCreate = this.client.admin().indices().prepareCreate(indexMetadata.getName());
        prepareCreate.setSettings(Settings.builder().put("number_of_shards", this.properties.getShardsPerIndex()).put("number_of_replicas", this.properties.getReplicasPerShard()).build());
        if (str != null) {
            prepareCreate.addMapping(ElasticsearchProperties.TYPE, str, XContentType.JSON);
        }
        log.debug("Creating new index with name {}", indexMetadata.getName());
        try {
            return prepareCreate.get().isAcknowledged();
        } catch (ResourceAlreadyExistsException e) {
            log.debug("Index already exists.", e);
            return false;
        }
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public boolean indexData(IndexMetadata indexMetadata, String str) {
        IndexRequest indexRequest = new IndexRequest(indexMetadata.getName(), ElasticsearchProperties.TYPE);
        if (indexMetadata.getId() != null && !indexMetadata.getId().isEmpty()) {
            indexRequest.id(indexMetadata.getId());
        }
        indexRequest.source(str, XContentType.JSON);
        indexRequest.routing(indexMetadata.getRouting());
        try {
            IndexResponse indexResponse = (IndexResponse) this.client.index(indexRequest).get();
            if (!indexResponse.status().equals(RestStatus.CREATED)) {
                if (!indexResponse.status().equals(RestStatus.OK)) {
                    return false;
                }
            }
            return true;
        } catch (InterruptedException | ExecutionException e) {
            log.error("Error indexing '#{}' to '{}'.", new Object[]{indexMetadata.getId(), indexMetadata.getName(), e});
            return false;
        }
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public boolean isIndexExisting(IndexMetadata indexMetadata) {
        SearchRequest searchRequest = new SearchRequest(new String[]{indexMetadata.getName()});
        searchRequest.types(new String[]{ElasticsearchProperties.TYPE});
        searchRequest.routing(indexMetadata.getRouting());
        try {
            return ((SearchResponse) this.client.search(searchRequest).get()).status().equals(RestStatus.OK);
        } catch (InterruptedException | ExecutionException e) {
            log.error("Error searching index '{}'.", indexMetadata.getName(), e);
            return false;
        }
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public boolean updateIndex(IndexMetadata indexMetadata, String str) {
        UpdateRequest updateRequest = new UpdateRequest(indexMetadata.getName(), ElasticsearchProperties.TYPE, indexMetadata.getId());
        updateRequest.doc(str, XContentType.JSON);
        updateRequest.routing(indexMetadata.getId());
        IndexRequest indexRequest = new IndexRequest(indexMetadata.getName(), ElasticsearchProperties.TYPE, indexMetadata.getId());
        indexRequest.source(str, XContentType.JSON);
        indexRequest.routing(indexMetadata.getId());
        updateRequest.upsert(indexRequest);
        try {
            return ((UpdateResponse) this.client.update(updateRequest).get()).status().equals(RestStatus.OK);
        } catch (InterruptedException | ExecutionException e) {
            log.error("Error updating index '{}'.", indexMetadata.getName(), e);
            return false;
        }
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public boolean deleteIndex(IndexMetadata indexMetadata) {
        DeleteRequest deleteRequest = new DeleteRequest(indexMetadata.getName(), ElasticsearchProperties.TYPE, indexMetadata.getId());
        deleteRequest.routing(indexMetadata.getRouting());
        try {
            return ((DeleteResponse) this.client.delete(deleteRequest).get()).status().equals(RestStatus.OK);
        } catch (InterruptedException | ExecutionException e) {
            log.error("Error while deleting index", e);
            return false;
        }
    }

    private void connectAsynchronously() {
        log.info("Trying to connect to Elasticsearch cluster {} at {}:{}", new Object[]{this.properties.getClusterName(), this.properties.getHost(), Integer.valueOf(this.properties.getPort())});
        new Thread(() -> {
            log.info("Connected to Elasticsearch cluster {}", this.properties.getClusterName());
            waitForYellowStatus();
        }, "EsClusterFinder").start();
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public void waitForYellowStatus() {
        try {
            CompletableFuture.runAsync(() -> {
                while (true) {
                    log.info("Waiting for yellow status of Elasticsearch cluster...");
                    if (isClusterYellow()) {
                        log.info("Elasticsearch cluster is yellow");
                        return;
                    }
                    sleep(1000L);
                }
            }).get(ElasticsearchClientConfiguration.CLIENT_SETUP_TIMEOUT, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Exception when waiting for yellow status", e);
            throw new IllegalStateException("Exception when waiting for Elasticsearch yellow status!", e);
        }
    }

    private void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            log.debug("Waiting for yellow status interrupted", e);
        }
    }

    private ClusterHealthResponse getClusterHealth() {
        return this.client.admin().cluster().prepareHealth(new String[0]).setWaitForYellowStatus().setTimeout(TimeValue.timeValueMillis(100L)).get();
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public boolean isClusterYellow() {
        try {
            ClusterHealthStatus status = getClusterHealth().getStatus();
            if (!status.equals(ClusterHealthStatus.YELLOW)) {
                if (!status.equals(ClusterHealthStatus.GREEN)) {
                    return false;
                }
            }
            return true;
        } catch (NoNodeAvailableException e) {
            log.info("Elasticsearch cluster not yet ready: {}", e.getMessage());
            log.trace("Elasticsearch cluster not yet ready: ", e);
            return false;
        }
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public void setup() {
        Settings.Builder builder = Settings.builder();
        builder.put("node.name", this.properties.getNodeName()).put("cluster.name", this.properties.getClusterName()).put("http.enabled", this.properties.isHttpEnabled());
        this.client = new PreBuiltTransportClient(builder.build(), new Class[0]);
        try {
            this.client.addTransportAddress(new TransportAddress(InetAddress.getByName(this.properties.getHost()), this.properties.getPort()));
        } catch (UnknownHostException e) {
            log.error("Error connecting to the Elasticsearch cluster at {}:{}", new Object[]{this.properties.getHost(), Integer.valueOf(this.properties.getPort()), e});
        }
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public boolean isClientHealthy() {
        try {
            getClusterHealth();
            return true;
        } catch (Exception e) {
            log.error("An error occurred checking cluster health: ", e);
            return false;
        }
    }

    @Override // cern.c2mon.server.elasticsearch.client.ElasticsearchClient
    public void close() {
        if (this.client != null) {
            this.client.close();
            log.info("Closed client {}", this.client.settings().get("node.name"));
        }
    }
}
