package org.apache.activeblaze.impl.reliable.swp;

import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activeblaze.impl.reliable.ReliableBuffer;
import org.apache.activeblaze.wire.Ack;
import org.apache.activeblaze.wire.Nack;
import org.apache.activeblaze.wire.Packet;
import org.apache.activeblaze.wire.PacketType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.class */
public class ConsumerProcessor {
    static final Log LOG = LogFactory.getLog(ConsumerProcessor.class);
    private static final long NOT_SET = -1;
    private long lastAckTime;
    private int bufferSize;
    private final SwpProcessor swp;
    private final SocketAddress peerAddress;
    private int maxWindowSize = 64;
    private int rtt = 1000;
    private long firstSequence = NOT_SET;
    private long lastSequence = NOT_SET;
    private ReliableBuffer replayBuffer = new ReliableBuffer();
    private final Lock lock = new ReentrantLock();
    private final AtomicLong ackSequence = new AtomicLong();
    private Packet lastAck = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerProcessor(SwpProcessor swpProcessor, SocketAddress socketAddress) {
        this.swp = swpProcessor;
        this.peerAddress = socketAddress;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processInBound(Packet packet) throws Exception {
        if (packet.getPacketType() == PacketType.CONTROL.getNumber()) {
            if (this.replayBuffer.isEmpty()) {
            }
            Packet packet2 = this.lastAck;
            if (packet2 != null) {
                this.swp.sendDownStream(packet2);
                return;
            }
            return;
        }
        long messageSequence = packet.getMessageSequence();
        if (messageSequence == this.lastSequence + 1 || this.lastSequence == NOT_SET) {
            this.lock.lock();
            try {
                if (this.lastSequence == NOT_SET) {
                    this.firstSequence = messageSequence;
                }
                this.lastSequence = messageSequence;
                this.bufferSize++;
                this.lock.unlock();
                this.swp.sendUpStream(packet);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this + " consumed packet " + packet.getMessageSequence() + " FROM: " + packet.getFrom());
                }
                if (this.bufferSize >= this.maxWindowSize) {
                    sendAck();
                    return;
                }
                return;
            } finally {
            }
        }
        LOG.debug(this + " Packet not in sequence " + packet.getMessageSequence() + " FROM: " + packet.getFrom() + " Last sequence is " + this.lastSequence);
        this.replayBuffer.addPacketInOrder(packet);
        List<Packet> list = null;
        if (this.replayBuffer.size() > 1) {
            synchronized (this.replayBuffer) {
                list = this.replayBuffer.getOrderedBuffer();
                if (!list.isEmpty()) {
                    this.replayBuffer.clear();
                }
            }
        }
        if (list != null && !list.isEmpty()) {
            Iterator<Packet> it = list.iterator();
            while (it.hasNext()) {
                this.swp.sendUpStream(it.next());
            }
            return;
        }
        if (packet.isReplayed() || this.replayBuffer.isEmpty()) {
            return;
        }
        Nack nack = new Nack();
        this.lock.lock();
        try {
            nack.setStartSequence(this.lastSequence + 1);
            nack.setEndSequence(packet.getMessageSequence() - 1);
            nack.setMessageSequence(this.ackSequence.incrementAndGet());
            nack.setResponseRequired(false);
            nack.setTo(this.peerAddress);
            this.swp.sendDownStream(nack);
            LOG.debug(this + " Sending Nack: " + nack.getStartSequence() + " , " + nack.getEndSequence());
            this.lastAck = nack;
            this.lock.unlock();
            this.lastAckTime = System.currentTimeMillis();
        } finally {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean control(long j) throws Exception {
        if (this.lastAckTime + this.rtt < j || this.bufferSize >= this.maxWindowSize) {
            sendAck();
        }
        return false;
    }

    void sendAck() throws Exception {
        this.lock.lock();
        try {
            this.bufferSize = 0;
            Ack ack = new Ack();
            ack.setStartSequence(this.firstSequence);
            ack.setEndSequence(this.lastSequence);
            ack.setMessageSequence(this.ackSequence.incrementAndGet());
            ack.setResponseRequired(false);
            ack.setTo(this.peerAddress);
            this.lastAckTime = System.currentTimeMillis();
            this.firstSequence = this.lastSequence;
            LOG.debug(this + " Sent Ack " + ack);
            this.swp.sendDownStream(ack);
            this.lastAck = ack;
        } finally {
            this.lock.unlock();
        }
    }

    public String toString() {
        return "ConsumerProcessor(" + this.peerAddress + ")";
    }
}
