/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.slm;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration;
import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem;
import org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore;
import org.elasticsearch.xpack.slm.SnapshotLifecycleService;
import org.elasticsearch.xpack.slm.SnapshotLifecycleStats;
import org.elasticsearch.xpack.slm.UpdateSnapshotLifecycleStatsTask;

public class SnapshotRetentionTask
implements SchedulerEngine.Listener {
    private static final Logger logger = LogManager.getLogger(SnapshotRetentionTask.class);
    private static final AtomicBoolean running = new AtomicBoolean(false);
    private final Client client;
    private final ClusterService clusterService;
    private final LongSupplier nowNanoSupplier;
    private final ThreadPool threadPool;
    private final SnapshotHistoryStore historyStore;

    public SnapshotRetentionTask(Client client, ClusterService clusterService, LongSupplier nowNanoSupplier, SnapshotHistoryStore historyStore, ThreadPool threadPool) {
        this.client = new OriginSettingClient(client, "index_lifecycle");
        this.clusterService = clusterService;
        this.nowNanoSupplier = nowNanoSupplier;
        this.historyStore = historyStore;
        this.threadPool = threadPool;
    }

    public void triggered(SchedulerEngine.Event event) {
        assert (event.getJobName().equals("slm-retention-job") || event.getJobName().equals("slm-execute-manual-retention-job")) : "expected id to be slm-retention-job or slm-execute-manual-retention-job but it was " + event.getJobName();
        ClusterState state = this.clusterService.state();
        if (SnapshotLifecycleService.slmStoppedOrStopping(state) && !event.getJobName().equals("slm-execute-manual-retention-job")) {
            logger.debug("skipping SLM retention as SLM is currently stopped or stopping");
            return;
        }
        if (running.compareAndSet(false, true)) {
            final SnapshotLifecycleStats slmStats = new SnapshotLifecycleStats();
            final Consumer<Exception> failureHandler = e -> {
                try {
                    logger.error("error during snapshot retention task", (Throwable)e);
                    slmStats.retentionFailed();
                    this.updateStateWithStats(slmStats);
                }
                finally {
                    running.set(false);
                }
            };
            try {
                final TimeValue maxDeletionTime = (TimeValue)LifecycleSettings.SLM_RETENTION_DURATION_SETTING.get(state.metaData().settings());
                logger.info("starting SLM retention snapshot cleanup task");
                slmStats.retentionRun();
                final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state);
                Set<String> repositioriesToFetch = policiesWithRetention.values().stream().map(SnapshotLifecyclePolicy::getRepository).collect(Collectors.toSet());
                if (repositioriesToFetch.isEmpty()) {
                    running.set(false);
                    return;
                }
                this.getAllRetainableSnapshots(repositioriesToFetch, new ActionListener<Map<String, List<SnapshotInfo>>>(){

                    public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
                        try {
                            Map<String, List> snapshotsToBeDeleted = allSnapshots.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((List)e.getValue()).stream().filter(snapshot -> SnapshotRetentionTask.snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention)).collect(Collectors.toList())));
                            SnapshotRetentionTask.this.maybeDeleteSnapshots(snapshotsToBeDeleted, maxDeletionTime, slmStats);
                            SnapshotRetentionTask.this.updateStateWithStats(slmStats);
                        }
                        finally {
                            running.set(false);
                        }
                    }

                    public void onFailure(Exception e) {
                        failureHandler.accept(e);
                    }
                }, failureHandler);
            }
            catch (Exception e2) {
                failureHandler.accept(e2);
            }
        } else {
            logger.trace("snapshot lifecycle retention task started, but a task is already running, skipping");
        }
    }

    static Map<String, SnapshotLifecyclePolicy> getAllPoliciesWithRetentionEnabled(ClusterState state) {
        SnapshotLifecycleMetadata snapMeta = (SnapshotLifecycleMetadata)state.metaData().custom("snapshot_lifecycle");
        if (snapMeta == null) {
            return Collections.emptyMap();
        }
        return snapMeta.getSnapshotConfigurations().entrySet().stream().filter(e -> ((SnapshotLifecyclePolicyMetadata)e.getValue()).getPolicy().getRetentionPolicy() != null).filter(e -> !((SnapshotLifecyclePolicyMetadata)e.getValue()).getPolicy().getRetentionPolicy().equals((Object)SnapshotRetentionConfiguration.EMPTY)).collect(Collectors.toMap(Map.Entry::getKey, e -> ((SnapshotLifecyclePolicyMetadata)e.getValue()).getPolicy()));
    }

    static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map<String, List<SnapshotInfo>> allSnapshots, Map<String, SnapshotLifecyclePolicy> policies) {
        String policyId;
        if (snapshot.userMetadata() == null) {
            return false;
        }
        try {
            policyId = (String)snapshot.userMetadata().get("policy");
        }
        catch (Exception e) {
            logger.debug("unable to retrieve policy id from snapshot metadata [" + snapshot.userMetadata() + "]", (Throwable)e);
            return false;
        }
        if (policyId == null) {
            return false;
        }
        SnapshotLifecyclePolicy policy = policies.get(policyId);
        if (policy == null) {
            return false;
        }
        SnapshotRetentionConfiguration retention = policy.getRetentionPolicy();
        if (retention == null || retention.equals((Object)SnapshotRetentionConfiguration.EMPTY)) {
            return false;
        }
        String repository = policy.getRepository();
        boolean eligible = retention.getSnapshotDeletionPredicate(allSnapshots.get(repository).stream().filter(info -> Optional.ofNullable(info.userMetadata()).map(meta -> meta.get("policy")).map(pId -> pId.equals(policyId)).orElse(false)).collect(Collectors.toList())).test(snapshot);
        logger.debug("[{}] testing snapshot [{}] deletion eligibility: {}", (Object)repository, (Object)snapshot.snapshotId(), (Object)(eligible ? "ELIGIBLE" : "INELIGIBLE"));
        return eligible;
    }

    void getAllRetainableSnapshots(Collection<String> repositories, ActionListener<Map<String, List<SnapshotInfo>>> listener, Consumer<Exception> errorHandler) {
        if (repositories.isEmpty()) {
            listener.onResponse(Collections.emptyMap());
        }
        this.threadPool.generic().execute(() -> {
            final ConcurrentHashMap snapshots = new ConcurrentHashMap();
            CountDown countDown = new CountDown(repositories.size());
            final Runnable onComplete = () -> {
                if (countDown.countDown()) {
                    listener.onResponse((Object)snapshots);
                }
            };
            for (final String repository : repositories) {
                this.client.admin().cluster().prepareGetSnapshots(repository).execute((ActionListener)new ActionListener<GetSnapshotsResponse>(){

                    public void onResponse(GetSnapshotsResponse resp) {
                        HashSet<SnapshotState> retainableStates = new HashSet<SnapshotState>(Arrays.asList(SnapshotState.SUCCESS, SnapshotState.FAILED, SnapshotState.PARTIAL));
                        try {
                            snapshots.compute(repository, (k, previousSnaps) -> {
                                if (previousSnaps != null) {
                                    throw new IllegalStateException("duplicate snapshot retrieval for repository" + repository);
                                }
                                return resp.getSnapshots().stream().filter(info -> retainableStates.contains(info.state())).collect(Collectors.toList());
                            });
                            onComplete.run();
                        }
                        catch (Exception e) {
                            logger.error((Message)new ParameterizedMessage("exception computing snapshots for repository {}", (Object)repository), (Throwable)e);
                            throw e;
                        }
                    }

                    public void onFailure(Exception e) {
                        logger.warn((Message)new ParameterizedMessage("unable to retrieve snapshots for repository [{}]", (Object)repository), (Throwable)e);
                        onComplete.run();
                    }
                });
            }
        });
    }

    static String getPolicyId(SnapshotInfo snapshotInfo) {
        return Optional.ofNullable(snapshotInfo.userMetadata()).filter(meta -> meta.get("policy") != null).filter(meta -> meta.get("policy") instanceof String).map(meta -> (String)meta.get("policy")).orElseThrow(() -> new IllegalStateException("expected snapshot " + snapshotInfo + " to have a policy in its metadata, but it did not"));
    }

    private void maybeDeleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete, TimeValue maximumTime, SnapshotLifecycleStats slmStats) {
        int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum();
        if (count == 0) {
            logger.debug("no snapshots are eligible for deletion");
            return;
        }
        ClusterState state = this.clusterService.state();
        if (SnapshotRetentionTask.okayToDeleteSnapshots(state)) {
            this.deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
        } else {
            logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed");
            ClusterStateObserver observer = new ClusterStateObserver(this.clusterService, maximumTime, logger, this.threadPool.getThreadContext());
            CountDownLatch latch = new CountDownLatch(1);
            observer.waitForNextChange((ClusterStateObserver.Listener)new NoSnapshotRunningListener(observer, newState -> this.threadPool.executor("management").execute(() -> {
                try {
                    this.deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
                }
                finally {
                    latch.countDown();
                }
            }), e -> {
                latch.countDown();
                throw new ElasticsearchException((Throwable)e);
            }));
            try {
                latch.await();
            }
            catch (InterruptedException e2) {
                throw new ElasticsearchException((Throwable)e2);
            }
        }
    }

    void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete, TimeValue maximumTime, SnapshotLifecycleStats slmStats) {
        int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum();
        logger.info("starting snapshot retention deletion for [{}] snapshots", (Object)count);
        long startTime = this.nowNanoSupplier.getAsLong();
        AtomicInteger deleted = new AtomicInteger(0);
        AtomicInteger failed = new AtomicInteger(0);
        for (Map.Entry<String, List<SnapshotInfo>> entry : snapshotsToDelete.entrySet()) {
            String repo = entry.getKey();
            List<SnapshotInfo> snapshots = entry.getValue();
            for (SnapshotInfo info : snapshots) {
                String policyId = SnapshotRetentionTask.getPolicyId(info);
                long deleteStartTime = this.nowNanoSupplier.getAsLong();
                this.deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, (ActionListener<AcknowledgedResponse>)ActionListener.wrap(acknowledgedResponse -> {
                    deleted.incrementAndGet();
                    if (acknowledgedResponse.isAcknowledged()) {
                        this.historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord((long)Instant.now().toEpochMilli(), (String)info.snapshotId().getName(), (String)policyId, (String)repo));
                    } else {
                        SnapshotHistoryItem.deletionPossibleSuccessRecord((long)Instant.now().toEpochMilli(), (String)info.snapshotId().getName(), (String)policyId, (String)repo, (String)"deletion request issued successfully, no acknowledgement received");
                    }
                }, e -> {
                    failed.incrementAndGet();
                    try {
                        SnapshotHistoryItem result = SnapshotHistoryItem.deletionFailureRecord((long)Instant.now().toEpochMilli(), (String)info.snapshotId().getName(), (String)policyId, (String)repo, (Exception)e);
                        this.historyStore.putAsync(result);
                    }
                    catch (IOException ex) {
                        logger.error((Message)new ParameterizedMessage("failed to record snapshot deletion failure for snapshot lifecycle policy [{}]", (Object)policyId), (Throwable)ex);
                    }
                }));
                long finishTime = this.nowNanoSupplier.getAsLong();
                TimeValue deletionTime = TimeValue.timeValueNanos((long)(finishTime - deleteStartTime));
                logger.debug("elapsed time for deletion of [{}] snapshot: {}", (Object)info.snapshotId(), (Object)deletionTime);
                TimeValue totalDeletionTime = TimeValue.timeValueNanos((long)(finishTime - startTime));
                if (totalDeletionTime.compareTo(maximumTime) <= 0) continue;
                logger.info("maximum snapshot retention deletion time reached, time spent: [{}], maximum allowed time: [{}], deleted [{}] out of [{}] snapshots scheduled for deletion, failed to delete [{}]", (Object)totalDeletionTime, (Object)maximumTime, (Object)deleted, (Object)count, (Object)failed);
                slmStats.deletionTime(totalDeletionTime);
                slmStats.retentionTimedOut();
                return;
            }
        }
        TimeValue totalElapsedTime = TimeValue.timeValueNanos((long)(this.nowNanoSupplier.getAsLong() - startTime));
        logger.debug("total elapsed time for deletion of [{}] snapshots: {}", (Object)deleted, (Object)totalElapsedTime);
        slmStats.deletionTime(totalElapsedTime);
    }

    void deleteSnapshot(final String slmPolicy, final String repo, final SnapshotId snapshot, final SnapshotLifecycleStats slmStats, final ActionListener<AcknowledgedResponse> listener) {
        logger.info("[{}] snapshot retention deleting snapshot [{}]", (Object)repo, (Object)snapshot);
        CountDownLatch latch = new CountDownLatch(1);
        this.client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName()).execute((ActionListener)new LatchedActionListener((ActionListener)new ActionListener<AcknowledgedResponse>(){

            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
                if (acknowledgedResponse.isAcknowledged()) {
                    logger.debug("[{}] snapshot [{}] deleted successfully", (Object)repo, (Object)snapshot);
                } else {
                    logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", (Object)repo, (Object)snapshot);
                }
                slmStats.snapshotDeleted(slmPolicy);
                listener.onResponse((Object)acknowledgedResponse);
            }

            public void onFailure(Exception e) {
                logger.warn((Message)new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", (Object)repo, (Object)snapshot), (Throwable)e);
                slmStats.snapshotDeleteFailure(slmPolicy);
                listener.onFailure(e);
            }
        }, latch));
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            logger.error((Message)new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", (Object)repo, (Object)snapshot), (Throwable)e);
            listener.onFailure((Exception)e);
            slmStats.snapshotDeleteFailure(slmPolicy);
        }
    }

    void updateStateWithStats(SnapshotLifecycleStats newStats) {
        this.clusterService.submitStateUpdateTask("update_slm_stats", (ClusterStateTaskConfig)new UpdateSnapshotLifecycleStatsTask(newStats));
    }

    public static boolean okayToDeleteSnapshots(ClusterState state) {
        SnapshotsInProgress snapshotsInProgress = (SnapshotsInProgress)state.custom("snapshots");
        if (snapshotsInProgress != null && snapshotsInProgress.entries().size() > 0) {
            return false;
        }
        SnapshotDeletionsInProgress deletionsInProgress = (SnapshotDeletionsInProgress)state.custom("snapshot_deletions");
        if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
            return false;
        }
        RepositoryCleanupInProgress repositoryCleanupInProgress = (RepositoryCleanupInProgress)state.custom("repository_cleanup");
        if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
            return false;
        }
        RestoreInProgress restoreInProgress = (RestoreInProgress)state.custom("restore");
        return restoreInProgress == null;
    }

    class NoSnapshotRunningListener
    implements ClusterStateObserver.Listener {
        private final Consumer<ClusterState> reRun;
        private final Consumer<Exception> exceptionConsumer;
        private final ClusterStateObserver observer;

        NoSnapshotRunningListener(ClusterStateObserver observer, Consumer<ClusterState> reRun, Consumer<Exception> exceptionConsumer) {
            this.observer = observer;
            this.reRun = reRun;
            this.exceptionConsumer = exceptionConsumer;
        }

        public void onNewClusterState(ClusterState state) {
            try {
                if (SnapshotRetentionTask.okayToDeleteSnapshots(state)) {
                    logger.debug("retrying SLM snapshot retention deletion after snapshot operation has completed");
                    this.reRun.accept(state);
                } else {
                    this.observer.waitForNextChange((ClusterStateObserver.Listener)this);
                }
            }
            catch (Exception e) {
                this.exceptionConsumer.accept(e);
            }
        }

        public void onClusterServiceClose() {
        }

        public void onTimeout(TimeValue timeout) {
            this.exceptionConsumer.accept(new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshot operations to complete"));
        }
    }
}

