/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.transform.transforms;

import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.TransformIndexer;
import org.elasticsearch.xpack.transform.transforms.TransformProgressGatherer;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;

class ClientTransformIndexer
extends TransformIndexer {
    private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class);
    private long logEvery = 1L;
    private long logCount = 0L;
    private final Client client;
    private final TransformConfigManager transformsConfigManager;
    private final CheckpointProvider checkpointProvider;
    private final TransformTask transformTask;
    private final AtomicInteger failureCount;
    private volatile boolean auditBulkFailures = true;
    private volatile boolean hasSourceChanged = true;
    private volatile String lastAuditedExceptionMessage = null;
    private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
    private volatile Instant changesLastDetectedAt;

    ClientTransformIndexer(TransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, AtomicReference<IndexerState> initialState, TransformIndexerPosition initialPosition, Client client, TransformAuditor auditor, TransformIndexerStats initialStats, TransformConfig transformConfig, Map<String, String> fieldMappings, TransformProgress transformProgress, TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint, TransformTask parentTask) {
        super(((TransformTask)((Object)ExceptionsHelper.requireNonNull((Object)((Object)parentTask), (String)"parentTask"))).getThreadPool().executor("generic"), (TransformAuditor)((Object)ExceptionsHelper.requireNonNull((Object)((Object)auditor), (String)"auditor")), transformConfig, fieldMappings, (AtomicReference)ExceptionsHelper.requireNonNull(initialState, (String)"initialState"), initialPosition, initialStats == null ? new TransformIndexerStats() : initialStats, transformProgress, lastCheckpoint, nextCheckpoint);
        this.transformsConfigManager = (TransformConfigManager)ExceptionsHelper.requireNonNull((Object)transformsConfigManager, (String)"transformsConfigManager");
        this.checkpointProvider = (CheckpointProvider)ExceptionsHelper.requireNonNull((Object)checkpointProvider, (String)"checkpointProvider");
        this.client = (Client)ExceptionsHelper.requireNonNull((Object)client, (String)"client");
        this.transformTask = parentTask;
        this.failureCount = new AtomicInteger(0);
    }

    @Override
    protected void onStart(long now, ActionListener<Boolean> listener) {
        if (this.transformTask.getTaskState() == TransformTaskState.FAILED) {
            logger.debug("[{}] attempted to start while failed.", (Object)this.getJobId());
            listener.onFailure((Exception)new ElasticsearchException("Attempted to start a failed transform [{}].", new Object[]{this.getJobId()}));
            return;
        }
        ActionListener updateConfigListener = ActionListener.wrap(updateConfigResponse -> {
            if (this.initialRun()) {
                this.createCheckpoint((ActionListener<TransformCheckpoint>)ActionListener.wrap(cp -> {
                    this.nextCheckpoint = cp;
                    if (this.nextCheckpoint.getCheckpoint() > 1L) {
                        this.progress = new TransformProgress(null, Long.valueOf(0L), Long.valueOf(0L));
                        super.onStart(now, listener);
                        return;
                    }
                    TransformProgressGatherer.getInitialProgress(this.client, this.buildFilterQuery(), this.getConfig(), (ActionListener<TransformProgress>)ActionListener.wrap(newProgress -> {
                        logger.trace("[{}] reset the progress from [{}] to [{}].", (Object)this.getJobId(), (Object)this.progress, newProgress);
                        this.progress = newProgress;
                        super.onStart(now, listener);
                    }, failure -> {
                        this.progress = null;
                        logger.warn((Message)new ParameterizedMessage("[{}] unable to load progress information for task.", (Object)this.getJobId()), (Throwable)failure);
                        super.onStart(now, listener);
                    }));
                }, arg_0 -> ((ActionListener)listener).onFailure(arg_0)));
            } else {
                super.onStart(now, listener);
            }
        }, arg_0 -> listener.onFailure(arg_0));
        ActionListener changedSourceListener = ActionListener.wrap(r -> {
            if (this.isContinuous()) {
                this.transformsConfigManager.getTransformConfiguration(this.getJobId(), (ActionListener<TransformConfig>)ActionListener.wrap(config -> {
                    this.transformConfig = config;
                    logger.debug("[{}] successfully refreshed transform config from index.", (Object)this.getJobId());
                    updateConfigListener.onResponse(null);
                }, failure -> {
                    String msg = TransformMessages.getMessage((String)"Failed to reload transform configuration for transform [{0}]", (Object[])new Object[]{this.getJobId()});
                    logger.error(msg, (Throwable)failure);
                    if (failure instanceof ResourceNotFoundException) {
                        updateConfigListener.onFailure((Exception)((Object)new TransformConfigReloadingException(msg, (Throwable)failure, new Object[0])));
                    } else {
                        this.auditor.warning(this.getJobId(), msg);
                        updateConfigListener.onResponse(null);
                    }
                }));
            } else {
                updateConfigListener.onResponse(null);
            }
        }, arg_0 -> listener.onFailure(arg_0));
        if (this.transformTask.getCheckpoint() > 0L && this.initialRun()) {
            this.sourceHasChanged((ActionListener<Boolean>)ActionListener.wrap(hasChanged -> {
                this.hasSourceChanged = hasChanged;
                if (hasChanged.booleanValue()) {
                    this.changesLastDetectedAt = Instant.now();
                    logger.debug("[{}] source has changed, triggering new indexer run.", (Object)this.getJobId());
                    changedSourceListener.onResponse(null);
                } else {
                    logger.trace("[{}] source has not changed, finish indexer early.", (Object)this.getJobId());
                    listener.onResponse((Object)false);
                }
            }, failure -> {
                this.hasSourceChanged = true;
                listener.onFailure(failure);
            }));
        } else {
            this.hasSourceChanged = true;
            changedSourceListener.onResponse(null);
        }
    }

    public CheckpointProvider getCheckpointProvider() {
        return this.checkpointProvider;
    }

    Instant getChangesLastDetectedAt() {
        return this.changesLastDetectedAt;
    }

    public synchronized boolean maybeTriggerAsyncJob(long now) {
        if (this.transformTask.getTaskState() == TransformTaskState.FAILED) {
            logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", (Object)this.getJobId());
            return false;
        }
        IndexerState indexerState = this.getState();
        if (IndexerState.INDEXING.equals((Object)indexerState) || IndexerState.STOPPING.equals((Object)indexerState)) {
            logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", (Object)this.getJobId(), (Object)indexerState);
            return false;
        }
        return super.maybeTriggerAsyncJob(now);
    }

    protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
        if (this.transformTask.getTaskState() == TransformTaskState.FAILED) {
            logger.debug("[{}] attempted to search while failed.", (Object)this.getJobId());
            nextPhase.onFailure((Exception)new ElasticsearchException("Attempted to do a search request for failed transform [{}].", new Object[]{this.getJobId()}));
            return;
        }
        ClientHelper.executeWithHeadersAsync((Map)this.transformConfig.getHeaders(), (String)"transform", (Client)this.client, (ActionType)SearchAction.INSTANCE, (ActionRequest)request, nextPhase);
    }

    protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
        if (this.transformTask.getTaskState() == TransformTaskState.FAILED) {
            logger.debug("[{}] attempted to bulk index while failed.", (Object)this.getJobId());
            nextPhase.onFailure((Exception)new ElasticsearchException("Attempted to do a bulk index request for failed transform [{}].", new Object[]{this.getJobId()}));
            return;
        }
        ClientHelper.executeWithHeadersAsync((Map)this.transformConfig.getHeaders(), (String)"transform", (Client)this.client, (ActionType)BulkAction.INSTANCE, (ActionRequest)request, (ActionListener)ActionListener.wrap(bulkResponse -> {
            if (bulkResponse.hasFailures()) {
                int failureCount = 0;
                for (BulkItemResponse item : bulkResponse.getItems()) {
                    if (!item.isFailed()) continue;
                    ++failureCount;
                }
                if (this.auditBulkFailures) {
                    this.auditor.warning(this.getJobId(), "Experienced at least [" + failureCount + "] bulk index failures. See the logs of the node running the transform for details. " + bulkResponse.buildFailureMessage());
                    this.auditBulkFailures = false;
                }
                nextPhase.onFailure((Exception)((Object)new BulkIndexingException("Bulk index experienced failures. See the logs of the node running the transform for details.", new Object[0])));
            } else {
                this.auditBulkFailures = true;
                nextPhase.onResponse(bulkResponse);
            }
        }, arg_0 -> nextPhase.onFailure(arg_0)));
    }

    protected void doSaveState(IndexerState indexerState, TransformIndexerPosition position, Runnable next) {
        if (this.transformTask.getTaskState() == TransformTaskState.FAILED) {
            logger.debug("[{}] attempted to save state and stats while failed.", (Object)this.getJobId());
            next.run();
            return;
        }
        if (indexerState.equals((Object)IndexerState.ABORTING)) {
            next.run();
            return;
        }
        if (!this.hasSourceChanged && !indexerState.equals((Object)IndexerState.STOPPED)) {
            next.run();
            return;
        }
        TransformTaskState taskState = this.transformTask.getTaskState();
        if (indexerState.equals((Object)IndexerState.STARTED) && this.transformTask.getCheckpoint() == 1L && !this.isContinuous()) {
            indexerState = IndexerState.STOPPED;
            this.auditor.info(this.transformConfig.getId(), "Transform finished indexing all data, initiating stop");
            logger.info("[{}] transform finished indexing all data, initiating stop.", (Object)this.transformConfig.getId());
        }
        if (indexerState.equals((Object)IndexerState.STOPPED)) {
            taskState = TransformTaskState.STOPPED;
        }
        TransformState state = new TransformState(taskState, indexerState, position, this.transformTask.getCheckpoint(), this.transformTask.getStateReason(), this.getProgress());
        logger.debug("[{}] updating persistent state of transform to [{}].", (Object)this.transformConfig.getId(), (Object)state.toString());
        SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = this.transformTask.getSeqNoPrimaryTermAndIndex();
        this.transformsConfigManager.putOrUpdateTransformStoredDoc(new TransformStoredDoc(this.getJobId(), state, (TransformIndexerStats)this.getStats()), seqNoPrimaryTermAndIndex, (ActionListener<SeqNoPrimaryTermAndIndex>)ActionListener.wrap(r -> {
            this.transformTask.updateSeqNoPrimaryTermAndIndex(seqNoPrimaryTermAndIndex, (SeqNoPrimaryTermAndIndex)r);
            if (state.getTaskState().equals((Object)TransformTaskState.STOPPED)) {
                this.transformTask.shutdown();
            }
            if (this.oldStatsCleanedUp.compareAndSet(false, true)) {
                this.transformsConfigManager.deleteOldTransformStoredDocuments(this.getJobId(), (ActionListener<Boolean>)ActionListener.wrap(nil -> {
                    logger.trace("[{}] deleted old transform stats and state document", (Object)this.getJobId());
                    next.run();
                }, e -> {
                    String msg = LoggerMessageFormat.format((String)"[{}] failed deleting old transform configurations.", (String)this.getJobId(), (Object[])new Object[0]);
                    logger.warn(msg, (Throwable)e);
                    this.oldStatsCleanedUp.set(false);
                    next.run();
                }));
            } else {
                next.run();
            }
        }, statsExc -> {
            logger.error((Message)new ParameterizedMessage("[{}] updating stats of transform failed.", (Object)this.transformConfig.getId()), (Throwable)statsExc);
            this.auditor.warning(this.getJobId(), "Failure updating stats of transform: " + statsExc.getMessage());
            if (state.getTaskState().equals((Object)TransformTaskState.STOPPED)) {
                this.transformTask.shutdown();
            }
            next.run();
        }));
    }

    protected void onFailure(Exception exc) {
        try {
            this.handleFailure(exc);
        }
        catch (Exception e) {
            logger.error((Message)new ParameterizedMessage("[{}] transform encountered an unexpected internal exception: ", (Object)this.getJobId()), (Throwable)e);
        }
    }

    @Override
    protected void onFinish(ActionListener<Void> listener) {
        try {
            if (!this.hasSourceChanged) {
                listener.onResponse(null);
                return;
            }
            super.onFinish(listener);
            long checkpoint = this.transformTask.incrementCheckpoint();
            this.lastCheckpoint = this.getNextCheckpoint();
            this.nextCheckpoint = null;
            this.failureCount.set(0);
            this.transformTask.setStateReason(null);
            if (this.progress != null && this.progress.getPercentComplete() != null && this.progress.getPercentComplete() < 100.0) {
                this.progress.incrementDocsProcessed(this.progress.getTotalDocs() - this.progress.getDocumentsProcessed());
            }
            if (this.lastCheckpoint != null && this.lastCheckpoint.getCheckpoint() > 1L) {
                long docsIndexed = 0L;
                long docsProcessed = 0L;
                if (this.progress != null) {
                    docsIndexed = this.progress.getDocumentsIndexed();
                    docsProcessed = this.progress.getDocumentsProcessed();
                }
                long durationMs = System.currentTimeMillis() - this.lastCheckpoint.getTimestamp();
                ((TransformIndexerStats)this.getStats()).incrementCheckpointExponentialAverages(durationMs < 0L ? 0L : durationMs, docsIndexed, docsProcessed);
            }
            if (this.shouldAuditOnFinish(checkpoint)) {
                this.auditor.info(this.getJobId(), "Finished indexing for transform checkpoint [" + checkpoint + "].");
            }
            logger.debug("[{}] finished indexing for transform checkpoint [{}].", (Object)this.getJobId(), (Object)checkpoint);
            this.auditBulkFailures = true;
            listener.onResponse(null);
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    protected boolean shouldAuditOnFinish(long completedCheckpoint) {
        if (++this.logCount % this.logEvery != 0L) {
            return false;
        }
        if (completedCheckpoint == 0L) {
            return true;
        }
        int log10Checkpoint = (int)Math.floor(Math.log10(completedCheckpoint));
        this.logEvery = log10Checkpoint >= 3 ? 1000L : (long)((int)Math.pow(10.0, log10Checkpoint));
        this.logCount = 0L;
        return true;
    }

    protected void onStop() {
        this.auditor.info(this.transformConfig.getId(), "Transform has stopped.");
        logger.info("[{}] transform has stopped.", (Object)this.transformConfig.getId());
    }

    protected void onAbort() {
        this.auditor.info(this.transformConfig.getId(), "Received abort request, stopping transform.");
        logger.info("[{}] transform received abort request. Stopping indexer.", (Object)this.transformConfig.getId());
        this.transformTask.shutdown();
    }

    @Override
    protected void createCheckpoint(ActionListener<TransformCheckpoint> listener) {
        this.checkpointProvider.createNextCheckpoint(this.getLastCheckpoint(), (ActionListener<TransformCheckpoint>)ActionListener.wrap(checkpoint -> this.transformsConfigManager.putTransformCheckpoint((TransformCheckpoint)checkpoint, (ActionListener<Boolean>)ActionListener.wrap(putCheckPointResponse -> listener.onResponse(checkpoint), createCheckpointException -> {
            logger.warn((Message)new ParameterizedMessage("[{}] failed to create checkpoint.", (Object)this.getJobId()), (Throwable)createCheckpointException);
            listener.onFailure((Exception)new RuntimeException("Failed to create checkpoint due to " + createCheckpointException.getMessage(), (Throwable)createCheckpointException));
        })), getCheckPointException -> {
            logger.warn((Message)new ParameterizedMessage("[{}] failed to retrieve checkpoint.", (Object)this.getJobId()), (Throwable)getCheckPointException);
            listener.onFailure((Exception)new RuntimeException("Failed to retrieve checkpoint due to " + getCheckPointException.getMessage(), (Throwable)getCheckPointException));
        }));
    }

    @Override
    protected void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
        this.checkpointProvider.sourceHasChanged(this.getLastCheckpoint(), (ActionListener<Boolean>)ActionListener.wrap(hasChanged -> {
            logger.trace("[{}] change detected [{}].", (Object)this.getJobId(), hasChanged);
            hasChangedListener.onResponse(hasChanged);
        }, e -> {
            logger.warn((Message)new ParameterizedMessage("[{}] failed to detect changes for transform. Skipping update till next check.", (Object)this.getJobId()), (Throwable)e);
            this.auditor.warning(this.getJobId(), "Failed to detect changes for transform, skipping update till next check. Exception: " + e.getMessage());
            hasChangedListener.onResponse((Object)false);
        }));
    }

    private boolean isIrrecoverableFailure(Exception e) {
        return e instanceof IndexNotFoundException || e instanceof AggregationResultUtils.AggregationExtractionException || e instanceof TransformConfigReloadingException;
    }

    synchronized void handleFailure(Exception e) {
        logger.warn((Message)new ParameterizedMessage("[{}] transform encountered an exception: ", (Object)this.getJobId()), (Throwable)e);
        if (this.handleCircuitBreakingException(e)) {
            return;
        }
        if (this.isIrrecoverableFailure(e) || this.failureCount.incrementAndGet() > this.transformTask.getNumFailureRetries()) {
            String failureMessage = this.isIrrecoverableFailure(e) ? "task encountered irrecoverable failure: " + e.getMessage() : "task encountered more than " + this.transformTask.getNumFailureRetries() + " failures; latest failure: " + e.getMessage();
            this.failIndexer(failureMessage);
        } else if (!e.getMessage().equals(this.lastAuditedExceptionMessage)) {
            this.auditor.warning(this.getJobId(), "Transform encountered an exception: " + e.getMessage() + " Will attempt again at next scheduled trigger.");
            this.lastAuditedExceptionMessage = e.getMessage();
        }
    }

    @Override
    protected void failIndexer(String failureMessage) {
        logger.error("[{}] transform has failed; experienced: [{}].", (Object)this.getJobId(), (Object)failureMessage);
        this.auditor.error(this.getJobId(), failureMessage);
        this.transformTask.markAsFailed(failureMessage, (ActionListener<Void>)ActionListener.wrap(r -> this.failureCount.set(0), e -> {}));
    }

    private static class TransformConfigReloadingException
    extends ElasticsearchException {
        TransformConfigReloadingException(String msg, Throwable cause, Object ... args) {
            super(msg, cause, args);
        }
    }

    private static class BulkIndexingException
    extends ElasticsearchException {
        BulkIndexingException(String msg, Object ... args) {
            super(msg, args);
        }
    }
}

