This package contains the logic for the recovery functionality.
PreliminariesRecoveries are started on data nodes as a result of data node discovering shard assignments to themselves in the cluster state. The master node sets up these shard allocations in the cluster state (see
ShardRouting). If a data node finds shard allocations that require recovery on itself, it will execute the required recoveries by executing the logic starting at
IndicesClusterStateService.createOrUpdateShards(org.elasticsearch.cluster.ClusterState). As the data nodes execute the steps of the recovery state machine they report back success or failure to do so to the master node via the transport actions in
ShardStateAction, which will then update the shard routing in the cluster state accordingly to reflect the status of the recovered shards or to handle failures in the recovery process. Recoveries can have various kinds of sources that are modeled via the
RecoverySourcethat is communicated to the recovery target by
ShardRouting.recoverySource()for each shard routing. These sources and their state machines will be described below. The actual recovery process for all of them is started by invoking
IndexShard.startRecovery(org.elasticsearch.indices.recovery.RecoveryState, org.elasticsearch.indices.recovery.PeerRecoveryTargetService, org.elasticsearch.indices.recovery.PeerRecoveryTargetService.RecoveryListener, org.elasticsearch.repositories.RepositoriesService, java.util.function.BiConsumer<java.lang.String, org.elasticsearch.cluster.metadata.MappingMetadata>, org.elasticsearch.indices.IndicesService).
CheckpointsAspects of the recovery logic are based on the concepts of local and global checkpoints. Each operation on a shard is tracked by a sequence number as well as the primary term during which it was applied to the index. The sequence number up to which operations have been fully processed on a shard is that shard's local checkpoint. The sequence number up to which operations on all replicas for a shard have been fully processed is referred to as the global checkpoint. Comparing the local checkpoints of shard copies enables determining which operations would have to be replayed to a shard copy to bring it in-sync with the primary shard. By retaining operations in the
RecoveryState.Translogor in soft deletes, they are available for this kind of replay that moves a shard lower local checkpoint up to a higher local checkpoint. The global checkpoint allows for determining which operations have been safely processed on all shards and thus don't have to be retained on the primary node for replay to replicas. The primary node tracks the global checkpoint for a shard via the
ReplicationTracker. The primary term is tracked by the master node and stored in the cluster state and incremented each time the primary node for a shard changes.
Retention LeasesThe duration for which a shard retains individual operations for replay during recovery is governed by the
RetentionLeasefunctionality. More information about this functionality can be found in the
org.elasticsearch.index.seqnopackage and the "History retention" section in the docs.
1. Peer RecoveryPeer recovery is the process of bringing a shard copy on one node, referred to as the target node below, in-sync with the shard copy on another node, referred to as the source node below. It is always the primary node of a shard that serves as the source of the recovery. On a high level, recovery happens by a combination of comparing and subsequently synchronizing files and operations from the source to the target. Synchronizing the on-disk file structure on the target with those on the source node is referred to as file-based recovery. Synchronizing operations based on comparing checkpoints is commonly referred to as ops-based recovery. As primaries and replicas are independent Lucene indices that will execute their Lucene level merges independently the concrete on-disk file structure on a pair of primary and replica nodes for a given shard will diverge over time even if both copies of the shard hold the exact same set of documents and operations. Peer recovery will therefore try to avoid file-based recovery where possible to reduce the amount of data that has to be transferred. It will prefer replaying just those operations missing on the target relative to the source instead as this avoids copying files from source to target that could contain data that is for the most part already present on the target. Replaying operations is possible as long as the primary node retains the missing operations as soft-deletes in its Lucene index.
State MachinePeer recoveries are modeled via a
RecoverySource.PeerRecoverySource. They start by moving the shard's state to
IndexShardState.RECOVERINGand then triggering the peer recovery through a call to
PeerRecoveryTargetService.startRecovery(org.elasticsearch.index.shard.IndexShard, org.elasticsearch.cluster.node.DiscoveryNode, org.elasticsearch.indices.recovery.PeerRecoveryTargetService.RecoveryListener)which results in the following steps being executed.
The target shard starts out with a
RecoveryState.Stage.INIT. At the start of the peer recovery process, the target node will try to recover from its local translog as far as if there are any operations to recover from it. It will first move to stage
RecoveryState.Stage.INDEXand then try to recover as far as possible from existing files and the existing translog. During this process, it will move to
RecoveryState.Stage.VERIFY_INDEX, verifying that the files on disk are not corrupted, then to
RecoveryState.Stage.TRANSLOGduring recovery from translog. A
StartRecoveryRequestis then sent to the primary node of the shard to recover by the target node for the recovery. This triggers
PeerRecoverySourceService.recover(org.elasticsearch.indices.recovery.StartRecoveryRequest, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoveryResponse>)on the primary node that receives the request. The
StartRecoveryRequestcontains information about the local state of the recovery target, based on which the recovery source will determine the recovery mechanism (file-based or ops-based) to use.
When determining whether to use ops-based recovery the recovery source will check the following conditions
that must all be true simultaneously for ops-based recovery to be executed:
Target shard and source shard must share the same
Engine.HISTORY_UUID_KEYin their latest Lucene commit.
The source must have retained all operations between the latest sequence number present on the target.
IndexShard.hasCompleteHistoryOperations(java.lang.String, org.elasticsearch.index.engine.Engine.HistorySource, long)for details.
A peer recovery retention lease must exist for the target shard and it must retain a sequence number below or equal
to the starting sequence number in
- Target shard and source shard must share the same
In case the preconditions for ops-based recovery aren't met, file-based recovery is executed first.
To trigger file-based recovery, the source node will execute phase 1 of the recovery by invoking
RecoverySourceHandler.phase1(org.apache.lucene.index.IndexCommit, long, java.util.function.IntSupplier, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoverySourceHandler.SendFileResult>). Using the information about the files on the target node found in the
StartRecoveryRequest, phase 1 will determine what segment files must be copied to the recovery target. The information about these files will then be sent to the recovery target via a
RecoveryFilesInfoRequest. Once the recovery target has received the list of files that will be copied to it,
RecoverySourceHandler.sendFiles(org.elasticsearch.index.store.Store, org.elasticsearch.index.store.StoreFileMetadata, java.util.function.IntSupplier, org.elasticsearch.action.ActionListener<java.lang.Void>)is invoked which will send the segment files over to the recovery target via a series of
RecoveryFileChunkRequest. Receiving a
RecoveryFilesInfoRequeston the target indicates to it that the recovery will be file-based so it will invoke
IndexShard.resetRecoveryStage()to reset the recovery back to
INITstage and then prepare for receiving files and move to stage
Once all the file chunks have been received by the recovery target, a retention lease for the latest global checkpoint is
created by the source node to ensure all remaining operations from the latest global checkpoint are retained for replay in
the next step of the recovery. Also, after creating the retention lease and before moving on to the next step of the peer
recovery process, a
RecoveryCleanFilesRequestis sent from the source to the target. The target will handle this request by doing the following:
- The file chunks from the previous step were saved to temporary file names. They are now renamed to their original names.
- Cleanup all files in the shard directory that are not part of the recovering shard copy.
Trigger creation of a new translog on the target. This moves the recovery stage on the target to
After the segment files synchronization from source to the target has finished or was skipped, the translog based recovery
step is executed by invoking
RecoverySourceHandler.prepareTargetForTranslog(int, org.elasticsearch.action.ActionListener<org.elasticsearch.core.TimeValue>)on the recovery source. This sends a
RecoveryPrepareForTranslogOperationsRequestto the recovery target which contains the estimated number of translog operations that have to be copied to the target. On the target, this request is handled and triggers a call to
IndexShard.openEngineAndSkipTranslogRecovery()which opens a new engine and translog and then responds back to the recovery source. Once the recovery source receives that response, it invokes
RecoverySourceHandler.phase2(long, long, org.elasticsearch.index.translog.Translog.Snapshot, long, long, org.elasticsearch.index.seqno.RetentionLeases, long, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoverySourceHandler.SendSnapshotResult>)to replay outstanding translog operations on the target. This is done by sending a series of
RecoveryTranslogOperationsRequestto the target which will respond with
RecoveryTranslogOperationsResponses which contain the maximum persisted local checkpoint for the target. Tracking the maximum of the received local checkpoint values is necessary for the next step, finalizing the recovery.
After replaying the translog operations on the target, the recovery is finalized by a call to
RecoverySourceHandler.finalizeRecovery(long, long, org.elasticsearch.action.ActionListener<java.lang.Void>)on the source. With the knowledge that the target has received all operations up to the maximum local checkpoint tracked in the previous step, the source (which is also the primary) can now update its in-sync checkpoint state by a call to
ReplicationTracker.markAllocationIdAsInSync(java.lang.String, long). Once the in-sync sequence number information has been persisted successfully, the source sends a
RecoveryFinalizeRecoveryRequestto the target which contains the global checkpoint as well as a sequence number above which the target can trim all operations from its translog since all operations above this number have just been replayed in the previous step and were either of the same or a newer version that those in the existing translog on the target. This step then also moves the target to the recovery stage
After the finalization step, the recovery source will send a
RecoveryResponseto the target which is implemented as a response to the initial
StartRecoveryRequestthat the target sent to initiate the recovery. This leads to a call to
IndexShard.postRecovery(java.lang.String)which moves the recovery state to stage
RecoveryState.Stage.DONE, triggers a refresh of the shard and moves the shard to state
IndexShardState.POST_RECOVERY. Finally, the recovery target will then send a
ShardStateAction.StartedShardEntrytransport message to master to inform it about the successful start of the shard.
After receiving the
StartedShardEntry, master will then update the cluster state to reflect the state of the now fully recovered recovery target by executing the
ShardStateAction.ShardStartedClusterStateTaskExecutor. The resulting cluster state update is then observed by
IndexShard.updateShardState(org.elasticsearch.cluster.routing.ShardRouting, long, java.util.function.BiConsumer<org.elasticsearch.index.shard.IndexShard, org.elasticsearch.action.ActionListener<org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask>>, long, java.util.Set<java.lang.String>, org.elasticsearch.cluster.routing.IndexShardRoutingTable)which updates the shard state on the target node to
IndexShardState.STARTEDthus completing the peer recovery.
ClassDescriptionAn exception marking that this recovery attempt should be ignored (since probably, we already recovered).File chunks are sent/requested sequentially by at most one thread at any time.The source recovery accepts recovery requests from other peer shards and start the recovery process from this source shard to the target shard.The recovery target handles recoveries of peer shards of the shard+node to recover to.This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node of those recoveries).RecoverySourceHandler handles the three phases of shard recovery, which is everything relating to copying the segment files as well as sending translog operations across the wire once the segments have been copied.Keeps track of state related to shard recovery.Represents a recovery where the current node is the target node of the recovery.Represents a request for starting a peer recovery.Represents a request for starting a peer recovery.