package org.apache.activeblaze.impl.reliable.swp;

import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activeblaze.BlazeNoRouteException;
import org.apache.activeblaze.impl.reliable.ReliableBuffer;
import org.apache.activeblaze.wire.Ack;
import org.apache.activeblaze.wire.Control;
import org.apache.activeblaze.wire.Nack;
import org.apache.activeblaze.wire.Packet;
import org.apache.activeblaze.wire.PacketType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.class */
public class ProducerProcessor {
    static final Log LOG = LogFactory.getLog(ProducerProcessor.class);
    private static final long NOT_SET = -1;
    private final SwpProcessor swp;
    private final SocketAddress peerAddress;
    private int maxWindowSize = 1024;
    private int rtt = 5000;
    private ReliableBuffer replayBuffer = new ReliableBuffer();
    private final Lock lock = new ReentrantLock();
    private final Condition full = this.lock.newCondition();
    private long lastAckId = NOT_SET;
    private long lastAckTime = NOT_SET;
    private AtomicLong sendSequence = new AtomicLong(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProducerProcessor(SwpProcessor swpProcessor, SocketAddress socketAddress) {
        this.swp = swpProcessor;
        this.peerAddress = socketAddress;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processOutbound(Packet packet) throws BlazeNoRouteException {
        packet.setMessageSequence(this.sendSequence.incrementAndGet());
        this.lock.lock();
        try {
            this.replayBuffer.addPacket(packet);
            if (LOG.isDebugEnabled()) {
                LOG.debug(this + " Sending " + packet + "  - buffer size = " + this.replayBuffer.size());
            }
            if (this.replayBuffer.getBufferSize() < this.maxWindowSize || this.full.await(this.rtt, TimeUnit.MILLISECONDS)) {
                return;
            }
            this.replayBuffer.clear();
            throw new BlazeNoRouteException("No route to " + packet.getTo());
        } catch (InterruptedException e) {
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Packet processInbound(Packet packet) throws Exception {
        Packet packet2 = null;
        if (packet != null) {
            int packetType = packet.getPacketType();
            if (packetType == PacketType.ACK.getNumber()) {
                Ack ack = (Ack) packet;
                long startSequence = ack.getStartSequence();
                long endSequence = ack.getEndSequence();
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this + " Got Ack = " + ack.getId() + ": " + ack.getStartSequence() + "," + ack.getEndSequence() + " [" + this.replayBuffer.size() + "]");
                }
                this.replayBuffer.removePackets(startSequence, endSequence);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this + " Processed Ack = " + ack.getId() + ": " + ack.getStartSequence() + "," + ack.getEndSequence() + " [" + this.replayBuffer.size() + "]");
                }
                this.lastAckId = ack.getMessageSequence();
                this.lastAckTime = System.currentTimeMillis();
                if (this.replayBuffer.getBufferSize() <= this.maxWindowSize) {
                    this.lock.lock();
                    try {
                        this.full.signalAll();
                        this.lock.unlock();
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                }
            } else if (packetType == PacketType.NACK.getNumber()) {
                this.lastAckTime = System.currentTimeMillis();
                Nack nack = (Nack) packet;
                this.lastAckId = nack.getMessageSequence();
                LOG.debug(this + " Got Nack = " + nack);
                List<Packet> packets = this.replayBuffer.getPackets(nack.getStartSequence(), nack.getEndSequence());
                LOG.debug(this + " Replaying " + packets);
                Iterator<Packet> it = packets.iterator();
                while (it.hasNext()) {
                    this.swp.sendDownStream(it.next());
                }
            } else {
                packet2 = packet;
            }
        }
        return packet2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean control(long j) throws Exception {
        boolean z = false;
        if (this.lastAckTime + (this.rtt / 2) < j) {
            Control control = new Control();
            this.lock.lock();
            try {
                control.setLastMessageSequence(this.lastAckId);
                control.setTo(this.peerAddress);
                LOG.debug(this + " Sending Control message " + control);
                this.lock.unlock();
                this.swp.sendDownStream(control);
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        } else if (this.lastAckTime + (this.rtt * 2) < j) {
            LOG.debug(this + " Not valid: Last AckTime " + this.lastAckTime + " , " + this.rtt + " , " + j);
            z = true;
        }
        return z;
    }

    public String toString() {
        return "ProducerProcessor(" + this.peerAddress + ")";
    }
}
