package org.postgresql.core.v3.replication;

import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.postgresql.copy.CopyDual;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.ReplicationType;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;

/* loaded from: input_file:postgresql-42.7.1.jar:org/postgresql/core/v3/replication/V3PGReplicationStream.class */
public class V3PGReplicationStream implements PGReplicationStream {
    private static final Logger LOGGER = Logger.getLogger(V3PGReplicationStream.class.getName());
    public static final long POSTGRES_EPOCH_2000_01_01 = 946684800000L;
    private static final long NANOS_PER_MILLISECOND = 1000000;
    private final CopyDual copyDual;
    private final long updateInterval;
    private final ReplicationType replicationType;
    private long lastStatusUpdate;
    private volatile LogSequenceNumber lastReceiveLSN;
    private boolean closeFlag = false;
    private LogSequenceNumber lastServerLSN = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber lastAppliedLSN = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber lastFlushedLSN = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber startOfLastMessageLSN = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber explicitlyFlushedLSN = LogSequenceNumber.INVALID_LSN;

    public V3PGReplicationStream(CopyDual copyDual, LogSequenceNumber logSequenceNumber, long j, ReplicationType replicationType) {
        this.lastReceiveLSN = LogSequenceNumber.INVALID_LSN;
        this.copyDual = copyDual;
        this.updateInterval = j * 1000000;
        this.lastStatusUpdate = System.nanoTime() - (j * 1000000);
        this.lastReceiveLSN = logSequenceNumber;
        this.replicationType = replicationType;
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public ByteBuffer read() throws SQLException {
        ByteBuffer byteBuffer;
        checkClose();
        ByteBuffer byteBuffer2 = null;
        while (true) {
            byteBuffer = byteBuffer2;
            if (byteBuffer != null || !this.copyDual.isActive()) {
                break;
            }
            byteBuffer2 = readInternal(true);
        }
        return byteBuffer;
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public ByteBuffer readPending() throws SQLException {
        checkClose();
        return readInternal(false);
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public LogSequenceNumber getLastReceiveLSN() {
        return this.lastReceiveLSN;
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public LogSequenceNumber getLastFlushedLSN() {
        return this.lastFlushedLSN;
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public LogSequenceNumber getLastAppliedLSN() {
        return this.lastAppliedLSN;
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public void setFlushedLSN(LogSequenceNumber logSequenceNumber) {
        this.lastFlushedLSN = logSequenceNumber;
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public void setAppliedLSN(LogSequenceNumber logSequenceNumber) {
        this.lastAppliedLSN = logSequenceNumber;
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public void forceUpdateStatus() throws SQLException {
        checkClose();
        updateStatusInternal(this.lastReceiveLSN, this.lastFlushedLSN, this.lastAppliedLSN, true);
    }

    @Override // org.postgresql.replication.PGReplicationStream
    public boolean isClosed() {
        return this.closeFlag || !this.copyDual.isActive();
    }

    private ByteBuffer readInternal(boolean z) throws SQLException {
        boolean z2 = false;
        while (true) {
            boolean z3 = z2;
            if (!this.copyDual.isActive()) {
                return null;
            }
            ByteBuffer receiveNextData = receiveNextData(z);
            if (z3 || isTimeUpdate()) {
                timeUpdateStatus();
            }
            if (receiveNextData == null) {
                return null;
            }
            byte b = receiveNextData.get();
            switch (b) {
                case 107:
                    z2 = processKeepAliveMessage(receiveNextData) | (this.updateInterval == 0);
                case 119:
                    return processXLogData(receiveNextData);
                default:
                    throw new PSQLException(GT.tr("Unexpected packet type during replication: {0}", Integer.toString(b)), PSQLState.PROTOCOL_VIOLATION);
            }
        }
    }

    private ByteBuffer receiveNextData(boolean z) throws SQLException {
        try {
            byte[] readFromCopy = this.copyDual.readFromCopy(z);
            if (readFromCopy != null) {
                return ByteBuffer.wrap(readFromCopy);
            }
            return null;
        } catch (PSQLException e) {
            if (e.getCause() instanceof SocketTimeoutException) {
                return null;
            }
            throw e;
        }
    }

    private boolean isTimeUpdate() {
        return this.updateInterval != 0 && System.nanoTime() - this.lastStatusUpdate >= this.updateInterval;
    }

    private void timeUpdateStatus() throws SQLException {
        updateStatusInternal(this.lastReceiveLSN, this.lastFlushedLSN, this.lastAppliedLSN, false);
    }

    private void updateStatusInternal(LogSequenceNumber logSequenceNumber, LogSequenceNumber logSequenceNumber2, LogSequenceNumber logSequenceNumber3, boolean z) throws SQLException {
        byte[] prepareUpdateStatus = prepareUpdateStatus(logSequenceNumber, logSequenceNumber2, logSequenceNumber3, z);
        this.copyDual.writeToCopy(prepareUpdateStatus, 0, prepareUpdateStatus.length);
        this.copyDual.flushCopy();
        this.explicitlyFlushedLSN = logSequenceNumber2;
        this.lastStatusUpdate = System.nanoTime();
    }

    private byte[] prepareUpdateStatus(LogSequenceNumber logSequenceNumber, LogSequenceNumber logSequenceNumber2, LogSequenceNumber logSequenceNumber3, boolean z) {
        ByteBuffer allocate = ByteBuffer.allocate(34);
        long nanoTime = System.nanoTime() / 1000000;
        long convert = TimeUnit.MICROSECONDS.convert(nanoTime - POSTGRES_EPOCH_2000_01_01, TimeUnit.MICROSECONDS);
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, " FE=> StandbyStatusUpdate(received: {0}, flushed: {1}, applied: {2}, clock: {3})", new Object[]{logSequenceNumber.asString(), logSequenceNumber2.asString(), logSequenceNumber3.asString(), new Date(nanoTime)});
        }
        allocate.put((byte) 114);
        allocate.putLong(logSequenceNumber.asLong());
        allocate.putLong(logSequenceNumber2.asLong());
        allocate.putLong(logSequenceNumber3.asLong());
        allocate.putLong(convert);
        if (z) {
            allocate.put((byte) 1);
        } else {
            allocate.put(logSequenceNumber == LogSequenceNumber.INVALID_LSN ? (byte) 1 : (byte) 0);
        }
        this.lastStatusUpdate = nanoTime;
        return allocate.array();
    }

    private boolean processKeepAliveMessage(ByteBuffer byteBuffer) {
        this.lastServerLSN = LogSequenceNumber.valueOf(byteBuffer.getLong());
        if (this.lastServerLSN.asLong() > this.lastReceiveLSN.asLong()) {
            this.lastReceiveLSN = this.lastServerLSN;
        }
        if (this.explicitlyFlushedLSN.asLong() >= this.startOfLastMessageLSN.asLong() && this.lastServerLSN.asLong() > this.explicitlyFlushedLSN.asLong() && this.lastServerLSN.asLong() > this.lastFlushedLSN.asLong()) {
            this.lastFlushedLSN = this.lastServerLSN;
        }
        long j = byteBuffer.getLong();
        boolean z = byteBuffer.get() != 0;
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "  <=BE Keepalive(lastServerWal: {0}, clock: {1} needReply: {2})", new Object[]{this.lastServerLSN.asString(), new Date(TimeUnit.MILLISECONDS.convert(j, TimeUnit.MICROSECONDS) + POSTGRES_EPOCH_2000_01_01), Boolean.valueOf(z)});
        }
        return z;
    }

    private ByteBuffer processXLogData(ByteBuffer byteBuffer) {
        long j = byteBuffer.getLong();
        this.startOfLastMessageLSN = LogSequenceNumber.valueOf(j);
        this.lastServerLSN = LogSequenceNumber.valueOf(byteBuffer.getLong());
        long j2 = byteBuffer.getLong();
        switch (this.replicationType) {
            case LOGICAL:
                this.lastReceiveLSN = LogSequenceNumber.valueOf(j);
                break;
            case PHYSICAL:
                this.lastReceiveLSN = LogSequenceNumber.valueOf(j + (byteBuffer.limit() - byteBuffer.position()));
                break;
        }
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "  <=BE XLogData(currWal: {0}, lastServerWal: {1}, clock: {2})", new Object[]{this.lastReceiveLSN.asString(), this.lastServerLSN.asString(), Long.valueOf(j2)});
        }
        return byteBuffer.slice();
    }

    private void checkClose() throws PSQLException {
        if (isClosed()) {
            throw new PSQLException(GT.tr("This replication stream has been closed.", new Object[0]), PSQLState.CONNECTION_DOES_NOT_EXIST);
        }
    }

    @Override // org.postgresql.replication.PGReplicationStream, java.lang.AutoCloseable
    public void close() throws SQLException {
        if (isClosed()) {
            return;
        }
        LOGGER.log(Level.FINEST, " FE=> StopReplication");
        this.copyDual.endCopy();
        this.closeFlag = true;
    }
}
