/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.retention;

import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;

public class ExpiredForecastsRemover
implements MlDataRemover {
    private static final Logger LOGGER = LogManager.getLogger(ExpiredForecastsRemover.class);
    private static final int MAX_FORECASTS = 10000;
    private static final String RESULTS_INDEX_PATTERN = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*";
    private final Client client;
    private final ThreadPool threadPool;
    private final long cutoffEpochMs;

    public ExpiredForecastsRemover(Client client, ThreadPool threadPool) {
        this.client = Objects.requireNonNull(client);
        this.threadPool = Objects.requireNonNull(threadPool);
        this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
    }

    @Override
    public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
        LOGGER.debug("Removing forecasts that expire before [{}]", (Object)this.cutoffEpochMs);
        ActionListener forecastStatsHandler = ActionListener.wrap(searchResponse -> this.deleteForecasts((SearchResponse)searchResponse, listener, isTimedOutSupplier), e -> listener.onFailure((Exception)new ElasticsearchException("An error occurred while searching forecasts to delete", (Throwable)e, new Object[0])));
        SearchSourceBuilder source = new SearchSourceBuilder();
        source.query((QueryBuilder)QueryBuilders.boolQuery().filter((QueryBuilder)QueryBuilders.termQuery((String)Result.RESULT_TYPE.getPreferredName(), (String)"model_forecast_request_stats")).filter((QueryBuilder)QueryBuilders.existsQuery((String)ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
        source.size(10000);
        source.trackTotalHits(true);
        source.sort("_doc");
        SearchRequest searchRequest = new SearchRequest(new String[]{RESULTS_INDEX_PATTERN});
        searchRequest.source(source);
        this.client.execute((ActionType)SearchAction.INSTANCE, (ActionRequest)searchRequest, (ActionListener)new ThreadedActionListener(LOGGER, this.threadPool, "ml_utility", forecastStatsHandler, false));
    }

    private void deleteForecasts(SearchResponse searchResponse, final ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
        List<ForecastRequestStats> forecastsToDelete;
        try {
            forecastsToDelete = this.findForecastsToDelete(searchResponse);
        }
        catch (IOException e) {
            listener.onFailure((Exception)e);
            return;
        }
        if (isTimedOutSupplier.get().booleanValue()) {
            listener.onResponse((Object)false);
            return;
        }
        DeleteByQueryRequest request = this.buildDeleteByQuery(forecastsToDelete);
        this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)request, (ActionListener)new ActionListener<BulkByScrollResponse>(){

            public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
                try {
                    if (bulkByScrollResponse.getDeleted() > 0L) {
                        LOGGER.info("Deleted [{}] documents corresponding to [{}] expired forecasts", (Object)bulkByScrollResponse.getDeleted(), (Object)forecastsToDelete.size());
                    }
                    listener.onResponse((Object)true);
                }
                catch (Exception e) {
                    this.onFailure(e);
                }
            }

            public void onFailure(Exception e) {
                listener.onFailure((Exception)new ElasticsearchException("Failed to remove expired forecasts", (Throwable)e, new Object[0]));
            }
        });
    }

    private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
        ArrayList<ForecastRequestStats> forecastsToDelete = new ArrayList<ForecastRequestStats>();
        SearchHits hits = searchResponse.getHits();
        if (hits.getTotalHits().value > 10000L) {
            LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", (Object)10000, (Object)10000);
        }
        for (SearchHit hit : hits.getHits()) {
            try (StreamInput stream = hit.getSourceRef().streamInput();
                 XContentParser parser = XContentFactory.xContent((XContentType)XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, (InputStream)stream);){
                ForecastRequestStats forecastRequestStats = (ForecastRequestStats)ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
                if (forecastRequestStats.getExpiryTime().toEpochMilli() >= this.cutoffEpochMs) continue;
                forecastsToDelete.add(forecastRequestStats);
            }
        }
        return forecastsToDelete;
    }

    private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
        DeleteByQueryRequest request = new DeleteByQueryRequest();
        request.setSlices(0);
        request.indices(new String[]{RESULTS_INDEX_PATTERN});
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
        boolQuery.must((QueryBuilder)QueryBuilders.termsQuery((String)Result.RESULT_TYPE.getPreferredName(), (String[])new String[]{"model_forecast_request_stats", "model_forecast"}));
        for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
            boolQuery.should((QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.termQuery((String)Job.ID.getPreferredName(), (String)forecastToDelete.getJobId())).must((QueryBuilder)QueryBuilders.termQuery((String)Forecast.FORECAST_ID.getPreferredName(), (String)forecastToDelete.getForecastId())));
        }
        BoolQueryBuilder query = QueryBuilders.boolQuery().filter((QueryBuilder)boolQuery);
        request.setQuery((QueryBuilder)query);
        request.getSearchRequest().source().sort("_doc");
        return request;
    }
}

