/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.datafeed;

import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

class DatafeedJob {
    private static final Logger LOGGER = LogManager.getLogger(DatafeedJob.class);
    private static final int NEXT_TASK_DELAY_MS = 100;
    static final long MISSING_DATA_CHECK_INTERVAL_MS = 900000L;
    private final AnomalyDetectionAuditor auditor;
    private final String jobId;
    private final DataDescription dataDescription;
    private final long frequencyMs;
    private final long queryDelayMs;
    private final Client client;
    private final DataExtractorFactory dataExtractorFactory;
    private final DatafeedTimingStatsReporter timingStatsReporter;
    private final Supplier<Long> currentTimeSupplier;
    private final DelayedDataDetector delayedDataDetector;
    private final Integer maxEmptySearches;
    private volatile long lookbackStartTimeMs;
    private volatile long latestFinalBucketEndTimeMs;
    private volatile long lastDataCheckTimeMs;
    private volatile String lastDataCheckAnnotationId;
    private volatile Annotation lastDataCheckAnnotation;
    private volatile Long lastEndTimeMs;
    private AtomicBoolean running = new AtomicBoolean(true);
    private volatile boolean isIsolated;
    private volatile boolean haveEverSeenData;

    DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client, AnomalyDetectionAuditor auditor, Supplier<Long> currentTimeSupplier, DelayedDataDetector delayedDataDetector, Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, boolean haveSeenDataPreviously) {
        this.jobId = jobId;
        this.dataDescription = Objects.requireNonNull(dataDescription);
        this.frequencyMs = frequencyMs;
        this.queryDelayMs = queryDelayMs;
        this.dataExtractorFactory = dataExtractorFactory;
        this.timingStatsReporter = timingStatsReporter;
        this.client = client;
        this.auditor = auditor;
        this.currentTimeSupplier = currentTimeSupplier;
        this.delayedDataDetector = delayedDataDetector;
        this.maxEmptySearches = maxEmptySearches;
        this.latestFinalBucketEndTimeMs = latestFinalBucketEndTimeMs;
        long lastEndTime = Math.max(latestFinalBucketEndTimeMs, latestRecordTimeMs);
        if (lastEndTime > 0L) {
            this.lastEndTimeMs = lastEndTime;
        }
        this.haveEverSeenData = haveSeenDataPreviously;
    }

    void isolate() {
        this.isIsolated = true;
        this.timingStatsReporter.disallowPersisting();
    }

    boolean isIsolated() {
        return this.isIsolated;
    }

    public String getJobId() {
        return this.jobId;
    }

    public Integer getMaxEmptySearches() {
        return this.maxEmptySearches;
    }

    public void finishReportingTimingStats() {
        try {
            this.timingStatsReporter.finishReporting();
        }
        catch (Exception e) {
            LOGGER.warn("[{}] Datafeed timing stats could not be reported due to: {}", (Object)this.jobId, (Object)e);
        }
    }

    Long runLookBack(long startTime, Long endTime) throws Exception {
        this.lookbackStartTimeMs = this.skipToStartTime(startTime);
        Optional<Long> endMs = Optional.ofNullable(endTime);
        long lookbackEnd = endMs.orElse(this.currentTimeSupplier.get() - this.queryDelayMs);
        boolean isLookbackOnly = endMs.isPresent();
        if (lookbackEnd <= this.lookbackStartTimeMs) {
            if (isLookbackOnly) {
                return null;
            }
            this.auditor.info(this.jobId, Messages.getMessage((String)"Datafeed started in real-time"));
            return this.nextRealtimeTimestamp();
        }
        String msg = Messages.getMessage((String)"Datafeed started (from: {0} to: {1}) with frequency [{2}]", (Object[])new Object[]{DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(this.lookbackStartTimeMs), endTime == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lookbackEnd), TimeValue.timeValueMillis((long)this.frequencyMs).getStringRep()});
        this.auditor.info(this.jobId, msg);
        LOGGER.info("[{}] {}", (Object)this.jobId, (Object)msg);
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setCalcInterim(true);
        this.run(this.lookbackStartTimeMs, lookbackEnd, request);
        if (this.shouldPersistAfterLookback(isLookbackOnly)) {
            this.sendPersistRequest();
        }
        if (this.isRunning() && !this.isIsolated) {
            this.auditor.info(this.jobId, Messages.getMessage((String)"Datafeed lookback completed"));
            LOGGER.info("[{}] Lookback has finished", (Object)this.jobId);
            if (isLookbackOnly) {
                return null;
            }
            this.auditor.info(this.jobId, Messages.getMessage((String)"Datafeed continued in real-time"));
            return this.nextRealtimeTimestamp();
        }
        if (!this.isIsolated) {
            LOGGER.debug("[{}] Lookback finished after being stopped", (Object)this.jobId);
        }
        return null;
    }

    private long skipToStartTime(long startTime) {
        if (this.lastEndTimeMs != null) {
            if (this.lastEndTimeMs + 1L > startTime) {
                return this.lastEndTimeMs + 1L;
            }
            FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
            request.setSkipTime(String.valueOf(startTime));
            FlushJobAction.Response flushResponse = this.flushJob(request);
            LOGGER.info("[{}] Skipped to time [{}]", (Object)this.jobId, (Object)flushResponse.getLastFinalizedBucketEnd().getTime());
            return flushResponse.getLastFinalizedBucketEnd().getTime();
        }
        return startTime;
    }

    long runRealtime() throws Exception {
        long start = this.lastEndTimeMs == null ? this.lookbackStartTimeMs : Math.max(this.lookbackStartTimeMs, this.lastEndTimeMs + 1L);
        long nowMinusQueryDelay = this.currentTimeSupplier.get() - this.queryDelayMs;
        long end = this.toIntervalStartEpochMs(nowMinusQueryDelay);
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setCalcInterim(true);
        request.setAdvanceTime(String.valueOf(end));
        this.run(start, end, request);
        this.checkForMissingDataIfNecessary();
        return this.nextRealtimeTimestamp();
    }

    private void checkForMissingDataIfNecessary() {
        if (this.isRunning() && !this.isIsolated && this.checkForMissingDataTriggered()) {
            this.lastDataCheckTimeMs = this.currentTimeSupplier.get();
            List<DelayedDataDetectorFactory.BucketWithMissingData> missingDataBuckets = this.delayedDataDetector.detectMissingData(this.latestFinalBucketEndTimeMs);
            if (!missingDataBuckets.isEmpty()) {
                long totalRecordsMissing = missingDataBuckets.stream().mapToLong(DelayedDataDetectorFactory.BucketWithMissingData::getMissingDocumentCount).sum();
                Bucket lastBucket = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket();
                Date endTime = new Date((lastBucket.getEpoch() + lastBucket.getBucketSpan()) * 1000L);
                String msg = Messages.getMessage((String)"Datafeed has missed {0} documents due to ingest latency, latest bucket with missing data is [{1}]. Consider increasing query_delay", (Object[])new Object[]{totalRecordsMissing, XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime())});
                Annotation annotation = this.createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg);
                if (this.lastDataCheckAnnotation != null && annotation.getAnnotation().equals(this.lastDataCheckAnnotation.getAnnotation()) && annotation.getTimestamp().equals(this.lastDataCheckAnnotation.getTimestamp()) && annotation.getEndTimestamp().equals(this.lastDataCheckAnnotation.getEndTimestamp())) {
                    return;
                }
                this.auditor.warning(this.jobId, msg);
                if (this.lastDataCheckAnnotationId != null) {
                    this.updateAnnotation(annotation);
                } else {
                    this.lastDataCheckAnnotationId = this.addAndSetDelayedDataAnnotation(annotation);
                }
            }
        }
    }

    private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
        Date currentTime = new Date(this.currentTimeSupplier.get());
        return new Annotation(msg, currentTime, "_xpack", startTime, endTime, this.jobId, currentTime, "_xpack", "annotation");
    }

    /*
     * Enabled aggressive exception aggregation
     */
    private String addAndSetDelayedDataAnnotation(Annotation annotation) {
        try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS);){
            String string;
            block14: {
                IndexRequest request = new IndexRequest(".ml-annotations-write");
                request.source(xContentBuilder);
                ThreadContext.StoredContext ignore = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
                try {
                    IndexResponse response = (IndexResponse)this.client.index(request).actionGet();
                    this.lastDataCheckAnnotation = annotation;
                    string = response.getId();
                    if (ignore == null) break block14;
                }
                catch (Throwable throwable) {
                    if (ignore != null) {
                        try {
                            ignore.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                ignore.close();
            }
            return string;
        }
        catch (IOException ex) {
            String errorMessage = "[" + this.jobId + "] failed to create annotation for delayed data checker.";
            LOGGER.error(errorMessage, (Throwable)ex);
            this.auditor.error(this.jobId, errorMessage);
            return null;
        }
    }

    private void updateAnnotation(Annotation annotation) {
        Annotation updatedAnnotation = new Annotation(this.lastDataCheckAnnotation);
        updatedAnnotation.setModifiedUsername("_xpack");
        updatedAnnotation.setModifiedTime(new Date(this.currentTimeSupplier.get()));
        updatedAnnotation.setAnnotation(annotation.getAnnotation());
        updatedAnnotation.setTimestamp(annotation.getTimestamp());
        updatedAnnotation.setEndTimestamp(annotation.getEndTimestamp());
        try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS);){
            IndexRequest indexRequest = new IndexRequest(".ml-annotations-write");
            indexRequest.id(this.lastDataCheckAnnotationId);
            indexRequest.source(xContentBuilder);
            try (ThreadContext.StoredContext ignore = this.client.threadPool().getThreadContext().stashWithOrigin("ml");){
                this.client.index(indexRequest).actionGet();
                this.lastDataCheckAnnotation = updatedAnnotation;
            }
        }
        catch (IOException ex) {
            String errorMessage = "[" + this.jobId + "] failed to update annotation for delayed data checker.";
            LOGGER.error(errorMessage, (Throwable)ex);
            this.auditor.error(this.jobId, errorMessage);
        }
    }

    private boolean checkForMissingDataTriggered() {
        return this.currentTimeSupplier.get() > this.lastDataCheckTimeMs + Math.min(900000L, this.delayedDataDetector.getWindow());
    }

    public boolean stop() {
        return this.running.compareAndSet(true, false);
    }

    public boolean isRunning() {
        return this.running.get();
    }

    private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException {
        Date lastFinalizedBucketEnd;
        if (end <= start) {
            return;
        }
        LOGGER.trace("[{}] Searching data in: [{}, {})", (Object)this.jobId, (Object)start, (Object)end);
        AnalysisProblemException error = null;
        long recordCount = 0L;
        DataExtractor dataExtractor = this.dataExtractorFactory.newExtractor(start, end);
        while (dataExtractor.hasNext()) {
            DataCounts counts;
            Optional extractedData;
            if (!(!this.isIsolated && this.isRunning() || dataExtractor.isCancelled())) {
                dataExtractor.cancel();
            }
            if (this.isIsolated) {
                return;
            }
            try {
                extractedData = dataExtractor.next();
            }
            catch (Exception e) {
                LOGGER.debug("[" + this.jobId + "] error while extracting data", (Throwable)e);
                if (e.toString().contains("doc values")) {
                    throw new ExtractionProblemException(this.nextRealtimeTimestamp(), (Throwable)new IllegalArgumentException("One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds using aggregations"));
                }
                throw new ExtractionProblemException(this.nextRealtimeTimestamp(), (Throwable)e);
            }
            if (this.isIsolated) {
                return;
            }
            if (!extractedData.isPresent()) continue;
            try (InputStream in = (InputStream)extractedData.get();){
                counts = this.postData(in, XContentType.JSON);
                LOGGER.trace("[{}] Processed another {} records", (Object)this.jobId, (Object)counts.getProcessedRecordCount());
                this.timingStatsReporter.reportDataCounts(counts);
            }
            catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (this.isIsolated) {
                    return;
                }
                LOGGER.debug("[" + this.jobId + "] error while posting data", (Throwable)e);
                boolean shouldStop = this.isConflictException(e);
                error = new AnalysisProblemException(this.nextRealtimeTimestamp(), shouldStop, e);
                break;
            }
            this.haveEverSeenData |= (recordCount += counts.getProcessedRecordCount()) > 0L;
            if (counts.getLatestRecordTimeStamp() == null) continue;
            this.lastEndTimeMs = counts.getLatestRecordTimeStamp().getTime();
        }
        this.lastEndTimeMs = Math.max(this.lastEndTimeMs == null ? 0L : this.lastEndTimeMs, dataExtractor.getEndTime() - 1L);
        LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", (Object)this.jobId, error, (Object)recordCount, (Object)this.lastEndTimeMs, (Object)this.isRunning(), (Object)dataExtractor.isCancelled());
        if (error != null) {
            throw error;
        }
        if (this.isRunning() && !this.isIsolated && (lastFinalizedBucketEnd = this.flushJob(flushRequest).getLastFinalizedBucketEnd()) != null) {
            this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.getTime();
        }
        if (recordCount == 0L) {
            throw new EmptyDataCountException(this.nextRealtimeTimestamp(), this.haveEverSeenData);
        }
    }

    private DataCounts postData(InputStream inputStream, XContentType xContentType) throws IOException {
        PostDataAction.Request request = new PostDataAction.Request(this.jobId);
        request.setDataDescription(this.dataDescription);
        request.setContent(Streams.readFully((InputStream)inputStream), xContentType);
        try (ThreadContext.StoredContext ignore = this.client.threadPool().getThreadContext().stashWithOrigin("ml");){
            PostDataAction.Response response = (PostDataAction.Response)this.client.execute((ActionType)PostDataAction.INSTANCE, (ActionRequest)request).actionGet();
            DataCounts dataCounts = response.getDataCounts();
            return dataCounts;
        }
    }

    private boolean isConflictException(Exception e) {
        return e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException)e).status() == RestStatus.CONFLICT;
    }

    private long nextRealtimeTimestamp() {
        long next = this.currentTimeSupplier.get() + this.frequencyMs;
        return this.toIntervalStartEpochMs(next) + this.queryDelayMs % this.frequencyMs + 100L;
    }

    private long toIntervalStartEpochMs(long epochMs) {
        return epochMs / this.frequencyMs * this.frequencyMs;
    }

    private FlushJobAction.Response flushJob(FlushJobAction.Request flushRequest) {
        FlushJobAction.Response response;
        block8: {
            LOGGER.trace("[" + this.jobId + "] Sending flush request");
            ThreadContext.StoredContext ignore = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
            try {
                response = (FlushJobAction.Response)this.client.execute((ActionType)FlushJobAction.INSTANCE, (ActionRequest)flushRequest).actionGet();
                if (ignore == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (ignore != null) {
                        try {
                            ignore.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    LOGGER.debug("[" + this.jobId + "] error while flushing job", (Throwable)e);
                    boolean shouldStop = this.isConflictException(e);
                    throw new AnalysisProblemException(this.nextRealtimeTimestamp(), shouldStop, e);
                }
            }
            ignore.close();
        }
        return response;
    }

    private boolean shouldPersistAfterLookback(boolean isLookbackOnly) {
        return !isLookbackOnly && !this.isIsolated && this.isRunning();
    }

    private void sendPersistRequest() {
        try {
            LOGGER.trace("[" + this.jobId + "] Sending persist request");
            try (ThreadContext.StoredContext ignore = this.client.threadPool().getThreadContext().stashWithOrigin("ml");){
                this.client.execute((ActionType)PersistJobAction.INSTANCE, (ActionRequest)new PersistJobAction.Request(this.jobId));
            }
        }
        catch (Exception e) {
            LOGGER.debug("[" + this.jobId + "] error while persisting job", (Throwable)e);
        }
    }

    Long lastEndTimeMs() {
        return this.lastEndTimeMs;
    }

    static class EmptyDataCountException
    extends RuntimeException {
        final long nextDelayInMsSinceEpoch;
        final boolean haveEverSeenData;

        EmptyDataCountException(long nextDelayInMsSinceEpoch, boolean haveEverSeenData) {
            this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
            this.haveEverSeenData = haveEverSeenData;
        }
    }

    static class ExtractionProblemException
    extends RuntimeException {
        final long nextDelayInMsSinceEpoch;

        ExtractionProblemException(long nextDelayInMsSinceEpoch, Throwable cause) {
            super(cause);
            this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
        }
    }

    static class AnalysisProblemException
    extends RuntimeException {
        final boolean shouldStop;
        final long nextDelayInMsSinceEpoch;

        AnalysisProblemException(long nextDelayInMsSinceEpoch, boolean shouldStop, Throwable cause) {
            super(cause);
            this.shouldStop = shouldStop;
            this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
        }
    }
}

