Package org.elasticsearch.indices.recovery


package org.elasticsearch.indices.recovery

This package contains the logic for the recovery functionality.

Preliminaries

Recoveries are started on data nodes as a result of data node discovering shard assignments to themselves in the cluster state. The master node sets up these shard allocations in the cluster state (see ShardRouting). If a data node finds shard allocations that require recovery on itself, it will execute the required recoveries by executing the logic starting at IndicesClusterStateService.createOrUpdateShards(org.elasticsearch.cluster.ClusterState). As the data nodes execute the steps of the recovery state machine they report back success or failure to do so to the master node via the transport actions in ShardStateAction, which will then update the shard routing in the cluster state accordingly to reflect the status of the recovered shards or to handle failures in the recovery process. Recoveries can have various kinds of sources that are modeled via the RecoverySource that is communicated to the recovery target by ShardRouting.recoverySource() for each shard routing. These sources and their state machines will be described below. The actual recovery process for all of them is started by invoking IndexShard.startRecovery(org.elasticsearch.indices.recovery.RecoveryState, org.elasticsearch.indices.recovery.PeerRecoveryTargetService, org.elasticsearch.indices.recovery.PeerRecoveryTargetService.RecoveryListener, org.elasticsearch.repositories.RepositoriesService, java.util.function.BiConsumer<java.lang.String, org.elasticsearch.cluster.metadata.MappingMetadata>, org.elasticsearch.indices.IndicesService).

Checkpoints

Aspects of the recovery logic are based on the concepts of local and global checkpoints. Each operation on a shard is tracked by a sequence number as well as the primary term during which it was applied to the index. The sequence number up to which operations have been fully processed on a shard is that shard's local checkpoint. The sequence number up to which operations on all replicas for a shard have been fully processed is referred to as the global checkpoint. Comparing the local checkpoints of shard copies enables determining which operations would have to be replayed to a shard copy to bring it in-sync with the primary shard. By retaining operations in the RecoveryState.Translog or in soft deletes, they are available for this kind of replay that moves a shard lower local checkpoint up to a higher local checkpoint. The global checkpoint allows for determining which operations have been safely processed on all shards and thus don't have to be retained on the primary node for replay to replicas. The primary node tracks the global checkpoint for a shard via the ReplicationTracker. The primary term is tracked by the master node and stored in the cluster state and incremented each time the primary node for a shard changes.

Retention Leases

The duration for which a shard retains individual operations for replay during recovery is governed by the RetentionLease functionality. More information about this functionality can be found in the org.elasticsearch.index.seqno package and the "History retention" section in the docs.

Recovery Types

1. Peer Recovery

Peer recovery is the process of bringing a shard copy on one node, referred to as the target node below, in-sync with the shard copy on another node, referred to as the source node below. It is always the primary node of a shard that serves as the source of the recovery. On a high level, recovery happens by a combination of comparing and subsequently synchronizing files and operations from the source to the target. Synchronizing the on-disk file structure on the target with those on the source node is referred to as file-based recovery. Synchronizing operations based on comparing checkpoints is commonly referred to as ops-based recovery. As primaries and replicas are independent Lucene indices that will execute their Lucene level merges independently the concrete on-disk file structure on a pair of primary and replica nodes for a given shard will diverge over time even if both copies of the shard hold the exact same set of documents and operations. Peer recovery will therefore try to avoid file-based recovery where possible to reduce the amount of data that has to be transferred. It will prefer replaying just those operations missing on the target relative to the source instead as this avoids copying files from source to target that could contain data that is for the most part already present on the target. Replaying operations is possible as long as the primary node retains the missing operations as soft-deletes in its Lucene index.

State Machine

Peer recoveries are modeled via a RecoverySource.PeerRecoverySource. They start by moving the shard's state to IndexShardState.RECOVERING and then triggering the peer recovery through a call to PeerRecoveryTargetService.startRecovery(org.elasticsearch.index.shard.IndexShard, org.elasticsearch.cluster.node.DiscoveryNode, org.elasticsearch.indices.recovery.PeerRecoveryTargetService.RecoveryListener) which results in the following steps being executed.
  1. The target shard starts out with a RecoveryState at stage RecoveryState.Stage.INIT. At the start of the peer recovery process, the target node will try to recover from its local translog as far as if there are any operations to recover from it. It will first move to stage RecoveryState.Stage.INDEX and then try to recover as far as possible from existing files and the existing translog. During this process, it will move to RecoveryState.Stage.VERIFY_INDEX, verifying that the files on disk are not corrupted, then to RecoveryState.Stage.TRANSLOG during recovery from translog. A StartRecoveryRequest is then sent to the primary node of the shard to recover by the target node for the recovery. This triggers PeerRecoverySourceService.recover(org.elasticsearch.indices.recovery.StartRecoveryRequest, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoveryResponse>) on the primary node that receives the request. The StartRecoveryRequest contains information about the local state of the recovery target, based on which the recovery source will determine the recovery mechanism (file-based or ops-based) to use.
  2. When determining whether to use ops-based recovery the recovery source will check the following conditions that must all be true simultaneously for ops-based recovery to be executed:
  3. In case the preconditions for ops-based recovery aren't met, file-based recovery is executed first. To trigger file-based recovery, the source node will execute phase 1 of the recovery by invoking RecoverySourceHandler.phase1(org.apache.lucene.index.IndexCommit, long, java.util.function.IntSupplier, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoverySourceHandler.SendFileResult>). Using the information about the files on the target node found in the StartRecoveryRequest, phase 1 will determine what segment files must be copied to the recovery target. The information about these files will then be sent to the recovery target via a RecoveryFilesInfoRequest. Once the recovery target has received the list of files that will be copied to it, RecoverySourceHandler.sendFiles(org.elasticsearch.index.store.Store, org.elasticsearch.index.store.StoreFileMetadata[], java.util.function.IntSupplier, org.elasticsearch.action.ActionListener<java.lang.Void>) is invoked which will send the segment files over to the recovery target via a series of RecoveryFileChunkRequest. Receiving a RecoveryFilesInfoRequest on the target indicates to it that the recovery will be file-based so it will invoke IndexShard.resetRecoveryStage() to reset the recovery back to INIT stage and then prepare for receiving files and move to stage INDEX again.
  4. Once all the file chunks have been received by the recovery target, a retention lease for the latest global checkpoint is created by the source node to ensure all remaining operations from the latest global checkpoint are retained for replay in the next step of the recovery. Also, after creating the retention lease and before moving on to the next step of the peer recovery process, a RecoveryCleanFilesRequest is sent from the source to the target. The target will handle this request by doing the following:
    • The file chunks from the previous step were saved to temporary file names. They are now renamed to their original names.
    • Cleanup all files in the shard directory that are not part of the recovering shard copy.
    • Trigger creation of a new translog on the target. This moves the recovery stage on the target to RecoveryState.Stage.TRANSLOG.
  5. After the segment files synchronization from source to the target has finished or was skipped, the translog based recovery step is executed by invoking RecoverySourceHandler.prepareTargetForTranslog(int, org.elasticsearch.action.ActionListener<org.elasticsearch.core.TimeValue>) on the recovery source. This sends a RecoveryPrepareForTranslogOperationsRequest to the recovery target which contains the estimated number of translog operations that have to be copied to the target. On the target, this request is handled and triggers a call to IndexShard.openEngineAndSkipTranslogRecovery() which opens a new engine and translog and then responds back to the recovery source. Once the recovery source receives that response, it invokes RecoverySourceHandler.phase2(long, long, org.elasticsearch.index.translog.Translog.Snapshot, long, long, org.elasticsearch.index.seqno.RetentionLeases, long, org.elasticsearch.action.ActionListener<org.elasticsearch.indices.recovery.RecoverySourceHandler.SendSnapshotResult>) to replay outstanding translog operations on the target. This is done by sending a series of RecoveryTranslogOperationsRequest to the target which will respond with RecoveryTranslogOperationsResponses which contain the maximum persisted local checkpoint for the target. Tracking the maximum of the received local checkpoint values is necessary for the next step, finalizing the recovery.
  6. After replaying the translog operations on the target, the recovery is finalized by a call to RecoverySourceHandler.finalizeRecovery(long, long, org.elasticsearch.action.ActionListener<java.lang.Void>) on the source. With the knowledge that the target has received all operations up to the maximum local checkpoint tracked in the previous step, the source (which is also the primary) can now update its in-sync checkpoint state by a call to ReplicationTracker.markAllocationIdAsInSync(java.lang.String, long). Once the in-sync sequence number information has been persisted successfully, the source sends a RecoveryFinalizeRecoveryRequest to the target which contains the global checkpoint as well as a sequence number above which the target can trim all operations from its translog since all operations above this number have just been replayed in the previous step and were either of the same or a newer version that those in the existing translog on the target. This step then also moves the target to the recovery stage RecoveryState.Stage.FINALIZE.
  7. After the finalization step, the recovery source will send a RecoveryResponse to the target which is implemented as a response to the initial StartRecoveryRequest that the target sent to initiate the recovery. This leads to a call to IndexShard.postRecovery(java.lang.String) which moves the recovery state to stage RecoveryState.Stage.DONE, triggers a refresh of the shard and moves the shard to state IndexShardState.POST_RECOVERY. Finally, the recovery target will then send a ShardStateAction.StartedShardEntry transport message to master to inform it about the successful start of the shard.
  8. After receiving the StartedShardEntry, master will then update the cluster state to reflect the state of the now fully recovered recovery target by executing the ShardStateAction.ShardStartedClusterStateTaskExecutor. The resulting cluster state update is then observed by IndexShard.updateShardState(org.elasticsearch.cluster.routing.ShardRouting, long, java.util.function.BiConsumer<org.elasticsearch.index.shard.IndexShard, org.elasticsearch.action.ActionListener<org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask>>, long, java.util.Set<java.lang.String>, org.elasticsearch.cluster.routing.IndexShardRoutingTable) which updates the shard state on the target node to IndexShardState.STARTED thus completing the peer recovery.
TODO: document other recovery types