/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.dataframe.process;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;

class DataFrameRowsJoiner
implements AutoCloseable {
    private static final Logger LOGGER = LogManager.getLogger(DataFrameRowsJoiner.class);
    private static final int RESULTS_BATCH_SIZE = 1000;
    private final String analyticsId;
    private final Client client;
    private final DataFrameDataExtractor dataExtractor;
    private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
    private LinkedList<RowResults> currentResults;
    private volatile String failure;

    DataFrameRowsJoiner(String analyticsId, Client client, DataFrameDataExtractor dataExtractor) {
        this.analyticsId = Objects.requireNonNull(analyticsId);
        this.client = Objects.requireNonNull(client);
        this.dataExtractor = Objects.requireNonNull(dataExtractor);
        this.dataFrameRowsIterator = new ResultMatchingDataFrameRows();
        this.currentResults = new LinkedList();
    }

    @Nullable
    String getFailure() {
        return this.failure;
    }

    void processRowResults(RowResults rowResults) {
        if (this.failure != null) {
            return;
        }
        try {
            this.addResultAndJoinIfEndOfBatch(rowResults);
        }
        catch (Exception e) {
            LOGGER.error((Message)new ParameterizedMessage("[{}] Failed to join results ", (Object)this.analyticsId), (Throwable)e);
            this.failure = "[" + this.analyticsId + "] Failed to join results: " + e.getMessage();
        }
    }

    private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
        this.currentResults.add(rowResults);
        if (this.currentResults.size() == 1000) {
            this.joinCurrentResults();
        }
    }

    private void joinCurrentResults() {
        BulkRequest bulkRequest = new BulkRequest();
        while (!this.currentResults.isEmpty()) {
            RowResults result = this.currentResults.pop();
            DataFrameDataExtractor.Row row = this.dataFrameRowsIterator.next();
            this.checkChecksumsMatch(row, result);
            bulkRequest.add(this.createIndexRequest(result, row.getHit()));
        }
        if (bulkRequest.numberOfActions() > 0) {
            this.executeBulkRequest(bulkRequest);
        }
        this.currentResults = new LinkedList();
    }

    private void checkChecksumsMatch(DataFrameDataExtractor.Row row, RowResults result) {
        if (row.getChecksum() != result.getChecksum()) {
            String msg = "Detected checksum mismatch for document with id [" + row.getHit().getId() + "]; ";
            msg = msg + "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; ";
            msg = msg + "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. ";
            msg = msg + "We rely on this index being immutable during a running analysis and so the results will be unreliable.";
            throw ExceptionsHelper.serverError((String)msg);
        }
    }

    private IndexRequest createIndexRequest(RowResults result, SearchHit hit) {
        LinkedHashMap<String, Object> source = new LinkedHashMap<String, Object>(hit.getSourceAsMap());
        source.putAll(result.getResults());
        IndexRequest indexRequest = new IndexRequest(hit.getIndex());
        indexRequest.id(hit.getId());
        indexRequest.source(source);
        indexRequest.opType(DocWriteRequest.OpType.INDEX);
        return indexRequest;
    }

    private void executeBulkRequest(BulkRequest bulkRequest) {
        BulkResponse bulkResponse = (BulkResponse)ClientHelper.executeWithHeaders(this.dataExtractor.getHeaders(), (String)"ml", (Client)this.client, () -> (BulkResponse)this.client.execute((ActionType)BulkAction.INSTANCE, (ActionRequest)bulkRequest).actionGet());
        if (bulkResponse.hasFailures()) {
            throw ExceptionsHelper.serverError((String)("failures while writing results [" + bulkResponse.buildFailureMessage() + "]"));
        }
    }

    @Override
    public void close() {
        try {
            this.joinCurrentResults();
        }
        catch (Exception e) {
            LOGGER.error((Message)new ParameterizedMessage("[{}] Failed to join results", (Object)this.analyticsId), (Throwable)e);
            this.failure = "[" + this.analyticsId + "] Failed to join results: " + e.getMessage();
        }
        finally {
            try {
                this.consumeDataExtractor();
            }
            catch (Exception e) {
                LOGGER.error((Message)new ParameterizedMessage("[{}] Failed to consume data extractor", (Object)this.analyticsId), (Throwable)e);
            }
        }
    }

    private void consumeDataExtractor() throws IOException {
        this.dataExtractor.cancel();
        while (this.dataExtractor.hasNext()) {
            this.dataExtractor.next();
        }
    }

    private class ResultMatchingDataFrameRows
    implements Iterator<DataFrameDataExtractor.Row> {
        private List<DataFrameDataExtractor.Row> currentDataFrameRows = Collections.emptyList();
        private int currentDataFrameRowsIndex;

        private ResultMatchingDataFrameRows() {
        }

        @Override
        public boolean hasNext() {
            return DataFrameRowsJoiner.this.dataExtractor.hasNext() || this.currentDataFrameRowsIndex < this.currentDataFrameRows.size();
        }

        @Override
        public DataFrameDataExtractor.Row next() {
            DataFrameDataExtractor.Row row = null;
            while ((row == null || row.shouldSkip()) && this.hasNext()) {
                this.advanceToNextBatchIfNecessary();
                row = this.currentDataFrameRows.get(this.currentDataFrameRowsIndex++);
            }
            if (row == null || row.shouldSkip()) {
                throw ExceptionsHelper.serverError((String)"no more data frame rows could be found while joining results");
            }
            return row;
        }

        private void advanceToNextBatchIfNecessary() {
            if (this.currentDataFrameRowsIndex >= this.currentDataFrameRows.size()) {
                this.currentDataFrameRows = this.getNextDataRowsBatch().orElse(Collections.emptyList());
                this.currentDataFrameRowsIndex = 0;
            }
        }

        private Optional<List<DataFrameDataExtractor.Row>> getNextDataRowsBatch() {
            try {
                return DataFrameRowsJoiner.this.dataExtractor.next();
            }
            catch (IOException e) {
                throw ExceptionsHelper.serverError((String)("error reading next batch of data frame rows [" + e.getMessage() + "]"));
            }
        }
    }
}

