/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.cluster.action.shard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.StaleShard;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

public class ShardStateAction {
    private static final Logger logger = LogManager.getLogger(ShardStateAction.class);
    public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
    public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
    private final TransportService transportService;
    private final ClusterService clusterService;
    private final ThreadPool threadPool;
    private final ResultDeduplicator<TransportRequest, Void> remoteShardStateUpdateDeduplicator;
    private static final Class<?>[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{NotMasterException.class, ConnectTransportException.class, FailedToCommitClusterStateException.class};

    @Inject
    public ShardStateAction(ClusterService clusterService, TransportService transportService, AllocationService allocationService, RerouteService rerouteService, ThreadPool threadPool) {
        this.transportService = transportService;
        this.clusterService = clusterService;
        this.threadPool = threadPool;
        this.remoteShardStateUpdateDeduplicator = new ResultDeduplicator(threadPool.getThreadContext());
        transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, EsExecutors.DIRECT_EXECUTOR_SERVICE, StartedShardEntry::new, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService)));
        transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, EsExecutors.DIRECT_EXECUTOR_SERVICE, FailedShardEntry::new, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService)));
    }

    private void sendShardAction(String actionName, ClusterState currentState, TransportRequest request, ActionListener<Void> listener) {
        ClusterStateObserver observer = new ClusterStateObserver(currentState, this.clusterService, null, logger, this.threadPool.getThreadContext());
        DiscoveryNode masterNode = currentState.nodes().getMasterNode();
        if (masterNode == null) {
            logger.warn("no master known for action [{}] for shard entry [{}]", (Object)actionName, (Object)request);
            this.waitForNewMasterAndRetry(actionName, observer, request, listener);
        } else {
            logger.debug("sending [{}] to [{}] for shard entry [{}]", (Object)actionName, (Object)masterNode.getId(), (Object)request);
            this.transportService.sendRequest(masterNode, actionName, request, TransportResponseHandler.empty(TransportResponseHandler.TRANSPORT_WORKER, listener.delegateResponse((l, exp) -> {
                if (ShardStateAction.isMasterChannelException(exp)) {
                    this.waitForNewMasterAndRetry(actionName, observer, request, listener);
                } else {
                    Exception cause;
                    Throwable patt7040$temp;
                    logger.warn(() -> Strings.format((String)"unexpected failure while sending request [%s] to [%s] for shard entry [%s]", (Object[])new Object[]{actionName, masterNode, request}), (Throwable)exp);
                    Exception exception = exp instanceof RemoteTransportException ? ((patt7040$temp = exp.getCause()) instanceof Exception ? (cause = (Exception)patt7040$temp) : new ElasticsearchException(exp.getCause())) : exp;
                    listener.onFailure(exception);
                }
            })));
        }
    }

    private static boolean isMasterChannelException(Throwable exp) {
        return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null;
    }

    public void remoteShardFailed(ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, String message, @Nullable Exception failure, ActionListener<Void> listener) {
        assert (primaryTerm > 0L) : "primary term should be strictly positive";
        this.remoteShardStateUpdateDeduplicator.executeOnce(new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale), listener, (req, reqListener) -> this.sendShardAction(SHARD_FAILED_ACTION_NAME, this.clusterService.state(), (TransportRequest)req, (ActionListener<Void>)reqListener));
    }

    int remoteShardRequestsInFlight() {
        return this.remoteShardStateUpdateDeduplicator.size();
    }

    public void clearRemoteShardRequestDeduplicator() {
        this.remoteShardStateUpdateDeduplicator.clear();
    }

    public void localShardFailed(ShardRouting shardRouting, String message, @Nullable Exception failure, ActionListener<Void> listener) {
        this.localShardFailed(shardRouting, message, failure, listener, this.clusterService.state());
    }

    public void localShardFailed(ShardRouting shardRouting, String message, @Nullable Exception failure, ActionListener<Void> listener, ClusterState currentState) {
        FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, true);
        this.sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener);
    }

    protected void waitForNewMasterAndRetry(final String actionName, ClusterStateObserver observer, final TransportRequest request, final ActionListener<Void> listener) {
        observer.waitForNextChange(new ClusterStateObserver.Listener(){

            @Override
            public void onNewClusterState(ClusterState state) {
                if (logger.isTraceEnabled()) {
                    logger.trace("new cluster state [{}] after waiting for master election for shard entry [{}]", (Object)state, (Object)request);
                }
                ShardStateAction.this.sendShardAction(actionName, state, request, listener);
            }

            @Override
            public void onClusterServiceClose() {
                logger.warn("node closed while execution action [{}] for shard entry [{}]", (Object)actionName, (Object)request);
                listener.onFailure(new NodeClosedException(ShardStateAction.this.clusterService.localNode()));
            }

            @Override
            public void onTimeout(TimeValue timeout) {
                assert (false);
            }
        }, ClusterStateObserver.NON_NULL_MASTER_PREDICATE);
    }

    public void shardStarted(ShardRouting shardRouting, long primaryTerm, String message, ShardLongFieldRange timestampRange, ShardLongFieldRange eventIngestedRange, ActionListener<Void> listener) {
        this.shardStarted(shardRouting, primaryTerm, message, timestampRange, eventIngestedRange, listener, this.clusterService.state());
    }

    public void shardStarted(ShardRouting shardRouting, long primaryTerm, String message, ShardLongFieldRange timestampRange, ShardLongFieldRange eventIngestedRange, ActionListener<Void> listener, ClusterState currentState) {
        this.remoteShardStateUpdateDeduplicator.executeOnce(new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, timestampRange, eventIngestedRange), listener, (req, l) -> this.sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, (TransportRequest)req, (ActionListener<Void>)l));
    }

    private static class ShardStartedTransportHandler
    implements TransportRequestHandler<StartedShardEntry> {
        private final MasterServiceTaskQueue<StartedShardUpdateTask> taskQueue;

        ShardStartedTransportHandler(ClusterService clusterService, ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor) {
            this.taskQueue = clusterService.createTaskQueue("shard-started", Priority.URGENT, shardStartedClusterStateTaskExecutor);
        }

        @Override
        public void messageReceived(StartedShardEntry request, TransportChannel channel, Task task) {
            logger.debug("{} received shard started for [{}]", (Object)request.shardId, (Object)request);
            this.taskQueue.submitTask("shard-started " + request, new StartedShardUpdateTask(request, new ChannelActionListener(channel).map(ignored -> TransportResponse.Empty.INSTANCE)), null);
        }
    }

    public static class ShardStartedClusterStateTaskExecutor
    implements ClusterStateTaskExecutor<StartedShardUpdateTask> {
        private final AllocationService allocationService;
        private final RerouteService rerouteService;

        public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService) {
            this.allocationService = allocationService;
            this.rerouteService = rerouteService;
        }

        @Override
        public ClusterState execute(ClusterStateTaskExecutor.BatchExecutionContext<StartedShardUpdateTask> batchExecutionContext) throws Exception {
            ArrayList<ClusterStateTaskExecutor.TaskContext<StartedShardUpdateTask>> tasksToBeApplied = new ArrayList<ClusterStateTaskExecutor.TaskContext<StartedShardUpdateTask>>();
            ArrayList<ShardRouting> shardRoutingsToBeApplied = new ArrayList<ShardRouting>(batchExecutionContext.taskContexts().size());
            HashSet<ShardRouting> seenShardRoutings = new HashSet<ShardRouting>();
            HashMap<Index, ClusterStateTimeRanges> updatedTimestampRanges = new HashMap<Index, ClusterStateTimeRanges>();
            ClusterState initialState = batchExecutionContext.initialState();
            for (ClusterStateTaskExecutor.TaskContext<StartedShardUpdateTask> taskContext : batchExecutionContext.taskContexts()) {
                StartedShardUpdateTask startedShardUpdateTask = taskContext.getTask();
                StartedShardEntry startedShardEntry = startedShardUpdateTask.getEntry();
                ShardRouting matched = initialState.getRoutingTable().getByAllocationId(startedShardEntry.shardId, startedShardEntry.allocationId);
                if (matched == null) {
                    logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", (Object)startedShardEntry.shardId, (Object)startedShardEntry);
                    taskContext.success(startedShardUpdateTask::onSuccess);
                    continue;
                }
                if (matched.primary() && startedShardEntry.primaryTerm > 0L) {
                    IndexMetadata indexMetadata = initialState.metadata().index(startedShardEntry.shardId.getIndex());
                    assert (indexMetadata != null);
                    long currentPrimaryTerm = indexMetadata.primaryTerm(startedShardEntry.shardId.id());
                    if (currentPrimaryTerm != startedShardEntry.primaryTerm) {
                        assert (currentPrimaryTerm > startedShardEntry.primaryTerm) : "received a primary term with a higher term than in the current cluster state (received [" + startedShardEntry.primaryTerm + "] but current is [" + currentPrimaryTerm + "])";
                        logger.debug("{} ignoring shard started task [{}] (primary term {} does not match current term {})", (Object)startedShardEntry.shardId, (Object)startedShardEntry, (Object)startedShardEntry.primaryTerm, (Object)currentPrimaryTerm);
                        taskContext.success(startedShardUpdateTask::onSuccess);
                        continue;
                    }
                }
                if (!matched.initializing()) {
                    assert (matched.active()) : "expected active shard routing for task " + startedShardEntry + " but found " + matched;
                    logger.debug("{} ignoring shard started task [{}] (shard exists but is not initializing: {})", (Object)startedShardEntry.shardId, (Object)startedShardEntry, (Object)matched);
                    taskContext.success(startedShardUpdateTask::onSuccess);
                    continue;
                }
                if (seenShardRoutings.contains(matched)) {
                    logger.trace("{} ignoring shard started task [{}] (already scheduled to start {})", (Object)startedShardEntry.shardId, (Object)startedShardEntry, (Object)matched);
                    tasksToBeApplied.add(taskContext);
                    continue;
                }
                logger.debug("{} starting shard {} (shard started task: [{}])", (Object)startedShardEntry.shardId, (Object)matched, (Object)startedShardEntry);
                tasksToBeApplied.add(taskContext);
                shardRoutingsToBeApplied.add(matched);
                seenShardRoutings.add(matched);
                Index index = startedShardEntry.shardId.getIndex();
                ClusterStateTimeRanges clusterStateTimeRanges = (ClusterStateTimeRanges)updatedTimestampRanges.get(index);
                IndexLongFieldRange currentTimestampMillisRange = clusterStateTimeRanges == null ? null : clusterStateTimeRanges.timestampRange();
                IndexLongFieldRange currentEventIngestedMillisRange = clusterStateTimeRanges == null ? null : clusterStateTimeRanges.eventIngestedRange();
                IndexMetadata indexMetadata = initialState.metadata().index(index);
                if (currentTimestampMillisRange == null) {
                    currentTimestampMillisRange = indexMetadata.getTimestampRange();
                }
                if (currentEventIngestedMillisRange == null) {
                    currentEventIngestedMillisRange = indexMetadata.getEventIngestedRange();
                }
                IndexLongFieldRange newTimestampMillisRange = currentTimestampMillisRange.extendWithShardRange(startedShardEntry.shardId.id(), indexMetadata.getNumberOfShards(), startedShardEntry.timestampRange);
                IndexLongFieldRange newEventIngestedMillisRange = IndexLongFieldRange.UNKNOWN;
                TransportVersion minTransportVersion = batchExecutionContext.initialState().getMinTransportVersion();
                if (minTransportVersion.onOrAfter(TransportVersions.EVENT_INGESTED_RANGE_IN_CLUSTER_STATE)) {
                    newEventIngestedMillisRange = currentEventIngestedMillisRange.extendWithShardRange(startedShardEntry.shardId.id(), indexMetadata.getNumberOfShards(), startedShardEntry.eventIngestedRange);
                }
                if (newTimestampMillisRange == currentTimestampMillisRange && newEventIngestedMillisRange == currentEventIngestedMillisRange) continue;
                updatedTimestampRanges.put(index, new ClusterStateTimeRanges(newTimestampMillisRange, newEventIngestedMillisRange));
            }
            assert (tasksToBeApplied.size() >= shardRoutingsToBeApplied.size());
            ClusterState maybeUpdatedState = initialState;
            try {
                maybeUpdatedState = this.allocationService.applyStartedShards(initialState, shardRoutingsToBeApplied);
                if (!updatedTimestampRanges.isEmpty()) {
                    Metadata.Builder metadataBuilder = Metadata.builder(maybeUpdatedState.metadata());
                    for (Map.Entry entry : updatedTimestampRanges.entrySet()) {
                        ClusterStateTimeRanges timeRanges = (ClusterStateTimeRanges)entry.getValue();
                        metadataBuilder.put(IndexMetadata.builder(metadataBuilder.getSafe((Index)entry.getKey())).timestampRange(timeRanges.timestampRange()).eventIngestedRange(timeRanges.eventIngestedRange(), maybeUpdatedState.getMinTransportVersion()));
                    }
                    maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build();
                }
                assert (ShardStartedClusterStateTaskExecutor.assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState));
                for (ClusterStateTaskExecutor.TaskContext taskContext : tasksToBeApplied) {
                    StartedShardUpdateTask startedShardUpdateTask = (StartedShardUpdateTask)taskContext.getTask();
                    taskContext.success(startedShardUpdateTask::onSuccess);
                }
            }
            catch (Exception e) {
                logger.warn(() -> Strings.format((String)"failed to apply started shards %s", (Object[])new Object[]{shardRoutingsToBeApplied}), (Throwable)e);
                for (ClusterStateTaskExecutor.TaskContext taskContext : tasksToBeApplied) {
                    taskContext.onFailure(e);
                }
            }
            return maybeUpdatedState;
        }

        private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) {
            for (Map.Entry<String, IndexRoutingTable> cursor : clusterState.getRoutingTable().getIndicesRouting().entrySet()) {
                assert (!cursor.getValue().allPrimaryShardsActive() || clusterState.metadata().index(cursor.getKey()).getTimestampRange().isComplete()) : "index [" + cursor.getKey() + "] should have complete timestamp range, but got " + clusterState.metadata().index(cursor.getKey()).getTimestampRange() + " for " + cursor.getValue().prettyPrint();
                assert (!cursor.getValue().allPrimaryShardsActive() || clusterState.metadata().index(cursor.getKey()).getEventIngestedRange().isComplete()) : "index [" + cursor.getKey() + "] should have complete event.ingested range, but got " + clusterState.metadata().index(cursor.getKey()).getEventIngestedRange() + " for " + cursor.getValue().prettyPrint();
            }
            return true;
        }

        @Override
        public void clusterStatePublished(ClusterState newClusterState) {
            this.rerouteService.reroute("reroute after starting shards", Priority.NORMAL, ActionListener.wrap(r -> logger.trace("reroute after starting shards succeeded"), e -> logger.debug("reroute after starting shards failed", (Throwable)e)));
        }
    }

    private static class ShardFailedTransportHandler
    implements TransportRequestHandler<FailedShardEntry> {
        private final MasterServiceTaskQueue<FailedShardUpdateTask> taskQueue;
        private static final String TASK_SOURCE = "shard-failed";

        ShardFailedTransportHandler(ClusterService clusterService, ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor) {
            this.taskQueue = clusterService.createTaskQueue(TASK_SOURCE, Priority.HIGH, shardFailedClusterStateTaskExecutor);
        }

        @Override
        public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) {
            logger.debug(() -> Strings.format((String)"%s received shard failed for [%s]", (Object[])new Object[]{request.getShardId(), request}), (Throwable)request.failure);
            this.taskQueue.submitTask(TASK_SOURCE, new FailedShardUpdateTask(request, new ChannelActionListener(channel).map(ignored -> TransportResponse.Empty.INSTANCE)), null);
        }
    }

    public static class ShardFailedClusterStateTaskExecutor
    implements ClusterStateTaskExecutor<FailedShardUpdateTask> {
        private final AllocationService allocationService;
        private final RerouteService rerouteService;

        public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService) {
            this.allocationService = allocationService;
            this.rerouteService = rerouteService;
        }

        @Override
        public ClusterState execute(ClusterStateTaskExecutor.BatchExecutionContext<FailedShardUpdateTask> batchExecutionContext) throws Exception {
            ArrayList<ClusterStateTaskExecutor.TaskContext<FailedShardUpdateTask>> tasksToBeApplied = new ArrayList<ClusterStateTaskExecutor.TaskContext<FailedShardUpdateTask>>();
            ArrayList<FailedShard> failedShardsToBeApplied = new ArrayList<FailedShard>();
            ArrayList<StaleShard> staleShardsToBeApplied = new ArrayList<StaleShard>();
            ClusterState initialState = batchExecutionContext.initialState();
            for (ClusterStateTaskExecutor.TaskContext<FailedShardUpdateTask> taskContext : batchExecutionContext.taskContexts()) {
                long currentPrimaryTerm;
                FailedShardUpdateTask task = taskContext.getTask();
                FailedShardEntry failedShardEntry = task.entry();
                IndexMetadata indexMetadata = initialState.metadata().index(failedShardEntry.getShardId().getIndex());
                if (indexMetadata == null) {
                    logger.debug("{} ignoring shard failed task [{}] (unknown index {})", (Object)failedShardEntry.getShardId(), (Object)failedShardEntry, (Object)failedShardEntry.getShardId().getIndex());
                    taskContext.success(task::onSuccess);
                    continue;
                }
                if (failedShardEntry.primaryTerm > 0L && (currentPrimaryTerm = indexMetadata.primaryTerm(failedShardEntry.getShardId().id())) != failedShardEntry.primaryTerm) {
                    assert (currentPrimaryTerm > failedShardEntry.primaryTerm) : "received a primary term with a higher term than in the current cluster state (received [" + failedShardEntry.primaryTerm + "] but current is [" + currentPrimaryTerm + "])";
                    logger.debug("{} failing shard failed task [{}] (primary term {} does not match current term {})", (Object)failedShardEntry.getShardId(), (Object)failedShardEntry, (Object)failedShardEntry.primaryTerm, (Object)indexMetadata.primaryTerm(failedShardEntry.getShardId().id()));
                    taskContext.onFailure(new NoLongerPrimaryShardException(failedShardEntry.getShardId(), "primary term [" + failedShardEntry.primaryTerm + "] did not match current primary term [" + currentPrimaryTerm + "]"));
                    continue;
                }
                ShardRouting matched = initialState.getRoutingTable().getByAllocationId(failedShardEntry.getShardId(), failedShardEntry.getAllocationId());
                if (matched == null) {
                    Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(failedShardEntry.getShardId().id());
                    if (failedShardEntry.primaryTerm > 0L && inSyncAllocationIds.contains(failedShardEntry.getAllocationId())) {
                        logger.debug("{} marking shard {} as stale (shard failed task: [{}])", (Object)failedShardEntry.getShardId(), (Object)failedShardEntry.getAllocationId(), (Object)failedShardEntry);
                        tasksToBeApplied.add(taskContext);
                        staleShardsToBeApplied.add(new StaleShard(failedShardEntry.getShardId(), failedShardEntry.getAllocationId()));
                        continue;
                    }
                    logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", (Object)failedShardEntry.getShardId(), (Object)failedShardEntry);
                    taskContext.success(task::onSuccess);
                    continue;
                }
                logger.debug("{} failing shard {} (shard failed task: [{}])", (Object)failedShardEntry.getShardId(), (Object)matched, (Object)task);
                tasksToBeApplied.add(taskContext);
                failedShardsToBeApplied.add(new FailedShard(matched, failedShardEntry.message, failedShardEntry.failure, failedShardEntry.markAsStale));
            }
            assert (tasksToBeApplied.size() == failedShardsToBeApplied.size() + staleShardsToBeApplied.size());
            ClusterState maybeUpdatedState = initialState;
            try (Releasable ignored = batchExecutionContext.dropHeadersContext();){
                maybeUpdatedState = this.applyFailedShards(initialState, failedShardsToBeApplied, staleShardsToBeApplied);
                for (ClusterStateTaskExecutor.TaskContext taskContext : tasksToBeApplied) {
                    FailedShardUpdateTask task = (FailedShardUpdateTask)taskContext.getTask();
                    taskContext.success(task::onSuccess);
                }
            }
            catch (Exception e) {
                logger.warn(() -> Strings.format((String)"failed to apply failed shards %s", (Object[])new Object[]{failedShardsToBeApplied}), (Throwable)e);
                for (ClusterStateTaskExecutor.TaskContext taskContext : tasksToBeApplied) {
                    taskContext.onFailure(e);
                }
            }
            return maybeUpdatedState;
        }

        ClusterState applyFailedShards(ClusterState currentState, List<FailedShard> failedShards, List<StaleShard> staleShards) {
            return this.allocationService.applyFailedShards(currentState, failedShards, staleShards);
        }

        @Override
        public void clusterStatePublished(ClusterState newClusterState) {
            int numberOfUnassignedShards = newClusterState.getRoutingNodes().unassigned().size();
            if (numberOfUnassignedShards > 0) {
                String reason = org.elasticsearch.common.Strings.format("[%d] unassigned shards after failing shards", numberOfUnassignedShards);
                logger.trace("{}, scheduling a reroute", (Object)reason);
                this.rerouteService.reroute(reason, Priority.NORMAL, ActionListener.wrap(r -> logger.trace("{}, reroute completed", (Object)reason), e -> logger.debug(() -> Strings.format((String)"%s, reroute failed", (Object[])new Object[]{reason}), (Throwable)e)));
            }
        }
    }

    public static class FailedShardEntry
    extends TransportRequest {
        final ShardId shardId;
        final String allocationId;
        final long primaryTerm;
        final String message;
        @Nullable
        final Exception failure;
        final boolean markAsStale;

        FailedShardEntry(StreamInput in) throws IOException {
            super(in);
            this.shardId = new ShardId(in);
            this.allocationId = in.readString();
            this.primaryTerm = in.readVLong();
            this.message = in.readString();
            this.failure = in.readException();
            this.markAsStale = in.readBoolean();
        }

        public FailedShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, @Nullable Exception failure, boolean markAsStale) {
            this.shardId = shardId;
            this.allocationId = allocationId;
            this.primaryTerm = primaryTerm;
            this.message = message;
            this.failure = failure;
            this.markAsStale = markAsStale;
        }

        public ShardId getShardId() {
            return this.shardId;
        }

        public String getAllocationId() {
            return this.allocationId;
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            this.shardId.writeTo(out);
            out.writeString(this.allocationId);
            out.writeVLong(this.primaryTerm);
            out.writeString(this.message);
            out.writeException(this.failure);
            out.writeBoolean(this.markAsStale);
        }

        @Override
        public String toString() {
            return org.elasticsearch.common.Strings.format("FailedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s], markAsStale [%b], failure [%s]}", this.shardId, this.allocationId, this.primaryTerm, this.message, this.markAsStale, this.failure != null ? ExceptionsHelper.stackTrace(this.failure) : null);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FailedShardEntry that = (FailedShardEntry)o;
            return Objects.equals(this.shardId, that.shardId) && Objects.equals(this.allocationId, that.allocationId) && this.primaryTerm == that.primaryTerm && this.markAsStale == that.markAsStale;
        }

        public int hashCode() {
            return Objects.hash(this.shardId, this.allocationId, this.primaryTerm, this.markAsStale);
        }
    }

    public static class StartedShardEntry
    extends TransportRequest {
        final ShardId shardId;
        final String allocationId;
        final long primaryTerm;
        final String message;
        final ShardLongFieldRange timestampRange;
        final ShardLongFieldRange eventIngestedRange;

        StartedShardEntry(StreamInput in) throws IOException {
            super(in);
            this.shardId = new ShardId(in);
            this.allocationId = in.readString();
            this.primaryTerm = in.readVLong();
            this.message = in.readString();
            this.timestampRange = ShardLongFieldRange.readFrom(in);
            this.eventIngestedRange = in.getTransportVersion().onOrAfter(TransportVersions.EVENT_INGESTED_RANGE_IN_CLUSTER_STATE) ? ShardLongFieldRange.readFrom(in) : ShardLongFieldRange.UNKNOWN;
        }

        public StartedShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, ShardLongFieldRange timestampRange, ShardLongFieldRange eventIngestedRange) {
            this.shardId = shardId;
            this.allocationId = allocationId;
            this.primaryTerm = primaryTerm;
            this.message = message;
            this.timestampRange = timestampRange;
            this.eventIngestedRange = eventIngestedRange;
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            this.shardId.writeTo(out);
            out.writeString(this.allocationId);
            out.writeVLong(this.primaryTerm);
            out.writeString(this.message);
            this.timestampRange.writeTo(out);
            if (out.getTransportVersion().onOrAfter(TransportVersions.EVENT_INGESTED_RANGE_IN_CLUSTER_STATE)) {
                this.eventIngestedRange.writeTo(out);
            }
        }

        @Override
        public String toString() {
            return org.elasticsearch.common.Strings.format("StartedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s]}", this.shardId, this.allocationId, this.primaryTerm, this.message);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            StartedShardEntry that = (StartedShardEntry)o;
            return this.primaryTerm == that.primaryTerm && this.shardId.equals(that.shardId) && this.allocationId.equals(that.allocationId) && this.message.equals(that.message) && this.timestampRange.equals(that.timestampRange) && this.eventIngestedRange.equals(that.eventIngestedRange);
        }

        public int hashCode() {
            return Objects.hash(this.shardId, this.allocationId, this.primaryTerm, this.message, this.timestampRange, this.eventIngestedRange);
        }
    }

    public static final class NoLongerPrimaryShardException
    extends ElasticsearchException {
        public NoLongerPrimaryShardException(ShardId shardId, String msg) {
            super(msg, new Object[0]);
            this.setShard(shardId);
        }

        public NoLongerPrimaryShardException(StreamInput in) throws IOException {
            super(in);
        }
    }

    public record StartedShardUpdateTask(StartedShardEntry entry, ActionListener<Void> listener) implements ClusterStateTaskListener
    {
        public StartedShardEntry getEntry() {
            return this.entry;
        }

        @Override
        public void onFailure(Exception e) {
            if (e instanceof NotMasterException) {
                logger.debug(() -> Strings.format((String)"%s no longer master while starting shard [%s]", (Object[])new Object[]{this.entry.shardId, this.entry}));
            } else if (e instanceof FailedToCommitClusterStateException) {
                logger.debug(() -> Strings.format((String)"%s unexpected failure while starting shard [%s]", (Object[])new Object[]{this.entry.shardId, this.entry}), (Throwable)e);
            } else {
                logger.error(() -> Strings.format((String)"%s unexpected failure while starting shard [%s]", (Object[])new Object[]{this.entry.shardId, this.entry}), (Throwable)e);
            }
            this.listener.onFailure(e);
        }

        public void onSuccess() {
            this.listener.onResponse(null);
        }

        @Override
        public String toString() {
            return "StartedShardUpdateTask{entry=" + this.entry + ", listener=" + this.listener + "}";
        }
    }

    record ClusterStateTimeRanges(IndexLongFieldRange timestampRange, IndexLongFieldRange eventIngestedRange) {
    }

    public record FailedShardUpdateTask(FailedShardEntry entry, ActionListener<Void> listener) implements ClusterStateTaskListener
    {
        public void onSuccess() {
            this.listener.onResponse(null);
        }

        @Override
        public void onFailure(Exception e) {
            logger.log(MasterService.isPublishFailureException(e) ? Level.DEBUG : Level.ERROR, () -> Strings.format((String)"%s unexpected failure while failing shard [%s]", (Object[])new Object[]{this.entry.shardId, this.entry}), (Throwable)e);
            this.listener.onFailure(e);
        }
    }
}

