package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.util.StringUtils;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
@InterfaceAudience.Private
/* loaded from: input_file:BOOT-INF/lib/hbase-server-1.4.9.jar:org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.class */
public class ReplicationSource extends Thread implements ReplicationSourceInterface {
    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
    private int queueSizePerGroup;
    private ReplicationQueues replicationQueues;
    private ReplicationPeers replicationPeers;
    private Configuration conf;
    private ReplicationQueueInfo replicationQueueInfo;
    private String peerId;
    private ReplicationSourceManager manager;
    private Stoppable stopper;
    private long sleepForRetries;
    private FileSystem fs;
    private UUID clusterId;
    private UUID peerClusterId;
    private String peerClusterZnode;
    private int maxRetriesMultiplier;
    private MetricsSource metrics;
    private int logQueueWarnThreshold;
    private ReplicationEndpoint replicationEndpoint;
    private WALEntryFilter walEntryFilter;
    private ReplicationThrottler throttler;
    private long defaultBandwidth;
    private long currentBandwidth;
    private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap();
    private AtomicLong totalReplicatedEdits = new AtomicLong(0);
    private AtomicLong totalReplicatedOperations = new AtomicLong(0);
    private volatile boolean sourceRunning = false;
    private ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads = new ConcurrentHashMap<>();

    /* renamed from: org.apache.hadoop.hbase.replication.regionserver.ReplicationSource$1 */
    /* loaded from: input_file:BOOT-INF/lib/hbase-server-1.4.9.jar:org/apache/hadoop/hbase/replication/regionserver/ReplicationSource$1.class */
    class AnonymousClass1 implements Thread.UncaughtExceptionHandler {
        AnonymousClass1() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            ReplicationSource.LOG.error("Unexpected exception in ReplicationSource", th);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/hbase-server-1.4.9.jar:org/apache/hadoop/hbase/replication/regionserver/ReplicationSource$LogsComparator.class */
    public static class LogsComparator implements Comparator<Path> {
        @Override // java.util.Comparator
        public int compare(Path path, Path path2) {
            return Long.compare(getTS(path), getTS(path2));
        }

        private static long getTS(Path path) {
            return Long.parseLong(path.getName().substring(path.getName().lastIndexOf(46) + 1));
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/hbase-server-1.4.9.jar:org/apache/hadoop/hbase/replication/regionserver/ReplicationSource$ReplicationSourceShipperThread.class */
    public class ReplicationSourceShipperThread extends Thread {
        ReplicationSourceInterface source;
        String walGroupId;
        PriorityBlockingQueue<Path> queue;
        ReplicationQueueInfo replicationQueueInfo;
        private long lastLoggedPosition = -1;
        private volatile Path currentPath;
        private WorkerState state;
        ReplicationSourceWALReaderThread entryReader;

        /* renamed from: org.apache.hadoop.hbase.replication.regionserver.ReplicationSource$ReplicationSourceShipperThread$1 */
        /* loaded from: input_file:BOOT-INF/lib/hbase-server-1.4.9.jar:org/apache/hadoop/hbase/replication/regionserver/ReplicationSource$ReplicationSourceShipperThread$1.class */
        public class AnonymousClass1 implements Thread.UncaughtExceptionHandler {
            AnonymousClass1() {
            }

            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                RSRpcServices.exitIfOOME(th);
                ReplicationSource.LOG.error("Unexpected exception in ReplicationSourceWorkerThread, currentPath=" + ReplicationSourceShipperThread.this.getCurrentPath(), th);
                ReplicationSource.this.stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
            }
        }

        public ReplicationSourceShipperThread(String str, PriorityBlockingQueue<Path> priorityBlockingQueue, ReplicationQueueInfo replicationQueueInfo, ReplicationSourceInterface replicationSourceInterface) {
            this.walGroupId = str;
            this.queue = priorityBlockingQueue;
            this.replicationQueueInfo = replicationQueueInfo;
            this.source = replicationSourceInterface;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            setWorkerState(WorkerState.RUNNING);
            while (isWorkerActive()) {
                int i = 1;
                if (ReplicationSource.this.isPeerEnabled()) {
                    while (this.entryReader == null) {
                        if (ReplicationSource.this.sleepForRetries("Replication WAL entry reader thread not initialized", i)) {
                            i++;
                        }
                        if (i == ReplicationSource.this.maxRetriesMultiplier) {
                            ReplicationSource.LOG.warn("Replication WAL entry reader thread not initialized");
                        }
                    }
                    try {
                        ReplicationSourceWALReaderThread.WALEntryBatch take = this.entryReader.take();
                        shipEdits(take);
                        if (this.replicationQueueInfo.isQueueRecovered() && take.getWalEntries().isEmpty() && take.getLastSeqIds().isEmpty()) {
                            ReplicationSource.LOG.debug("Finished recovering queue for group " + this.walGroupId + " of peer " + ReplicationSource.this.peerClusterZnode);
                            ReplicationSource.this.metrics.incrCompletedRecoveryQueue();
                            setWorkerState(WorkerState.FINISHED);
                        }
                    } catch (InterruptedException e) {
                        ReplicationSource.LOG.trace("Interrupted while waiting for next replication entry batch", e);
                        Thread.currentThread().interrupt();
                    }
                } else if (ReplicationSource.this.sleepForRetries("Replication is disabled", 1)) {
                    int i2 = 1 + 1;
                }
            }
            if (this.replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
                synchronized (this) {
                    Threads.sleep(100L);
                    boolean z = true;
                    Iterator it = ReplicationSource.this.workerThreads.values().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        ReplicationSourceShipperThread replicationSourceShipperThread = (ReplicationSourceShipperThread) it.next();
                        if (!replicationSourceShipperThread.equals(this) && replicationSourceShipperThread.getWorkerState() != WorkerState.FINISHED) {
                            z = false;
                            break;
                        }
                    }
                    if (z) {
                        ReplicationSource.this.manager.closeRecoveredQueue(this.source);
                        if (this.source instanceof ReplicationSource) {
                            ((ReplicationSource) this.source).replicationEndpoint.stop();
                        }
                        ReplicationSource.LOG.info("Finished recovering queue " + ReplicationSource.this.peerClusterZnode + " with the following stats: " + ReplicationSource.this.getStats());
                    }
                }
            }
            if (this.state != WorkerState.FINISHED) {
                setWorkerState(WorkerState.STOPPED);
            }
        }

        private void cleanUpHFileRefs(WALEdit wALEdit) throws IOException {
            String str = ReplicationSource.this.peerClusterZnode;
            if (str.contains("-")) {
                str = ReplicationSource.this.peerClusterZnode.split("-")[0];
            }
            ArrayList<Cell> cells = wALEdit.getCells();
            int size = cells.size();
            for (int i = 0; i < size; i++) {
                Cell cell = cells.get(i);
                if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
                    List<WALProtos.StoreDescriptor> storesList = WALEdit.getBulkLoadDescriptor(cell).getStoresList();
                    int size2 = storesList.size();
                    for (int i2 = 0; i2 < size2; i2++) {
                        List<String> storeFileList = storesList.get(i2).getStoreFileList();
                        ReplicationSource.this.manager.cleanUpHFileRefs(str, storeFileList);
                        ReplicationSource.this.metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
                    }
                }
            }
        }

        private void checkBandwidthChangeAndResetThrottler() {
            long currentBandwidth = ReplicationSource.this.getCurrentBandwidth();
            if (currentBandwidth != ReplicationSource.this.currentBandwidth) {
                ReplicationSource.access$802(ReplicationSource.this, currentBandwidth);
                ReplicationSource.this.throttler.setBandwidth(ReplicationSource.this.currentBandwidth / 10.0d);
                ReplicationSource.LOG.info("ReplicationSource : " + ReplicationSource.this.peerId + " bandwidth throttling changed, currentBandWidth=" + ReplicationSource.this.currentBandwidth);
            }
        }

        protected void shipEdits(ReplicationSourceWALReaderThread.WALEntryBatch wALEntryBatch) {
            long nanoTime;
            boolean replicate;
            long nanoTime2;
            List<WAL.Entry> walEntries = wALEntryBatch.getWalEntries();
            long lastWalPosition = wALEntryBatch.getLastWalPosition();
            this.currentPath = wALEntryBatch.getLastWalPath();
            int i = 0;
            if (walEntries.isEmpty()) {
                if (this.lastLoggedPosition != lastWalPosition) {
                    updateLogPosition(lastWalPosition);
                    ReplicationSource.this.metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), this.walGroupId);
                    return;
                }
                return;
            }
            int heapSize = (int) wALEntryBatch.getHeapSize();
            while (isWorkerActive()) {
                try {
                    checkBandwidthChangeAndResetThrottler();
                    if (ReplicationSource.this.throttler.isEnabled()) {
                        long nextSleepInterval = ReplicationSource.this.throttler.getNextSleepInterval(heapSize);
                        if (nextSleepInterval > 0) {
                            try {
                                if (ReplicationSource.LOG.isTraceEnabled()) {
                                    ReplicationSource.LOG.trace("To sleep " + nextSleepInterval + "ms for throttling control");
                                }
                                Thread.sleep(nextSleepInterval);
                                ReplicationSource.this.throttler.resetStartTick();
                            } catch (InterruptedException e) {
                                ReplicationSource.LOG.debug("Interrupted while sleeping for throttling control");
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                    ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
                    replicateContext.setEntries(walEntries).setSize(heapSize);
                    replicateContext.setWalGroupId(this.walGroupId);
                    nanoTime = System.nanoTime();
                    replicate = ReplicationSource.this.replicationEndpoint.replicate(replicateContext);
                    nanoTime2 = System.nanoTime();
                } catch (Exception e2) {
                    ReplicationSource.LOG.warn(ReplicationSource.this.replicationEndpoint.getClass().getName() + " threw unknown exception:" + StringUtils.stringifyException(e2));
                    if (ReplicationSource.this.sleepForRetries("ReplicationEndpoint threw exception", i)) {
                        i++;
                    }
                }
                if (replicate) {
                    i = Math.max(i - 1, 0);
                    if (this.lastLoggedPosition != lastWalPosition) {
                        int size = walEntries.size();
                        for (int i2 = 0; i2 < size; i2++) {
                            cleanUpHFileRefs(walEntries.get(i2).getEdit());
                        }
                        updateLogPosition(lastWalPosition);
                    }
                    if (ReplicationSource.this.throttler.isEnabled()) {
                        ReplicationSource.this.throttler.addPushSize(heapSize);
                    }
                    ReplicationSource.this.totalReplicatedEdits.addAndGet(walEntries.size());
                    ReplicationSource.this.totalReplicatedOperations.addAndGet(wALEntryBatch.getNbOperations());
                    ReplicationSource.this.metrics.shipBatch(wALEntryBatch.getNbOperations(), heapSize, wALEntryBatch.getNbHFiles());
                    ReplicationSource.this.metrics.setAgeOfLastShippedOp(walEntries.get(walEntries.size() - 1).getKey().getWriteTime(), this.walGroupId);
                    if (ReplicationSource.LOG.isTraceEnabled()) {
                        ReplicationSource.LOG.trace("Replicated " + ReplicationSource.this.totalReplicatedEdits + " entries in total, or " + ReplicationSource.this.totalReplicatedOperations + " operations in " + ((nanoTime2 - nanoTime) / 1000000) + " ms");
                    }
                    return;
                }
            }
        }

        private void updateLogPosition(long j) {
            ReplicationSource.this.manager.logPositionAndCleanOldLogs(this.currentPath, ReplicationSource.this.peerClusterZnode, j, this.replicationQueueInfo.isQueueRecovered(), false);
            this.lastLoggedPosition = j;
        }

        public void startup() {
            String name = Thread.currentThread().getName();
            AnonymousClass1 anonymousClass1 = new Thread.UncaughtExceptionHandler() { // from class: org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.ReplicationSourceShipperThread.1
                AnonymousClass1() {
                }

                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread, Throwable th) {
                    RSRpcServices.exitIfOOME(th);
                    ReplicationSource.LOG.error("Unexpected exception in ReplicationSourceWorkerThread, currentPath=" + ReplicationSourceShipperThread.this.getCurrentPath(), th);
                    ReplicationSource.this.stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
                }
            };
            Threads.setDaemonThreadRunning(this, name + ".replicationSource." + this.walGroupId + "," + ReplicationSource.this.peerClusterZnode, anonymousClass1);
            ReplicationSource.this.workerThreads.put(this.walGroupId, this);
            long j = 0;
            if (this.replicationQueueInfo.isQueueRecovered()) {
                j = getRecoveredQueueStartPos(0L);
                for (int i = 0; i <= ReplicationSource.this.maxRetriesMultiplier; i++) {
                    try {
                        locateRecoveredPaths();
                        break;
                    } catch (IOException e) {
                        ReplicationSource.LOG.error("Error while locating recovered queue paths, attempt #" + i);
                    }
                }
            }
            startWALReaderThread(name, anonymousClass1, j);
        }

        private long getRecoveredQueueStartPos(long j) {
            try {
                j = ReplicationSource.this.replicationQueues.getLogPosition(ReplicationSource.this.peerClusterZnode, this.queue.peek().getName());
                if (ReplicationSource.LOG.isTraceEnabled()) {
                    ReplicationSource.LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + j);
                }
            } catch (ReplicationException e) {
                terminate("Couldn't get the position of this recovered queue " + ReplicationSource.this.peerClusterZnode, e);
            }
            return j;
        }

        private void startWALReaderThread(String str, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, long j) {
            this.entryReader = new ReplicationSourceWALReaderThread(ReplicationSource.this.manager, this.replicationQueueInfo, this.queue, j, ReplicationSource.this.fs, ReplicationSource.this.conf, new ChainWALEntryFilter(Lists.newArrayList(ReplicationSource.this.walEntryFilter, new ClusterMarkingEntryFilter(ReplicationSource.this.clusterId, ReplicationSource.this.peerClusterId, ReplicationSource.this.replicationEndpoint))), ReplicationSource.this.metrics);
            Threads.setDaemonThreadRunning(this.entryReader, str + ".replicationSource.replicationWALReaderThread." + this.walGroupId + "," + ReplicationSource.this.peerClusterZnode, uncaughtExceptionHandler);
        }

        private void locateRecoveredPaths() throws IOException {
            boolean z = false;
            PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue(ReplicationSource.this.queueSizePerGroup, new LogsComparator());
            Iterator<Path> it = this.queue.iterator();
            while (it.hasNext()) {
                Path next = it.next();
                if (!ReplicationSource.this.fs.exists(next)) {
                    z = true;
                    if (!(ReplicationSource.this.stopper instanceof ReplicationSyncUp.DummyServer)) {
                        List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
                        ReplicationSource.LOG.info("NB dead servers : " + deadRegionServers.size());
                        Path wALRootDir = FSUtils.getWALRootDir(ReplicationSource.this.conf);
                        Iterator<String> it2 = deadRegionServers.iterator();
                        while (true) {
                            if (!it2.hasNext()) {
                                ReplicationSource.LOG.error(String.format("WAL Path %s doesn't exist and couldn't find its new location", next));
                                priorityBlockingQueue.add(next);
                                break;
                            }
                            Path path = new Path(wALRootDir, DefaultWALProvider.getWALDirectoryName(it2.next()));
                            for (Path path2 : new Path[]{new Path(path, next.getName()), new Path(path.suffix(DefaultWALProvider.SPLITTING_EXT), next.getName())}) {
                                ReplicationSource.LOG.info("Possible location " + path2.toUri().toString());
                                if (ReplicationSource.this.manager.getFs().exists(path2)) {
                                    ReplicationSource.LOG.info("Log " + next + " still exists at " + path2);
                                    priorityBlockingQueue.add(path2);
                                    break;
                                }
                            }
                        }
                    } else {
                        priorityBlockingQueue.add(getReplSyncUpPath(next));
                    }
                } else {
                    priorityBlockingQueue.add(next);
                }
            }
            if (z) {
                if (priorityBlockingQueue.size() != this.queue.size()) {
                    ReplicationSource.LOG.error("Recovery queue size is incorrect");
                    throw new IOException("Recovery queue size error");
                }
                this.queue.clear();
                Iterator it3 = priorityBlockingQueue.iterator();
                while (it3.hasNext()) {
                    this.queue.add((Path) it3.next());
                }
            }
        }

        private Path getReplSyncUpPath(Path path) throws IOException {
            for (FileStatus fileStatus : ReplicationSource.this.fs.listStatus(ReplicationSource.this.manager.getLogDir())) {
                Path path2 = fileStatus.getPath();
                for (FileStatus fileStatus2 : ReplicationSource.this.fs.listStatus(path2)) {
                    path2 = new Path(path2, fileStatus2.getPath().getName());
                    if (path2.getName().equals(path.getName())) {
                        ReplicationSource.LOG.info("Log " + path2.getName() + " found at " + path2);
                        return path2;
                    }
                }
            }
            ReplicationSource.LOG.error("Didn't find path for: " + path.getName());
            return path;
        }

        public Path getCurrentPath() {
            return this.entryReader.getCurrentPath();
        }

        public long getCurrentPosition() {
            return this.lastLoggedPosition;
        }

        private boolean isWorkerActive() {
            return (ReplicationSource.this.stopper.isStopped() || this.state != WorkerState.RUNNING || isInterrupted()) ? false : true;
        }

        private void terminate(String str, Exception exc) {
            if (exc == null) {
                ReplicationSource.LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + str);
            } else {
                ReplicationSource.LOG.error("Closing worker for wal group " + this.walGroupId + " because an error occurred: " + str, exc);
            }
            this.entryReader.interrupt();
            Threads.shutdown(this.entryReader, ReplicationSource.this.sleepForRetries);
            setWorkerState(WorkerState.STOPPED);
            interrupt();
            Threads.shutdown(this, ReplicationSource.this.sleepForRetries);
            ReplicationSource.LOG.info("ReplicationSourceWorker " + getName() + " terminated");
        }

        public void setWorkerState(WorkerState workerState) {
            this.state = workerState;
            if (this.entryReader != null) {
                this.entryReader.setReaderRunning(workerState == WorkerState.RUNNING);
            }
        }

        public WorkerState getWorkerState() {
            return this.state;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/hbase-server-1.4.9.jar:org/apache/hadoop/hbase/replication/regionserver/ReplicationSource$WorkerState.class */
    public enum WorkerState {
        RUNNING,
        STOPPED,
        FINISHED
    }

    public ReplicationSource() {
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void init(Configuration configuration, FileSystem fileSystem, ReplicationSourceManager replicationSourceManager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stoppable, String str, UUID uuid, ReplicationEndpoint replicationEndpoint, MetricsSource metricsSource) throws IOException {
        this.stopper = stoppable;
        this.conf = HBaseConfiguration.create(configuration);
        decorateConf();
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
        this.replicationQueues = replicationQueues;
        this.replicationPeers = replicationPeers;
        this.manager = replicationSourceManager;
        this.fs = fileSystem;
        this.metrics = metricsSource;
        this.clusterId = uuid;
        this.peerClusterZnode = str;
        this.replicationQueueInfo = new ReplicationQueueInfo(str);
        this.peerId = this.replicationQueueInfo.getPeerId();
        this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
        this.replicationEndpoint = replicationEndpoint;
        this.defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0L);
        this.currentBandwidth = getCurrentBandwidth();
        this.throttler = new ReplicationThrottler(this.currentBandwidth / 10.0d);
        LOG.info("peerClusterZnode=" + str + ", ReplicationSource : " + this.peerId + ", currentBandwidth=" + this.currentBandwidth);
    }

    private void decorateConf() {
        String str = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
        if (org.apache.commons.lang.StringUtils.isNotEmpty(str)) {
            this.conf.set(HConstants.RPC_CODEC_CONF_KEY, str);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void enqueueLog(Path path) {
        String wALPrefixFromWALName = DefaultWALProvider.getWALPrefixFromWALName(path.getName());
        PriorityBlockingQueue<Path> priorityBlockingQueue = this.queues.get(wALPrefixFromWALName);
        if (priorityBlockingQueue == null) {
            priorityBlockingQueue = new PriorityBlockingQueue<>(this.queueSizePerGroup, new LogsComparator());
            this.queues.put(wALPrefixFromWALName, priorityBlockingQueue);
            if (this.sourceRunning) {
                ReplicationSourceShipperThread replicationSourceShipperThread = new ReplicationSourceShipperThread(wALPrefixFromWALName, priorityBlockingQueue, this.replicationQueueInfo, this);
                if (this.workerThreads.putIfAbsent(wALPrefixFromWALName, replicationSourceShipperThread) != null) {
                    LOG.debug("Someone has beat us to start a worker thread for wal group " + wALPrefixFromWALName);
                } else {
                    LOG.debug("Starting up worker for wal group " + wALPrefixFromWALName);
                    replicationSourceShipperThread.startup();
                }
            }
        }
        priorityBlockingQueue.put(path);
        this.metrics.incrSizeOfLogQueue();
        int size = priorityBlockingQueue.size();
        if (size > this.logQueueWarnThreshold) {
            LOG.warn("WAL group " + wALPrefixFromWALName + " queue size: " + size + " exceeds value of replication.source.log.queue.warn: " + this.logQueueWarnThreshold);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void addHFileRefs(TableName tableName, byte[] bArr, List<Pair<Path, Path>> list) throws ReplicationException {
        String str = this.peerClusterZnode;
        if (str.contains("-")) {
            str = this.peerClusterZnode.split("-")[0];
        }
        Map<TableName, List<String>> tableCFs = this.replicationPeers.getPeer(str).getTableCFs();
        if (tableCFs == null) {
            this.replicationQueues.addHFileRefs(str, list);
            this.metrics.incrSizeOfHFileRefsQueue(list.size());
            return;
        }
        List<String> list2 = tableCFs.get(tableName);
        if (!tableCFs.containsKey(tableName) || (list2 != null && !list2.contains(Bytes.toString(bArr)))) {
            LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " + Bytes.toString(bArr) + " to peer id " + str);
        } else {
            this.replicationQueues.addHFileRefs(str, list);
            this.metrics.incrSizeOfHFileRefsQueue(list.size());
        }
    }

    private void uninitialize() {
        LOG.debug("Source exiting " + this.peerId);
        this.metrics.clear();
        if (this.replicationEndpoint.state() == Service.State.STARTING || this.replicationEndpoint.state() == Service.State.RUNNING) {
            this.replicationEndpoint.stopAndWait();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.sourceRunning = true;
        try {
            if (((Service.State) this.replicationEndpoint.start().get()) != Service.State.RUNNING) {
                LOG.warn("ReplicationEndpoint was not started. Exiting");
                uninitialize();
                return;
            }
            ArrayList newArrayList = Lists.newArrayList(new SystemTableWALEntryFilter());
            WALEntryFilter wALEntryfilter = this.replicationEndpoint.getWALEntryfilter();
            if (wALEntryfilter != null) {
                newArrayList.add(wALEntryfilter);
            }
            this.walEntryFilter = new ChainWALEntryFilter(newArrayList);
            int i = 1;
            while (isSourceActive() && this.peerClusterId == null) {
                this.peerClusterId = this.replicationEndpoint.getPeerUUID();
                if (isSourceActive() && this.peerClusterId == null && sleepForRetries("Cannot contact the peer's zk ensemble", i)) {
                    i++;
                }
            }
            if (this.clusterId.equals(this.peerClusterId) && !this.replicationEndpoint.canReplicateToSameCluster()) {
                terminate("ClusterId " + this.clusterId + " is replicating to itself: peerClusterId " + this.peerClusterId + " which is not allowed by ReplicationEndpoint:" + this.replicationEndpoint.getClass().getName(), null, false);
                this.manager.closeQueue(this);
                return;
            }
            LOG.info("Replicating " + this.clusterId + " -> " + this.peerClusterId);
            for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : this.queues.entrySet()) {
                String key = entry.getKey();
                ReplicationSourceShipperThread replicationSourceShipperThread = new ReplicationSourceShipperThread(key, entry.getValue(), this.replicationQueueInfo, this);
                if (this.workerThreads.putIfAbsent(key, replicationSourceShipperThread) != null) {
                    LOG.debug("Someone has beat us to start a worker thread for wal group " + key);
                } else {
                    LOG.debug("Starting up worker for wal group " + key);
                    replicationSourceShipperThread.startup();
                }
            }
        } catch (Exception e) {
            LOG.warn("Error starting ReplicationEndpoint, exiting", e);
            throw new RuntimeException(e);
        }
    }

    protected boolean sleepForRetries(String str, int i) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace(str + ", sleeping " + this.sleepForRetries + " times " + i);
            }
            Thread.sleep(this.sleepForRetries * i);
        } catch (InterruptedException e) {
            LOG.debug("Interrupted while sleeping between retries");
            Thread.currentThread().interrupt();
        }
        return i < this.maxRetriesMultiplier;
    }

    protected boolean isPeerEnabled() {
        return this.replicationPeers.getStatusOfPeer(this.peerId);
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void startup() {
        Threads.setDaemonThreadRunning(this, Thread.currentThread().getName() + ".replicationSource," + this.peerClusterZnode, new Thread.UncaughtExceptionHandler() { // from class: org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.1
            AnonymousClass1() {
            }

            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                ReplicationSource.LOG.error("Unexpected exception in ReplicationSource", th);
            }
        });
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void terminate(String str) {
        terminate(str, null);
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void terminate(String str, Exception exc) {
        terminate(str, exc, true);
    }

    public void terminate(String str, Exception exc, boolean z) {
        if (exc == null) {
            LOG.info("Closing source " + this.peerClusterZnode + " because: " + str);
        } else {
            LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + str, exc);
        }
        this.sourceRunning = false;
        Collection<ReplicationSourceShipperThread> values = this.workerThreads.values();
        for (ReplicationSourceShipperThread replicationSourceShipperThread : values) {
            replicationSourceShipperThread.setWorkerState(WorkerState.STOPPED);
            replicationSourceShipperThread.entryReader.interrupt();
            replicationSourceShipperThread.interrupt();
        }
        ListenableFuture stop = this.replicationEndpoint != null ? this.replicationEndpoint.stop() : null;
        if (z) {
            for (ReplicationSourceShipperThread replicationSourceShipperThread2 : values) {
                Threads.shutdown(replicationSourceShipperThread2, this.sleepForRetries);
                LOG.info("ReplicationSourceWorker " + replicationSourceShipperThread2.getName() + " terminated");
            }
            if (stop != null) {
                try {
                    stop.get(this.sleepForRetries * this.maxRetriesMultiplier, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" + this.peerClusterZnode, e);
                }
            }
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public String getPeerClusterZnode() {
        return this.peerClusterZnode;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public String getPeerClusterId() {
        return this.peerId;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public Path getCurrentPath() {
        for (ReplicationSourceShipperThread replicationSourceShipperThread : this.workerThreads.values()) {
            if (replicationSourceShipperThread.getCurrentPath() != null) {
                return replicationSourceShipperThread.getCurrentPath();
            }
        }
        return null;
    }

    private boolean isSourceActive() {
        return !this.stopper.isStopped() && this.sourceRunning;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public String getStats() {
        StringBuilder sb = new StringBuilder();
        sb.append("Total replicated edits: ").append(this.totalReplicatedEdits).append(", current progress: \n");
        for (Map.Entry<String, ReplicationSourceShipperThread> entry : this.workerThreads.entrySet()) {
            String key = entry.getKey();
            ReplicationSourceShipperThread value = entry.getValue();
            long currentPosition = value.getCurrentPosition();
            Path currentPath = value.getCurrentPath();
            sb.append("walGroup [").append(key).append("]: ");
            if (currentPath != null) {
                sb.append("currently replicating from: ").append(currentPath).append(" at position: ").append(currentPosition).append("\n");
            } else {
                sb.append("no replication ongoing, waiting for new log");
            }
        }
        return sb.toString();
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public MetricsSource getSourceMetrics() {
        return this.metrics;
    }

    public long getCurrentBandwidth() {
        ReplicationPeer peer = this.replicationPeers.getPeer(this.peerId);
        long peerBandwidth = peer != null ? peer.getPeerBandwidth() : 0L;
        return peerBandwidth != 0 ? peerBandwidth : this.defaultBandwidth;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.access$802(org.apache.hadoop.hbase.replication.regionserver.ReplicationSource, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$802(org.apache.hadoop.hbase.replication.regionserver.ReplicationSource r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.currentBandwidth = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.access$802(org.apache.hadoop.hbase.replication.regionserver.ReplicationSource, long):long");
    }

    static {
    }
}
