/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.dataframe;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.core.watcher.watch.Payload;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;

public class DataFrameAnalyticsTask
extends AllocatedPersistentTask
implements StartDataFrameAnalyticsAction.TaskMatcher {
    private static final Logger LOGGER = LogManager.getLogger(DataFrameAnalyticsTask.class);
    private final Client client;
    private final ClusterService clusterService;
    private final DataFrameAnalyticsManager analyticsManager;
    private final DataFrameAnalyticsAuditor auditor;
    private final StartDataFrameAnalyticsAction.TaskParams taskParams;
    @Nullable
    private volatile Long reindexingTaskId;
    private volatile boolean isReindexingFinished;
    private volatile boolean isStopping;
    private volatile boolean isMarkAsCompletedCalled;
    private final ProgressTracker progressTracker = new ProgressTracker();

    public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers, Client client, ClusterService clusterService, DataFrameAnalyticsManager analyticsManager, DataFrameAnalyticsAuditor auditor, StartDataFrameAnalyticsAction.TaskParams taskParams) {
        super(id, type, action, "data_frame_analytics-" + taskParams.getId(), parentTask, headers);
        this.client = Objects.requireNonNull(client);
        this.clusterService = Objects.requireNonNull(clusterService);
        this.analyticsManager = Objects.requireNonNull(analyticsManager);
        this.auditor = Objects.requireNonNull(auditor);
        this.taskParams = Objects.requireNonNull(taskParams);
    }

    public StartDataFrameAnalyticsAction.TaskParams getParams() {
        return this.taskParams;
    }

    public void setReindexingTaskId(Long reindexingTaskId) {
        LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", (Object)this.taskParams.getId(), (Object)reindexingTaskId, (Object)this.reindexingTaskId);
        this.reindexingTaskId = reindexingTaskId;
    }

    public void setReindexingFinished() {
        this.isReindexingFinished = true;
    }

    public boolean isStopping() {
        return this.isStopping;
    }

    public ProgressTracker getProgressTracker() {
        return this.progressTracker;
    }

    protected void onCancelled() {
        this.stop(this.getReasonCancelled(), TimeValue.ZERO);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void markAsCompleted() {
        DataFrameAnalyticsTask dataFrameAnalyticsTask = this;
        synchronized (dataFrameAnalyticsTask) {
            if (this.isMarkAsCompletedCalled) {
                return;
            }
            this.isMarkAsCompletedCalled = true;
        }
        this.persistProgress(() -> super.markAsCompleted());
    }

    public void markAsFailed(Exception e) {
        this.persistProgress(() -> super.markAsFailed(e));
    }

    public void stop(String reason, TimeValue timeout) {
        this.isStopping = true;
        ActionListener reindexProgressListener = ActionListener.wrap(aVoid -> this.doStop(reason, timeout), e -> {
            LOGGER.error((Message)new ParameterizedMessage("[{}] Error updating reindexing progress", (Object)this.taskParams.getId()), (Throwable)e);
            this.doStop(reason, timeout);
        });
        this.updateReindexTaskProgress((ActionListener<Void>)reindexProgressListener);
    }

    private void doStop(String reason, TimeValue timeout) {
        if (this.reindexingTaskId != null) {
            this.cancelReindexingTask(reason, timeout);
        }
        this.analyticsManager.stop(this);
    }

    private void cancelReindexingTask(String reason, TimeValue timeout) {
        TaskId reindexTaskId = new TaskId(this.clusterService.localNode().getId(), this.reindexingTaskId.longValue());
        LOGGER.debug("[{}] Cancelling reindex task [{}]", (Object)this.taskParams.getId(), (Object)reindexTaskId);
        CancelTasksRequest cancelReindex = new CancelTasksRequest();
        cancelReindex.setTaskId(reindexTaskId);
        cancelReindex.setReason(reason);
        cancelReindex.setTimeout(timeout);
        CancelTasksResponse cancelReindexResponse = (CancelTasksResponse)this.client.admin().cluster().cancelTasks(cancelReindex).actionGet();
        Throwable firstError = null;
        if (!cancelReindexResponse.getNodeFailures().isEmpty()) {
            firstError = ((ElasticsearchException)cancelReindexResponse.getNodeFailures().get(0)).getRootCause();
        }
        if (!cancelReindexResponse.getTaskFailures().isEmpty()) {
            firstError = ((TaskOperationFailure)cancelReindexResponse.getTaskFailures().get(0)).getCause();
        }
        if (firstError != null && !(ExceptionsHelper.unwrapCause((Throwable)firstError) instanceof ResourceNotFoundException)) {
            throw ExceptionsHelper.serverError((String)("[" + this.taskParams.getId() + "] Error cancelling reindex task"), (Throwable)firstError);
        }
        LOGGER.debug("[{}] Reindex task was successfully cancelled", (Object)this.taskParams.getId());
    }

    public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
        DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, this.getAllocationId(), reason);
        this.updatePersistentTaskState((PersistentTaskState)newTaskState, ActionListener.wrap(updatedTask -> {
            this.auditor.info(this.getParams().getId(), Messages.getMessage((String)"Successfully updated analytics task state to [{0}]", (Object[])new Object[]{state}));
            LOGGER.info("[{}] Successfully update task state to [{}]", (Object)this.getParams().getId(), (Object)state);
        }, e -> LOGGER.error((Message)new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]", new Object[]{this.getParams().getId(), state, reason}), (Throwable)e)));
    }

    public void updateReindexTaskProgress(ActionListener<Void> listener) {
        this.getReindexTaskProgress((ActionListener<Integer>)ActionListener.wrap(reindexTaskProgress -> {
            this.progressTracker.reindexingPercent.set(Math.max(1, reindexTaskProgress));
            listener.onResponse(null);
        }, arg_0 -> listener.onFailure(arg_0)));
    }

    private void getReindexTaskProgress(ActionListener<Integer> listener) {
        TaskId reindexTaskId = this.getReindexTaskId();
        if (reindexTaskId == null) {
            listener.onResponse((Object)(this.isReindexingFinished ? 100 : 0));
            return;
        }
        GetTaskRequest getTaskRequest = new GetTaskRequest();
        getTaskRequest.setTaskId(reindexTaskId);
        this.client.admin().cluster().getTask(getTaskRequest, ActionListener.wrap(taskResponse -> {
            TaskResult taskResult = taskResponse.getTask();
            BulkByScrollTask.Status taskStatus = (BulkByScrollTask.Status)taskResult.getTask().getStatus();
            int progress = (int)((double)taskStatus.getCreated() * 100.0 / (double)taskStatus.getTotal());
            listener.onResponse((Object)progress);
        }, error -> {
            if (ExceptionsHelper.unwrapCause((Throwable)error) instanceof ResourceNotFoundException) {
                listener.onResponse((Object)(this.isReindexingFinished ? 100 : 0));
            } else {
                listener.onFailure(error);
            }
        }));
    }

    @Nullable
    private TaskId getReindexTaskId() {
        try {
            return new TaskId(this.clusterService.localNode().getId(), this.reindexingTaskId.longValue());
        }
        catch (NullPointerException e) {
            return null;
        }
    }

    private void persistProgress(Runnable runnable) {
        LOGGER.debug("[{}] Persisting progress", (Object)this.taskParams.getId());
        GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(this.taskParams.getId());
        ClientHelper.executeAsyncWithOrigin((Client)this.client, (String)"ml", (ActionType)GetDataFrameAnalyticsStatsAction.INSTANCE, (ActionRequest)getStatsRequest, (ActionListener)ActionListener.wrap(statsResponse -> {
            GetDataFrameAnalyticsStatsAction.Response.Stats stats = (GetDataFrameAnalyticsStatsAction.Response.Stats)statsResponse.getResponse().results().get(0);
            IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
            indexRequest.id(DataFrameAnalyticsTask.progressDocId(this.taskParams.getId()));
            indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder();){
                new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
                indexRequest.source(jsonBuilder);
            }
            ClientHelper.executeAsyncWithOrigin((Client)this.client, (String)"ml", (ActionType)IndexAction.INSTANCE, (ActionRequest)indexRequest, (ActionListener)ActionListener.wrap(indexResponse -> {
                LOGGER.debug("[{}] Successfully indexed progress document", (Object)this.taskParams.getId());
                runnable.run();
            }, indexError -> {
                LOGGER.error((Message)new ParameterizedMessage("[{}] cannot persist progress as an error occurred while indexing", (Object)this.taskParams.getId()), (Throwable)indexError);
                runnable.run();
            }));
        }, e -> {
            LOGGER.error((Message)new ParameterizedMessage("[{}] cannot persist progress as an error occurred while retrieving stats", (Object)this.taskParams.getId()), (Throwable)e);
            runnable.run();
        }));
    }

    public static StartingState determineStartingState(String jobId, List<PhaseProgress> progressOnStart) {
        PhaseProgress lastIncompletePhase = null;
        for (PhaseProgress phaseProgress : progressOnStart) {
            if (phaseProgress.getProgressPercent() >= 100) continue;
            lastIncompletePhase = phaseProgress;
            break;
        }
        if (lastIncompletePhase == null) {
            return StartingState.FINISHED;
        }
        LOGGER.debug("[{}] Last incomplete progress [{}, {}]", (Object)jobId, (Object)lastIncompletePhase.getPhase(), (Object)lastIncompletePhase.getProgressPercent());
        switch (lastIncompletePhase.getPhase()) {
            case "reindexing": {
                return lastIncompletePhase.getProgressPercent() == 0 ? StartingState.FIRST_TIME : StartingState.RESUMING_REINDEXING;
            }
            case "loading_data": 
            case "analyzing": 
            case "writing_results": {
                return StartingState.RESUMING_ANALYZING;
            }
        }
        LOGGER.warn("[{}] Unexpected progress phase [{}]", (Object)jobId, (Object)lastIncompletePhase.getPhase());
        return StartingState.FIRST_TIME;
    }

    public static String progressDocId(String id) {
        return "data_frame_analytics-" + id + "-progress";
    }

    public static class ProgressTracker {
        public static final String REINDEXING = "reindexing";
        public static final String LOADING_DATA = "loading_data";
        public static final String ANALYZING = "analyzing";
        public static final String WRITING_RESULTS = "writing_results";
        public final AtomicInteger reindexingPercent = new AtomicInteger(0);
        public final AtomicInteger loadingDataPercent = new AtomicInteger(0);
        public final AtomicInteger analyzingPercent = new AtomicInteger(0);
        public final AtomicInteger writingResultsPercent = new AtomicInteger(0);

        public List<PhaseProgress> report() {
            return Arrays.asList(new PhaseProgress(REINDEXING, this.reindexingPercent.get()), new PhaseProgress(LOADING_DATA, this.loadingDataPercent.get()), new PhaseProgress(ANALYZING, this.analyzingPercent.get()), new PhaseProgress(WRITING_RESULTS, this.writingResultsPercent.get()));
        }
    }

    public static enum StartingState {
        FIRST_TIME,
        RESUMING_REINDEXING,
        RESUMING_ANALYZING,
        FINISHED;

    }
}

