/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.enrich;

import java.util.concurrent.Semaphore;
import java.util.function.LongSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.enrich.EnrichPlugin;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import org.elasticsearch.xpack.enrich.EnrichPolicyRunner;
import org.elasticsearch.xpack.enrich.EnrichStore;
import org.elasticsearch.xpack.enrich.ExecuteEnrichPolicyTask;
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;

public class EnrichPolicyExecutor {
    private static final Logger logger = LogManager.getLogger(EnrichPolicyExecutor.class);
    public static final String TASK_ACTION = "policy_execution";
    private final ClusterService clusterService;
    private final IndicesService indicesService;
    private final Client client;
    private final ThreadPool threadPool;
    private final IndexNameExpressionResolver indexNameExpressionResolver;
    private final LongSupplier nowSupplier;
    private final int fetchSize;
    private final EnrichPolicyLocks policyLocks;
    private final int maximumConcurrentPolicyExecutions;
    private final int maxForceMergeAttempts;
    private final Semaphore policyExecutionPermits;

    public EnrichPolicyExecutor(Settings settings, ClusterService clusterService, IndicesService indicesService, Client client, ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, EnrichPolicyLocks policyLocks, LongSupplier nowSupplier) {
        this.clusterService = clusterService;
        this.indicesService = indicesService;
        this.client = client;
        this.threadPool = threadPool;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        this.nowSupplier = nowSupplier;
        this.policyLocks = policyLocks;
        this.fetchSize = (Integer)EnrichPlugin.ENRICH_FETCH_SIZE_SETTING.get(settings);
        this.maximumConcurrentPolicyExecutions = (Integer)EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.get(settings);
        this.maxForceMergeAttempts = (Integer)EnrichPlugin.ENRICH_MAX_FORCE_MERGE_ATTEMPTS.get(settings);
        this.policyExecutionPermits = new Semaphore(this.maximumConcurrentPolicyExecutions);
    }

    public void coordinatePolicyExecution(ExecuteEnrichPolicyAction.Request request, ActionListener<ExecuteEnrichPolicyAction.Response> listener) {
        long nowTimestamp = this.nowSupplier.getAsLong();
        String policyName = request.getName();
        String enrichIndexName = EnrichPolicy.getIndexName((String)request.getName(), (long)nowTimestamp);
        Releasable policyLock = this.tryLockingPolicy(request.getName(), enrichIndexName);
        try {
            InternalExecutePolicyAction.Request internalRequest = new InternalExecutePolicyAction.Request(request.masterNodeTimeout(), request.getName(), enrichIndexName);
            internalRequest.setWaitForCompletion(request.isWaitForCompletion());
            internalRequest.setParentTask(request.getParentTask());
            this.client.execute((ActionType)InternalExecutePolicyAction.INSTANCE, (ActionRequest)internalRequest, ActionListener.wrap(response -> {
                if (response.getStatus() != null) {
                    logger.debug("Unlocking enrich policy [{}:{}] on complete with no task scheduled", (Object)policyName, (Object)enrichIndexName);
                    policyLock.close();
                    listener.onResponse(response);
                } else {
                    assert (response.getTaskId() != null) : "If the execute response does not have a status it must return a task id";
                    this.awaitTaskCompletionAndThenRelease(response.getTaskId(), () -> {
                        logger.debug("Unlocking enrich policy [{}:{}] on completion of task status", (Object)policyName, (Object)enrichIndexName);
                        policyLock.close();
                    }, policyName, enrichIndexName);
                    listener.onResponse(response);
                }
            }, e -> {
                logger.debug("Unlocking enrich policy [{}:{}] on failure to execute internal action", (Object)policyName, (Object)enrichIndexName);
                policyLock.close();
                listener.onFailure(e);
            }));
        }
        catch (Exception e2) {
            policyLock.close();
            throw e2;
        }
    }

    public void runPolicyLocally(ExecuteEnrichPolicyTask task, String policyName, String enrichIndexName, ActionListener<ExecuteEnrichPolicyStatus> listener) {
        try {
            EnrichPolicy policy = EnrichStore.getPolicy(policyName, this.clusterService.state());
            if (policy == null) {
                throw new ResourceNotFoundException("policy [{}] does not exist", new Object[]{policyName});
            }
            task.setStatus(new ExecuteEnrichPolicyStatus("SCHEDULED"));
            Runnable runnable = this.createPolicyRunner(policyName, policy, enrichIndexName, task, listener);
            this.threadPool.executor("generic").execute(runnable);
        }
        catch (Exception e) {
            task.setStatus(new ExecuteEnrichPolicyStatus("FAILED"));
            throw e;
        }
    }

    private Releasable tryLockingPolicy(String policyName, String enrichIndexName) {
        EnrichPolicyLocks.EnrichPolicyLock policyLock = this.policyLocks.lockPolicy(policyName, enrichIndexName);
        if (!this.policyExecutionPermits.tryAcquire()) {
            policyLock.close();
            throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName + "] would exceed maximum concurrent policy executions [" + this.maximumConcurrentPolicyExecutions + "]");
        }
        return () -> {
            try (EnrichPolicyLocks.EnrichPolicyLock enrichPolicyLock = policyLock;){
                this.policyExecutionPermits.release();
            }
        };
    }

    private void awaitTaskCompletionAndThenRelease(final TaskId taskId, final Releasable policyLock, final String policyName, final String enrichIndexName) {
        GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(taskId).setWaitForCompletion(true).setTimeout(TimeValue.MAX_VALUE);
        this.client.admin().cluster().getTask(getTaskRequest, (ActionListener)new ActionListener<GetTaskResponse>(){

            public void onResponse(GetTaskResponse getTaskResponse) {
                policyLock.close();
            }

            public void onFailure(Exception exception) {
                if (ExceptionsHelper.unwrap((Throwable)exception, (Class[])new Class[]{ResourceNotFoundException.class}) != null) {
                    logger.debug("Assuming async policy [{}:{}] execution task [{}] has ended after not being able to retrieve it from remote host", (Object)policyName, (Object)enrichIndexName, (Object)taskId);
                    policyLock.close();
                } else if (ExceptionsHelper.unwrap((Throwable)exception, (Class[])new Class[]{ElasticsearchTimeoutException.class}) != null) {
                    logger.debug("Retrying task wait after encountering timeout during async policy execution result [{}:{}]", (Object)policyName, (Object)enrichIndexName);
                    EnrichPolicyExecutor.this.awaitTaskCompletionAndThenRelease(taskId, policyLock, policyName, enrichIndexName);
                } else {
                    logger.error("Emergency unlock for enrich policy [" + policyName + ":" + enrichIndexName + "] on failure to determine task status caused by unhandled exception", (Throwable)exception);
                    policyLock.close();
                }
            }
        });
    }

    private Runnable createPolicyRunner(String policyName, EnrichPolicy policy, String enrichIndexName, ExecuteEnrichPolicyTask task, ActionListener<ExecuteEnrichPolicyStatus> listener) {
        return new EnrichPolicyRunner(policyName, policy, task, listener, this.clusterService, this.indicesService, this.client, this.indexNameExpressionResolver, enrichIndexName, this.fetchSize, this.maxForceMergeAttempts);
    }
}

