package org.apache.activeblaze.impl.transport;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Map;
import org.apache.activeblaze.BlazeException;
import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.SendRequest;
import org.apache.activeblaze.wire.Ack;
import org.apache.activeblaze.wire.Buffer;
import org.apache.activeblaze.wire.Packet;

/* loaded from: input_file:org/apache/activeblaze/impl/transport/UdpTransport.class */
public class UdpTransport extends BaseTransport {
    DatagramChannel datagramChannel;
    ByteBuffer receiveBuffer;
    private Map<Packet, SendRequest> messageRequests = new LRUCache(5000);

    @Override // org.apache.activeblaze.impl.transport.BaseTransport, org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doInit() throws Exception {
        super.doInit();
        if (getLocalURI() == null) {
            throw new BlazeException("localURI not set");
        }
        InetSocketAddress inetSocketAddress = new InetSocketAddress(getLocalURI().getHost(), getLocalURI().getPort());
        try {
            this.receiveBuffer = ByteBuffer.allocateDirect(getBufferSize());
            this.datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
            if (this.datagramChannel.isOpen()) {
                this.datagramChannel.setOption((SocketOption<SocketOption>) StandardSocketOptions.SO_RCVBUF, (SocketOption) Integer.valueOf(getBufferSize()));
                this.datagramChannel.setOption((SocketOption<SocketOption>) StandardSocketOptions.SO_SNDBUF, (SocketOption) Integer.valueOf(getBufferSize()));
                this.datagramChannel.setOption((SocketOption<SocketOption>) StandardSocketOptions.SO_REUSEADDR, (SocketOption) true);
                this.datagramChannel.bind((SocketAddress) inetSocketAddress);
                URI localURI = getLocalURI();
                setLocalURI(new URI(localURI.getScheme(), localURI.getUserInfo(), localURI.getHost(), ((InetSocketAddress) this.datagramChannel.getLocalAddress()).getPort(), localURI.getPath(), localURI.getQuery(), localURI.getFragment()));
            }
        } catch (Exception e) {
            throw new BlazeException("Could not open Datagram channel for " + getLocalURI(), e);
        }
    }

    @Override // org.apache.activeblaze.impl.transport.BaseTransport, org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doShutDown() throws Exception {
        super.doShutDown();
        if (this.datagramChannel != null) {
            this.datagramChannel.close();
        }
    }

    @Override // org.apache.activeblaze.impl.transport.ThreadChainedProcessor
    protected void doProcess() throws Exception {
        SendRequest remove;
        SocketAddress receive = this.datagramChannel.receive(this.receiveBuffer);
        this.receiveBuffer.flip();
        if (this.receiveBuffer.limit() > 0) {
            Packet buildPacket = buildPacket(receive, new Buffer(this.receiveBuffer));
            if (buildPacket.isResponse() && (remove = this.messageRequests.remove(buildPacket.getCorrelationId())) != null) {
                remove.put(buildPacket.getCorrelationId(), buildPacket);
            }
            if (buildPacket.isResponseRequired()) {
                Packet createAckPacket = createAckPacket(buildPacket);
                createAckPacket.setTo(receive);
                downStream(createAckPacket);
            }
            upStream(buildPacket);
        }
        this.receiveBuffer.clear();
    }

    @Override // org.apache.activeblaze.impl.transport.BaseTransport
    public void sendData(SocketAddress socketAddress, byte[] bArr, int i, int i2) throws Exception {
        if (isInitialized() && this.datagramChannel.isOpen()) {
            this.datagramChannel.send(ByteBuffer.wrap(bArr, i, i2), socketAddress);
        }
    }

    private Packet createAckPacket(Packet packet) {
        Ack ack = new Ack();
        ack.setStartSequence(packet.getMessageSequence());
        ack.setEndSequence(packet.getMessageSequence());
        ack.setCorrelationId(packet.getId());
        return ack;
    }
}
