package io.trino.server.protocol.spooling;

import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.slice.Slices;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.server.ExternalUriInfo;
import io.trino.server.protocol.spooling.SpoolingConfig;
import io.trino.server.security.ResourceSecurity;
import io.trino.spi.HostAddress;
import io.trino.spi.spool.SpooledLocation;
import io.trino.spi.spool.SpooledSegmentHandle;
import io.trino.spi.spool.SpoolingManager;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.ServiceUnavailableException;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriInfo;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;

@ResourceSecurity(ResourceSecurity.AccessType.PUBLIC)
@Path("/v1/spooled")
/* loaded from: input_file:io/trino/server/protocol/spooling/CoordinatorSegmentResource.class */
public class CoordinatorSegmentResource {
    private final SpoolingManager spoolingManager;
    private final SpoolingConfig.SegmentRetrievalMode retrievalMode;
    private final InternalNodeManager nodeManager;

    @Inject
    public CoordinatorSegmentResource(SpoolingManager spoolingManager, SpoolingConfig spoolingConfig, InternalNodeManager internalNodeManager) {
        this.spoolingManager = (SpoolingManager) Objects.requireNonNull(spoolingManager, "spoolingManager is null");
        this.retrievalMode = ((SpoolingConfig) Objects.requireNonNull(spoolingConfig, "config is null")).getRetrievalMode();
        this.nodeManager = (InternalNodeManager) Objects.requireNonNull(internalNodeManager, "nodeManager is null");
    }

    @Produces({"application/octet-stream"})
    @GET
    @Path("/download/{identifier}")
    public Response download(@Context UriInfo uriInfo, @PathParam("identifier") String str, @Context HttpHeaders httpHeaders) throws IOException {
        SpooledSegmentHandle handle = handle(str, httpHeaders);
        switch (this.retrievalMode) {
            case STORAGE:
                throw new ServiceUnavailableException("Retrieval mode is STORAGE but segment resource was called");
            case COORDINATOR_PROXY:
                return Response.ok(this.spoolingManager.openInputStream(handle)).build();
            case WORKER_PROXY:
                HostAddress randomActiveWorkerNode = randomActiveWorkerNode();
                return Response.seeOther(uriInfo.getRequestUriBuilder().host(randomActiveWorkerNode.getHostText()).port(randomActiveWorkerNode.getPort()).build(new Object[0])).build();
            case COORDINATOR_STORAGE_REDIRECT:
                return Response.seeOther(((SpooledLocation.DirectLocation) this.spoolingManager.directLocation(handle).orElseThrow(() -> {
                    return new ServiceUnavailableException("Could not generate pre-signed URI");
                })).directUri()).build();
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
    }

    @GET
    @Path("/ack/{identifier}")
    public Response acknowledge(@PathParam("identifier") String str, @Context HttpHeaders httpHeaders) throws IOException {
        try {
            this.spoolingManager.acknowledge(handle(str, httpHeaders));
            return Response.ok().build();
        } catch (IOException e) {
            return Response.serverError().entity(e.toString()).build();
        }
    }

    public static UriBuilder spooledSegmentUriBuilder(ExternalUriInfo externalUriInfo) {
        return UriBuilder.fromUri(externalUriInfo.baseUriBuilder().build()).path(CoordinatorSegmentResource.class);
    }

    public HostAddress randomActiveWorkerNode() {
        List list = (List) this.nodeManager.getActiveNodesSnapshot().getAllNodes().stream().filter(internalNode -> {
            return !internalNode.isCoordinator();
        }).collect(ImmutableList.toImmutableList());
        Verify.verify(!list.isEmpty(), "No active worker nodes available", new Object[0]);
        return ((InternalNode) list.get(ThreadLocalRandom.current().nextInt(list.size()))).getHostAndPort();
    }

    private SpooledSegmentHandle handle(String str, HttpHeaders httpHeaders) {
        return this.spoolingManager.handle(Slices.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8)), httpHeaders.getRequestHeaders());
    }
}
