package org.codehaus.wadi.location.balancing;

import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.wadi.group.MessageExchangeException;
import org.codehaus.wadi.group.Peer;
import org.codehaus.wadi.servicespace.LifecycleState;
import org.codehaus.wadi.servicespace.ServiceSpace;
import org.codehaus.wadi.servicespace.ServiceSpaceLifecycleEvent;
import org.codehaus.wadi.servicespace.ServiceSpaceListener;

/* loaded from: input_file:org/codehaus/wadi/location/balancing/BasicPartitionBalancerSingletonService.class */
public class BasicPartitionBalancerSingletonService implements PartitionBalancerSingletonService {
    private static final Log log = LogFactory.getLog(BasicPartitionBalancerSingletonService.class);
    private final ServiceSpace serviceSpace;
    private final PartitionBalancer partitionBalancer;
    private final SynchronousQueue<Boolean> rebalancingFlag;
    private Thread thread;
    private LeavingServiceSpaceMonitor leavingServiceSpaceMonitor;

    /* loaded from: input_file:org/codehaus/wadi/location/balancing/BasicPartitionBalancerSingletonService$LeavingServiceSpaceMonitor.class */
    private class LeavingServiceSpaceMonitor implements ServiceSpaceListener {
        private LeavingServiceSpaceMonitor() {
        }

        @Override // org.codehaus.wadi.servicespace.ServiceSpaceListener
        public void receive(ServiceSpaceLifecycleEvent serviceSpaceLifecycleEvent, Set<Peer> set) {
            if (serviceSpaceLifecycleEvent.getState() == LifecycleState.FAILED) {
                BasicPartitionBalancerSingletonService.this.queueRebalancing();
            }
        }
    }

    public BasicPartitionBalancerSingletonService(ServiceSpace serviceSpace, PartitionBalancer partitionBalancer) {
        if (null == serviceSpace) {
            throw new IllegalArgumentException("serviceSpace is required");
        }
        if (null == partitionBalancer) {
            throw new IllegalArgumentException("partitionBalancer is required");
        }
        this.serviceSpace = serviceSpace;
        this.partitionBalancer = partitionBalancer;
        this.rebalancingFlag = new SynchronousQueue<>();
        this.leavingServiceSpaceMonitor = new LeavingServiceSpaceMonitor();
    }

    @Override // org.codehaus.wadi.core.Lifecycle
    public void start() throws Exception {
        this.partitionBalancer.start();
        this.thread = new Thread(this, "WADI Partition Balancer");
        this.thread.start();
        this.serviceSpace.addServiceSpaceListener(this.leavingServiceSpaceMonitor);
    }

    @Override // org.codehaus.wadi.core.Lifecycle
    public void stop() throws Exception {
        this.serviceSpace.removeServiceSpaceListener(this.leavingServiceSpaceMonitor);
        this.rebalancingFlag.put(Boolean.FALSE);
        this.thread.join();
        this.partitionBalancer.stop();
    }

    @Override // org.codehaus.wadi.location.balancing.PartitionBalancerSingletonService
    public void queueRebalancing() {
        log.info("Queueing partition rebalancing");
        try {
            this.rebalancingFlag.put(Boolean.TRUE);
        } catch (InterruptedException e) {
            throw ((IllegalStateException) new IllegalStateException().initCause(e));
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.rebalancingFlag.take() == Boolean.TRUE) {
            try {
                try {
                    this.partitionBalancer.balancePartitions();
                } catch (MessageExchangeException e) {
                    log.warn("Rebalancing has failed", e);
                    scheduleRebalancing();
                }
            } catch (InterruptedException e2) {
                log.error("Coordinator thread interrupted", e2);
                return;
            }
        }
    }

    @Override // org.codehaus.wadi.servicespace.SingletonService
    public void onBecomeSingletonDueToMembershipUpdate() {
        queueRebalancing();
    }

    public String toString() {
        return "PartitionManager for ServiceSpace [" + this.serviceSpace.getServiceSpaceName() + "]";
    }

    protected void scheduleRebalancing() {
        log.warn("Will retry rebalancing in [500] ms");
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        queueRebalancing();
    }
}
