/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.dataframe.process;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcess;
import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessConfig;
import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessFactory;
import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsResultProcessor;
import org.elasticsearch.xpack.ml.dataframe.process.DataFrameRowsJoiner;
import org.elasticsearch.xpack.ml.dataframe.process.customprocessing.CustomProcessor;
import org.elasticsearch.xpack.ml.dataframe.process.customprocessing.CustomProcessorFactory;
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;

public class AnalyticsProcessManager {
    private static final Logger LOGGER = LogManager.getLogger(AnalyticsProcessManager.class);
    private final Client client;
    private final ThreadPool threadPool;
    private final AnalyticsProcessFactory<AnalyticsResult> processFactory;
    private final ConcurrentMap<Long, ProcessContext> processContextByAllocation = new ConcurrentHashMap<Long, ProcessContext>();
    private final DataFrameAnalyticsAuditor auditor;

    public AnalyticsProcessManager(Client client, ThreadPool threadPool, AnalyticsProcessFactory<AnalyticsResult> analyticsProcessFactory, DataFrameAnalyticsAuditor auditor) {
        this.client = Objects.requireNonNull(client);
        this.threadPool = Objects.requireNonNull(threadPool);
        this.processFactory = Objects.requireNonNull(analyticsProcessFactory);
        this.auditor = Objects.requireNonNull(auditor);
    }

    public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory, Consumer<Exception> finishHandler) {
        this.threadPool.generic().execute(() -> {
            if (task.isStopping()) {
                finishHandler.accept(null);
                return;
            }
            this.refreshDest(config);
            ProcessContext processContext = new ProcessContext(config.getId());
            if (this.processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
                finishHandler.accept((Exception)ExceptionsHelper.serverError((String)("[" + processContext.id + "] Could not create process as one already exists")));
                return;
            }
            BytesReference state = this.getModelState(config);
            if (processContext.startProcess(dataExtractorFactory, config, task, state)) {
                ExecutorService executorService = this.threadPool.executor("ml_job_comms");
                executorService.execute(() -> this.processResults(processContext));
                executorService.execute(() -> this.processData(task, config, processContext.dataExtractor, processContext.process, processContext.resultProcessor, finishHandler, state));
            } else {
                finishHandler.accept(null);
            }
        });
    }

    @Nullable
    private BytesReference getModelState(DataFrameAnalyticsConfig config) {
        if (!config.getAnalysis().persistsState()) {
            return null;
        }
        try (ThreadContext.StoredContext ignore = this.client.threadPool().getThreadContext().stashWithOrigin("ml");){
            SearchRequest searchRequest = new SearchRequest(new String[]{AnomalyDetectorsIndex.jobStateIndexPattern()});
            searchRequest.source().size(1).query((QueryBuilder)QueryBuilders.idsQuery().addIds(new String[]{config.getAnalysis().getStateDocId(config.getId())}));
            SearchResponse searchResponse = (SearchResponse)this.client.prepareSearch(new String[]{AnomalyDetectorsIndex.jobStateIndexPattern()}).setSize(1).setQuery((QueryBuilder)QueryBuilders.idsQuery().addIds(new String[]{config.getAnalysis().getStateDocId(config.getId())})).get();
            SearchHit[] hits = searchResponse.getHits().getHits();
            BytesReference bytesReference = hits.length == 0 ? null : hits[0].getSourceRef();
            return bytesReference;
        }
    }

    private void processResults(ProcessContext processContext) {
        try {
            processContext.resultProcessor.process(processContext.process);
        }
        catch (Exception e) {
            processContext.setFailureReason(e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, AnalyticsProcess<AnalyticsResult> process, AnalyticsResultProcessor resultProcessor, Consumer<Exception> finishHandler, BytesReference state) {
        try {
            ProcessContext processContext = (ProcessContext)this.processContextByAllocation.get(task.getAllocationId());
            this.writeHeaderRecord(dataExtractor, process);
            this.writeDataRows(dataExtractor, process, config.getAnalysis(), task.getProgressTracker());
            process.writeEndOfDataMessage();
            process.flushStream();
            this.restoreState(config, state, process, finishHandler);
            LOGGER.info("[{}] Waiting for result processor to complete", (Object)config.getId());
            resultProcessor.awaitForCompletion();
            processContext.setFailureReason(resultProcessor.getFailure());
            this.refreshDest(config);
            LOGGER.info("[{}] Result processor has completed", (Object)config.getId());
        }
        catch (Exception e) {
            if (!task.isStopping()) {
                String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", (Object)config.getId(), (Object)e.getMessage()).getFormattedMessage();
                LOGGER.error(errorMsg, (Throwable)e);
                ((ProcessContext)this.processContextByAllocation.get(task.getAllocationId())).setFailureReason(errorMsg);
            }
        }
        finally {
            this.closeProcess(task);
            ProcessContext processContext = (ProcessContext)this.processContextByAllocation.remove(task.getAllocationId());
            LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", (Object)config.getId(), (Object)this.processContextByAllocation.size());
            if (processContext.getFailureReason() == null) {
                LOGGER.info("[{}] Marking task completed", (Object)config.getId());
                finishHandler.accept(null);
            } else {
                LOGGER.error("[{}] Marking task failed; {}", (Object)config.getId(), (Object)processContext.getFailureReason());
                task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
            }
        }
    }

    private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess<AnalyticsResult> process, DataFrameAnalysis analysis, DataFrameAnalyticsTask.ProgressTracker progressTracker) throws IOException {
        CustomProcessor customProcessor = new CustomProcessorFactory(dataExtractor.getFieldNames()).create(analysis);
        String[] record = new String[dataExtractor.getFieldNames().size() + 2];
        record[record.length - 1] = "";
        long totalRows = process.getConfig().rows();
        long rowsProcessed = 0L;
        while (dataExtractor.hasNext()) {
            Optional<List<DataFrameDataExtractor.Row>> rows = dataExtractor.next();
            if (!rows.isPresent()) continue;
            for (DataFrameDataExtractor.Row row : rows.get()) {
                if (row.shouldSkip()) continue;
                String[] rowValues = row.getValues();
                System.arraycopy(rowValues, 0, record, 0, rowValues.length);
                record[record.length - 2] = String.valueOf(row.getChecksum());
                customProcessor.process(record);
                process.writeRecord(record);
            }
            progressTracker.loadingDataPercent.set((rowsProcessed += (long)rows.get().size()) >= totalRows ? 100 : (int)((double)rowsProcessed * 100.0 / (double)totalRows));
        }
    }

    private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsProcess<AnalyticsResult> process) throws IOException {
        List<String> fieldNames = dataExtractor.getFieldNames();
        String[] headerRecord = new String[fieldNames.size() + 2];
        for (int i = 0; i < fieldNames.size(); ++i) {
            headerRecord[i] = fieldNames.get(i);
        }
        headerRecord[headerRecord.length - 2] = ".";
        headerRecord[headerRecord.length - 1] = ".";
        process.writeRecord(headerRecord);
    }

    private void restoreState(DataFrameAnalyticsConfig config, @Nullable BytesReference state, AnalyticsProcess<AnalyticsResult> process, Consumer<Exception> failureHandler) {
        if (!config.getAnalysis().persistsState()) {
            LOGGER.debug("[{}] Analysis does not support state", (Object)config.getId());
            return;
        }
        if (state == null) {
            LOGGER.debug("[{}] No model state available to restore", (Object)config.getId());
            return;
        }
        LOGGER.debug("[{}] Restoring from previous model state", (Object)config.getId());
        this.auditor.info(config.getId(), "Restoring from previous model state");
        try (ThreadContext.StoredContext ignore = this.client.threadPool().getThreadContext().stashWithOrigin("ml");){
            process.restoreState(state);
        }
        catch (Exception e) {
            LOGGER.error((Message)new ParameterizedMessage("[{}] Failed to restore state", (Object)process.getConfig().jobId()), (Throwable)e);
            failureHandler.accept((Exception)ExceptionsHelper.serverError((String)"Failed to restore state", (Throwable)e));
        }
    }

    private AnalyticsProcess<AnalyticsResult> createProcess(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, AnalyticsProcessConfig analyticsProcessConfig, @Nullable BytesReference state) {
        ExecutorService executorService = this.threadPool.executor("ml_job_comms");
        AnalyticsProcess<AnalyticsResult> process = this.processFactory.createAnalyticsProcess(config, analyticsProcessConfig, state, executorService, this.onProcessCrash(task));
        if (!process.isProcessAlive()) {
            throw ExceptionsHelper.serverError((String)"Failed to start data frame analytics process");
        }
        return process;
    }

    private Consumer<String> onProcessCrash(DataFrameAnalyticsTask task) {
        return reason -> {
            ProcessContext processContext = (ProcessContext)this.processContextByAllocation.get(task.getAllocationId());
            if (processContext != null) {
                processContext.setFailureReason(reason);
                processContext.stop();
            }
        };
    }

    private void refreshDest(DataFrameAnalyticsConfig config) {
        ClientHelper.executeWithHeaders((Map)config.getHeaders(), (String)"ml", (Client)this.client, () -> (RefreshResponse)this.client.execute((ActionType)RefreshAction.INSTANCE, (ActionRequest)new RefreshRequest(new String[]{config.getDest().getIndex()})).actionGet());
    }

    private void closeProcess(DataFrameAnalyticsTask task) {
        String configId = task.getParams().getId();
        LOGGER.info("[{}] Closing process", (Object)configId);
        ProcessContext processContext = (ProcessContext)this.processContextByAllocation.get(task.getAllocationId());
        try {
            processContext.process.close();
            LOGGER.info("[{}] Closed process", (Object)configId);
        }
        catch (Exception e) {
            String errorMsg = new ParameterizedMessage("[{}] Error closing data frame analyzer process [{}]", (Object)configId, (Object)e.getMessage()).getFormattedMessage();
            processContext.setFailureReason(errorMsg);
        }
    }

    public void stop(DataFrameAnalyticsTask task) {
        ProcessContext processContext = (ProcessContext)this.processContextByAllocation.get(task.getAllocationId());
        if (processContext != null) {
            LOGGER.debug("[{}] Stopping process", (Object)task.getParams().getId());
            processContext.stop();
        } else {
            LOGGER.debug("[{}] No process context to stop", (Object)task.getParams().getId());
            task.markAsCompleted();
        }
    }

    class ProcessContext {
        private final String id;
        private volatile AnalyticsProcess<AnalyticsResult> process;
        private volatile DataFrameDataExtractor dataExtractor;
        private volatile AnalyticsResultProcessor resultProcessor;
        private volatile boolean processKilled;
        private volatile String failureReason;

        ProcessContext(String id) {
            this.id = Objects.requireNonNull(id);
        }

        public String getId() {
            return this.id;
        }

        public boolean isProcessKilled() {
            return this.processKilled;
        }

        private synchronized void setFailureReason(String failureReason) {
            if (failureReason != null) {
                this.failureReason = failureReason;
            }
        }

        private String getFailureReason() {
            return this.failureReason;
        }

        public synchronized void stop() {
            LOGGER.debug("[{}] Stopping process", (Object)this.id);
            this.processKilled = true;
            if (this.dataExtractor != null) {
                this.dataExtractor.cancel();
            }
            if (this.process != null) {
                try {
                    this.process.kill();
                }
                catch (IOException e) {
                    LOGGER.error((Message)new ParameterizedMessage("[{}] Failed to kill process", (Object)this.id), (Throwable)e);
                }
            }
        }

        private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config, DataFrameAnalyticsTask task, @Nullable BytesReference state) {
            if (this.processKilled) {
                return false;
            }
            this.dataExtractor = dataExtractorFactory.newExtractor(false);
            AnalyticsProcessConfig analyticsProcessConfig = this.createProcessConfig(config, this.dataExtractor);
            LOGGER.trace("[{}] creating analytics process with config [{}]", (Object)config.getId(), (Object)Strings.toString((ToXContent)analyticsProcessConfig));
            if (analyticsProcessConfig.rows() == 0L) {
                LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", (Object)config.getId());
                return false;
            }
            this.process = AnalyticsProcessManager.this.createProcess(task, config, analyticsProcessConfig, state);
            DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), AnalyticsProcessManager.this.client, dataExtractorFactory.newExtractor(true));
            this.resultProcessor = new AnalyticsResultProcessor(this.id, dataFrameRowsJoiner, this::isProcessKilled, task.getProgressTracker());
            return true;
        }

        private AnalyticsProcessConfig createProcessConfig(DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor) {
            DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
            Set<String> categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis());
            AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig(config.getId(), dataSummary.rows, dataSummary.cols, config.getModelMemoryLimit(), 1, config.getDest().getResultsField(), categoricalFields, config.getAnalysis());
            return processConfig;
        }
    }
}

