package org.apache.activeblaze.impl.reliable.swp;

import java.net.SocketAddress;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.SendRequest;
import org.apache.activeblaze.wire.Packet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activeblaze/impl/reliable/swp/SwpProcessor.class */
public class SwpProcessor extends DefaultChainedProcessor {
    static final Log LOG = LogFactory.getLog(SwpProcessor.class);
    private Map<Packet, SendRequest> messageRequests;
    private Timer statusTimer;
    private int maxConcurrentRequests = 1000;
    private int maxWindowSize = 16384;
    private int windowSize = 0;
    private int rtt = 1000;
    protected AtomicInteger session = new AtomicInteger();
    Map<SocketAddress, ProducerProcessor> producers = new ConcurrentHashMap();
    Map<SocketAddress, ConsumerProcessor> consumers = new ConcurrentHashMap();

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.Processor
    public void downStream(Packet packet) throws Exception {
        Packet mo0clone = packet.mo0clone();
        getProducer(mo0clone.getTo()).processOutbound(mo0clone);
        sendDownStream(mo0clone);
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.Processor
    public void upStream(Packet packet) throws Exception {
        ProducerProcessor producerProcessor;
        synchronized (this.producers) {
            producerProcessor = this.producers.get(packet.getFrom());
        }
        Packet packet2 = packet;
        if (producerProcessor != null) {
            packet2 = producerProcessor.processInbound(packet);
        }
        if (packet2 != null) {
            getConsumer(packet.getFrom()).processInBound(packet);
        }
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doInit() throws Exception {
        super.doInit();
        this.messageRequests = new LRUCache(this.maxConcurrentRequests);
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doStart() throws Exception {
        super.doStart();
        TimerTask timerTask = new TimerTask() { // from class: org.apache.activeblaze.impl.reliable.swp.SwpProcessor.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    for (Map.Entry<SocketAddress, ProducerProcessor> entry : SwpProcessor.this.producers.entrySet()) {
                        if (entry.getValue().control(currentTimeMillis)) {
                            SwpProcessor.LOG.info("Peer " + entry.getValue() + "Not valid - removing");
                            SwpProcessor.this.producers.remove(entry.getKey());
                            SwpProcessor.this.consumers.remove(entry.getKey());
                        }
                    }
                    for (Map.Entry<SocketAddress, ConsumerProcessor> entry2 : SwpProcessor.this.consumers.entrySet()) {
                        if (entry2.getValue().control(currentTimeMillis)) {
                            SwpProcessor.LOG.info("Peer " + entry2.getValue() + "Not valid - removing");
                            SwpProcessor.this.producers.remove(entry2.getKey());
                            SwpProcessor.this.consumers.remove(entry2.getKey());
                        }
                    }
                } catch (Exception e) {
                    SwpProcessor.LOG.error("Failed to send heartbeat", e);
                }
            }
        };
        int i = this.rtt / 4;
        this.statusTimer = new Timer(true);
        this.statusTimer.scheduleAtFixedRate(timerTask, i, i);
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doStop() throws Exception {
        if (this.statusTimer != null) {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            this.statusTimer.schedule(new TimerTask() { // from class: org.apache.activeblaze.impl.reliable.swp.SwpProcessor.2
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    SwpProcessor.this.statusTimer.cancel();
                    countDownLatch.countDown();
                }
            }, 0L);
            countDownLatch.await();
            this.statusTimer = null;
        }
        super.doStop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendDownStream(Packet packet) throws Exception {
        super.downStream(packet);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendUpStream(Packet packet) throws Exception {
        super.upStream(packet);
    }

    public Map<Packet, SendRequest> getMessageRequests() {
        return this.messageRequests;
    }

    public void setMessageRequests(Map<Packet, SendRequest> map) {
        this.messageRequests = map;
    }

    public int getMaxConcurrentRequests() {
        return this.maxConcurrentRequests;
    }

    public void setMaxConcurrentRequests(int i) {
        this.maxConcurrentRequests = i;
    }

    public int getMaxWindowSize() {
        return this.maxWindowSize;
    }

    public void setMaxWindowSize(int i) {
        this.maxWindowSize = i;
    }

    public int getWindowSize() {
        return this.windowSize;
    }

    public void setWindowSize(int i) {
        this.windowSize = i;
    }

    private ProducerProcessor getProducer(SocketAddress socketAddress) {
        ProducerProcessor producerProcessor;
        synchronized (this.producers) {
            producerProcessor = this.producers.get(socketAddress);
            if (producerProcessor == null) {
                producerProcessor = new ProducerProcessor(this, socketAddress);
                this.producers.put(socketAddress, producerProcessor);
            }
        }
        return producerProcessor;
    }

    private ConsumerProcessor getConsumer(SocketAddress socketAddress) {
        ConsumerProcessor consumerProcessor = this.consumers.get(socketAddress);
        if (consumerProcessor == null) {
            consumerProcessor = new ConsumerProcessor(this, socketAddress);
            this.consumers.put(socketAddress, consumerProcessor);
        }
        return consumerProcessor;
    }
}
