package com.google.cloud.bigtable.data.v2.stub.changestream;

import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.Value;
import com.google.common.base.Preconditions;
import org.threeten.bp.Instant;

/* loaded from: input_file:com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.class */
final class ChangeStreamStateMachine<ChangeStreamRecordT> {
    private final ChangeStreamRecordAdapter.ChangeStreamRecordBuilder<ChangeStreamRecordT> builder;
    private State currentState;
    private ChangeStreamRecordT completeChangeStreamRecord;
    private int numHeartbeats = 0;
    private int numCloseStreams = 0;
    private int numDataChanges = 0;
    private int numNonCellMods = 0;
    private int numCellChunks = 0;
    private int expectedTotalSizeOfChunkedSetCell = 0;
    private int actualTotalSizeOfChunkedSetCell = 0;
    private final State AWAITING_NEW_STREAM_RECORD = new State() { // from class: com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.1
        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
            ChangeStreamStateMachine.this.validate(ChangeStreamStateMachine.this.completeChangeStreamRecord == null, "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet.");
            ChangeStreamStateMachine.this.completeChangeStreamRecord = ChangeStreamStateMachine.this.builder.onHeartbeat(heartbeat);
            return ChangeStreamStateMachine.this.AWAITING_STREAM_RECORD_CONSUME;
        }

        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
            ChangeStreamStateMachine.this.validate(ChangeStreamStateMachine.this.completeChangeStreamRecord == null, "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet.");
            ChangeStreamStateMachine.this.completeChangeStreamRecord = ChangeStreamStateMachine.this.builder.onCloseStream(closeStream);
            return ChangeStreamStateMachine.this.AWAITING_STREAM_RECORD_CONSUME;
        }

        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
            ChangeStreamStateMachine.this.validate(ChangeStreamStateMachine.this.completeChangeStreamRecord == null, "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet.");
            ChangeStreamStateMachine.this.validate(!dataChange.getRowKey().isEmpty(), "AWAITING_NEW_STREAM_RECORD: First data change missing rowKey.");
            ChangeStreamStateMachine.this.validate(dataChange.hasCommitTimestamp(), "AWAITING_NEW_STREAM_RECORD: First data change missing commit timestamp.");
            ChangeStreamStateMachine.this.validate(dataChange.getChunksCount() > 0, "AWAITING_NEW_STREAM_RECORD: First data change missing mods.");
            if (dataChange.getType() == ReadChangeStreamResponse.DataChange.Type.GARBAGE_COLLECTION) {
                ChangeStreamStateMachine.this.validate(dataChange.getSourceClusterId().isEmpty(), "AWAITING_NEW_STREAM_RECORD: GC mutation shouldn't have source cluster id.");
                ChangeStreamStateMachine.this.builder.startGcMutation(dataChange.getRowKey(), Instant.ofEpochSecond(dataChange.getCommitTimestamp().getSeconds(), dataChange.getCommitTimestamp().getNanos()), dataChange.getTiebreaker());
            } else if (dataChange.getType() == ReadChangeStreamResponse.DataChange.Type.USER) {
                ChangeStreamStateMachine.this.validate(!dataChange.getSourceClusterId().isEmpty(), "AWAITING_NEW_STREAM_RECORD: User initiated data change missing source cluster id.");
                ChangeStreamStateMachine.this.builder.startUserMutation(dataChange.getRowKey(), dataChange.getSourceClusterId(), Instant.ofEpochSecond(dataChange.getCommitTimestamp().getSeconds(), dataChange.getCommitTimestamp().getNanos()), dataChange.getTiebreaker());
            } else {
                ChangeStreamStateMachine.this.validate(false, "AWAITING_NEW_STREAM_RECORD: Unexpected type: " + dataChange.getType());
            }
            return ChangeStreamStateMachine.this.AWAITING_NEW_DATA_CHANGE.handleDataChange(dataChange);
        }
    };
    private final State AWAITING_NEW_DATA_CHANGE = new State() { // from class: com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.2
        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
            throw new IllegalStateException("AWAITING_NEW_DATA_CHANGE: Can't handle a Heartbeat in the middle of building a ChangeStreamMutation.");
        }

        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
            throw new IllegalStateException("AWAITING_NEW_DATA_CHANGE: Can't handle a CloseStream in the middle of building a ChangeStreamMutation.");
        }

        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
            int i = 0;
            while (i < dataChange.getChunksCount()) {
                ReadChangeStreamResponse.MutationChunk chunks = dataChange.getChunks(i);
                Mutation mutation = chunks.getMutation();
                if (mutation.hasSetCell()) {
                    Mutation.SetCell setCell = chunks.getMutation().getSetCell();
                    if (chunks.hasChunkInfo()) {
                        if (chunks.getChunkInfo().getChunkedValueOffset() == 0) {
                            ChangeStreamStateMachine.this.validate(chunks.getChunkInfo().getChunkedValueSize() > 0, "AWAITING_NEW_DATA_CHANGE: First chunk of a chunked cell must have a positive chunked value size.");
                            ChangeStreamStateMachine.this.expectedTotalSizeOfChunkedSetCell = chunks.getChunkInfo().getChunkedValueSize();
                            ChangeStreamStateMachine.this.actualTotalSizeOfChunkedSetCell = 0;
                            ChangeStreamStateMachine.this.builder.startCell(setCell.getFamilyName(), setCell.getColumnQualifier(), setCell.getTimestampMicros());
                        } else {
                            ChangeStreamStateMachine.this.validate(i == 0, "AWAITING_NEW_DATA_CHANGE: Non-first chunked SetCell must be the first mod of a DataChange.");
                        }
                        ChangeStreamStateMachine.this.validate(chunks.getChunkInfo().getChunkedValueSize() == ChangeStreamStateMachine.this.expectedTotalSizeOfChunkedSetCell, "AWAITING_NEW_DATA_CHANGE: Chunked cell value size must be the same for all chunks.");
                        ChangeStreamStateMachine.access$508(ChangeStreamStateMachine.this);
                        ChangeStreamStateMachine.this.builder.cellValue(setCell.getValue());
                        ChangeStreamStateMachine.access$712(ChangeStreamStateMachine.this, setCell.getValue().size());
                        if (!chunks.getChunkInfo().getLastChunk()) {
                            ChangeStreamStateMachine.this.validate(i == dataChange.getChunksCount() - 1, "AWAITING_NEW_DATA_CHANGE: Current mod is a chunked SetCell but not the last chunk, but it's not the last mod of the current response.");
                            return ChangeStreamStateMachine.this.AWAITING_NEW_DATA_CHANGE;
                        }
                        ChangeStreamStateMachine.this.builder.finishCell();
                        ChangeStreamStateMachine.this.validate(ChangeStreamStateMachine.this.actualTotalSizeOfChunkedSetCell == ChangeStreamStateMachine.this.expectedTotalSizeOfChunkedSetCell, "Chunked value size in ChunkInfo doesn't match the actual total size. Expected total size: " + ChangeStreamStateMachine.this.expectedTotalSizeOfChunkedSetCell + "; actual total size: " + ChangeStreamStateMachine.this.actualTotalSizeOfChunkedSetCell);
                    } else {
                        ChangeStreamStateMachine.this.builder.startCell(setCell.getFamilyName(), setCell.getColumnQualifier(), setCell.getTimestampMicros());
                        ChangeStreamStateMachine.access$508(ChangeStreamStateMachine.this);
                        ChangeStreamStateMachine.this.builder.cellValue(setCell.getValue());
                        ChangeStreamStateMachine.this.builder.finishCell();
                    }
                } else if (mutation.hasDeleteFromFamily()) {
                    ChangeStreamStateMachine.access$808(ChangeStreamStateMachine.this);
                    ChangeStreamStateMachine.this.builder.deleteFamily(mutation.getDeleteFromFamily().getFamilyName());
                } else if (mutation.hasDeleteFromColumn()) {
                    ChangeStreamStateMachine.access$808(ChangeStreamStateMachine.this);
                    ChangeStreamStateMachine.this.builder.deleteCells(mutation.getDeleteFromColumn().getFamilyName(), mutation.getDeleteFromColumn().getColumnQualifier(), Range.TimestampRange.create(mutation.getDeleteFromColumn().getTimeRange().getStartTimestampMicros(), mutation.getDeleteFromColumn().getTimeRange().getEndTimestampMicros()));
                } else if (mutation.hasAddToCell()) {
                    ChangeStreamStateMachine.this.builder.addToCell(mutation.getAddToCell().getFamilyName(), Value.fromProto(mutation.getAddToCell().getColumnQualifier()), Value.fromProto(mutation.getAddToCell().getTimestamp()), Value.fromProto(mutation.getAddToCell().getInput()));
                } else {
                    if (!mutation.hasMergeToCell()) {
                        throw new IllegalStateException("Received unknown mod type. You may need to upgrade your Bigtable client.");
                    }
                    ChangeStreamStateMachine.this.builder.mergeToCell(mutation.getMergeToCell().getFamilyName(), Value.fromProto(mutation.getMergeToCell().getColumnQualifier()), Value.fromProto(mutation.getMergeToCell().getTimestamp()), Value.fromProto(mutation.getMergeToCell().getInput()));
                }
                i++;
            }
            return ChangeStreamStateMachine.this.checkAndFinishMutationIfNeeded(dataChange);
        }
    };
    private final State AWAITING_STREAM_RECORD_CONSUME = new State() { // from class: com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.3
        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
            throw new IllegalStateException("AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record.");
        }

        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
            throw new IllegalStateException("AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record.");
        }

        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
            throw new IllegalStateException("AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record.");
        }
    };
    private final State ERROR = new State() { // from class: com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.4
        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
            throw new IllegalStateException("ERROR: Failed to handle Heartbeat.");
        }

        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
            throw new IllegalStateException("ERROR: Failed to handle CloseStream.");
        }

        @Override // com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamStateMachine.State
        State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
            throw new IllegalStateException("ERROR: Failed to handle DataChange.");
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine$InvalidInputException.class */
    public static class InvalidInputException extends RuntimeException {
        InvalidInputException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine$State.class */
    public static abstract class State {
        State() {
        }

        State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
            throw new IllegalStateException();
        }

        State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
            throw new IllegalStateException();
        }

        State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
            throw new IllegalStateException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangeStreamStateMachine(ChangeStreamRecordAdapter.ChangeStreamRecordBuilder<ChangeStreamRecordT> changeStreamRecordBuilder) {
        this.builder = changeStreamRecordBuilder;
        reset();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
        try {
            this.numHeartbeats++;
            this.currentState = this.currentState.handleHeartbeat(heartbeat);
        } catch (RuntimeException e) {
            this.currentState = this.ERROR;
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
        try {
            this.numCloseStreams++;
            this.currentState = this.currentState.handleCloseStream(closeStream);
        } catch (RuntimeException e) {
            this.currentState = this.ERROR;
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
        try {
            this.numDataChanges++;
            this.currentState = this.currentState.handleDataChange(dataChange);
        } catch (RuntimeException e) {
            this.currentState = this.ERROR;
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangeStreamRecordT consumeChangeStreamRecord() {
        Preconditions.checkState(this.completeChangeStreamRecord != null, "No change stream record to consume.");
        Preconditions.checkState(this.currentState == this.AWAITING_STREAM_RECORD_CONSUME, "Change stream record is not ready to consume: " + this.currentState);
        ChangeStreamRecordT changestreamrecordt = this.completeChangeStreamRecord;
        reset();
        return changestreamrecordt;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasCompleteChangeStreamRecord() {
        return this.completeChangeStreamRecord != null && this.currentState == this.AWAITING_STREAM_RECORD_CONSUME;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isChangeStreamRecordInProgress() {
        return this.currentState != this.AWAITING_NEW_STREAM_RECORD;
    }

    private void reset() {
        this.currentState = this.AWAITING_NEW_STREAM_RECORD;
        this.numHeartbeats = 0;
        this.numCloseStreams = 0;
        this.numDataChanges = 0;
        this.numNonCellMods = 0;
        this.numCellChunks = 0;
        this.expectedTotalSizeOfChunkedSetCell = 0;
        this.actualTotalSizeOfChunkedSetCell = 0;
        this.completeChangeStreamRecord = null;
        this.builder.reset();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public State checkAndFinishMutationIfNeeded(ReadChangeStreamResponse.DataChange dataChange) {
        if (!dataChange.getDone()) {
            return this.AWAITING_NEW_DATA_CHANGE;
        }
        validate(!dataChange.getToken().isEmpty(), "Last data change missing token");
        validate(dataChange.hasEstimatedLowWatermark(), "Last data change missing lowWatermark");
        this.completeChangeStreamRecord = this.builder.finishChangeStreamMutation(dataChange.getToken(), Instant.ofEpochSecond(dataChange.getEstimatedLowWatermark().getSeconds(), dataChange.getEstimatedLowWatermark().getNanos()));
        return this.AWAITING_STREAM_RECORD_CONSUME;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validate(boolean z, String str) {
        if (!z) {
            throw new InvalidInputException(str + ". numHeartbeats: " + this.numHeartbeats + ", numCloseStreams: " + this.numCloseStreams + ", numDataChanges: " + this.numDataChanges + ", numNonCellMods: " + this.numNonCellMods + ", numCellChunks: " + this.numCellChunks + ", expectedTotalSizeOfChunkedSetCell: " + this.expectedTotalSizeOfChunkedSetCell + ", actualTotalSizeOfChunkedSetCell: " + this.actualTotalSizeOfChunkedSetCell);
        }
    }

    static /* synthetic */ int access$508(ChangeStreamStateMachine changeStreamStateMachine) {
        int i = changeStreamStateMachine.numCellChunks;
        changeStreamStateMachine.numCellChunks = i + 1;
        return i;
    }

    static /* synthetic */ int access$712(ChangeStreamStateMachine changeStreamStateMachine, int i) {
        int i2 = changeStreamStateMachine.actualTotalSizeOfChunkedSetCell + i;
        changeStreamStateMachine.actualTotalSizeOfChunkedSetCell = i2;
        return i2;
    }

    static /* synthetic */ int access$808(ChangeStreamStateMachine changeStreamStateMachine) {
        int i = changeStreamStateMachine.numNonCellMods;
        changeStreamStateMachine.numNonCellMods = i + 1;
        return i;
    }
}
