/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.esql.enrich;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.VersionId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LocalCircuitBreaker;
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OutputOperator;
import org.elasticsearch.compute.operator.SinkOperator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
import org.elasticsearch.xpack.core.security.support.Exceptions;
import org.elasticsearch.xpack.core.security.user.User;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.enrich.EnrichQuerySourceOperator;
import org.elasticsearch.xpack.esql.enrich.MergePositionsOperator;
import org.elasticsearch.xpack.esql.enrich.QueryList;
import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;

public class EnrichLookupService {
    public static final String LOOKUP_ACTION_NAME = "indices:data/read/esql/lookup";
    private final ClusterService clusterService;
    private final SearchService searchService;
    private final TransportService transportService;
    private final Executor executor;
    private final BigArrays bigArrays;
    private final BlockFactory blockFactory;
    private final LocalCircuitBreaker.SizeSettings localBreakerSettings;

    public EnrichLookupService(ClusterService clusterService, SearchService searchService, TransportService transportService, BigArrays bigArrays, BlockFactory blockFactory) {
        this.clusterService = clusterService;
        this.searchService = searchService;
        this.transportService = transportService;
        this.executor = transportService.getThreadPool().executor("search");
        this.bigArrays = bigArrays;
        this.blockFactory = blockFactory;
        this.localBreakerSettings = new LocalCircuitBreaker.SizeSettings(clusterService.getSettings());
        transportService.registerRequestHandler(LOOKUP_ACTION_NAME, (Executor)transportService.getThreadPool().executor("esql_worker"), in -> new LookupRequest(in, blockFactory), (TransportRequestHandler)new TransportHandler());
    }

    public void lookupAsync(String sessionId, CancellableTask parentTask, String index, DataType inputDataType, String matchType, String matchField, List<NamedExpression> extractFields, Page inputPage, ActionListener<Page> outListener) {
        ThreadContext threadContext = this.transportService.getThreadPool().getThreadContext();
        ContextPreservingActionListener listener = ContextPreservingActionListener.wrapPreservingContext(outListener, (ThreadContext)threadContext);
        this.hasEnrichPrivilege((ActionListener<Void>)listener.delegateFailureAndWrap((delegate, ignored) -> {
            ClusterState clusterState = this.clusterService.state();
            GroupShardsIterator shardIterators = this.clusterService.operationRouting().searchShards(clusterState, new String[]{index}, Map.of(), "_local");
            if (shardIterators.size() != 1) {
                delegate.onFailure((Exception)((Object)new EsqlIllegalArgumentException("target index {} has more than one shard", index)));
                return;
            }
            ShardIterator shardIt = (ShardIterator)shardIterators.get(0);
            ShardRouting shardRouting = shardIt.nextOrNull();
            ShardId shardId = shardIt.shardId();
            if (shardRouting == null) {
                delegate.onFailure((Exception)new UnavailableShardsException(shardId, "enrich index is not available", new Object[0]));
                return;
            }
            DiscoveryNode targetNode = clusterState.nodes().get(shardRouting.currentNodeId());
            LookupRequest lookupRequest = new LookupRequest(sessionId, shardId, inputDataType, matchType, matchField, inputPage, extractFields);
            try (ThreadContext.StoredContext unused = threadContext.stashWithOrigin("enrich");){
                this.transportService.sendChildRequest(targetNode, LOOKUP_ACTION_NAME, (TransportRequest)lookupRequest, (Task)parentTask, TransportRequestOptions.EMPTY, (TransportResponseHandler)new ActionListenerResponseHandler(delegate.map(LookupResponse::takePage), in -> new LookupResponse(in, this.blockFactory), this.executor));
            }
        }));
    }

    private void hasEnrichPrivilege(ActionListener<Void> outListener) {
        Settings settings = this.clusterService.getSettings();
        if (!settings.hasValue(XPackSettings.SECURITY_ENABLED.getKey()) || !((Boolean)XPackSettings.SECURITY_ENABLED.get(settings)).booleanValue()) {
            outListener.onResponse(null);
            return;
        }
        ThreadContext threadContext = this.transportService.getThreadPool().getThreadContext();
        SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
        User user = securityContext.getUser();
        if (user == null) {
            outListener.onFailure((Exception)new IllegalStateException("missing or unable to read authentication info on request"));
            return;
        }
        HasPrivilegesRequest request = new HasPrivilegesRequest();
        request.username(user.principal());
        request.clusterPrivileges(new String[]{ClusterPrivilegeResolver.MONITOR_ENRICH.name()});
        request.indexPrivileges(new RoleDescriptor.IndicesPrivileges[0]);
        request.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
        ActionListener listener = outListener.delegateFailureAndWrap((l, resp) -> {
            if (resp.isCompleteMatch()) {
                l.onResponse(null);
                return;
            }
            String detailed = resp.getClusterPrivileges().entrySet().stream().filter(e -> (Boolean)e.getValue() == false).map(e -> "privilege [" + (String)e.getKey() + "] is missing").collect(Collectors.joining(", "));
            String message = "user [" + user.principal() + "] doesn't have sufficient privileges to perform enrich lookup: " + detailed;
            l.onFailure((Exception)Exceptions.authorizationError((String)message, (Object[])new Object[0]));
        });
        this.transportService.sendRequest(this.transportService.getLocalNode(), "cluster:admin/xpack/security/user/has_privileges", (TransportRequest)request, TransportRequestOptions.EMPTY, (TransportResponseHandler)new ActionListenerResponseHandler(listener, HasPrivilegesResponse::new, this.executor));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doLookup(String sessionId, CancellableTask task, ShardId shardId, DataType inputDataType, String matchType, String matchField, Page inputPage, List<NamedExpression> extractFields, ActionListener<Page> listener) {
        Block inputBlock = inputPage.getBlock(0);
        if (inputBlock.areAllValuesNull()) {
            listener.onResponse((Object)this.createNullResponse(inputPage.getPositionCount(), extractFields));
            return;
        }
        ArrayList<Object> releasables = new ArrayList<Object>(6);
        boolean started = false;
        try {
            MergePositionsOperator mergePositionsOperator;
            BytesRefBlock bytesRefBlock;
            OrdinalBytesRefBlock ordinalsBytesRefBlock;
            ShardSearchRequest shardSearchRequest = new ShardSearchRequest(shardId, 0L, AliasFilter.EMPTY);
            SearchContext searchContext = this.searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT);
            releasables.add(searchContext);
            LocalCircuitBreaker localBreaker = new LocalCircuitBreaker(this.blockFactory.breaker(), this.localBreakerSettings.overReservedBytes(), this.localBreakerSettings.maxOverReservedBytes());
            releasables.add(localBreaker);
            DriverContext driverContext = new DriverContext(this.bigArrays, this.blockFactory.newChildFactory(localBreaker));
            ElementType[] mergingTypes = new ElementType[extractFields.size()];
            for (int i2 = 0; i2 < extractFields.size(); ++i2) {
                mergingTypes[i2] = PlannerUtils.toElementType(extractFields.get(i2).dataType());
            }
            int[] mergingChannels = IntStream.range(0, extractFields.size()).map(i -> i + 2).toArray();
            if (inputBlock instanceof BytesRefBlock && (ordinalsBytesRefBlock = (bytesRefBlock = (BytesRefBlock)inputBlock).asOrdinals()) != null) {
                inputBlock = ordinalsBytesRefBlock.getDictionaryVector().asBlock();
                selectedPositions = ordinalsBytesRefBlock.getOrdinalsBlock();
                mergePositionsOperator = new MergePositionsOperator(1, mergingChannels, mergingTypes, selectedPositions, driverContext.blockFactory());
            } else {
                selectedPositions = IntVector.range((int)0, (int)inputBlock.getPositionCount(), (BlockFactory)this.blockFactory).asBlock();
                try {
                    mergePositionsOperator = new MergePositionsOperator(1, mergingChannels, mergingTypes, selectedPositions, driverContext.blockFactory());
                }
                finally {
                    if (selectedPositions != null) {
                        selectedPositions.close();
                    }
                }
            }
            releasables.add(mergePositionsOperator);
            SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
            MappedFieldType fieldType = searchExecutionContext.getFieldType(matchField);
            QueryList queryList = switch (matchType) {
                case "match", "range" -> QueryList.termQueryList(fieldType, searchExecutionContext, inputBlock, inputDataType);
                case "geo_match" -> QueryList.geoShapeQuery(fieldType, searchExecutionContext, inputBlock, inputDataType);
                default -> throw new EsqlIllegalArgumentException("illegal match type " + matchType);
            };
            EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(driverContext.blockFactory(), 256, queryList, searchExecutionContext.getIndexReader());
            releasables.add((Object)queryOperator);
            Operator extractFieldsOperator = EnrichLookupService.extractFieldsOperator(searchContext, driverContext, extractFields);
            releasables.add(extractFieldsOperator);
            AtomicReference result = new AtomicReference();
            OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), result::set);
            releasables.add(outputOperator);
            Driver driver = new Driver("enrich-lookup:" + sessionId, System.currentTimeMillis(), System.nanoTime(), driverContext, () -> EnrichLookupService.lookupDescription(sessionId, shardId, inputDataType, matchType, matchField, extractFields, inputPage.getPositionCount()), (SourceOperator)queryOperator, List.of(extractFieldsOperator, mergePositionsOperator), (SinkOperator)outputOperator, Driver.DEFAULT_STATUS_INTERVAL, Releasables.wrap((Releasable[])new Releasable[]{searchContext, localBreaker}));
            task.addListener(() -> {
                String reason = Objects.requireNonNullElse(task.getReasonCancelled(), "task was cancelled");
                driver.cancel(reason);
            });
            ThreadContext threadContext = this.transportService.getThreadPool().getThreadContext();
            Driver.start((ThreadContext)threadContext, (Executor)this.executor, (Driver)driver, (int)10000, (ActionListener)listener.map(ignored -> {
                Page out = (Page)result.get();
                if (out == null) {
                    out = this.createNullResponse(inputPage.getPositionCount(), extractFields);
                }
                return out;
            }));
            started = true;
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
        finally {
            if (!started) {
                Releasables.close(releasables);
            }
        }
    }

    private static Operator extractFieldsOperator(SearchContext searchContext, DriverContext driverContext, List<NamedExpression> extractFields) {
        EsPhysicalOperationProviders.DefaultShardContext shardContext = new EsPhysicalOperationProviders.DefaultShardContext(0, searchContext.getSearchExecutionContext(), searchContext.request().getAliasFilter());
        ArrayList<ValuesSourceReaderOperator.FieldInfo> fields = new ArrayList<ValuesSourceReaderOperator.FieldInfo>(extractFields.size());
        for (NamedExpression extractField : extractFields) {
            String string;
            if (extractField instanceof Alias) {
                Alias a = (Alias)extractField;
                string = ((NamedExpression)a.child()).name();
            } else {
                string = extractField.name();
            }
            BlockLoader loader = shardContext.blockLoader(string, extractField.dataType() == DataType.UNSUPPORTED, MappedFieldType.FieldExtractPreference.NONE);
            fields.add(new ValuesSourceReaderOperator.FieldInfo(extractField.name(), PlannerUtils.toElementType(extractField.dataType()), shardIdx -> {
                if (shardIdx != 0) {
                    throw new IllegalStateException("only one shard");
                }
                return loader;
            }));
        }
        return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.searcher().getIndexReader(), () -> ((SearchContext)searchContext).newSourceLoader())), 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Page createNullResponse(int positionCount, List<NamedExpression> extractFields) {
        Block[] blocks = new Block[extractFields.size()];
        try {
            for (int i = 0; i < extractFields.size(); ++i) {
                blocks[i] = this.blockFactory.newConstantNullBlock(positionCount);
            }
            Page page = new Page(blocks);
            return page;
        }
        finally {
            if (blocks[blocks.length - 1] == null) {
                Releasables.close((Releasable[])blocks);
            }
        }
    }

    private static String lookupDescription(String sessionId, ShardId shardId, DataType inputDataType, String matchType, String matchField, List<NamedExpression> extractFields, int positionCount) {
        return "ENRICH_LOOKUP( session=" + sessionId + " ,shard=" + shardId + " ,input_type=" + inputDataType + " ,match_type=" + matchType + " ,match_field=" + matchField + " ,extract_fields=" + extractFields + " ,positions=" + positionCount + ")";
    }

    private class TransportHandler
    implements TransportRequestHandler<LookupRequest> {
        private TransportHandler() {
        }

        public void messageReceived(LookupRequest request, TransportChannel channel, Task task) {
            request.incRef();
            ActionListener listener = ActionListener.runBefore((ActionListener)new ChannelActionListener(channel), request::decRef);
            EnrichLookupService.this.doLookup(request.sessionId, (CancellableTask)task, request.shardId, request.inputDataType, request.matchType, request.matchField, request.inputPage, request.extractFields, (ActionListener<Page>)listener.delegateFailureAndWrap((l, outPage) -> ActionListener.respondAndRelease((ActionListener)l, (RefCounted)new LookupResponse((Page)outPage, EnrichLookupService.this.blockFactory))));
        }
    }

    private static class LookupRequest
    extends TransportRequest
    implements IndicesRequest {
        private final String sessionId;
        private final ShardId shardId;
        private final DataType inputDataType;
        private final String matchType;
        private final String matchField;
        private final Page inputPage;
        private final List<NamedExpression> extractFields;
        private final Page toRelease;
        private final RefCounted refs = AbstractRefCounted.of(this::releasePage);

        LookupRequest(String sessionId, ShardId shardId, DataType inputDataType, String matchType, String matchField, Page inputPage, List<NamedExpression> extractFields) {
            this.sessionId = sessionId;
            this.shardId = shardId;
            this.inputDataType = inputDataType;
            this.matchType = matchType;
            this.matchField = matchField;
            this.inputPage = inputPage;
            this.toRelease = null;
            this.extractFields = extractFields;
        }

        LookupRequest(StreamInput in, BlockFactory blockFactory) throws IOException {
            super(in);
            this.sessionId = in.readString();
            this.shardId = new ShardId(in);
            String inputDataType = in.getTransportVersion().onOrAfter((VersionId)TransportVersions.ESQL_EXTENDED_ENRICH_INPUT_TYPE) ? in.readString() : "unknown";
            this.inputDataType = EsqlDataTypes.fromTypeName(inputDataType);
            this.matchType = in.readString();
            this.matchField = in.readString();
            try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory);){
                this.inputPage = new Page((StreamInput)bsi);
            }
            this.toRelease = this.inputPage;
            PlanStreamInput planIn = new PlanStreamInput(in, PlanNameRegistry.INSTANCE, in.namedWriteableRegistry(), null);
            this.extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
        }

        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(this.sessionId);
            out.writeWriteable((Writeable)this.shardId);
            if (out.getTransportVersion().onOrAfter((VersionId)TransportVersions.ESQL_EXTENDED_ENRICH_INPUT_TYPE)) {
                out.writeString(this.inputDataType.typeName());
            }
            out.writeString(this.matchType);
            out.writeString(this.matchField);
            out.writeWriteable((Writeable)this.inputPage);
            PlanStreamOutput planOut = new PlanStreamOutput(out, PlanNameRegistry.INSTANCE, null);
            planOut.writeNamedWriteableCollection(this.extractFields);
        }

        public String[] indices() {
            return new String[]{this.shardId.getIndexName()};
        }

        public IndicesOptions indicesOptions() {
            return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
        }

        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
            return new CancellableTask(id, type, action, "", parentTaskId, headers){

                public String getDescription() {
                    return EnrichLookupService.lookupDescription(sessionId, shardId, inputDataType, matchType, matchField, extractFields, inputPage.getPositionCount());
                }
            };
        }

        private void releasePage() {
            if (this.toRelease != null) {
                Releasables.closeExpectNoException(() -> ((Page)this.toRelease).releaseBlocks());
            }
        }

        public void incRef() {
            this.refs.incRef();
        }

        public boolean tryIncRef() {
            return this.refs.tryIncRef();
        }

        public boolean decRef() {
            return this.refs.decRef();
        }

        public boolean hasReferences() {
            return this.refs.hasReferences();
        }
    }

    private static class LookupResponse
    extends TransportResponse {
        private Page page;
        private final RefCounted refs = AbstractRefCounted.of(this::releasePage);
        private final BlockFactory blockFactory;
        private long reservedBytes = 0L;

        LookupResponse(Page page, BlockFactory blockFactory) {
            this.page = page;
            this.blockFactory = blockFactory;
        }

        LookupResponse(StreamInput in, BlockFactory blockFactory) throws IOException {
            try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory);){
                this.page = new Page((StreamInput)bsi);
            }
            this.blockFactory = blockFactory;
        }

        public void writeTo(StreamOutput out) throws IOException {
            long bytes = this.page.ramBytesUsedByBlocks();
            this.blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "serialize enrich lookup response");
            this.reservedBytes += bytes;
            this.page.writeTo(out);
        }

        Page takePage() {
            Page p = this.page;
            this.page = null;
            return p;
        }

        private void releasePage() {
            this.blockFactory.breaker().addWithoutBreaking(-this.reservedBytes);
            if (this.page != null) {
                Releasables.closeExpectNoException(() -> ((Page)this.page).releaseBlocks());
            }
        }

        public void incRef() {
            this.refs.incRef();
        }

        public boolean tryIncRef() {
            return this.refs.tryIncRef();
        }

        public boolean decRef() {
            return this.refs.decRef();
        }

        public boolean hasReferences() {
            return this.refs.hasReferences();
        }
    }
}

