/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.enrich.action;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.util.BiConsumer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.enrich.EnrichPlugin;
import org.elasticsearch.xpack.enrich.action.EnrichShardMultiSearchAction;

public class EnrichCoordinatorProxyAction
extends ActionType<SearchResponse> {
    public static final EnrichCoordinatorProxyAction INSTANCE = new EnrichCoordinatorProxyAction();
    public static final String NAME = "indices:data/read/xpack/enrich/coordinate_lookups";

    private EnrichCoordinatorProxyAction() {
        super(NAME, SearchResponse::new);
    }

    public static class Coordinator {
        final BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction;
        final int maxLookupsPerRequest;
        final int maxNumberOfConcurrentRequests;
        final BlockingQueue<Slot> queue;
        final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0);
        volatile long remoteRequestsTotal = 0L;
        final AtomicLong executedSearchesTotal = new AtomicLong(0L);

        public Coordinator(Client client, Settings settings) {
            this(Coordinator.lookupFunction((ElasticsearchClient)client), (Integer)EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.get(settings), (Integer)EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.get(settings), (Integer)EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.get(settings));
        }

        Coordinator(BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction, int maxLookupsPerRequest, int maxNumberOfConcurrentRequests, int queueCapacity) {
            this.lookupFunction = lookupFunction;
            this.maxLookupsPerRequest = maxLookupsPerRequest;
            this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests;
            this.queue = new ArrayBlockingQueue<Slot>(queueCapacity);
        }

        void schedule(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
            try {
                this.queue.put(new Slot(searchRequest, listener));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("unable to add item to queue", e);
            }
            this.coordinateLookups();
        }

        EnrichStatsAction.Response.CoordinatorStats getStats(String nodeId) {
            return new EnrichStatsAction.Response.CoordinatorStats(nodeId, this.queue.size(), this.remoteRequestsCurrent.get(), this.remoteRequestsTotal, this.executedSearchesTotal.get());
        }

        synchronized void coordinateLookups() {
            while (!this.queue.isEmpty() && this.remoteRequestsCurrent.get() < this.maxNumberOfConcurrentRequests) {
                ArrayList slots = new ArrayList();
                this.queue.drainTo(slots, this.maxLookupsPerRequest);
                MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
                slots.forEach(slot -> multiSearchRequest.add(slot.searchRequest));
                this.remoteRequestsCurrent.incrementAndGet();
                ++this.remoteRequestsTotal;
                this.lookupFunction.accept((Object)multiSearchRequest, (response, e) -> this.handleResponse(slots, (MultiSearchResponse)response, (Exception)e));
            }
        }

        void handleResponse(List<Slot> slots, MultiSearchResponse response, Exception e) {
            this.remoteRequestsCurrent.decrementAndGet();
            this.executedSearchesTotal.addAndGet(slots.size());
            if (response != null) {
                assert (slots.size() == response.getResponses().length);
                for (int i = 0; i < response.getResponses().length; ++i) {
                    MultiSearchResponse.Item responseItem = response.getResponses()[i];
                    Slot slot2 = slots.get(i);
                    if (responseItem.isFailure()) {
                        slot2.actionListener.onFailure(responseItem.getFailure());
                        continue;
                    }
                    slot2.actionListener.onResponse((Object)responseItem.getResponse());
                }
            } else if (e != null) {
                slots.forEach(slot -> slot.actionListener.onFailure(e));
            } else {
                throw new AssertionError((Object)"no response and no error");
            }
            this.coordinateLookups();
        }

        static BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction(ElasticsearchClient client) {
            return (request, consumer) -> {
                int slot = 0;
                HashMap<String, List> itemsPerIndex = new HashMap<String, List>();
                for (SearchRequest searchRequest : request.requests()) {
                    List items = itemsPerIndex.computeIfAbsent(searchRequest.indices()[0], k -> new ArrayList());
                    items.add(new Tuple((Object)slot, (Object)searchRequest));
                    ++slot;
                }
                AtomicInteger counter = new AtomicInteger(0);
                ConcurrentHashMap shardResponses = new ConcurrentHashMap();
                for (Map.Entry entry : itemsPerIndex.entrySet()) {
                    String enrichIndexName = (String)entry.getKey();
                    List enrichIndexRequestsAndSlots = (List)entry.getValue();
                    ActionListener listener = ActionListener.wrap(response -> {
                        shardResponses.put(enrichIndexName, new Tuple(response, null));
                        if (counter.incrementAndGet() == itemsPerIndex.size()) {
                            consumer.accept((Object)Coordinator.reduce(request.requests().size(), itemsPerIndex, shardResponses), null);
                        }
                    }, e -> {
                        shardResponses.put(enrichIndexName, new Tuple(null, e));
                        if (counter.incrementAndGet() == itemsPerIndex.size()) {
                            consumer.accept((Object)Coordinator.reduce(request.requests().size(), itemsPerIndex, shardResponses), null);
                        }
                    });
                    MultiSearchRequest mrequest = new MultiSearchRequest();
                    enrichIndexRequestsAndSlots.stream().map(Tuple::v2).forEach(arg_0 -> ((MultiSearchRequest)mrequest).add(arg_0));
                    client.execute((ActionType)EnrichShardMultiSearchAction.INSTANCE, (ActionRequest)new EnrichShardMultiSearchAction.Request(mrequest), listener);
                }
            };
        }

        static MultiSearchResponse reduce(int numRequest, Map<String, List<Tuple<Integer, SearchRequest>>> itemsPerIndex, Map<String, Tuple<MultiSearchResponse, Exception>> shardResponses) {
            MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numRequest];
            for (Map.Entry<String, Tuple<MultiSearchResponse, Exception>> rspEntry : shardResponses.entrySet()) {
                List<Tuple<Integer, SearchRequest>> reqSlots = itemsPerIndex.get(rspEntry.getKey());
                if (rspEntry.getValue().v1() != null) {
                    MultiSearchResponse shardResponse = (MultiSearchResponse)rspEntry.getValue().v1();
                    for (int i = 0; i < shardResponse.getResponses().length; ++i) {
                        int slot = (Integer)reqSlots.get(i).v1();
                        items[slot] = shardResponse.getResponses()[i];
                    }
                    continue;
                }
                if (rspEntry.getValue().v2() != null) {
                    Exception e = (Exception)rspEntry.getValue().v2();
                    for (Tuple<Integer, SearchRequest> originSlot : reqSlots) {
                        items[((Integer)originSlot.v1()).intValue()] = new MultiSearchResponse.Item(null, e);
                    }
                    continue;
                }
                throw new AssertionError();
            }
            return new MultiSearchResponse(items, 1L);
        }

        static class Slot {
            final SearchRequest searchRequest;
            final ActionListener<SearchResponse> actionListener;

            Slot(SearchRequest searchRequest, ActionListener<SearchResponse> actionListener) {
                this.searchRequest = Objects.requireNonNull(searchRequest);
                this.actionListener = Objects.requireNonNull(actionListener);
            }
        }
    }

    public static class TransportAction
    extends HandledTransportAction<SearchRequest, SearchResponse> {
        private final Coordinator coordinator;

        @Inject
        public TransportAction(TransportService transportService, ActionFilters actionFilters, Coordinator coordinator) {
            super(EnrichCoordinatorProxyAction.NAME, transportService, actionFilters, SearchRequest::new);
            this.coordinator = coordinator;
        }

        protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
            assert (Thread.currentThread().getName().contains("write"));
            this.coordinator.schedule(request, listener);
        }
    }
}

