/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.core.async;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.CharBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Streams;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncResponse;
import org.elasticsearch.xpack.core.async.AsyncSearchSecurity;
import org.elasticsearch.xpack.core.async.AsyncTask;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.core.search.action.SearchStatusResponse;
import org.elasticsearch.xpack.core.security.SecurityContext;

public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
    private static final Logger logger = LogManager.getLogger(AsyncTaskIndexService.class);
    public static final String HEADERS_FIELD = "headers";
    public static final String RESPONSE_HEADERS_FIELD = "response_headers";
    public static final String EXPIRATION_TIME_FIELD = "expiration_time";
    public static final String RESULT_FIELD = "result";
    private static final int ASYNC_TASK_INDEX_MAPPINGS_VERSION = 0;
    private final String index;
    private final ThreadContext threadContext;
    private final Client client;
    final AsyncSearchSecurity security;
    private final Client clientWithOrigin;
    private final NamedWriteableRegistry registry;
    private final Writeable.Reader<R> reader;
    private final BigArrays bigArrays;
    private volatile long maxResponseSize;
    private final ClusterService clusterService;
    private final CircuitBreaker circuitBreaker;

    static Settings settings() {
        return Settings.builder().put("index.codec", "best_compression").put("index.number_of_shards", 1).put("index.auto_expand_replicas", "0-1").build();
    }

    private static XContentBuilder mappings() {
        try {
            XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("_doc").startObject("_meta").field("version", Version.CURRENT).field("managed_index_mappings_version", 0).endObject().field("dynamic", "strict").startObject("properties").startObject(HEADERS_FIELD).field("type", "object").field("enabled", "false").endObject().startObject(RESPONSE_HEADERS_FIELD).field("type", "object").field("enabled", "false").endObject().startObject(RESULT_FIELD).field("type", "object").field("enabled", "false").endObject().startObject(EXPIRATION_TIME_FIELD).field("type", "long").endObject().endObject().endObject().endObject();
            return builder;
        }
        catch (IOException e) {
            throw new UncheckedIOException("Failed to build mappings for .async-search", e);
        }
    }

    public static SystemIndexDescriptor getSystemIndexDescriptor() {
        return SystemIndexDescriptor.builder().setIndexPattern(".async-search*").setDescription("Async search results").setPrimaryIndex(".async-search").setMappings(AsyncTaskIndexService.mappings()).setSettings(AsyncTaskIndexService.settings()).setVersionMetaKey("version").setOrigin("async_search").build();
    }

    public AsyncTaskIndexService(String index, ClusterService clusterService, ThreadContext threadContext, Client client, String origin, Writeable.Reader<R> reader, NamedWriteableRegistry registry, BigArrays bigArrays) {
        this.index = index;
        this.threadContext = threadContext;
        this.client = client;
        this.security = new AsyncSearchSecurity(index, new SecurityContext(clusterService.getSettings(), client.threadPool().getThreadContext()), client, origin);
        this.clientWithOrigin = new OriginSettingClient(client, origin);
        this.registry = registry;
        this.reader = reader;
        this.bigArrays = bigArrays;
        this.maxResponseSize = SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.get(clusterService.getSettings()).getBytes();
        clusterService.getClusterSettings().addSettingsUpdateConsumer(SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING, v -> {
            this.maxResponseSize = v.getBytes();
        });
        this.clusterService = clusterService;
        this.circuitBreaker = bigArrays.breakerService().getBreaker("request");
    }

    public Client getClientWithOrigin() {
        return this.clientWithOrigin;
    }

    public Client getClient() {
        return this.client;
    }

    public AsyncSearchSecurity getSecurity() {
        return this.security;
    }

    public void createResponseForEQL(String docId, Map<String, String> headers, R response, ActionListener<DocWriteResponse> listener) {
        this.indexResponse(docId, headers, null, response, false, listener);
    }

    public void createResponseForEQL(String docId, Map<String, String> headers, Map<String, List<String>> responseHeaders, R response, ActionListener<DocWriteResponse> listener) {
        this.indexResponse(docId, headers, responseHeaders, response, false, listener);
    }

    public void createResponse(String docId, Map<String, String> headers, R response, ActionListener<DocWriteResponse> listener) {
        this.indexResponse(docId, headers, null, response, true, listener);
    }

    public void updateResponse(String docId, Map<String, List<String>> responseHeaders, R response, ActionListener<UpdateResponse> listener) {
        this.updateResponse(docId, responseHeaders, response, listener, false);
    }

    private void indexResponse(String docId, Map<String, String> headers, @Nullable Map<String, List<String>> responseHeaders, R response, boolean limitToMaxResponseSize, ActionListener<DocWriteResponse> listener) {
        try {
            ReleasableBytesStreamOutput buffer = this.allocateBuffer(limitToMaxResponseSize);
            listener = ActionListener.runBefore(listener, buffer::close);
            XContentBuilder source = XContentFactory.jsonBuilder(buffer).startObject().field(HEADERS_FIELD, headers).field(EXPIRATION_TIME_FIELD, response.getExpirationTime());
            if (responseHeaders != null) {
                source.field(RESPONSE_HEADERS_FIELD, responseHeaders);
            }
            this.addResultFieldAndFinish((Writeable)response, source);
            this.clientWithOrigin.index(new IndexRequest(this.index).create(true).id(docId).source(buffer.bytes(), source.contentType()), listener);
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    private void updateResponse(String docId, Map<String, List<String>> responseHeaders, R response, ActionListener<UpdateResponse> listener, boolean isFailure) {
        ReleasableBytesStreamOutput buffer = null;
        try {
            buffer = this.allocateBuffer(!isFailure);
            XContentBuilder source = XContentFactory.jsonBuilder(buffer).startObject().field(RESPONSE_HEADERS_FIELD, responseHeaders);
            this.addResultFieldAndFinish((Writeable)response, source);
            this.clientWithOrigin.update(((UpdateRequest)new UpdateRequest().index(this.index)).id(docId).doc(buffer.bytes(), source.contentType()).retryOnConflict(5), ActionListener.runBefore(listener, buffer::close));
        }
        catch (Exception e) {
            Releasables.close(buffer);
            if (isFailure) {
                listener.onFailure(e);
            }
            Throwable cause = ExceptionsHelper.unwrapCause(e);
            if (!(cause instanceof DocumentMissingException) && !(cause instanceof VersionConflictEngineException)) {
                logger.error(() -> "failed to store async-search [" + docId + "]", (Throwable)e);
                this.updateResponse(docId, responseHeaders, response.convertToFailure(e), ActionListener.running(() -> listener.onFailure(e)), true);
            }
            listener.onFailure(e);
        }
    }

    private ReleasableBytesStreamOutput allocateBuffer(boolean limitToMaxResponseSize) {
        return limitToMaxResponseSize ? new ReleasableBytesStreamOutputWithLimit(0, this.bigArrays.withCircuitBreaking(), this.maxResponseSize) : new ReleasableBytesStreamOutput(0, this.bigArrays.withCircuitBreaking());
    }

    private void addResultFieldAndFinish(Writeable response, XContentBuilder source) throws IOException {
        source.directFieldAsBase64(RESULT_FIELD, os -> {
            os = Streams.noCloseStream(os);
            TransportVersion minNodeVersion = this.clusterService.state().getMinTransportVersion();
            TransportVersion.writeVersion(minNodeVersion, new OutputStreamStreamOutput((OutputStream)os));
            if (minNodeVersion.onOrAfter(TransportVersions.V_7_15_0)) {
                os = CompressorFactory.COMPRESSOR.threadLocalOutputStream((OutputStream)os);
            }
            try (OutputStreamStreamOutput out = new OutputStreamStreamOutput((OutputStream)os);){
                out.setTransportVersion(minNodeVersion);
                response.writeTo(out);
            }
        }).endObject();
        source.flush();
    }

    public void updateExpirationTime(String docId, long expirationTimeMillis, ActionListener<UpdateResponse> listener) {
        Map<String, Object> source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis);
        UpdateRequest request = ((UpdateRequest)new UpdateRequest().index(this.index)).id(docId).doc(source, XContentType.JSON).retryOnConflict(5);
        this.clientWithOrigin.update(request, listener);
    }

    public void deleteResponse(AsyncExecutionId asyncExecutionId, ActionListener<DeleteResponse> listener) {
        try {
            DeleteRequest request = new DeleteRequest(this.index).id(asyncExecutionId.getDocId());
            this.clientWithOrigin.delete(request, listener);
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    public static <T extends AsyncTask> T getTask(TaskManager taskManager, AsyncExecutionId asyncExecutionId, Class<T> tClass) throws IOException {
        Task task = taskManager.getTask(asyncExecutionId.getTaskId().getId());
        if (!tClass.isInstance(task)) {
            return null;
        }
        AsyncTask asyncTask = (AsyncTask)((Object)task);
        if (!asyncTask.getExecutionId().equals(asyncExecutionId)) {
            return null;
        }
        return (T)asyncTask;
    }

    public <T extends AsyncTask> T getTaskAndCheckAuthentication(TaskManager taskManager, AsyncExecutionId asyncExecutionId, Class<T> tClass) throws IOException {
        T asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncExecutionId, tClass);
        if (asyncTask == null) {
            return null;
        }
        if (!this.security.currentUserHasAccessToTask((AsyncTask)asyncTask)) {
            throw new ResourceNotFoundException(asyncExecutionId.getEncoded() + " not found", new Object[0]);
        }
        return asyncTask;
    }

    public void getResponse(AsyncExecutionId asyncExecutionId, boolean restoreResponseHeaders, ActionListener<R> listener) {
        this.getResponseFromIndex(asyncExecutionId, restoreResponseHeaders, true, listener);
    }

    private void getResponseFromIndex(AsyncExecutionId asyncExecutionId, boolean restoreResponseHeaders, boolean checkAuthentication, ActionListener<R> outerListener) {
        GetRequest getRequest = new GetRequest(this.index).preference(asyncExecutionId.getEncoded()).id(asyncExecutionId.getDocId()).realtime(true);
        this.clientWithOrigin.get(getRequest, outerListener.delegateFailure((listener, getResponse) -> {
            R resp;
            if (!getResponse.isExists()) {
                listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded(), new Object[0]));
                return;
            }
            try {
                BytesReference source = getResponse.getSourceInternal();
                long reservedBytes = (long)source.length() * 2L;
                this.circuitBreaker.addEstimateBytesAndMaybeBreak(reservedBytes, "decode async response");
                listener = ActionListener.runAfter(listener, () -> this.circuitBreaker.addWithoutBreaking(-reservedBytes));
                resp = this.parseResponseFromIndex(asyncExecutionId, source, restoreResponseHeaders, checkAuthentication);
            }
            catch (Exception e) {
                listener.onFailure(e);
                return;
            }
            ActionListener.respondAndRelease(listener, resp);
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive exception aggregation
     */
    private R parseResponseFromIndex(AsyncExecutionId asyncExecutionId, BytesReference source, boolean restoreResponseHeaders, boolean checkAuthentication) {
        try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, source, XContentType.JSON);){
            String string;
            XContentParserUtils.ensureExpectedToken(parser.nextToken(), XContentParser.Token.START_OBJECT, parser);
            AsyncResponse<Object> resp = null;
            Long expirationTime = null;
            block22: while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
                XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
                parser.nextToken();
                switch (parser.currentName()) {
                    case "result": {
                        resp = this.decodeResponse(parser.charBuffer());
                        break;
                    }
                    case "expiration_time": {
                        expirationTime = (long)((Long)parser.numberValue());
                        break;
                    }
                    case "headers": {
                        Map headers = (Map)XContentParserUtils.parseFieldsValue(parser);
                        if (!checkAuthentication || this.security.currentUserHasAccessToTaskWithHeaders(headers)) continue block22;
                        throw new ResourceNotFoundException(asyncExecutionId.getEncoded(), new Object[0]);
                    }
                    case "response_headers": {
                        Map responseHeaders = (Map)XContentParserUtils.parseFieldsValue(parser);
                        if (!restoreResponseHeaders) continue block22;
                        AsyncTaskIndexService.restoreResponseHeadersContext(this.threadContext, responseHeaders);
                        break;
                    }
                    default: {
                        XContentParserUtils.parseFieldsValue(parser);
                    }
                }
            }
            Objects.requireNonNull(resp, "Get result doesn't include [result] field");
            Objects.requireNonNull(expirationTime, "Get result doesn't include [expiration_time] field");
            try {
                string = resp.withExpirationTime(expirationTime);
            }
            catch (Throwable throwable) {
                resp.decRef();
                throw throwable;
            }
            resp.decRef();
            return (R)string;
        }
        catch (IOException e) {
            throw new ElasticsearchParseException("Failed to parse the get result", (Throwable)e, new Object[0]);
        }
    }

    public <T extends AsyncTask, SR extends SearchStatusResponse> void retrieveStatus(GetAsyncStatusRequest request, TaskManager taskManager, Class<T> tClass, Function<T, SR> statusProducerFromTask, TriFunction<R, Long, String, SR> statusProducerFromIndex, ActionListener<SR> originalListener) {
        ActionListener<SearchStatusResponse> outerListener = originalListener.delegateFailure((listener, resp) -> {
            if (resp.getExpirationTime() < System.currentTimeMillis()) {
                listener.onFailure(new ResourceNotFoundException(request.getId(), new Object[0]));
            } else {
                listener.onResponse(resp);
            }
        });
        this.security.currentUserCanSeeStatusOfAllSearches(ActionListener.wrap(canSeeAll -> {
            AsyncExecutionId asyncExecutionId = AsyncExecutionId.decode(request.getId());
            try {
                Object asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncExecutionId, tClass);
                if (asyncTask != null) {
                    if (canSeeAll.booleanValue() || this.security.currentUserHasAccessToTask((AsyncTask)asyncTask)) {
                        SearchStatusResponse response = (SearchStatusResponse)statusProducerFromTask.apply(asyncTask);
                        outerListener.onResponse(response);
                    } else {
                        outerListener.onFailure(new ResourceNotFoundException(request.getId(), new Object[0]));
                    }
                } else {
                    boolean checkAuthentication = canSeeAll == false;
                    this.getResponseFromIndex(asyncExecutionId, false, checkAuthentication, outerListener.map(resp -> (SearchStatusResponse)statusProducerFromIndex.apply(resp, resp.getExpirationTime(), asyncExecutionId.getEncoded())));
                }
            }
            catch (Exception exc) {
                outerListener.onFailure(exc);
            }
        }, outerListener::onFailure));
    }

    private R decodeResponse(final CharBuffer encodedBuffer) throws IOException {
        InputStream encodedIn = Base64.getDecoder().wrap(new InputStream(){

            @Override
            public int read() {
                if (encodedBuffer.hasRemaining()) {
                    return encodedBuffer.get();
                }
                return -1;
            }
        });
        TransportVersion version = TransportVersion.readVersion(new InputStreamStreamInput(encodedIn));
        assert (version.onOrBefore(TransportVersion.current())) : version + " >= " + TransportVersion.current();
        StreamInput input = version.onOrAfter(TransportVersions.V_7_15_0) ? CompressorFactory.COMPRESSOR.threadLocalStreamInput(encodedIn) : new InputStreamStreamInput(encodedIn);
        try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(input, this.registry);){
            ((StreamInput)in).setTransportVersion(version);
            AsyncResponse asyncResponse = (AsyncResponse)this.reader.read(in);
            return (R)asyncResponse;
        }
    }

    public static void restoreResponseHeadersContext(ThreadContext threadContext, Map<String, List<String>> responseHeaders) {
        for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
            for (String value : entry.getValue()) {
                threadContext.addResponseHeader(entry.getKey(), value);
            }
        }
    }

    private static class ReleasableBytesStreamOutputWithLimit
    extends ReleasableBytesStreamOutput {
        private final long limit;

        ReleasableBytesStreamOutputWithLimit(int expectedSize, BigArrays bigArrays, long limit) {
            super(expectedSize, bigArrays);
            this.limit = limit;
        }

        @Override
        protected void ensureCapacity(long offset) {
            if (offset > this.limit) {
                throw new IllegalArgumentException("Can't store an async search response larger than [" + this.limit + "] bytes. This limit can be set by changing the [" + SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting.");
            }
            super.ensureCapacity(offset);
        }
    }
}

