package org.apache.activeblaze.impl.transport;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.net.SocketAddress;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.activeblaze.wire.Buffer;
import org.apache.activeblaze.wire.BufferInputStream;
import org.apache.activeblaze.wire.BufferOutputStream;
import org.apache.activeblaze.wire.Packet;
import org.apache.activeblaze.wire.PacketAudit;
import org.apache.activeblaze.wire.PacketType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activeblaze/impl/transport/BaseTransport.class */
public abstract class BaseTransport extends ThreadChainedProcessor {
    private static final Log LOG = LogFactory.getLog(BaseTransport.class);
    protected static final short MAGIC = 4011;
    static final int DEFAULT_BUFFER_SIZE = 524288;
    private URI localURI;
    private Buffer bufferOfLocalURI;
    private LinkedList<Packet> initializingList;
    private Disruptor<EventPacket> disruptor;
    private RingBuffer<EventPacket> ringBuffer;
    private ExecutorService executorService;
    private int bufferSize = DEFAULT_BUFFER_SIZE;
    private int soTimeout = 2000;
    private int timeToLive = 1;
    private boolean loopBack = false;
    protected final PacketAudit audit = new PacketAudit();
    private boolean broadcast = true;
    private boolean enableAudit = false;
    private int maxDispatchQueueSize = 1024;
    private BufferOutputStream bufferOut = new BufferOutputStream(1024);
    private byte[] intBuffer = new byte[4];

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activeblaze/impl/transport/BaseTransport$EventPacket.class */
    public static class EventPacket {
        private Packet packet;

        private EventPacket() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Packet getPacket() {
            return this.packet;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setPacket(Packet packet) {
            this.packet = packet;
        }
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doInit() throws Exception {
        super.doInit();
        this.audit.init();
        if (this.localURI != null) {
            this.bufferOfLocalURI = new Buffer(this.localURI.toString());
        }
        this.initializingList = new LinkedList<>();
        int maxDispatchQueueSize = getMaxDispatchQueueSize();
        if ((maxDispatchQueueSize & (maxDispatchQueueSize - 1)) != 0) {
            int i = ((maxDispatchQueueSize - 1) | (this.maxDispatchQueueSize >> 1) | (this.maxDispatchQueueSize >> 2) | (this.maxDispatchQueueSize >> 4) | (this.maxDispatchQueueSize >> 16)) + 1;
            LOG.warn("maxDispatchQueueSize has to be a power of 2 - setting from " + getMaxDispatchQueueSize() + " to " + i);
            setMaxDispatchQueueSize(i);
        }
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doShutDown() throws Exception {
        this.initializingList.clear();
        super.doShutDown();
        this.audit.shutDown();
    }

    @Override // org.apache.activeblaze.impl.transport.ThreadChainedProcessor, org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doStart() throws Exception {
        super.doStart();
        this.audit.start();
        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: org.apache.activeblaze.impl.transport.BaseTransport.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, BaseTransport.this.getLocalURI() + "-DisruptorThread");
                thread.setDaemon(true);
                return thread;
            }
        });
        this.disruptor = new Disruptor<>(new EventFactory<EventPacket>() { // from class: org.apache.activeblaze.impl.transport.BaseTransport.2
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public EventPacket m15newInstance() {
                return new EventPacket();
            }
        }, getMaxDispatchQueueSize(), this.executorService);
        this.disruptor.handleEventsWith(new EventHandler[]{new EventHandler<EventPacket>() { // from class: org.apache.activeblaze.impl.transport.BaseTransport.3
            public void onEvent(EventPacket eventPacket, long j, boolean z) throws Exception {
                Packet packet = eventPacket.getPacket();
                if (packet != null) {
                    try {
                        BaseTransport.this.passUpStream(packet);
                    } catch (InterruptedException e) {
                    } catch (Exception e2) {
                        String str = "";
                        try {
                            str = packet.toString();
                        } catch (Throwable th) {
                        }
                        BaseTransport.LOG.error("Caught an exception processing a packet: " + str, e2);
                        BaseTransport.this.stopInternal();
                    }
                }
            }
        }});
        this.ringBuffer = this.disruptor.start();
        Iterator<Packet> it = this.initializingList.iterator();
        while (it.hasNext()) {
            addToDisruptor(it.next());
        }
        this.initializingList.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void passUpStream(Packet packet) throws Exception {
        super.upStream(packet);
    }

    @Override // org.apache.activeblaze.impl.transport.ThreadChainedProcessor, org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doStop() throws Exception {
        super.doStop();
        this.audit.stop();
        if (this.disruptor != null) {
            this.disruptor.shutdown();
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    public URI getLocalURI() {
        return this.localURI;
    }

    public void setLocalURI(URI uri) {
        this.localURI = uri;
        if (this.localURI != null) {
            this.bufferOfLocalURI = new Buffer(this.localURI.toString());
        }
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    public void setBufferSize(int i) {
        this.bufferSize = i;
    }

    public int getSoTimeout() {
        return this.soTimeout;
    }

    public void setSoTimeout(int i) {
        this.soTimeout = i;
    }

    public int getTimeToLive() {
        return this.timeToLive;
    }

    public void setTimeToLive(int i) {
        this.timeToLive = i;
    }

    public boolean isLoopBack() {
        return this.loopBack;
    }

    public void setLoopBack(boolean z) {
        this.loopBack = z;
    }

    protected PacketAudit getAudit() {
        return this.audit;
    }

    public boolean isBroadcast() {
        return this.broadcast;
    }

    public void setBroadcast(boolean z) {
        this.broadcast = z;
    }

    public boolean isEnableAudit() {
        return this.enableAudit;
    }

    public void setEnableAudit(boolean z) {
        this.enableAudit = z;
    }

    public Buffer getBufferOfLocalURI() {
        return this.bufferOfLocalURI;
    }

    public final String toString() {
        return ("" + System.identityHashCode(this) + ": ") + (this.localURI != null ? this.localURI.toString() : " Uninitialized Transport");
    }

    public int getMaxDispatchQueueSize() {
        return this.maxDispatchQueueSize;
    }

    public void setMaxDispatchQueueSize(int i) {
        this.maxDispatchQueueSize = i;
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.Processor
    public void upStream(Packet packet) throws Exception {
        if (isStopped()) {
            return;
        }
        if (this.enableAudit && this.audit.isDuplicate(packet)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(toString() + " Ignoring duplicate packet: " + packet);
            }
        } else if (this.ringBuffer == null) {
            this.initializingList.add(packet);
        } else {
            addToDisruptor(packet);
        }
    }

    private void addToDisruptor(Packet packet) {
        long next = this.ringBuffer.next();
        ((EventPacket) this.ringBuffer.get(next)).setPacket(packet);
        this.ringBuffer.publish(next);
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.Processor
    public final synchronized void downStream(Packet packet) throws Exception {
        if (isInitialized()) {
            if (isEnableAudit()) {
                this.audit.isDuplicate(packet);
            }
            this.bufferOut.reset();
            this.bufferOut.writeShort(MAGIC);
            this.bufferOut.skip(4);
            this.bufferOut.writeByte(packet.getPacketType());
            packet.write(this.bufferOut);
            int length = this.bufferOut.length() - 6;
            this.intBuffer[0] = (byte) (length >>> 24);
            this.intBuffer[1] = (byte) (length >>> 16);
            this.intBuffer[2] = (byte) (length >>> 8);
            this.intBuffer[3] = (byte) (length >>> 0);
            System.arraycopy(this.intBuffer, 0, this.bufferOut.getBuffer(), this.bufferOut.getOffset() + 2, this.intBuffer.length);
            sendData(packet.getTo(), this.bufferOut.getBuffer(), this.bufferOut.getOffset(), this.bufferOut.length());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void processData(SocketAddress socketAddress, Buffer buffer) throws Exception {
        upStream(buildPacket(socketAddress, buffer));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Packet buildPacket(SocketAddress socketAddress, Buffer buffer) throws Exception {
        Packet packet = null;
        BufferInputStream bufferInputStream = new BufferInputStream(buffer);
        short readShort = bufferInputStream.readShort();
        if (readShort == MAGIC) {
            bufferInputStream.readInt();
            Packet createPacket = PacketType.valueOf(bufferInputStream.readByte()).createPacket();
            createPacket.read(bufferInputStream);
            createPacket.setFrom(socketAddress);
            packet = createPacket;
        } else {
            LOG.warn("Bad Packet magic " + ((int) readShort));
        }
        return packet;
    }

    protected abstract void sendData(SocketAddress socketAddress, byte[] bArr, int i, int i2) throws Exception;
}
