package org.apache.hudi.client.transaction.lock.models;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.class */
public class TestLockProviderHeartbeatManager {
    private ScheduledExecutorService mockScheduler;
    private Logger mockLogger;
    private ScheduledFuture<?> mockFuture;
    private HeartbeatManager manager;
    private static final String LOGGER_ID = "test-owner";
    private ScheduledExecutorService actualExecutorService;

    @BeforeEach
    void setUp() {
        this.mockScheduler = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        this.mockLogger = (Logger) Mockito.mock(Logger.class);
        this.mockFuture = (ScheduledFuture) Mockito.mock(ScheduledFuture.class);
        this.actualExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "Heartbeat-Test-Thread");
            thread.setDaemon(true);
            return thread;
        });
    }

    @AfterEach
    void tearDown() throws Exception {
        if (this.manager != null) {
            this.manager.close();
            this.manager = null;
        }
        this.actualExecutorService.shutdownNow();
    }

    @Test
    void testStartHeartbeatSuccess() {
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.eq(100L), ArgumentMatchers.eq(100L), (TimeUnit) ArgumentMatchers.eq(TimeUnit.MILLISECONDS))).thenAnswer(invocationOnMock -> {
            return this.mockFuture;
        });
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
    }

    @Test
    void testStartHeartbeatAlreadyRunning() {
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock -> {
            return this.mockFuture;
        });
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        Assertions.assertFalse(this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Logger) Mockito.verify(this.mockLogger)).warn("Owner {}: Heartbeat is already running.", LOGGER_ID);
    }

    @Test
    void testStartHeartbeatSchedulerException() {
        ((ScheduledExecutorService) Mockito.doThrow(new Throwable[]{new RejectedExecutionException("Scheduler failure")}).when(this.mockScheduler)).scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        Assertions.assertFalse(this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Logger) Mockito.verify(this.mockLogger)).error((String) ArgumentMatchers.eq("Owner {}: Unable to schedule heartbeat task. {}"), ArgumentMatchers.eq(LOGGER_ID), ArgumentMatchers.any(RejectedExecutionException.class));
    }

    @Test
    void testStopHeartbeatNeverStarted() {
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        Assertions.assertFalse(this.manager.stopHeartbeat(true));
        ((Logger) Mockito.verify(this.mockLogger)).warn("Owner {}: No active heartbeat task to stop.", LOGGER_ID);
    }

    @Test
    void testStopHeartbeatAlreadyRequested() {
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock -> {
            return this.mockFuture;
        });
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        Mockito.when(Boolean.valueOf(this.mockFuture.cancel(true))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockFuture.isDone())).thenReturn(false).thenReturn(true);
        Assertions.assertTrue(this.manager.stopHeartbeat(true));
        Assertions.assertFalse(this.manager.stopHeartbeat(true));
        ((Logger) Mockito.verify(this.mockLogger)).warn("Owner {}: No active heartbeat task to stop.", LOGGER_ID);
    }

    @Test
    void testHeartbeatUnableToAcquireSemaphore() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock -> {
            Runnable runnable = (Runnable) invocationOnMock.getArgument(0);
            atomicReference.set(new Thread(() -> {
                runnable.run();
                countDownLatch.countDown();
            }));
            return this.mockFuture;
        });
        Mockito.when(Boolean.valueOf(this.mockFuture.cancel(true))).thenReturn(true);
        Semaphore semaphore = (Semaphore) Mockito.mock(Semaphore.class);
        Mockito.when(Boolean.valueOf(semaphore.tryAcquire())).thenReturn(false);
        Mockito.when(Boolean.valueOf(semaphore.tryAcquire(ArgumentMatchers.eq(LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS), (TimeUnit) ArgumentMatchers.eq(TimeUnit.MILLISECONDS)))).thenReturn(true);
        this.manager = new LockProviderHeartbeatManager(LOGGER_ID, this.mockScheduler, 100L, LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, () -> {
            return true;
        }, semaphore, this.mockLogger);
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Thread) atomicReference.get()).start();
        Assertions.assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS));
        Assertions.assertTrue(this.manager.stopHeartbeat(true));
        ((Logger) Mockito.verify(this.mockLogger)).error("Owner {}: Heartbeat semaphore should be acquirable at the start of every heartbeat!", LOGGER_ID);
        Assertions.assertFalse(this.manager.hasActiveHeartbeat());
    }

    @Test
    void testStopHeartbeatMockSuccessfulCancel() {
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock -> {
            return this.mockFuture;
        });
        Mockito.when(Boolean.valueOf(this.mockFuture.cancel(true))).thenReturn(true);
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        this.manager.startHeartbeatForThread(Thread.currentThread());
        Mockito.when(Boolean.valueOf(this.mockFuture.isDone())).thenReturn(false).thenReturn(true);
        Assertions.assertTrue(this.manager.stopHeartbeat(true));
    }

    @Test
    void testHeartbeatTaskHandlesInterrupt() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock -> {
            Runnable runnable = (Runnable) invocationOnMock.getArgument(0);
            atomicReference.set(new Thread(() -> {
                runnable.run();
                countDownLatch.countDown();
            }));
            return this.mockFuture;
        });
        Mockito.when(Boolean.valueOf(this.mockFuture.cancel(true))).thenReturn(true);
        this.manager = createDefaultManagerWithMocks(() -> {
            return false;
        });
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Thread) atomicReference.get()).start();
        ((Thread) atomicReference.get()).interrupt();
        Assertions.assertTrue(countDownLatch.await(500L, TimeUnit.MILLISECONDS), "Heartbeat task did not run in time");
        Assertions.assertFalse(this.manager.stopHeartbeat(true));
        ((Logger) Mockito.verify(this.mockLogger)).warn("Owner {}: No active heartbeat task to stop.", LOGGER_ID);
        ((Logger) Mockito.verify(this.mockLogger)).debug("Owner {}: Heartbeat started with interval: {} ms", LOGGER_ID, 100L);
        ((Logger) Mockito.verify(this.mockLogger)).info("Owner {}: Requested termination of heartbeat task. Cancellation returned {}.", LOGGER_ID, true);
        Assertions.assertFalse(this.manager.hasActiveHeartbeat());
    }

    @Test
    void testHeartbeatTaskNullWriter() {
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.manager.startHeartbeatForThread((Thread) null);
        });
    }

    @Test
    void testHeartbeatTaskImmediateDeadMonitoringThread() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock -> {
            Runnable runnable = (Runnable) invocationOnMock.getArgument(0);
            atomicReference.set(new Thread(() -> {
                runnable.run();
                countDownLatch.countDown();
            }));
            return this.mockFuture;
        });
        Mockito.when(Boolean.valueOf(this.mockFuture.cancel(false))).thenReturn(false);
        Thread thread = new Thread(() -> {
        });
        thread.start();
        thread.join();
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        Assertions.assertTrue(this.manager.startHeartbeatForThread(thread));
        ((Thread) atomicReference.get()).start();
        Assertions.assertTrue(countDownLatch.await(500L, TimeUnit.MILLISECONDS), "Heartbeat task did not run in time");
        ((Logger) Mockito.verify(this.mockLogger)).warn("Owner {}: Monitored thread is no longer alive.", LOGGER_ID);
        ((Logger) Mockito.verify(this.mockLogger)).info("Owner {}: Requested termination of heartbeat task. Cancellation returned {}.", LOGGER_ID, false);
        Assertions.assertFalse(this.manager.hasActiveHeartbeat());
    }

    @Test
    void testHeartbeatTaskRenewalException() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        Mockito.when(this.mockScheduler.scheduleAtFixedRate((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenAnswer(invocationOnMock -> {
            Runnable runnable = (Runnable) invocationOnMock.getArgument(0);
            atomicReference.set(new Thread(() -> {
                runnable.run();
                countDownLatch.countDown();
            }));
            return this.mockFuture;
        });
        this.manager = createDefaultManagerWithMocks(() -> {
            throw new RuntimeException("Renewal error");
        });
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        ((Thread) atomicReference.get()).start();
        Assertions.assertTrue(countDownLatch.await(500L, TimeUnit.MILLISECONDS), "Heartbeat task did not run in time");
        ((Logger) Mockito.verify(this.mockLogger)).error((String) ArgumentMatchers.eq("Owner {}: Heartbeat function threw exception {}"), ArgumentMatchers.eq(LOGGER_ID), ArgumentMatchers.any(RuntimeException.class));
        Assertions.assertFalse(this.manager.hasActiveHeartbeat());
    }

    @Test
    void testHeartbeatStopWaitsForHeartbeatTaskToFinish() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.manager = createDefaultManagerWithRealExecutor(() -> {
            try {
                Assertions.assertTrue(countDownLatch.await(500L, TimeUnit.MILLISECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return true;
        });
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        new Thread(() -> {
            Assertions.assertTrue(this.manager.stopHeartbeat(false));
            countDownLatch2.countDown();
        }).start();
        countDownLatch.countDown();
        Assertions.assertTrue(countDownLatch2.await(500L, TimeUnit.MILLISECONDS), "Stop heartbeat task did not finish.");
        Assertions.assertFalse(this.manager.hasActiveHeartbeat());
        ((Logger) Mockito.verify(this.mockLogger)).debug("Owner {}: Heartbeat task successfully terminated.", LOGGER_ID);
    }

    @Test
    void testHeartbeatUnableToStopHeartbeatTask() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.manager = new LockProviderHeartbeatManager(LOGGER_ID, this.actualExecutorService, 100L, 5000L, () -> {
            try {
                countDownLatch2.countDown();
                Assertions.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return true;
        }, new Semaphore(1), this.mockLogger);
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            Assertions.assertFalse(this.manager.stopHeartbeat(false));
            countDownLatch3.countDown();
        });
        Assertions.assertTrue(countDownLatch2.await(500L, TimeUnit.MILLISECONDS), "Heartbeat task did not start.");
        thread.start();
        Assertions.assertTrue(countDownLatch3.await(7000L, TimeUnit.MILLISECONDS), "Stop heartbeat task did not finish.");
        Assertions.assertTrue(this.manager.hasActiveHeartbeat());
        ((Logger) Mockito.verify(this.mockLogger)).error("Owner {}: Heartbeat is still in flight!", LOGGER_ID);
        countDownLatch.countDown();
    }

    @Test
    void testHeartbeatInterruptStopHeartbeatTask() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.manager = new LockProviderHeartbeatManager(LOGGER_ID, this.actualExecutorService, 100L, 5000L, () -> {
            try {
                countDownLatch2.countDown();
                Assertions.assertTrue(countDownLatch.await(10000L, TimeUnit.MILLISECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return true;
        }, new Semaphore(1), this.mockLogger);
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            Assertions.assertFalse(this.manager.stopHeartbeat(false));
            countDownLatch3.countDown();
        });
        Assertions.assertTrue(countDownLatch2.await(500L, TimeUnit.MILLISECONDS), "Heartbeat task did not start.");
        thread.start();
        thread.interrupt();
        Assertions.assertTrue(countDownLatch3.await(7000L, TimeUnit.MILLISECONDS), "Stop heartbeat task did not finish.");
        Assertions.assertTrue(this.manager.hasActiveHeartbeat());
        ((Logger) Mockito.verify(this.mockLogger)).warn("Owner {}: Interrupted while waiting for heartbeat termination.", LOGGER_ID);
        countDownLatch.countDown();
    }

    @Test
    void testHeartbeatTaskValidateStop() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.manager = createDefaultManagerWithRealExecutor(() -> {
            countDownLatch.countDown();
            return true;
        });
        Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
        Assertions.assertTrue(countDownLatch.await(2000L, TimeUnit.MILLISECONDS), "Heartbeat did not renew twice in time");
        Assertions.assertEquals(0L, countDownLatch.getCount(), "Heartbeat did not execute exactly twice");
        Assertions.assertTrue(this.manager.hasActiveHeartbeat());
        Assertions.assertTrue(this.manager.stopHeartbeat(false));
        Assertions.assertFalse(this.manager.hasActiveHeartbeat());
    }

    @Test
    void testDefaultManagerRapidStartStop1Ms() {
        this.manager = new LockProviderHeartbeatManager(LOGGER_ID, 1L, () -> {
            return true;
        });
        for (int i = 0; i < 100; i++) {
            Assertions.assertTrue(this.manager.startHeartbeatForThread(Thread.currentThread()));
            Assertions.assertTrue(this.manager.hasActiveHeartbeat());
            Assertions.assertTrue(this.manager.stopHeartbeat(true));
            Assertions.assertFalse(this.manager.hasActiveHeartbeat());
        }
    }

    @Test
    void testClose() throws Exception {
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        this.manager.close();
        Assertions.assertFalse(this.manager.hasActiveHeartbeat());
    }

    @Test
    void testClose_StopsHeartbeatAndShutsDownScheduler() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockScheduler.awaitTermination(5L, TimeUnit.SECONDS))).thenReturn(true);
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        this.manager.close();
        ((ScheduledExecutorService) Mockito.verify(this.mockScheduler)).shutdown();
        ((ScheduledExecutorService) Mockito.verify(this.mockScheduler, Mockito.never())).shutdownNow();
    }

    @Test
    void testClose_ForceShutdownWhenTerminationTimesOut() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockScheduler.awaitTermination(5L, TimeUnit.SECONDS))).thenReturn(false);
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        this.manager.close();
        ((ScheduledExecutorService) Mockito.verify(this.mockScheduler)).shutdown();
        ((ScheduledExecutorService) Mockito.verify(this.mockScheduler)).shutdownNow();
    }

    @Test
    void testClose_HandlesInterruptedException() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockScheduler.awaitTermination(5L, TimeUnit.SECONDS))).thenThrow(new Throwable[]{new InterruptedException()});
        this.manager = createDefaultManagerWithMocks(() -> {
            return true;
        });
        this.manager.close();
        ((ScheduledExecutorService) Mockito.verify(this.mockScheduler)).shutdown();
        ((ScheduledExecutorService) Mockito.verify(this.mockScheduler)).shutdownNow();
        Assertions.assertTrue(Thread.currentThread().isInterrupted(), "Thread should be interrupted after exception handling");
    }

    private LockProviderHeartbeatManager createDefaultManagerWithMocks(Supplier<Boolean> supplier) {
        return new LockProviderHeartbeatManager(LOGGER_ID, this.mockScheduler, 100L, LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, supplier, new Semaphore(1), this.mockLogger);
    }

    private LockProviderHeartbeatManager createDefaultManagerWithRealExecutor(Supplier<Boolean> supplier) {
        return new LockProviderHeartbeatManager(LOGGER_ID, this.actualExecutorService, 100L, LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, supplier, new Semaphore(1), this.mockLogger);
    }
}
