package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.util.LoggingContext;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceConnector;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/ErrorHandlerTest.class */
public class ErrorHandlerTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/pipeline/ErrorHandlerTest$TestConnectorConfig.class */
    public static final class TestConnectorConfig extends CommonConnectorConfig {
        protected TestConnectorConfig(Configuration configuration) {
            super(configuration, 0);
        }

        public String getContextName() {
            return "test";
        }

        public String getConnectorName() {
            return "test";
        }

        public EnumeratedValue getSnapshotMode() {
            return null;
        }

        public Optional<EnumeratedValue> getSnapshotLockingMode() {
            return Optional.empty();
        }

        protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
            return null;
        }
    }

    @Test
    public void noError() throws Exception {
        poll(queue());
    }

    @Test
    public void nonRetriableByDefault() throws Exception {
        Configuration empty = Configuration.empty();
        ChangeEventQueue<DataChangeEvent> queue = queue();
        initErrorHandler(empty, queue, new IllegalArgumentException("This is my error"));
        pollAndAssertNonRetriable(queue);
    }

    @Test
    public void isRetriable() throws Exception {
        Configuration build = Configuration.create().build();
        ChangeEventQueue<DataChangeEvent> queue = queue();
        initErrorHandler(build, queue, new IOException("This is my error"));
        pollAndAssertRetriable(queue);
    }

    @Test
    public void isRetryingWithMaxTimes() throws Exception {
        Configuration build = Configuration.create().with(CommonConnectorConfig.MAX_RETRIES_ON_ERROR, 2).build();
        ChangeEventQueue<DataChangeEvent> queue = queue();
        IOException iOException = new IOException("This is my error");
        ErrorHandler initErrorHandler = initErrorHandler(build, queue, iOException);
        pollAndAssertRetriable(queue);
        ErrorHandler replaceErrorHandler = replaceErrorHandler(build, queue, new Exception(iOException), initErrorHandler);
        pollAndAssertRetriable(queue);
        replaceErrorHandler(build, queue, iOException, replaceErrorHandler);
        pollAndAssertNonRetriable(queue);
    }

    @Test
    public void isNotRetryingWithMaxRetries() throws Exception {
        Configuration build = Configuration.create().with(CommonConnectorConfig.MAX_RETRIES_ON_ERROR, 2).build();
        ChangeEventQueue<DataChangeEvent> queue = queue();
        ErrorHandler initErrorHandler = initErrorHandler(build, queue, new IOException("This is my error"));
        pollAndAssertRetriable(queue);
        replaceErrorHandler(build, queue, new Exception("This is fatal"), initErrorHandler);
        pollAndAssertNonRetriable(queue);
    }

    @Test
    public void customRetriableMatch() throws Exception {
        Configuration build = Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*my error.*").build();
        ChangeEventQueue<DataChangeEvent> queue = queue();
        initErrorHandler(build, queue, new IllegalArgumentException("This is my error to retry"));
        pollAndAssertRetriable(queue);
    }

    @Test
    public void customRetriableNoMatch() throws Exception {
        Configuration build = Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*not my error.*").build();
        ChangeEventQueue<DataChangeEvent> queue = queue();
        initErrorHandler(build, queue, new IllegalArgumentException("This is my error to retry"));
        pollAndAssertNonRetriable(queue);
    }

    @Test
    public void customRetriableMatchNested() throws Exception {
        Configuration build = Configuration.create().with(CommonConnectorConfig.CUSTOM_RETRIABLE_EXCEPTION, ".*my error.*").build();
        ChangeEventQueue<DataChangeEvent> queue = queue();
        initErrorHandler(build, queue, new Exception("Main", new IllegalArgumentException("This is my error to retry")));
        pollAndAssertRetriable(queue);
    }

    private void pollAndAssertRetriable(ChangeEventQueue<DataChangeEvent> changeEventQueue) throws Exception {
        try {
            poll(changeEventQueue);
            Assert.fail("Exception must be thrown");
        } catch (ConnectException e) {
            Assertions.assertThat(e).isInstanceOf(RetriableException.class);
        }
    }

    private void pollAndAssertNonRetriable(ChangeEventQueue<DataChangeEvent> changeEventQueue) throws Exception {
        try {
            poll(changeEventQueue);
            Assert.fail("Exception must be thrown");
        } catch (ConnectException e) {
            Assertions.assertThat(e).isNotInstanceOf(RetriableException.class);
        }
    }

    private void poll(ChangeEventQueue<DataChangeEvent> changeEventQueue) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            changeEventQueue.poll();
            Thread.sleep(100L);
        }
    }

    private ErrorHandler initErrorHandler(Configuration configuration, ChangeEventQueue<DataChangeEvent> changeEventQueue, Throwable th) {
        return replaceErrorHandler(configuration, changeEventQueue, th, null);
    }

    private ErrorHandler replaceErrorHandler(Configuration configuration, ChangeEventQueue<DataChangeEvent> changeEventQueue, Throwable th, ErrorHandler errorHandler) {
        ErrorHandler errorHandler2 = new ErrorHandler(SourceConnector.class, new TestConnectorConfig(configuration), changeEventQueue, errorHandler);
        if (th != null) {
            errorHandler2.setProducerThrowable(th);
        }
        return errorHandler2;
    }

    private ChangeEventQueue<DataChangeEvent> queue() {
        return new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(1L)).maxBatchSize(1000).maxQueueSize(1000).loggingContextSupplier(() -> {
            return LoggingContext.forConnector("test", "test", "test");
        }).build();
    }
}
