package org.jumpmind.symmetric.service.impl;

import java.io.OutputStream;
import java.io.Writer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.SqlConstants;
import org.jumpmind.exception.InterruptedException;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.db.SequenceIdentifier;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.io.data.transform.TransformTable;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
import org.jumpmind.symmetric.io.data.writer.ProtocolDataWriter;
import org.jumpmind.symmetric.io.data.writer.StagingDataWriter;
import org.jumpmind.symmetric.io.data.writer.StructureDataWriter;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeCommunication;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchWithPayload;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoDataWriter;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.SimpleRouterContext;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.service.ITransformService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.service.impl.TransformService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.util.Statistics;

/* loaded from: input_file:org/jumpmind/symmetric/service/impl/DataExtractorService.class */
public class DataExtractorService extends AbstractService implements IDataExtractorService, INodeCommunicationService.INodeCommunicationExecutor {
    static final long MS_PASSED_BEFORE_BATCH_REQUERIED = 5000;
    private IOutgoingBatchService outgoingBatchService;
    private IRouterService routerService;
    private IConfigurationService configurationService;
    private ITriggerRouterService triggerRouterService;
    private ITransformService transformService;
    private IDataService dataService;
    private INodeService nodeService;
    private IStatisticManager statisticManager;
    private IStagingManager stagingManager;
    private INodeCommunicationService nodeCommunicationService;
    private IClusterService clusterService;
    private Map<String, Semaphore> locks;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/jumpmind/symmetric/service/impl/DataExtractorService$ExtractMode.class */
    public enum ExtractMode {
        FOR_SYM_CLIENT,
        FOR_PAYLOAD_CLIENT,
        EXTRACT_ONLY
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jumpmind/symmetric/service/impl/DataExtractorService$ExtractRequestMapper.class */
    public class ExtractRequestMapper implements ISqlRowMapper<ExtractRequest> {
        ExtractRequestMapper() {
        }

        /* renamed from: mapRow, reason: merged with bridge method [inline-methods] */
        public ExtractRequest m53mapRow(Row row) {
            ExtractRequest extractRequest = new ExtractRequest();
            extractRequest.setNodeId(row.getString("node_id"));
            extractRequest.setRequestId(row.getLong("request_id"));
            extractRequest.setStartBatchId(row.getLong("start_batch_id"));
            extractRequest.setEndBatchId(row.getLong("end_batch_id"));
            extractRequest.setStatus(ExtractRequest.ExtractStatus.valueOf(row.getString("status").toUpperCase()));
            extractRequest.setCreateTime(row.getDateTime("create_time"));
            extractRequest.setLastUpdateTime(row.getDateTime("last_update_time"));
            extractRequest.setTriggerRouter(DataExtractorService.this.triggerRouterService.findTriggerRouterById(row.getString("trigger_id"), row.getString("router_id")));
            return extractRequest;
        }
    }

    /* loaded from: input_file:org/jumpmind/symmetric/service/impl/DataExtractorService$MultiBatchStagingWriter.class */
    public class MultiBatchStagingWriter implements IDataWriter {
        long maxBatchSize;
        StagingDataWriter currentDataWriter;
        List<OutgoingBatch> batches;
        List<OutgoingBatch> finishedBatches;
        IStagingManager stagingManager;
        String sourceNodeId;
        DataContext context;
        Table table;
        OutgoingBatch outgoingBatch;
        Batch batch;
        boolean inError = false;

        public MultiBatchStagingWriter(String str, IStagingManager iStagingManager, List<OutgoingBatch> list, long j) {
            this.sourceNodeId = str;
            this.maxBatchSize = j;
            this.stagingManager = iStagingManager;
            this.batches = new ArrayList(list);
            this.finishedBatches = new ArrayList(list.size());
        }

        public void open(DataContext dataContext) {
            this.context = dataContext;
            nextBatch();
            this.currentDataWriter = new StagingDataWriter(DataExtractorService.this.parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD), this.sourceNodeId, Constants.STAGING_CATEGORY_OUTGOING, this.stagingManager, (IProtocolDataWriterListener[]) null);
            this.currentDataWriter.open(dataContext);
        }

        public void close() {
            if (this.currentDataWriter != null) {
                this.currentDataWriter.close();
            }
        }

        public Map<Batch, Statistics> getStatistics() {
            return this.currentDataWriter.getStatistics();
        }

        public void start(Batch batch) {
            this.batch = batch;
            this.currentDataWriter.start(batch);
        }

        public boolean start(Table table) {
            this.table = table;
            this.currentDataWriter.start(table);
            return true;
        }

        public void write(CsvData csvData) {
            this.outgoingBatch.incrementDataEventCount();
            this.outgoingBatch.incrementInsertEventCount();
            this.currentDataWriter.write(csvData);
            if (this.outgoingBatch.getDataEventCount() < this.maxBatchSize || this.batches.size() <= 0) {
                return;
            }
            this.currentDataWriter.end(this.table);
            this.currentDataWriter.end(this.batch, false);
            this.outgoingBatch.setByteCount(((Statistics) this.currentDataWriter.getStatistics().get(this.batch)).get("BYTECOUNT"));
            this.currentDataWriter.close();
            startNewBatch();
        }

        public void end(Table table) {
            if (this.currentDataWriter != null) {
                this.currentDataWriter.end(table);
                this.outgoingBatch.setByteCount(((Statistics) this.currentDataWriter.getStatistics().get(this.batch)).get("BYTECOUNT"));
            }
        }

        public void end(Batch batch, boolean z) {
            this.inError = z;
            if (this.currentDataWriter != null) {
                this.currentDataWriter.end(this.batch, z);
            }
        }

        protected void nextBatch() {
            if (this.outgoingBatch != null) {
                this.finishedBatches.add(this.outgoingBatch);
            }
            this.outgoingBatch = this.batches.remove(0);
            this.outgoingBatch.setDataEventCount(0L);
            this.outgoingBatch.setInsertEventCount(0L);
            Iterator<OutgoingBatch> it = this.finishedBatches.iterator();
            while (it.hasNext()) {
                IStagedResource stagedResource = DataExtractorService.this.getStagedResource(it.next());
                if (stagedResource != null) {
                    stagedResource.refreshLastUpdateTime();
                }
            }
        }

        protected void startNewBatch() {
            nextBatch();
            this.currentDataWriter = new StagingDataWriter(DataExtractorService.this.parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD), this.sourceNodeId, Constants.STAGING_CATEGORY_OUTGOING, this.stagingManager, (IProtocolDataWriterListener[]) null);
            this.batch = new Batch(Batch.BatchType.EXTRACT, this.outgoingBatch.getBatchId(), this.outgoingBatch.getChannelId(), DataExtractorService.this.symmetricDialect.getBinaryEncoding(), this.sourceNodeId, this.outgoingBatch.getNodeId(), false);
            this.currentDataWriter.open(this.context);
            this.currentDataWriter.start(this.batch);
            this.currentDataWriter.start(this.table);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jumpmind/symmetric/service/impl/DataExtractorService$SelectFromSymDataSource.class */
    public class SelectFromSymDataSource implements IExtractDataReaderSource {
        private Batch batch;
        private OutgoingBatch outgoingBatch;
        private Table targetTable;
        private Table sourceTable;
        private TriggerHistory lastTriggerHistory;
        private String lastRouterId;
        private boolean requiresLobSelectedFromSource;
        private ISqlReadCursor<Data> cursor;
        private SelectFromTableSource reloadSource;
        private Node targetNode;
        private ProcessInfo processInfo;

        public SelectFromSymDataSource(OutgoingBatch outgoingBatch, Node node, Node node2, ProcessInfo processInfo) {
            this.processInfo = processInfo;
            this.outgoingBatch = outgoingBatch;
            this.batch = new Batch(Batch.BatchType.EXTRACT, outgoingBatch.getBatchId(), outgoingBatch.getChannelId(), DataExtractorService.this.symmetricDialect.getBinaryEncoding(), node.getNodeId(), outgoingBatch.getNodeId(), outgoingBatch.isCommonFlag());
            this.targetNode = node2;
        }

        public Batch getBatch() {
            return this.batch;
        }

        public Table getSourceTable() {
            return this.sourceTable;
        }

        public Table getTargetTable() {
            return this.targetTable;
        }

        public CsvData next() {
            if (this.cursor == null) {
                this.cursor = DataExtractorService.this.dataService.selectDataFor(this.batch);
            }
            Data data = null;
            if (this.reloadSource != null) {
                data = (Data) this.reloadSource.next();
                this.targetTable = this.reloadSource.getTargetTable();
                this.sourceTable = this.reloadSource.getSourceTable();
                if (data == null) {
                    this.reloadSource.close();
                    this.reloadSource = null;
                }
            }
            if (data == null) {
                data = (Data) this.cursor.next();
                if (data != null) {
                    TriggerHistory triggerHistory = data.getTriggerHistory();
                    String str = (String) data.getAttribute("routerId");
                    if (data.getDataEventType() == DataEventType.RELOAD) {
                        String triggerId = triggerHistory.getTriggerId();
                        TriggerRouter triggerRouterForCurrentNode = DataExtractorService.this.triggerRouterService.getTriggerRouterForCurrentNode(triggerId, str, false);
                        if (triggerRouterForCurrentNode == null) {
                            DataExtractorService.this.log.warn("Could not find trigger router definition for {}:{}.  Skipping reload event with the data id of {}", new Object[]{triggerId, str, Long.valueOf(data.getDataId())});
                            return next();
                        }
                        this.processInfo.setCurrentTableName(triggerHistory.getSourceTableName());
                        this.reloadSource = new SelectFromTableSource(this.outgoingBatch, this.batch, new SelectFromTableEvent(this.targetNode, triggerRouterForCurrentNode, triggerHistory, data.getRowData()));
                        data = (Data) this.reloadSource.next();
                        this.sourceTable = this.reloadSource.getSourceTable();
                        this.targetTable = this.reloadSource.getTargetTable();
                        this.requiresLobSelectedFromSource = this.reloadSource.requiresLobsSelectedFromSource();
                    } else {
                        Trigger triggerById = DataExtractorService.this.triggerRouterService.getTriggerById(triggerHistory.getTriggerId(), false);
                        if (triggerById != null) {
                            if (this.lastTriggerHistory == null || this.lastTriggerHistory.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId() || this.lastRouterId == null || !this.lastRouterId.equals(str)) {
                                this.sourceTable = DataExtractorService.this.lookupAndOrderColumnsAccordingToTriggerHistory(str, triggerHistory, false, true);
                                this.targetTable = DataExtractorService.this.lookupAndOrderColumnsAccordingToTriggerHistory(str, triggerHistory, true, false);
                                this.requiresLobSelectedFromSource = triggerById.isUseStreamLobs();
                            }
                            data.setNoBinaryOldData(this.requiresLobSelectedFromSource || DataExtractorService.this.symmetricDialect.getName().equals("mssql"));
                            this.outgoingBatch.incrementDataEventCount();
                        } else {
                            DataExtractorService.this.log.error("Could not locate a trigger with the id of {} for {}.  It was recorded in the hist table with a hist id of {}", new Object[]{triggerHistory.getTriggerId(), triggerHistory.getSourceTableName(), Integer.valueOf(triggerHistory.getTriggerHistoryId())});
                        }
                    }
                    this.lastTriggerHistory = triggerHistory;
                    this.lastRouterId = str;
                } else {
                    closeCursor();
                }
            }
            return data;
        }

        public boolean requiresLobsSelectedFromSource() {
            return this.requiresLobSelectedFromSource;
        }

        protected void closeCursor() {
            if (this.cursor != null) {
                this.cursor.close();
                this.cursor = null;
            }
        }

        public void close() {
            closeCursor();
            if (this.reloadSource != null) {
                this.reloadSource.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jumpmind/symmetric/service/impl/DataExtractorService$SelectFromTableEvent.class */
    public class SelectFromTableEvent {
        private TriggerRouter triggerRouter;
        private TriggerHistory triggerHistory;
        private Node node;
        private Data data;
        private String initialLoadSelect;

        public SelectFromTableEvent(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, String str) {
            this.node = node;
            this.triggerRouter = triggerRouter;
            this.initialLoadSelect = str;
            Trigger trigger = triggerRouter.getTrigger();
            this.triggerHistory = triggerHistory != null ? triggerHistory : DataExtractorService.this.triggerRouterService.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(), trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName());
        }

        public SelectFromTableEvent(Data data) {
            this.data = data;
            this.triggerHistory = data.getTriggerHistory();
        }

        public TriggerHistory getTriggerHistory() {
            return this.triggerHistory;
        }

        public TriggerRouter getTriggerRouter() {
            return this.triggerRouter;
        }

        public Data getData() {
            return this.data;
        }

        public Node getNode() {
            return this.node;
        }

        public boolean containsData() {
            return this.data != null;
        }

        public String getInitialLoadSelect() {
            return this.initialLoadSelect;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jumpmind/symmetric/service/impl/DataExtractorService$SelectFromTableSource.class */
    public class SelectFromTableSource implements IExtractDataReaderSource {
        private OutgoingBatch outgoingBatch;
        private Batch batch;
        private Table targetTable;
        private Table sourceTable;
        private List<SelectFromTableEvent> selectFromTableEventsToSend;
        private SelectFromTableEvent currentInitialLoadEvent;
        private ISqlReadCursor<Data> cursor;
        private SimpleRouterContext routingContext;
        private Node node;
        private TriggerRouter triggerRouter;

        public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch, SelectFromTableEvent selectFromTableEvent) {
            this.outgoingBatch = outgoingBatch;
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(selectFromTableEvent);
            init(batch, arrayList);
        }

        public SelectFromTableSource(Batch batch, List<SelectFromTableEvent> list) {
            init(batch, list);
        }

        protected void init(Batch batch, List<SelectFromTableEvent> list) {
            this.selectFromTableEventsToSend = new ArrayList(list);
            this.batch = batch;
            this.node = DataExtractorService.this.nodeService.findNode(batch.getTargetNodeId());
            if (this.node == null) {
                throw new SymmetricException("Could not find a node represented by %s", this.batch.getTargetNodeId());
            }
        }

        public Table getSourceTable() {
            return this.sourceTable;
        }

        public Batch getBatch() {
            return this.batch;
        }

        public Table getTargetTable() {
            return this.targetTable;
        }

        public CsvData next() {
            CsvData selectNext;
            do {
                selectNext = selectNext();
                if (selectNext == null || this.routingContext == null) {
                    break;
                }
            } while (!DataExtractorService.this.routerService.shouldDataBeRouted(this.routingContext, new DataMetaData((Data) selectNext, this.targetTable, this.triggerRouter.getRouter(), this.routingContext.getChannel()), this.node, true, StringUtils.isNotBlank(this.triggerRouter.getInitialLoadSelect()), this.triggerRouter));
            if (selectNext != null && this.outgoingBatch != null && !this.outgoingBatch.isExtractJobFlag()) {
                this.outgoingBatch.incrementDataEventCount();
                this.outgoingBatch.incrementEventCount(selectNext.getDataEventType());
            }
            return selectNext;
        }

        protected CsvData selectNext() {
            CsvData csvData = null;
            if (this.currentInitialLoadEvent == null && this.selectFromTableEventsToSend.size() > 0) {
                this.currentInitialLoadEvent = this.selectFromTableEventsToSend.remove(0);
                TriggerHistory triggerHistory = this.currentInitialLoadEvent.getTriggerHistory();
                if (this.currentInitialLoadEvent.containsData()) {
                    csvData = this.currentInitialLoadEvent.getData();
                    this.currentInitialLoadEvent = null;
                    this.sourceTable = DataExtractorService.this.lookupAndOrderColumnsAccordingToTriggerHistory((String) csvData.getAttribute("routerId"), triggerHistory, false, true);
                    this.targetTable = DataExtractorService.this.lookupAndOrderColumnsAccordingToTriggerHistory((String) csvData.getAttribute("routerId"), triggerHistory, true, false);
                } else {
                    this.triggerRouter = this.currentInitialLoadEvent.getTriggerRouter();
                    if (this.routingContext == null) {
                        this.routingContext = new SimpleRouterContext(this.batch.getTargetNodeId(), this.batch != null ? DataExtractorService.this.configurationService.getNodeChannel(this.batch.getChannelId(), false) : new NodeChannel(this.triggerRouter.getTrigger().getChannelId()));
                    }
                    this.sourceTable = DataExtractorService.this.lookupAndOrderColumnsAccordingToTriggerHistory(this.triggerRouter.getRouter().getRouterId(), triggerHistory, false, true);
                    this.targetTable = DataExtractorService.this.lookupAndOrderColumnsAccordingToTriggerHistory(this.triggerRouter.getRouter().getRouterId(), triggerHistory, true, false);
                    startNewCursor(triggerHistory, this.triggerRouter, this.currentInitialLoadEvent.getInitialLoadSelect());
                }
            }
            if (this.cursor != null) {
                csvData = (CsvData) this.cursor.next();
                if (csvData == null) {
                    closeCursor();
                    csvData = next();
                }
            }
            return csvData;
        }

        protected void closeCursor() {
            if (this.cursor != null) {
                this.cursor.close();
                this.cursor = null;
                this.currentInitialLoadEvent = null;
            }
        }

        protected void startNewCursor(final TriggerHistory triggerHistory, final TriggerRouter triggerRouter, String str) {
            final int length = triggerHistory.getParsedColumnNames().length - 1;
            this.cursor = DataExtractorService.this.sqlTemplate.queryForCursor(DataExtractorService.this.symmetricDialect.createInitialLoadSqlFor(this.currentInitialLoadEvent.getNode(), triggerRouter, this.sourceTable, triggerHistory, DataExtractorService.this.configurationService.getChannel(triggerRouter.getTrigger().getChannelId()), str), new ISqlRowMapper<Data>() { // from class: org.jumpmind.symmetric.service.impl.DataExtractorService.SelectFromTableSource.1
                /* renamed from: mapRow, reason: merged with bridge method [inline-methods] */
                public Data m54mapRow(Row row) {
                    String stringValue = row.stringValue();
                    if (length > StringUtils.countMatches(stringValue, ",")) {
                        throw new SymmetricException("The extracted row data did not have the expected (%d) number of columns: %s", Integer.valueOf(length), stringValue);
                    }
                    Data data = new Data(0L, null, stringValue, DataEventType.INSERT, triggerHistory.getSourceTableName(), null, triggerHistory, SelectFromTableSource.this.batch.getChannelId(), null, null);
                    data.putAttribute("routerId", triggerRouter.getRouter().getRouterId());
                    return data;
                }
            });
        }

        public boolean requiresLobsSelectedFromSource() {
            if (this.currentInitialLoadEvent == null || this.currentInitialLoadEvent.getTriggerRouter() == null) {
                return false;
            }
            return this.currentInitialLoadEvent.getTriggerRouter().getTrigger().isUseStreamLobs();
        }

        public void close() {
            closeCursor();
        }
    }

    public DataExtractorService(IParameterService iParameterService, ISymmetricDialect iSymmetricDialect, IOutgoingBatchService iOutgoingBatchService, IRouterService iRouterService, IConfigurationService iConfigurationService, ITriggerRouterService iTriggerRouterService, INodeService iNodeService, IDataService iDataService, ITransformService iTransformService, IStatisticManager iStatisticManager, IStagingManager iStagingManager, IClusterService iClusterService, INodeCommunicationService iNodeCommunicationService) {
        super(iParameterService, iSymmetricDialect);
        this.locks = new HashMap();
        this.outgoingBatchService = iOutgoingBatchService;
        this.routerService = iRouterService;
        this.dataService = iDataService;
        this.configurationService = iConfigurationService;
        this.triggerRouterService = iTriggerRouterService;
        this.nodeService = iNodeService;
        this.transformService = iTransformService;
        this.statisticManager = iStatisticManager;
        this.stagingManager = iStagingManager;
        this.nodeCommunicationService = iNodeCommunicationService;
        this.clusterService = iClusterService;
        setSqlMap(new DataExtractorSqlMap(iSymmetricDialect.getPlatform(), createSqlReplacementTokens()));
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public void extractConfigurationStandalone(Node node, OutputStream outputStream) {
        extractConfigurationStandalone(node, TransportUtils.toWriter(outputStream), new String[0]);
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public void extractConfigurationStandalone(Node node, Writer writer, String... strArr) {
        Node findIdentity = this.nodeService.findIdentity();
        Batch batch = new Batch(Batch.BatchType.EXTRACT, Constants.VIRTUAL_BATCH_FOR_REGISTRATION, Constants.CHANNEL_CONFIG, this.symmetricDialect.getBinaryEncoding(), findIdentity.getNodeId(), node.getNodeId(), false);
        List<TriggerRouter> buildTriggerRoutersForSymmetricTables = this.triggerRouterService.buildTriggerRoutersForSymmetricTables(StringUtils.isBlank(node.getSymmetricVersion()) ? Version.version() : node.getSymmetricVersion(), new NodeGroupLink(this.parameterService.getNodeGroupId(), node.getNodeGroupId()), strArr);
        ArrayList arrayList = new ArrayList(buildTriggerRoutersForSymmetricTables.size() * 2);
        for (int size = buildTriggerRoutersForSymmetricTables.size() - 1; size >= 0; size--) {
            TriggerRouter triggerRouter = buildTriggerRoutersForSymmetricTables.get(size);
            if (!triggerRouter.getTrigger().getChannelId().equals("filesync")) {
                TriggerHistory newestTriggerHistoryForTrigger = this.triggerRouterService.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger().getTriggerId(), null, null, triggerRouter.getTrigger().getSourceTableName());
                if (newestTriggerHistoryForTrigger == null) {
                    Trigger trigger = triggerRouter.getTrigger();
                    Table tableFromCache = this.symmetricDialect.getPlatform().getTableFromCache(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName(), false);
                    if (tableFromCache == null) {
                        throw new IllegalStateException("Could not find a required table: " + triggerRouter.getTrigger().getSourceTableName());
                    }
                    newestTriggerHistoryForTrigger = new TriggerHistory(tableFromCache, triggerRouter.getTrigger(), this.symmetricDialect.getTriggerTemplate());
                    newestTriggerHistoryForTrigger.setTriggerHistoryId(Integer.MAX_VALUE - size);
                }
                StringBuilder sb = new StringBuilder(this.symmetricDialect.createPurgeSqlFor(node, triggerRouter, newestTriggerHistoryForTrigger));
                addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger().getSourceTableName(), sb);
                Data data = new Data(1L, null, sb.toString(), DataEventType.SQL, newestTriggerHistoryForTrigger.getSourceTableName(), null, newestTriggerHistoryForTrigger, triggerRouter.getTrigger().getChannelId(), null, null);
                data.putAttribute("routerId", triggerRouter.getRouter().getRouterId());
                arrayList.add(new SelectFromTableEvent(data));
            }
        }
        for (int i = 0; i < buildTriggerRoutersForSymmetricTables.size(); i++) {
            TriggerRouter triggerRouter2 = buildTriggerRoutersForSymmetricTables.get(i);
            if (!triggerRouter2.getTrigger().getChannelId().equals("filesync")) {
                TriggerHistory newestTriggerHistoryForTrigger2 = this.triggerRouterService.getNewestTriggerHistoryForTrigger(triggerRouter2.getTrigger().getTriggerId(), null, null, null);
                if (newestTriggerHistoryForTrigger2 == null) {
                    Trigger trigger2 = triggerRouter2.getTrigger();
                    newestTriggerHistoryForTrigger2 = new TriggerHistory(this.symmetricDialect.getPlatform().getTableFromCache(trigger2.getSourceCatalogName(), trigger2.getSourceSchemaName(), trigger2.getSourceTableName(), false), trigger2, this.symmetricDialect.getTriggerTemplate());
                    newestTriggerHistoryForTrigger2.setTriggerHistoryId(Integer.MAX_VALUE - i);
                }
                if (triggerRouter2.getTrigger().getSourceTableName().endsWith(TableConstants.SYM_NODE_IDENTITY)) {
                    arrayList.add(new SelectFromTableEvent(new Data(1L, null, node.getNodeId(), DataEventType.INSERT, newestTriggerHistoryForTrigger2.getSourceTableName(), null, newestTriggerHistoryForTrigger2, triggerRouter2.getTrigger().getChannelId(), null, null)));
                } else {
                    arrayList.add(new SelectFromTableEvent(node, triggerRouter2, newestTriggerHistoryForTrigger2, null));
                }
            }
        }
        DataProcessor dataProcessor = new DataProcessor(new ExtractDataReader(this.symmetricDialect.getPlatform(), new SelectFromTableSource(batch, arrayList)), new ProtocolDataWriter(this.nodeService.findIdentityNodeId(), writer, node.requires13Compatiblity()), "configuration extract");
        DataContext dataContext = new DataContext();
        dataContext.put(Constants.DATA_CONTEXT_TARGET_NODE, node);
        dataContext.put(Constants.DATA_CONTEXT_SOURCE_NODE, findIdentity);
        dataProcessor.process(dataContext);
        if (buildTriggerRoutersForSymmetricTables.size() == 0) {
            this.log.error("{} attempted registration, but was sent an empty configuration", node);
        }
    }

    private void addPurgeCriteriaToConfigurationTables(String str, StringBuilder sb) {
        Node findIdentity;
        if ((TableConstants.getTableName(this.parameterService.getTablePrefix(), TableConstants.SYM_NODE).equalsIgnoreCase(str) || TableConstants.getTableName(this.parameterService.getTablePrefix(), TableConstants.SYM_NODE_SECURITY).equalsIgnoreCase(str)) && (findIdentity = this.nodeService.findIdentity()) != null) {
            sb.append(String.format(" where created_at_node_id='%s'", findIdentity.getNodeId()));
        }
    }

    private List<OutgoingBatch> filterBatchesForExtraction(OutgoingBatches outgoingBatches, ChannelMap channelMap) {
        if (this.parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)) {
            outgoingBatches.filterBatchesForChannel("filesync");
        }
        List<OutgoingBatch> filterBatchesForChannels = outgoingBatches.filterBatchesForChannels(channelMap.getIgnoreChannels());
        for (OutgoingBatch outgoingBatch : filterBatchesForChannels) {
            outgoingBatch.setStatus(OutgoingBatch.Status.OK);
            outgoingBatch.incrementIgnoreCount();
            if (this.log.isDebugEnabled()) {
                this.log.debug("Batch {} is being ignored", Long.valueOf(outgoingBatch.getBatchId()));
            }
        }
        this.outgoingBatchService.updateOutgoingBatches(filterBatchesForChannels);
        outgoingBatches.filterBatchesForChannels(channelMap.getSuspendChannels());
        if (!outgoingBatches.containsBatchesInError() && outgoingBatches.containsLoadBatches()) {
            outgoingBatches.removeNonLoadBatches();
        }
        return outgoingBatches.getBatches();
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo, Node node, StructureDataWriter.PayloadType payloadType, boolean z, boolean z2, boolean z3) {
        OutgoingBatches outgoingBatches = this.outgoingBatchService.getOutgoingBatches(node.getNodeId(), false);
        if (outgoingBatches.containsBatches()) {
            List<OutgoingBatch> filterBatchesForExtraction = filterBatchesForExtraction(outgoingBatches, this.configurationService.getSuspendIgnoreChannelLists(node.getNodeId()));
            if (filterBatchesForExtraction.size() > 0) {
                StructureDataWriter structureDataWriter = new StructureDataWriter(this.symmetricDialect.getPlatform(), node.getDatabaseType(), payloadType, z3, this.symmetricDialect.getBinaryEncoding(), z, z2);
                List<OutgoingBatch> extract = extract(processInfo, node, filterBatchesForExtraction, structureDataWriter, ExtractMode.FOR_PAYLOAD_CLIENT);
                ArrayList arrayList = new ArrayList();
                for (OutgoingBatch outgoingBatch : extract) {
                    OutgoingBatchWithPayload outgoingBatchWithPayload = new OutgoingBatchWithPayload(outgoingBatch, payloadType);
                    outgoingBatchWithPayload.setPayload((List) structureDataWriter.getPayloadMap().get(Long.valueOf(outgoingBatch.getBatchId())));
                    outgoingBatchWithPayload.setPayloadType(payloadType);
                    arrayList.add(outgoingBatchWithPayload);
                }
                return arrayList;
            }
        }
        return Collections.emptyList();
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public List<OutgoingBatch> extract(ProcessInfo processInfo, Node node, IOutgoingTransport iOutgoingTransport) {
        if (!this.parameterService.is(ParameterConstants.START_ROUTE_JOB)) {
            this.routerService.routeData(true);
        }
        OutgoingBatches outgoingBatches = this.outgoingBatchService.getOutgoingBatches(node.getNodeId(), false);
        if (outgoingBatches.containsBatches()) {
            List<OutgoingBatch> filterBatchesForExtraction = filterBatchesForExtraction(outgoingBatches, iOutgoingTransport.getSuspendIgnoreChannelLists(this.configurationService, node));
            if (filterBatchesForExtraction.size() > 0) {
                return extract(processInfo, node, filterBatchesForExtraction, new ProtocolDataWriter(this.nodeService.findIdentityNodeId(), iOutgoingTransport.openWriter(), node.requires13Compatiblity()), ExtractMode.FOR_SYM_CLIENT);
            }
        }
        return Collections.emptyList();
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public boolean extractOnlyOutgoingBatch(String str, long j, Writer writer) {
        OutgoingBatch findOutgoingBatch;
        boolean z = false;
        Node node = Constants.UNROUTED_NODE_ID.equals(str) ? new Node(str, this.parameterService.getNodeGroupId()) : this.nodeService.findNode(str);
        if (node != null && (findOutgoingBatch = this.outgoingBatchService.findOutgoingBatch(j, str)) != null) {
            ProtocolDataWriter protocolDataWriter = new ProtocolDataWriter(this.nodeService.findIdentityNodeId(), writer, node.requires13Compatiblity());
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(findOutgoingBatch);
            z = extract(new ProcessInfo(), node, arrayList, protocolDataWriter, ExtractMode.EXTRACT_ONLY).size() > 0;
        }
        return z;
    }

    protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node node, List<OutgoingBatch> list, IDataWriter iDataWriter, ExtractMode extractMode) {
        IStagedResource stagedResource;
        boolean is = this.parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
        ArrayList arrayList = new ArrayList(list.size());
        if (list.size() <= 0) {
            return Collections.emptyList();
        }
        HashSet hashSet = new HashSet();
        long currentTimeMillis = System.currentTimeMillis();
        OutgoingBatch outgoingBatch = null;
        try {
            long j = 0;
            int i = 0;
            long j2 = this.parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);
            int i2 = 0;
            while (true) {
                if (i2 >= list.size()) {
                    break;
                }
                OutgoingBatch outgoingBatch2 = list.get(i2);
                hashSet.add(outgoingBatch2.getChannelId());
                processInfo.incrementBatchCount();
                processInfo.setCurrentBatchId(outgoingBatch2.getBatchId());
                OutgoingBatch requeryIfEnoughTimeHasPassed = requeryIfEnoughTimeHasPassed(currentTimeMillis, outgoingBatch2);
                if (!requeryIfEnoughTimeHasPassed.isExtractJobFlag() || requeryIfEnoughTimeHasPassed.getStatus() == OutgoingBatch.Status.IG) {
                    processInfo.setStatus(ProcessInfo.Status.EXTRACTING);
                    requeryIfEnoughTimeHasPassed = extractOutgoingBatch(processInfo, node, iDataWriter, requeryIfEnoughTimeHasPassed, is, true, extractMode);
                } else if (!this.parameterService.is(ParameterConstants.INTITAL_LOAD_USE_EXTRACT_JOB)) {
                    requeryIfEnoughTimeHasPassed.setStatus(OutgoingBatch.Status.NE);
                    requeryIfEnoughTimeHasPassed.setExtractJobFlag(false);
                } else if (requeryIfEnoughTimeHasPassed.getStatus() != OutgoingBatch.Status.RQ && requeryIfEnoughTimeHasPassed.getStatus() != OutgoingBatch.Status.IG && !isPreviouslyExtracted(requeryIfEnoughTimeHasPassed)) {
                    this.log.info("Batch {} is marked as ready but it has been deleted.  Rescheduling it for extraction", requeryIfEnoughTimeHasPassed.getNodeBatchId());
                    if (changeBatchStatus(OutgoingBatch.Status.RQ, requeryIfEnoughTimeHasPassed, extractMode)) {
                        resetExtractRequest(requeryIfEnoughTimeHasPassed);
                    }
                } else if (requeryIfEnoughTimeHasPassed.getStatus() == OutgoingBatch.Status.RQ) {
                    this.log.info("Batch {} is not ready for delivery.  It is currently scheduled for extraction", requeryIfEnoughTimeHasPassed.getNodeBatchId());
                    break;
                }
                if (is || extractMode == ExtractMode.FOR_PAYLOAD_CLIENT) {
                    processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
                    requeryIfEnoughTimeHasPassed = sendOutgoingBatch(processInfo, node, requeryIfEnoughTimeHasPassed, iDataWriter, extractMode);
                }
                arrayList.add(requeryIfEnoughTimeHasPassed);
                if (requeryIfEnoughTimeHasPassed.getStatus() != OutgoingBatch.Status.OK) {
                    requeryIfEnoughTimeHasPassed.setLoadCount(requeryIfEnoughTimeHasPassed.getLoadCount() + 1);
                    changeBatchStatus(OutgoingBatch.Status.LD, requeryIfEnoughTimeHasPassed, extractMode);
                    j += requeryIfEnoughTimeHasPassed.getByteCount();
                    i++;
                    if (j >= j2 && arrayList.size() < list.size()) {
                        this.log.info("Reached the total byte threshold after {} of {} batches were extracted for node '{}'.  The remaining batches will be extracted on a subsequent sync", new Object[]{Integer.valueOf(i), Integer.valueOf(list.size()), node.getNodeId()});
                        break;
                    }
                }
                i2++;
            }
        } catch (RuntimeException e) {
            SQLException unwrapSqlException = unwrapSqlException(e);
            if (0 != 0) {
                this.statisticManager.incrementDataExtractedErrors(outgoingBatch.getChannelId(), 1L);
                if (unwrapSqlException != null) {
                    outgoingBatch.setSqlState(unwrapSqlException.getSQLState());
                    outgoingBatch.setSqlCode(unwrapSqlException.getErrorCode());
                    outgoingBatch.setSqlMessage(unwrapSqlException.getMessage());
                } else {
                    outgoingBatch.setSqlMessage(getRootMessage(e));
                }
                outgoingBatch.revertStatsOnError();
                if (outgoingBatch.getStatus() != OutgoingBatch.Status.IG) {
                    outgoingBatch.setStatus(OutgoingBatch.Status.ER);
                }
                outgoingBatch.setErrorFlag(true);
                this.outgoingBatchService.updateOutgoingBatch(null);
                if (isStreamClosedByClient(e)) {
                    this.log.warn("Failed to transport batch {}.  The stream was closed by the client.  There is a good chance that a previously sent batch errored out and the stream was closed or there was a network error.  The error was: {}", (Object) null, getRootMessage(e));
                } else {
                    if ((e instanceof ProtocolException) && (stagedResource = getStagedResource(null)) != null) {
                        stagedResource.delete();
                    }
                    this.log.error("Failed to extract batch {}", (Object) null, e);
                }
            } else {
                this.log.error("Could not log the outgoing batch status because the batch was null", e);
            }
        }
        Calendar calendar = Calendar.getInstance();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            NodeChannel nodeChannel = this.configurationService.getNodeChannel((String) it.next(), node.getNodeId(), false);
            if (nodeChannel != null) {
                nodeChannel.setLastExtractTime(calendar.getTime());
                this.configurationService.saveNodeChannelControl(nodeChannel, false);
            }
        }
        return arrayList;
    }

    protected final boolean changeBatchStatus(OutgoingBatch.Status status, OutgoingBatch outgoingBatch, ExtractMode extractMode) {
        if (outgoingBatch.getStatus() != OutgoingBatch.Status.IG) {
            outgoingBatch.setStatus(status);
        }
        if (extractMode == ExtractMode.EXTRACT_ONLY) {
            return false;
        }
        this.outgoingBatchService.updateOutgoingBatch(outgoingBatch);
        return true;
    }

    protected final OutgoingBatch requeryIfEnoughTimeHasPassed(long j, OutgoingBatch outgoingBatch) {
        if (System.currentTimeMillis() - j > MS_PASSED_BEFORE_BATCH_REQUERIED) {
            outgoingBatch = this.outgoingBatchService.findOutgoingBatch(outgoingBatch.getBatchId(), outgoingBatch.getNodeId());
        }
        return outgoingBatch;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Finally extract failed */
    public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node node, IDataWriter iDataWriter, OutgoingBatch outgoingBatch, boolean z, boolean z2, ExtractMode extractMode) {
        Semaphore semaphore;
        if (outgoingBatch.getStatus() != OutgoingBatch.Status.OK || ExtractMode.EXTRACT_ONLY == extractMode) {
            Node findIdentity = this.nodeService.findIdentity();
            TransformWriter createTransformDataWriter = z ? createTransformDataWriter(findIdentity, node, new ProcessInfoDataWriter(new StagingDataWriter(this.parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD), this.nodeService.findIdentityNodeId(), Constants.STAGING_CATEGORY_OUTGOING, this.stagingManager, new IProtocolDataWriterListener[0]), processInfo)) : createTransformDataWriter(findIdentity, node, new ProcessInfoDataWriter(iDataWriter, processInfo));
            long currentTimeMillis = System.currentTimeMillis();
            long j = 0;
            long j2 = 0;
            if (outgoingBatch.getStatus() == OutgoingBatch.Status.IG) {
                Batch batch = new Batch(Batch.BatchType.EXTRACT, outgoingBatch.getBatchId(), outgoingBatch.getChannelId(), this.symmetricDialect.getBinaryEncoding(), findIdentity.getNodeId(), outgoingBatch.getNodeId(), outgoingBatch.isCommonFlag());
                batch.setIgnored(true);
                try {
                    IStagedResource stagedResource = getStagedResource(outgoingBatch);
                    if (stagedResource != null) {
                        stagedResource.delete();
                    }
                    DataContext dataContext = new DataContext(batch);
                    dataContext.put(Constants.DATA_CONTEXT_TARGET_NODE, node);
                    dataContext.put(Constants.DATA_CONTEXT_SOURCE_NODE, findIdentity);
                    createTransformDataWriter.open(dataContext);
                    createTransformDataWriter.start(batch);
                    createTransformDataWriter.end(batch, false);
                    createTransformDataWriter.close();
                } catch (Throwable th) {
                    createTransformDataWriter.close();
                    throw th;
                }
            } else if (!isPreviouslyExtracted(outgoingBatch)) {
                int i = this.parameterService.getInt(ParameterConstants.CONCURRENT_WORKERS);
                String l = z ? Long.toString(outgoingBatch.getBatchId()) : outgoingBatch.getNodeBatchId();
                Semaphore semaphore2 = null;
                try {
                    try {
                        synchronized (this.locks) {
                            semaphore = this.locks.get(l);
                            if (semaphore == null) {
                                semaphore = new Semaphore(i);
                                this.locks.put(l, semaphore);
                            }
                            try {
                                semaphore.acquire();
                            } catch (InterruptedException e) {
                                throw new InterruptedException(e);
                            }
                        }
                        synchronized (semaphore) {
                            if (!isPreviouslyExtracted(outgoingBatch)) {
                                outgoingBatch.setExtractCount(outgoingBatch.getExtractCount() + 1);
                                if (z2) {
                                    changeBatchStatus(OutgoingBatch.Status.QY, outgoingBatch, extractMode);
                                }
                                outgoingBatch.resetStats();
                                ExtractDataReader extractDataReader = new ExtractDataReader(this.symmetricDialect.getPlatform(), new SelectFromSymDataSource(outgoingBatch, findIdentity, node, processInfo));
                                DataContext dataContext2 = new DataContext();
                                dataContext2.put(Constants.DATA_CONTEXT_TARGET_NODE, node);
                                dataContext2.put(Constants.DATA_CONTEXT_SOURCE_NODE, findIdentity);
                                new DataProcessor(extractDataReader, createTransformDataWriter, "extract").process(dataContext2);
                                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                                Statistics statistics = (Statistics) createTransformDataWriter.getNestedWriter().getStatistics().values().iterator().next();
                                j = currentTimeMillis2 - statistics.get("TRANSFORMMILLIS");
                                j2 = statistics.get("BYTECOUNT");
                            }
                        }
                        semaphore.release();
                        synchronized (this.locks) {
                            if (semaphore.availablePermits() == i) {
                                this.locks.remove(l);
                            }
                        }
                    } catch (RuntimeException e2) {
                        IStagedResource stagedResource2 = getStagedResource(outgoingBatch);
                        if (stagedResource2 != null) {
                            stagedResource2.close();
                            stagedResource2.delete();
                        }
                        throw e2;
                    }
                } catch (Throwable th2) {
                    semaphore2.release();
                    synchronized (this.locks) {
                        if (semaphore2.availablePermits() == i) {
                            this.locks.remove(l);
                        }
                        throw th2;
                    }
                }
            }
            if (z2) {
                long dataEventCount = outgoingBatch.getDataEventCount();
                long insertEventCount = outgoingBatch.getInsertEventCount();
                outgoingBatch = requeryIfEnoughTimeHasPassed(currentTimeMillis, outgoingBatch);
                if (dataEventCount > outgoingBatch.getDataEventCount()) {
                    outgoingBatch.setDataEventCount(dataEventCount);
                }
                if (insertEventCount > outgoingBatch.getInsertEventCount()) {
                    outgoingBatch.setInsertEventCount(insertEventCount);
                }
                if (j > 0) {
                    outgoingBatch.setExtractMillis(j);
                }
                if (j2 > 0) {
                    outgoingBatch.setByteCount(j2);
                    this.statisticManager.incrementDataBytesExtracted(outgoingBatch.getChannelId(), j2);
                    this.statisticManager.incrementDataExtracted(outgoingBatch.getChannelId(), outgoingBatch.getExtractCount());
                }
            }
        }
        return outgoingBatch;
    }

    protected IStagedResource getStagedResource(OutgoingBatch outgoingBatch) {
        return this.stagingManager.find(new Object[]{Constants.STAGING_CATEGORY_OUTGOING, outgoingBatch.getStagedLocation(), Long.valueOf(outgoingBatch.getBatchId())});
    }

    protected boolean isPreviouslyExtracted(OutgoingBatch outgoingBatch) {
        IStagedResource stagedResource = getStagedResource(outgoingBatch);
        if (stagedResource == null || !stagedResource.exists() || stagedResource.getState() == IStagedResource.State.CREATE) {
            return false;
        }
        if (!this.log.isDebugEnabled()) {
            return true;
        }
        this.log.debug("We have already extracted batch {}.  Using the existing extraction: {}", Long.valueOf(outgoingBatch.getBatchId()), stagedResource);
        return true;
    }

    protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node node, OutgoingBatch outgoingBatch, IDataWriter iDataWriter, ExtractMode extractMode) {
        if (outgoingBatch.getStatus() != OutgoingBatch.Status.OK || ExtractMode.EXTRACT_ONLY == extractMode) {
            outgoingBatch.setSentCount(outgoingBatch.getSentCount() + 1);
            changeBatchStatus(OutgoingBatch.Status.SE, outgoingBatch, extractMode);
            long currentTimeMillis = System.currentTimeMillis();
            IStagedResource stagedResource = getStagedResource(outgoingBatch);
            if (stagedResource == null) {
                throw new IllegalStateException(String.format("Could not find the staged resource for batch %s", outgoingBatch.getNodeBatchId()));
            }
            ProtocolDataReader protocolDataReader = new ProtocolDataReader(Batch.BatchType.EXTRACT, outgoingBatch.getNodeId(), stagedResource);
            DataContext dataContext = new DataContext();
            dataContext.put(Constants.DATA_CONTEXT_TARGET_NODE, node);
            dataContext.put(Constants.DATA_CONTEXT_SOURCE_NODE, this.nodeService.findIdentity());
            new DataProcessor(protocolDataReader, new ProcessInfoDataWriter(iDataWriter, processInfo), "send from stage").process(dataContext);
            if (iDataWriter.getStatistics().size() > 0) {
                Statistics statistics = (Statistics) iDataWriter.getStatistics().values().iterator().next();
                this.statisticManager.incrementDataSent(outgoingBatch.getChannelId(), statistics.get("STATEMENTCOUNT"));
                this.statisticManager.incrementDataBytesSent(outgoingBatch.getChannelId(), statistics.get("BYTECOUNT"));
            } else {
                this.log.warn("Could not find recorded statistics for batch {}", outgoingBatch.getNodeBatchId());
            }
            outgoingBatch = requeryIfEnoughTimeHasPassed(currentTimeMillis, outgoingBatch);
        }
        return outgoingBatch;
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public boolean extractBatchRange(Writer writer, String str, long j, long j2) {
        boolean z = false;
        Node findIdentity = this.nodeService.findIdentity();
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 > j2) {
                return z;
            }
            OutgoingBatch findOutgoingBatch = this.outgoingBatchService.findOutgoingBatch(j4, str);
            if (findOutgoingBatch != null) {
                Node findNode = this.nodeService.findNode(str);
                if (findNode == null && Constants.UNROUTED_NODE_ID.equals(str)) {
                    findNode = new Node();
                    findNode.setNodeId(Constants.UNROUTED_NODE_ID);
                }
                if (findNode != null) {
                    ExtractDataReader extractDataReader = new ExtractDataReader(this.symmetricDialect.getPlatform(), new SelectFromSymDataSource(findOutgoingBatch, findIdentity, findNode, new ProcessInfo()));
                    DataContext dataContext = new DataContext();
                    dataContext.put(Constants.DATA_CONTEXT_TARGET_NODE, findNode);
                    dataContext.put(Constants.DATA_CONTEXT_SOURCE_NODE, this.nodeService.findIdentity());
                    new DataProcessor(extractDataReader, createTransformDataWriter(this.nodeService.findIdentity(), findNode, new ProtocolDataWriter(this.nodeService.findIdentityNodeId(), writer, findNode.requires13Compatiblity())), "extract range").process(dataContext);
                    z = true;
                }
            }
            j3 = j4 + 1;
        }
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public boolean extractBatchRange(Writer writer, String str, Date date, Date date2, String... strArr) {
        boolean z = false;
        Node findIdentity = this.nodeService.findIdentity();
        for (OutgoingBatch outgoingBatch : this.outgoingBatchService.getOutgoingBatchRange(str, date, date2, strArr).getBatches()) {
            Node findNode = this.nodeService.findNode(str);
            if (findNode == null && Constants.UNROUTED_NODE_ID.equals(str)) {
                findNode = new Node();
                findNode.setNodeId(Constants.UNROUTED_NODE_ID);
            }
            if (findNode != null) {
                ExtractDataReader extractDataReader = new ExtractDataReader(this.symmetricDialect.getPlatform(), new SelectFromSymDataSource(outgoingBatch, findIdentity, findNode, new ProcessInfo()));
                DataContext dataContext = new DataContext();
                dataContext.put(Constants.DATA_CONTEXT_TARGET_NODE, findNode);
                dataContext.put(Constants.DATA_CONTEXT_SOURCE_NODE, this.nodeService.findIdentity());
                new DataProcessor(extractDataReader, createTransformDataWriter(this.nodeService.findIdentity(), findNode, new ProtocolDataWriter(this.nodeService.findIdentityNodeId(), writer, findNode.requires13Compatiblity())), "extract range").process(dataContext);
                z = true;
            }
        }
        return z;
    }

    protected TransformWriter createTransformDataWriter(Node node, Node node2, IDataWriter iDataWriter) {
        List<TransformService.TransformTableNodeGroupLink> list = null;
        if (node2 != null) {
            list = this.transformService.findTransformsFor(new NodeGroupLink(node.getNodeGroupId(), node2.getNodeGroupId()), TransformPoint.EXTRACT, true);
        }
        return new TransformWriter(this.symmetricDialect.getPlatform(), TransformPoint.EXTRACT, iDataWriter, this.transformService.getColumnTransforms(), list != null ? (TransformTable[]) list.toArray(new TransformTable[list.size()]) : null);
    }

    protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String str, TriggerHistory triggerHistory, boolean z, boolean z2) {
        Table table;
        String sourceCatalogName = triggerHistory.getSourceCatalogName();
        String sourceSchemaName = triggerHistory.getSourceSchemaName();
        String sourceTableName = triggerHistory.getSourceTableName();
        if (z2) {
            table = this.platform.getTableFromCache(sourceCatalogName, sourceSchemaName, sourceTableName, false).copyAndFilterColumns(triggerHistory.getParsedColumnNames(), triggerHistory.getParsedPkColumnNames(), true);
        } else {
            table = new Table(sourceTableName);
            table.addColumns(triggerHistory.getParsedColumnNames());
            table.setPrimaryKeys(triggerHistory.getParsedPkColumnNames());
        }
        table.setCatalog(sourceCatalogName);
        table.setSchema(sourceSchemaName);
        Router routerById = this.triggerRouterService.getRouterById(str, false);
        if (routerById != null && z) {
            if (StringUtils.equals(Constants.NONE_TOKEN, routerById.getTargetCatalogName())) {
                table.setCatalog((String) null);
            } else if (StringUtils.isNotBlank(routerById.getTargetCatalogName())) {
                table.setCatalog(routerById.getTargetCatalogName());
            }
            if (StringUtils.equals(Constants.NONE_TOKEN, routerById.getTargetSchemaName())) {
                table.setSchema((String) null);
            } else if (StringUtils.isNotBlank(routerById.getTargetSchemaName())) {
                table.setSchema(routerById.getTargetSchemaName());
            }
            if (StringUtils.isNotBlank(routerById.getTargetTableName())) {
                table.setName(routerById.getTargetTableName());
            }
        }
        return table;
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public RemoteNodeStatuses queueWork(boolean z) {
        RemoteNodeStatuses remoteNodeStatuses = new RemoteNodeStatuses();
        if (this.nodeService.findIdentity() == null) {
            this.log.debug("Not running initial load extract service because this node does not have an identity");
        } else if (z || this.clusterService.lock(ClusterConstants.INITIAL_LOAD_EXTRACT)) {
            try {
                Iterator<String> it = getExtractRequestNodes().iterator();
                while (it.hasNext()) {
                    queue(it.next(), remoteNodeStatuses);
                }
            } finally {
                if (!z) {
                    this.clusterService.unlock(ClusterConstants.INITIAL_LOAD_EXTRACT);
                }
            }
        }
        return remoteNodeStatuses;
    }

    protected void queue(String str, RemoteNodeStatuses remoteNodeStatuses) {
        NodeCommunication.CommunicationType communicationType = NodeCommunication.CommunicationType.EXTRACT;
        int availableThreads = this.nodeCommunicationService.getAvailableThreads(communicationType);
        NodeCommunication find = this.nodeCommunicationService.find(str, communicationType);
        if (availableThreads > 0) {
            this.nodeCommunicationService.execute(find, remoteNodeStatuses, this);
        }
    }

    public List<String> getExtractRequestNodes() {
        return this.sqlTemplate.query(getSql("selectNodeIdsForExtractSql"), SqlConstants.STRING_MAPPER, new Object[]{ExtractRequest.ExtractStatus.NE.name()});
    }

    public List<ExtractRequest> getExtractRequestsForNode(String str) {
        return this.sqlTemplate.query(getSql("selectExtractRequestForNodeSql"), new ExtractRequestMapper(), new Object[]{str, ExtractRequest.ExtractStatus.NE.name()});
    }

    protected void resetExtractRequest(OutgoingBatch outgoingBatch) {
        this.sqlTemplate.update(getSql("resetExtractRequestStatus"), new Object[]{ExtractRequest.ExtractStatus.NE.name(), Long.valueOf(outgoingBatch.getBatchId()), Long.valueOf(outgoingBatch.getBatchId()), outgoingBatch.getNodeId()});
    }

    @Override // org.jumpmind.symmetric.service.IDataExtractorService
    public void requestExtractRequest(ISqlTransaction iSqlTransaction, String str, TriggerRouter triggerRouter, long j, long j2) {
        iSqlTransaction.insertWithGeneratedKey(getSql("insertExtractRequestSql"), this.symmetricDialect.getSequenceKeyName(SequenceIdentifier.REQUEST), this.symmetricDialect.getSequenceName(SequenceIdentifier.REQUEST), new Object[]{str, ExtractRequest.ExtractStatus.NE.name(), Long.valueOf(j), Long.valueOf(j2), triggerRouter.getTrigger().getTriggerId(), triggerRouter.getRouter().getRouterId()}, new int[]{12, 12, -5, -5, 12, 12});
    }

    protected void updateExtractRequestStatus(ISqlTransaction iSqlTransaction, long j, ExtractRequest.ExtractStatus extractStatus) {
        iSqlTransaction.prepareAndExecute(getSql("updateExtractRequestStatus"), new Object[]{extractStatus.name(), Long.valueOf(j)});
    }

    @Override // org.jumpmind.symmetric.service.INodeCommunicationService.INodeCommunicationExecutor
    public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus remoteNodeStatus) {
        long currentTimeMillis = System.currentTimeMillis();
        List<ExtractRequest> extractRequestsForNode = getExtractRequestsForNode(nodeCommunication.getNodeId());
        for (int i = 0; i < extractRequestsForNode.size() && System.currentTimeMillis() - currentTimeMillis <= Constants.LONG_OPERATION_THRESHOLD; i++) {
            ExtractRequest extractRequest = extractRequestsForNode.get(i);
            Node findIdentity = this.nodeService.findIdentity();
            Node findNode = this.nodeService.findNode(nodeCommunication.getNodeId());
            this.log.info("Extracting batches for request {}. Starting at batch {}.  Ending at batch {}", new Object[]{Long.valueOf(extractRequest.getRequestId()), Long.valueOf(extractRequest.getStartBatchId()), Long.valueOf(extractRequest.getEndBatchId())});
            List<OutgoingBatch> batches = this.outgoingBatchService.getOutgoingBatchRange(extractRequest.getStartBatchId(), extractRequest.getEndBatchId()).getBatches();
            ProcessInfo newProcessInfo = this.statisticManager.newProcessInfo(new ProcessInfoKey(findIdentity.getNodeId(), nodeCommunication.getNodeId(), ProcessInfoKey.ProcessType.INITIAL_LOAD_EXTRACT_JOB));
            try {
                boolean z = true;
                Iterator<OutgoingBatch> it = batches.iterator();
                while (it.hasNext()) {
                    if (it.next().getStatus() != OutgoingBatch.Status.OK) {
                        z = false;
                    }
                }
                if (z) {
                    this.log.info("Batches already had an OK status for request {}, batches {} to {}.  Not extracting", new Object[]{Long.valueOf(extractRequest.getRequestId()), Long.valueOf(extractRequest.getStartBatchId()), Long.valueOf(extractRequest.getEndBatchId())});
                } else {
                    extractOutgoingBatch(newProcessInfo, findNode, new MultiBatchStagingWriter(findIdentity.getNodeId(), this.stagingManager, batches, this.configurationService.getChannel(batches.get(0).getChannelId()).getMaxBatchSize()), batches.get(0), false, false, ExtractMode.FOR_SYM_CLIENT);
                }
                boolean z2 = true;
                Iterator<OutgoingBatch> it2 = this.outgoingBatchService.getOutgoingBatchRange(extractRequest.getStartBatchId(), extractRequest.getEndBatchId()).getBatches().iterator();
                while (it2.hasNext()) {
                    if (it2.next().getStatus() != OutgoingBatch.Status.OK) {
                        z2 = false;
                    }
                }
                ISqlTransaction iSqlTransaction = null;
                try {
                    try {
                        iSqlTransaction = this.sqlTemplate.startSqlTransaction();
                        updateExtractRequestStatus(iSqlTransaction, extractRequest.getRequestId(), ExtractRequest.ExtractStatus.OK);
                        if (z2) {
                            this.log.info("Batches already had an OK status for request {}, batches {} to {}.  Not updating the status to NE", new Object[]{Long.valueOf(extractRequest.getRequestId()), Long.valueOf(extractRequest.getStartBatchId()), Long.valueOf(extractRequest.getEndBatchId())});
                        } else {
                            for (OutgoingBatch outgoingBatch : batches) {
                                outgoingBatch.setStatus(OutgoingBatch.Status.NE);
                                this.outgoingBatchService.updateOutgoingBatch(iSqlTransaction, outgoingBatch);
                            }
                        }
                        iSqlTransaction.commit();
                        close(iSqlTransaction);
                        newProcessInfo.setStatus(ProcessInfo.Status.DONE);
                    } catch (Throwable th) {
                        close(null);
                        throw th;
                    }
                } catch (Error e) {
                    if (iSqlTransaction != null) {
                        iSqlTransaction.rollback();
                    }
                    throw e;
                } catch (RuntimeException e2) {
                    if (iSqlTransaction != null) {
                        iSqlTransaction.rollback();
                    }
                    throw e2;
                }
            } catch (RuntimeException e3) {
                this.log.debug("Failed to extract batches for request {}. Starting at batch {}.  Ending at batch {}", new Object[]{Long.valueOf(extractRequest.getRequestId()), Long.valueOf(extractRequest.getStartBatchId()), Long.valueOf(extractRequest.getEndBatchId())});
                newProcessInfo.setStatus(ProcessInfo.Status.ERROR);
                throw e3;
            }
        }
    }
}
