package org.apache.activeblaze.group;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activeblaze.BlazeChannelImpl;
import org.apache.activeblaze.BlazeMessage;
import org.apache.activeblaze.BlazeMessageListener;
import org.apache.activeblaze.BlazeNoRouteException;
import org.apache.activeblaze.BlazeRuntimeException;
import org.apache.activeblaze.Destination;
import org.apache.activeblaze.Processor;
import org.apache.activeblaze.Subscription;
import org.apache.activeblaze.SubscriptionHolder;
import org.apache.activeblaze.impl.processor.ChainedProcessor;
import org.apache.activeblaze.impl.reliable.ReliableFactory;
import org.apache.activeblaze.impl.transport.BaseTransport;
import org.apache.activeblaze.impl.transport.TransportFactory;
import org.apache.activeblaze.util.AsyncGroupRequest;
import org.apache.activeblaze.util.LRUCache;
import org.apache.activeblaze.util.PropertyUtil;
import org.apache.activeblaze.util.SendRequest;
import org.apache.activeblaze.wire.Buffer;
import org.apache.activeblaze.wire.MemberImpl;
import org.apache.activeblaze.wire.Packet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activeblaze/group/BlazeGroupChannelImpl.class */
public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
    private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
    private String name;
    protected Processor unicast;
    private MemberImpl local;
    private BlazeMessageListener inboxListener;
    private Group group;
    protected Buffer inboxAddress;
    protected int inBoxPort;
    protected Map<String, SendRequest> messageRequests = new LRUCache(10000);
    private final List<SubscriptionHolder> queueMessageListeners = new CopyOnWriteArrayList();
    protected final Object localMutex = new Object();

    /* JADX INFO: Access modifiers changed from: protected */
    public BlazeGroupChannelImpl(String str) {
        this.name = str;
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl, org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doInit() throws Exception {
        super.doInit();
        BaseTransport baseTransport = TransportFactory.get(new URI(PropertyUtil.addPropertiesToURIFromBean(getConfiguration().getUnicastURI(), getConfiguration())));
        baseTransport.setName(getId() + "-Unicast");
        this.unicast = configureProcess(baseTransport, getConfiguration().getReliableUnicast());
        this.unicast.init();
        URI localURI = baseTransport.getLocalURI();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(localURI.getHost(), localURI.getPort());
        this.inboxAddress = new Buffer(inetSocketAddress.getAddress().getAddress());
        this.inBoxPort = inetSocketAddress.getPort();
        this.local = createLocal(localURI);
        this.group = createGroup();
    }

    protected final ChainedProcessor configureProcess(ChainedProcessor chainedProcessor, String str) throws Exception {
        int maxPacketSize = getConfiguration().getMaxPacketSize();
        ChainedProcessor reliability = getReliability(str);
        reliability.setPrev(this);
        reliability.setExceptionListener(this);
        reliability.setMaxPacketSize(maxPacketSize);
        reliability.setEnd(chainedProcessor);
        return reliability;
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl
    protected ChainedProcessor getReliability(String str) throws Exception {
        return ReliableFactory.get(str);
    }

    protected MemberImpl createLocal(URI uri) throws Exception {
        return new MemberImpl(getId(), getName(), 0L, 0L, uri);
    }

    protected Group createGroup() {
        return new Group(this);
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl, org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doShutDown() throws Exception {
        super.doShutDown();
        if (this.group != null) {
            this.group.shutDown();
        }
        if (this.unicast != null) {
            this.unicast.shutDown();
        }
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl, org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doStart() throws Exception {
        super.doStart();
        this.unicast.start();
        this.group.start();
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl, org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doStop() throws Exception {
        super.doStop();
        this.group.stop();
        this.unicast.stop();
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl, org.apache.activeblaze.group.BlazeGroupChannel
    public String getName() {
        String str;
        synchronized (this.localMutex) {
            str = this.name;
        }
        return str;
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void setName(String str) {
        synchronized (this.localMutex) {
            this.name = str;
            if (this.local != null) {
                this.local.setName(str);
            }
        }
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public BlazeMessageListener getInboxListener() {
        return this.inboxListener;
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void setInboxListener(BlazeMessageListener blazeMessageListener) {
        this.inboxListener = blazeMessageListener;
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl, org.apache.activeblaze.BlazeChannel
    public BlazeGroupConfiguration getConfiguration() {
        return (BlazeGroupConfiguration) this.configuration;
    }

    public MemberImpl getLocalMember() {
        MemberImpl memberImpl;
        synchronized (this.localMutex) {
            memberImpl = this.local;
        }
        return memberImpl;
    }

    protected void setLocalMember(MemberImpl memberImpl) {
        synchronized (this.localMutex) {
            this.local = memberImpl;
        }
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void addMemberChangedListener(MemberChangedListener memberChangedListener) throws Exception {
        init();
        this.group.addMemberChangedListener(memberChangedListener);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void removeMemberChangedListener(MemberChangedListener memberChangedListener) throws Exception {
        init();
        this.group.removeMemberChangedListener(memberChangedListener);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public Member getMemberById(String str) throws Exception {
        init();
        return this.group.getMemberById(str);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public Member getMemberByName(String str) throws Exception {
        init();
        return this.group.getMemberByName(str);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public Member getAndWaitForMemberByName(String str, int i) throws Exception {
        init();
        return this.group.getAndWaitForMemberByName(str, i);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public Set<Member> getMembers() throws Exception {
        init();
        return this.group.getMembers();
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void send(String str, BlazeMessage blazeMessage) throws Exception {
        send(new Destination(new Buffer(str), false), blazeMessage);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void send(Destination destination, BlazeMessage blazeMessage) throws Exception {
        while (true) {
            MemberImpl queueDestination = getQueueDestination(destination.getName());
            if (queueDestination == null) {
                return;
            }
            try {
                send(queueDestination, destination.getName(), blazeMessage);
                return;
            } catch (BlazeNoRouteException e) {
                LOG.debug("No response - resending to another client", e);
            }
        }
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void send(Member member, BlazeMessage blazeMessage) throws Exception {
        send((MemberImpl) member, member.getInBoxDestination(), blazeMessage);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public BlazeMessage sendRequest(Member member, BlazeMessage blazeMessage) throws Exception {
        return sendRequest(member, blazeMessage, 0);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public BlazeMessage sendRequest(Member member, BlazeMessage blazeMessage, int i) throws Exception {
        return sendRequest((MemberImpl) member, member.getInBoxDestination(), blazeMessage, i);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public BlazeMessage sendRequest(String str, BlazeMessage blazeMessage) throws Exception {
        return sendRequest(new Destination(str, false), blazeMessage, 0);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public BlazeMessage sendRequest(Destination destination, BlazeMessage blazeMessage) throws Exception {
        return sendRequest(destination, blazeMessage, 0);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public BlazeMessage sendRequest(String str, BlazeMessage blazeMessage, int i) throws Exception {
        return sendRequest(new Destination(str, false), blazeMessage, i);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public BlazeMessage sendRequest(Destination destination, BlazeMessage blazeMessage, int i) throws Exception {
        Buffer name = destination.getName();
        long j = 0;
        long j2 = i;
        if (i > 0) {
            j = System.currentTimeMillis() + i;
        }
        while (!isStopped()) {
            if (i != 0 && j2 <= 0) {
                return null;
            }
            MemberImpl queueDestination = getQueueDestination(name);
            if (queueDestination != null) {
                try {
                    BlazeMessage sendRequest = sendRequest(queueDestination, destination, blazeMessage, (int) j2);
                    if (sendRequest != null) {
                        if (i > 0) {
                        }
                        return sendRequest;
                    }
                    if (i > 0) {
                        j2 = (int) Math.max(j - System.currentTimeMillis(), 0L);
                    }
                } catch (BlazeNoRouteException e) {
                    if (i > 0) {
                        j2 = (int) Math.max(j - System.currentTimeMillis(), 0L);
                    }
                } catch (Throwable th) {
                    if (i > 0) {
                    }
                    throw th;
                }
            } else {
                this.group.waitForNewMember((int) j2);
            }
        }
        return null;
    }

    public BlazeMessage sendRequest(MemberImpl memberImpl, String str, BlazeMessage blazeMessage, int i) throws Exception {
        return sendRequest(memberImpl, new Destination(str, false), blazeMessage, i);
    }

    public BlazeMessage sendRequest(MemberImpl memberImpl, Destination destination, BlazeMessage blazeMessage, int i) throws Exception {
        BlazeMessage blazeMessage2 = null;
        if (memberImpl != null) {
            SendRequest sendRequest = new SendRequest();
            preProcessMessage(memberImpl.getAddress(), destination, blazeMessage);
            synchronized (this.messageRequests) {
                this.messageRequests.put(blazeMessage.getId(), sendRequest);
            }
            this.unicast.downStream(blazeMessage);
            blazeMessage2 = (BlazeMessage) sendRequest.get(i);
        }
        return blazeMessage2;
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void sendReply(Member member, BlazeMessage blazeMessage, String str) throws Exception {
        preProcessMessage(((MemberImpl) member).getAddress(), new Destination(member.getInBoxDestination(), false), blazeMessage);
        blazeMessage.setCorrelationId(str);
        this.unicast.downStream(blazeMessage);
    }

    protected void send(MemberImpl memberImpl, String str, BlazeMessage blazeMessage) throws Exception {
        send(memberImpl, new Buffer(str), blazeMessage);
    }

    protected void send(MemberImpl memberImpl, Buffer buffer, BlazeMessage blazeMessage) throws Exception {
        preProcessMessage(memberImpl.getAddress(), new Destination(buffer, false), blazeMessage);
        this.unicast.downStream(blazeMessage);
    }

    protected void preProcessMessage(InetSocketAddress inetSocketAddress, Destination destination, BlazeMessage blazeMessage) {
        preProcessMessage(destination, blazeMessage);
        blazeMessage.setTo(inetSocketAddress);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void addBlazeQueueMessageListener(String str, BlazeMessageListener blazeMessageListener) throws Exception {
        init();
        this.queueMessageListeners.add(new SubscriptionHolder(str, false, blazeMessageListener));
        buildLocal();
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void addBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener blazeMessageListener) throws Exception {
        init();
        this.queueMessageListeners.add(new SubscriptionHolder(subscription, blazeMessageListener));
        buildLocal();
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void removeBlazeQueueMessageListener(String str, BlazeMessageListener blazeMessageListener) throws Exception {
        init();
        this.queueMessageListeners.remove(new SubscriptionHolder(str, false, blazeMessageListener));
        buildLocal();
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void removeBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener blazeMessageListener) throws Exception {
        init();
        this.queueMessageListeners.remove(new SubscriptionHolder(subscription, blazeMessageListener));
        buildLocal();
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl, org.apache.activeblaze.BlazeChannel
    public void addBlazeTopicMessageListener(String str, BlazeMessageListener blazeMessageListener) throws Exception {
        init();
        super.addBlazeTopicMessageListener(str, blazeMessageListener);
        buildLocal();
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl, org.apache.activeblaze.BlazeChannel
    public void removeBlazeTopicMessageListener(String str, BlazeMessageListener blazeMessageListener) throws Exception {
        init();
        super.removeBlazeTopicMessageListener(str, blazeMessageListener);
        buildLocal();
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void addToGroup(String str) throws Exception {
        init();
        this.local.addToGroup(str);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public void removeFromGroup(String str) throws Exception {
        init();
        this.local.removeFromGroup(str);
    }

    @Override // org.apache.activeblaze.group.BlazeGroupChannel
    public List<String> getGroups() throws Exception {
        init();
        return this.local.getGroups();
    }

    @Override // org.apache.activeblaze.BlazeChannelImpl
    protected void processPacket(Packet packet) throws Exception {
        if (!isStarted() || processRequest(packet)) {
            return;
        }
        if (packet instanceof BlazeMessage) {
            doProcessBlazeMessage((BlazeMessage) packet);
        } else if (packet instanceof Member) {
            doProcessMember((MemberImpl) packet);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean processRequest(Packet packet) {
        SendRequest remove;
        boolean z = false;
        String correlationId = packet.getCorrelationId();
        if (correlationId != null && correlationId.length() > 0) {
            synchronized (this.messageRequests) {
                remove = this.messageRequests.remove(correlationId);
            }
            if (remove != null) {
                remove.put(correlationId, packet);
                z = true;
            }
        }
        return z;
    }

    protected void doProcessBlazeMessage(BlazeMessage blazeMessage) throws Exception {
        Destination destination = blazeMessage.getDestination();
        if (destination != null) {
            if (destination.isTopic()) {
                dispatch(blazeMessage);
                return;
            }
            if (this.inboxListener != null && this.producerId.equals(destination.getName())) {
                this.inboxListener.onMessage(blazeMessage);
                return;
            }
            int i = 0;
            for (SubscriptionHolder subscriptionHolder : this.queueMessageListeners) {
                if (subscriptionHolder.getSubscription().matches(destination.getName())) {
                    subscriptionHolder.getListener().onMessage(blazeMessage);
                    this.queueMessageListeners.remove(i);
                    this.queueMessageListeners.add(subscriptionHolder);
                }
                i++;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Group getGroup() {
        return this.group;
    }

    protected BlazeMessage createMessage(String str) {
        return new BlazeGroupMessage(this.group.getMemberById(str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void doProcessMember(MemberImpl memberImpl) throws Exception {
        this.group.processMember(memberImpl);
    }

    public void broadcastManagementMessage(Packet packet) throws Exception {
        this.broadcast.downStreamManagement(packet);
    }

    public void sendGroupRequest(AsyncGroupRequest asyncGroupRequest, MemberImpl memberImpl, Packet packet) throws Exception {
        SendRequest sendRequest = new SendRequest();
        asyncGroupRequest.add(packet.getId(), sendRequest);
        synchronized (this.messageRequests) {
            this.messageRequests.put(packet.getId(), sendRequest);
        }
        packet.setReliable(true);
        packet.setTo(memberImpl.getAddress());
        this.unicast.downStream(packet);
    }

    public void broadcastManagementMessage(Packet packet, String str) throws Exception {
        packet.setCorrelationId(str);
        packet.setReliable(true);
        this.broadcast.downStreamManagement(packet);
    }

    public void sendManagementMessage(InetSocketAddress inetSocketAddress, Packet packet) throws Exception {
        packet.setReliable(false);
        packet.setTo(inetSocketAddress);
        this.unicast.downStream(packet);
    }

    public void sendReply(MemberImpl memberImpl, Packet packet, String str) throws Exception {
        packet.setReliable(false);
        packet.setTo(memberImpl.getAddress());
        packet.setCorrelationId(str);
        this.unicast.downStream(packet);
    }

    protected MemberImpl getQueueDestination(Buffer buffer) {
        MemberImpl memberImpl = null;
        Map<Subscription, List<MemberImpl>> queueMap = this.group.getQueueMap();
        List<MemberImpl> list = queueMap.get(new Subscription(buffer, false));
        if (list == null) {
            Iterator<Subscription> it = queueMap.keySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Subscription next = it.next();
                if (next.matches(buffer)) {
                    list = queueMap.get(next);
                    break;
                }
            }
        }
        if (list != null && !list.isEmpty()) {
            memberImpl = list.remove(0);
            list.add(memberImpl);
        }
        return memberImpl;
    }

    protected void buildLocal() {
        if (!isInitialized()) {
            throw new BlazeRuntimeException("Not Initialized");
        }
        try {
            synchronized (this.localMutex) {
                MemberImpl mo0clone = getLocalMember().mo0clone();
                mo0clone.clearSubscriptions();
                Iterator<SubscriptionHolder> it = this.topicMessageListeners.iterator();
                while (it.hasNext()) {
                    mo0clone.addSubscription(it.next().getSubscription());
                }
                Iterator<SubscriptionHolder> it2 = this.queueMessageListeners.iterator();
                while (it2.hasNext()) {
                    mo0clone.addSubscription(it2.next().getSubscription());
                }
                this.group.processMemberUpdate(this.local, mo0clone);
                this.group.broadcastHeartBeat(mo0clone);
                this.local = mo0clone;
                this.group.updateLocal(this.local);
            }
        } catch (Exception e) {
            LOG.error("Failed to update local member ", e);
        }
    }
}
