/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.action.bulk;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestModifier;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public abstract class TransportAbstractBulkAction
extends HandledTransportAction<BulkRequest, BulkResponse> {
    private static final Logger logger = LogManager.getLogger(TransportAbstractBulkAction.class);
    protected final ThreadPool threadPool;
    protected final ClusterService clusterService;
    protected final IndexingPressure indexingPressure;
    protected final SystemIndices systemIndices;
    private final IngestService ingestService;
    private final IngestActionForwarder ingestForwarder;
    protected final LongSupplier relativeTimeProvider;
    protected final Executor writeExecutor;
    protected final Executor systemWriteExecutor;
    private final ActionType<BulkResponse> bulkAction;

    public TransportAbstractBulkAction(ActionType<BulkResponse> action, TransportService transportService, ActionFilters actionFilters, Writeable.Reader<BulkRequest> requestReader, ThreadPool threadPool, ClusterService clusterService, IngestService ingestService, IndexingPressure indexingPressure, SystemIndices systemIndices, LongSupplier relativeTimeProvider) {
        super(action.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE);
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.ingestService = ingestService;
        this.indexingPressure = indexingPressure;
        this.systemIndices = systemIndices;
        this.writeExecutor = threadPool.executor("write");
        this.systemWriteExecutor = threadPool.executor("system_write");
        this.ingestForwarder = new IngestActionForwarder(transportService);
        clusterService.addStateApplier(this.ingestForwarder);
        this.relativeTimeProvider = relativeTimeProvider;
        this.bulkAction = action;
    }

    @Override
    protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
        int indexingOps = bulkRequest.numberOfActions();
        long indexingBytes = bulkRequest.ramBytesUsed();
        boolean isOnlySystem = TransportBulkAction.isOnlySystem(bulkRequest, this.clusterService.state().metadata().getIndicesLookup(), this.systemIndices);
        Releasable releasable = this.indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
        ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, () -> ((Releasable)releasable).close());
        Executor executor = isOnlySystem ? this.systemWriteExecutor : this.writeExecutor;
        this.ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
    }

    private void ensureClusterStateThenForkAndExecute(final Task task, final BulkRequest bulkRequest, final Executor executor, final ActionListener<BulkResponse> releasingListener) {
        ClusterState initialState = this.clusterService.state();
        final ClusterBlockException blockException = initialState.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
        if (blockException != null) {
            if (!blockException.retryable()) {
                releasingListener.onFailure(blockException);
                return;
            }
            logger.trace("cluster is blocked, waiting for it to recover", (Throwable)blockException);
            ClusterStateObserver clusterStateObserver = new ClusterStateObserver(initialState, this.clusterService, bulkRequest.timeout(), logger, this.threadPool.getThreadContext());
            clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener(){

                @Override
                public void onNewClusterState(ClusterState state) {
                    TransportAbstractBulkAction.this.forkAndExecute(task, bulkRequest, executor, releasingListener);
                }

                @Override
                public void onClusterServiceClose() {
                    releasingListener.onFailure(new NodeClosedException(TransportAbstractBulkAction.this.clusterService.localNode()));
                }

                @Override
                public void onTimeout(TimeValue timeout) {
                    releasingListener.onFailure(blockException);
                }
            }, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE));
        } else {
            this.forkAndExecute(task, bulkRequest, executor, releasingListener);
        }
    }

    private void forkAndExecute(final Task task, final BulkRequest bulkRequest, final Executor executor, final ActionListener<BulkResponse> releasingListener) {
        executor.execute(new ActionRunnable<BulkResponse>(releasingListener){

            @Override
            protected void doRun() {
                TransportAbstractBulkAction.this.applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
            }
        });
    }

    private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener) {
        boolean hasIndexRequestsWithPipelines = false;
        Metadata metadata = this.clusterService.state().getMetadata();
        for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
            IndexRequest ir;
            IndexRequest indexRequest = TransportAbstractBulkAction.getIndexWriteRequest(actionRequest);
            if (indexRequest != null) {
                IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
                hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
            }
            if (!(actionRequest instanceof IndexRequest) || (ir = (IndexRequest)actionRequest).getAutoGeneratedTimestamp() == -1L) continue;
            throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
        }
        if (hasIndexRequestsWithPipelines) {
            ActionListener.run(listener, l -> {
                if (Assertions.ENABLED) {
                    boolean arePipelinesResolved = bulkRequest.requests().stream().map(TransportAbstractBulkAction::getIndexWriteRequest).filter(Objects::nonNull).allMatch(IndexRequest::isPipelineResolved);
                    assert (arePipelinesResolved) : bulkRequest;
                }
                if (this.clusterService.localNode().isIngestNode()) {
                    this.processBulkIndexIngestRequest(task, bulkRequest, executor, metadata, (ActionListener<BulkResponse>)l);
                } else {
                    this.ingestForwarder.forwardIngestRequest(this.bulkAction, bulkRequest, (ActionListener<?>)l);
                }
            });
            return true;
        }
        return false;
    }

    private void processBulkIndexIngestRequest(final Task task, BulkRequest original, final Executor executor, Metadata metadata, ActionListener<BulkResponse> listener) {
        long ingestStartTimeInNanos = System.nanoTime();
        BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
        this.getIngestService(original).executeBulkRequest(original.numberOfActions(), () -> bulkRequestModifier, bulkRequestModifier::markItemAsDropped, indexName -> this.shouldStoreFailure((String)indexName, metadata, this.threadPool.absoluteTimeInMillis()), bulkRequestModifier::markItemForFailureStore, bulkRequestModifier::markItemAsFailed, (originalThread, exception) -> {
            if (exception != null) {
                logger.debug("failed to execute pipeline for a bulk request", (Throwable)exception);
                listener.onFailure((Exception)exception);
            } else {
                long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
                final BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
                final ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
                if (bulkRequest.requests().isEmpty()) {
                    actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
                } else {
                    ActionRunnable<BulkResponse> runnable = new ActionRunnable<BulkResponse>(actionListener){

                        @Override
                        protected void doRun() {
                            TransportAbstractBulkAction.this.applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
                        }

                        @Override
                        public boolean isForceExecution() {
                            return true;
                        }
                    };
                    if (originalThread == Thread.currentThread()) {
                        runnable.run();
                    } else {
                        executor.execute(runnable);
                    }
                }
            }
        }, executor);
    }

    protected abstract boolean shouldStoreFailure(String var1, Metadata var2, long var3);

    public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteRequest) {
        IndexRequest indexRequest = null;
        if (docWriteRequest instanceof IndexRequest) {
            indexRequest = (IndexRequest)docWriteRequest;
        } else if (docWriteRequest instanceof UpdateRequest) {
            UpdateRequest updateRequest = (UpdateRequest)docWriteRequest;
            indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
        }
        return indexRequest;
    }

    protected IngestService getIngestService(BulkRequest request) {
        return this.ingestService;
    }

    protected long relativeTime() {
        return this.relativeTimeProvider.getAsLong();
    }

    protected long buildTookInMillis(long startTimeNanos) {
        return TimeUnit.NANOSECONDS.toMillis(this.relativeTime() - startTimeNanos);
    }

    private void applyPipelinesAndDoInternalExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener) {
        long relativeStartTime = this.threadPool.relativeTimeInMillis();
        if (!this.applyPipelines(task, bulkRequest, executor, listener)) {
            this.doInternalExecute(task, bulkRequest, executor, listener, relativeStartTime);
        }
    }

    protected abstract void doInternalExecute(Task var1, BulkRequest var2, Executor var3, ActionListener<BulkResponse> var4, long var5);
}

