package org.apache.activeblaze;

import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activeblaze.impl.network.Network;
import org.apache.activeblaze.impl.network.NetworkFactory;
import org.apache.activeblaze.impl.processor.ChainedProcessor;
import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
import org.apache.activeblaze.impl.reliable.ReliableFactory;
import org.apache.activeblaze.util.IdGenerator;
import org.apache.activeblaze.util.PropertyUtil;
import org.apache.activeblaze.wire.Buffer;
import org.apache.activeblaze.wire.Packet;

/* loaded from: input_file:org/apache/activeblaze/BlazeChannelImpl.class */
public class BlazeChannelImpl extends DefaultChainedProcessor implements BlazeChannel, ExceptionListener {
    protected Processor broadcast;
    protected List<SubscriptionHolder> topicMessageListeners = new CopyOnWriteArrayList();
    protected final IdGenerator idGenerator = new IdGenerator();
    protected BlazeConfiguration configuration = new BlazeConfiguration();
    private String id = this.idGenerator.generateId();
    protected Buffer producerId = new Buffer(this.id);

    @Override // org.apache.activeblaze.BlazeChannel
    public String getId() {
        return this.id;
    }

    public String getName() {
        return getId();
    }

    @Override // org.apache.activeblaze.BlazeChannel
    public void addBlazeTopicMessageListener(String str, BlazeMessageListener blazeMessageListener) throws Exception {
        this.topicMessageListeners.add(new SubscriptionHolder(str, true, blazeMessageListener));
    }

    @Override // org.apache.activeblaze.BlazeChannel
    public void addBlazeTopicMessageListener(Subscription subscription, BlazeMessageListener blazeMessageListener) throws Exception {
        this.topicMessageListeners.add(new SubscriptionHolder(subscription, blazeMessageListener));
    }

    @Override // org.apache.activeblaze.BlazeChannel
    public void removeBlazeTopicMessageListener(String str, BlazeMessageListener blazeMessageListener) throws Exception {
        this.topicMessageListeners.remove(new SubscriptionHolder(str, true, blazeMessageListener));
    }

    @Override // org.apache.activeblaze.BlazeChannel
    public void removeBlazeTopicMessageListener(Subscription subscription, BlazeMessageListener blazeMessageListener) throws Exception {
        this.topicMessageListeners.remove(new SubscriptionHolder(subscription, blazeMessageListener));
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doInit() throws Exception {
        super.doInit();
        URI uri = new URI(PropertyUtil.addPropertiesToURIFromBean(getConfiguration().getBroadcastURI(), getConfiguration()));
        URI uri2 = null;
        String managementURI = getConfiguration().getManagementURI();
        if (managementURI != null && managementURI.length() > 0) {
            uri2 = new URI(getConfiguration().getManagementURI());
        }
        Network network = NetworkFactory.get(uri, uri2, getConfiguration().getReliableBroadcast());
        network.setName(getName());
        this.broadcast = configureProcess(network);
        this.broadcast.init();
    }

    protected final ChainedProcessor configureProcess(ChainedProcessor chainedProcessor) throws Exception {
        int maxPacketSize = getConfiguration().getMaxPacketSize();
        chainedProcessor.setPrev(this);
        chainedProcessor.setExceptionListener(this);
        chainedProcessor.setMaxPacketSize(maxPacketSize);
        return chainedProcessor;
    }

    protected ChainedProcessor getReliability(String str) throws Exception {
        return ReliableFactory.get(str);
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doShutDown() throws Exception {
        super.doShutDown();
        Processor processor = this.broadcast;
        if (processor != null) {
            processor.shutDown();
        }
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doStart() throws Exception {
        super.doStart();
        this.broadcast.start();
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.BaseService
    public void doStop() throws Exception {
        super.doStop();
        this.broadcast.stop();
    }

    @Override // org.apache.activeblaze.BlazeChannel
    public void broadcast(String str, BlazeMessage blazeMessage) throws Exception {
        broadcast(new Destination(str, true), blazeMessage);
    }

    @Override // org.apache.activeblaze.BlazeChannel
    public void broadcast(Destination destination, BlazeMessage blazeMessage) throws Exception {
        blazeMessage.setDestination(destination);
        preProcessMessage(destination, blazeMessage);
        this.broadcast.downStream(blazeMessage);
    }

    @Override // org.apache.activeblaze.impl.processor.DefaultChainedProcessor, org.apache.activeblaze.Processor
    public void upStream(Packet packet) throws Exception {
        processPacket(packet);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void preProcessMessage(Destination destination, BlazeMessage blazeMessage) {
        blazeMessage.setDestination(destination);
        blazeMessage.setProducerId(this.id);
        if (blazeMessage.getId() == null) {
            blazeMessage.setId(this.idGenerator.generateId());
        }
    }

    protected void processPacket(Packet packet) throws Exception {
        if (packet instanceof BlazeMessage) {
            dispatch((BlazeMessage) packet);
        }
    }

    @Override // org.apache.activeblaze.BlazeChannel
    public BlazeConfiguration getConfiguration() {
        return this.configuration;
    }

    public void setConfiguration(BlazeConfiguration blazeConfiguration) {
        this.configuration = blazeConfiguration;
    }

    @Override // org.apache.activeblaze.ExceptionListener
    public void onException(Exception exc) {
        doFireException(exc);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void dispatch(BlazeMessage blazeMessage) throws Exception {
        if (blazeMessage == null || blazeMessage.getDestination() == null) {
            return;
        }
        Buffer name = blazeMessage.getDestination().getName();
        for (SubscriptionHolder subscriptionHolder : this.topicMessageListeners) {
            if (subscriptionHolder.getSubscription().matches(name)) {
                subscriptionHolder.getListener().onMessage(blazeMessage);
            }
        }
    }
}
