/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.watcher.execution;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.execution.CurrentExecutions;
import org.elasticsearch.watcher.execution.ExecutionState;
import org.elasticsearch.watcher.execution.QueuedWatch;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.TriggeredWatch;
import org.elasticsearch.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionSnapshot;
import org.elasticsearch.watcher.execution.WatchExecutor;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.validation.WatcherSettingsValidation;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchLockService;
import org.elasticsearch.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;

public class ExecutionService
extends AbstractComponent {
    private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30L, TimeUnit.SECONDS);
    private static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "watcher.stop.timeout";
    private final HistoryStore historyStore;
    private final TriggeredWatchStore triggeredWatchStore;
    private final WatchExecutor executor;
    private final WatchStore watchStore;
    private final WatchLockService watchLockService;
    private final Clock clock;
    private final TimeValue defaultThrottlePeriod;
    private final TimeValue maxStopTimeout;
    private volatile CurrentExecutions currentExecutions = null;
    private final AtomicBoolean started = new AtomicBoolean(false);

    @Inject
    public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, WatchStore watchStore, WatchLockService watchLockService, Clock clock, WatcherSettingsValidation settingsValidation) {
        super(settings);
        this.historyStore = historyStore;
        this.triggeredWatchStore = triggeredWatchStore;
        this.executor = executor;
        this.watchStore = watchStore;
        this.watchLockService = watchLockService;
        this.clock = clock;
        this.defaultThrottlePeriod = settings.getAsTime("watcher.execution.default_throttle_period", TimeValue.timeValueSeconds((long)5L));
        this.maxStopTimeout = settings.getAsTime(DEFAULT_MAX_STOP_TIMEOUT_SETTING, DEFAULT_MAX_STOP_TIMEOUT);
        if (this.defaultThrottlePeriod.millis() < 0L) {
            settingsValidation.addError("watcher.execution.default_throttle_period", "time value cannot be negative");
        }
    }

    public void start(ClusterState state) throws Exception {
        if (this.started.get()) {
            return;
        }
        assert (this.executor.queue().isEmpty()) : "queue should be empty, but contains " + this.executor.queue().size() + " elements.";
        if (this.started.compareAndSet(false, true)) {
            try {
                this.logger.debug("starting execution service", new Object[0]);
                this.historyStore.start();
                this.triggeredWatchStore.start();
                this.currentExecutions = new CurrentExecutions();
                Collection<TriggeredWatch> triggeredWatches = this.triggeredWatchStore.loadTriggeredWatches(state);
                this.executeTriggeredWatches(triggeredWatches);
                this.logger.debug("started execution service", new Object[0]);
            }
            catch (Exception e) {
                this.started.set(false);
                throw e;
            }
        }
    }

    public boolean validate(ClusterState state) {
        return this.triggeredWatchStore.validate(state);
    }

    public void stop() {
        if (this.started.compareAndSet(true, false)) {
            this.logger.debug("stopping execution service", new Object[0]);
            int cancelledTaskCount = this.executor.queue().drainTo(new ArrayList());
            this.currentExecutions.sealAndAwaitEmpty(this.maxStopTimeout);
            this.triggeredWatchStore.stop();
            this.historyStore.stop();
            this.logger.debug("cancelled [{}] queued tasks", new Object[]{cancelledTaskCount});
            this.logger.debug("stopped execution service", new Object[0]);
        }
    }

    public boolean started() {
        return this.started.get();
    }

    public TimeValue defaultThrottlePeriod() {
        return this.defaultThrottlePeriod;
    }

    public long executionThreadPoolQueueSize() {
        return this.executor.queue().size();
    }

    public long executionThreadPoolMaxSize() {
        return this.executor.largestPoolSize();
    }

    public List<WatchExecutionSnapshot> currentExecutions() {
        ArrayList<WatchExecutionSnapshot> currentExecutions = new ArrayList<WatchExecutionSnapshot>();
        for (WatchExecution watchExecution : this.currentExecutions) {
            currentExecutions.add(watchExecution.createSnapshot());
        }
        Collections.sort(currentExecutions, new Comparator<WatchExecutionSnapshot>(){

            @Override
            public int compare(WatchExecutionSnapshot e1, WatchExecutionSnapshot e2) {
                return e1.executionTime().compareTo((ReadableInstant)e2.executionTime());
            }
        });
        return currentExecutions;
    }

    public List<QueuedWatch> queuedWatches() {
        ArrayList<Runnable> snapshot = new ArrayList<Runnable>(this.executor.queue());
        if (snapshot.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<QueuedWatch> queuedWatches = new ArrayList<QueuedWatch>(snapshot.size());
        for (Runnable task : snapshot) {
            WatchExecutionTask executionTask = (WatchExecutionTask)task;
            queuedWatches.add(new QueuedWatch(executionTask.ctx));
        }
        Collections.sort(queuedWatches, new Comparator<QueuedWatch>(){

            @Override
            public int compare(QueuedWatch e1, QueuedWatch e2) {
                return e1.executionTime().compareTo((ReadableInstant)e2.executionTime());
            }
        });
        return queuedWatches;
    }

    void processEventsAsync(Iterable<TriggerEvent> events) throws Exception {
        if (!this.started.get()) {
            throw new IllegalStateException("not started");
        }
        final LinkedList<TriggeredWatch> triggeredWatches = new LinkedList<TriggeredWatch>();
        final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<TriggeredExecutionContext>();
        DateTime now = this.clock.now(DateTimeZone.UTC);
        for (TriggerEvent event : events) {
            Watch watch = this.watchStore.get(event.jobName());
            if (watch == null) {
                this.logger.warn("unable to find watch [{}] in the watch store, perhaps it has been deleted", new Object[]{event.jobName()});
                continue;
            }
            TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event, this.defaultThrottlePeriod);
            contexts.add(ctx);
            triggeredWatches.add(new TriggeredWatch(ctx.id(), event));
        }
        this.logger.debug("saving watch records [{}]", new Object[]{triggeredWatches.size()});
        this.triggeredWatchStore.putAll(triggeredWatches, new ActionListener<List<Integer>>(){

            public void onResponse(List<Integer> successFullSlots) {
                for (Integer slot : successFullSlots) {
                    TriggeredWatch triggeredWatch = (TriggeredWatch)triggeredWatches.get(slot);
                    try {
                        ExecutionService.this.executeAsync((WatchExecutionContext)contexts.get(slot), triggeredWatch);
                    }
                    catch (Exception e) {
                        ExecutionService.this.logger.error("failed to execute watch [{}]", (Throwable)e, new Object[]{triggeredWatch.id()});
                    }
                }
            }

            public void onFailure(Throwable e) {
                Throwable cause = ExceptionsHelper.unwrapCause((Throwable)e);
                if (cause instanceof EsRejectedExecutionException) {
                    ExecutionService.this.logger.debug("failed to store watch records due to overloaded threadpool [{}]", new Object[]{ExceptionsHelper.detailedMessage((Throwable)e)});
                } else {
                    ExecutionService.this.logger.warn("failed to store watch records", e, new Object[0]);
                }
            }
        });
    }

    void processEventsSync(Iterable<TriggerEvent> events) throws Exception {
        if (!this.started.get()) {
            throw new IllegalStateException("not started");
        }
        LinkedList<TriggeredWatch> triggeredWatches = new LinkedList<TriggeredWatch>();
        LinkedList<TriggeredExecutionContext> contexts = new LinkedList<TriggeredExecutionContext>();
        DateTime now = this.clock.now(DateTimeZone.UTC);
        for (TriggerEvent event : events) {
            Watch watch = this.watchStore.get(event.jobName());
            if (watch == null) {
                this.logger.warn("unable to find watch [{}] in the watch store, perhaps it has been deleted", new Object[]{event.jobName()});
                continue;
            }
            TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event, this.defaultThrottlePeriod);
            contexts.add(ctx);
            triggeredWatches.add(new TriggeredWatch(ctx.id(), event));
        }
        this.logger.debug("saving watch records [{}]", new Object[]{triggeredWatches.size()});
        if (triggeredWatches.size() == 0) {
            return;
        }
        List<Integer> slots = this.triggeredWatchStore.putAll(triggeredWatches);
        for (Integer slot : slots) {
            this.executeAsync((WatchExecutionContext)contexts.get(slot), triggeredWatches.get(slot));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    public WatchRecord execute(WatchExecutionContext ctx) {
        WatchRecord record;
        block36: {
            WatchLockService.Lock lock;
            block34: {
                record = null;
                lock = this.watchLockService.acquire(ctx.watch().id());
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("acquired lock for [{}] -- [{}]", new Object[]{ctx.id(), System.identityHashCode(lock)});
                }
                this.currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
                if (ctx.knownWatch() && this.watchStore.get(ctx.watch().id()) == null) {
                    String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
                    this.logger.warn(message, new Object[0]);
                    record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_WATCH_MISSING, message);
                } else {
                    this.logger.debug("executing watch [{}]", new Object[]{ctx.id().watchId()});
                    try {
                        record = this.executeInner(ctx);
                    }
                    catch (Exception e) {
                        this.logger.warn("failed to execute watch [{}]", (Throwable)e, new Object[]{ctx.id()});
                        record = ctx.abortFailedExecution(ExceptionsHelper.detailedMessage((Throwable)e));
                    }
                    if (record != null && ctx.recordExecution()) {
                        try {
                            this.watchStore.updateStatus(ctx.watch());
                        }
                        catch (Exception e) {
                            this.logger.warn("failed to update watch status [{}]", (Throwable)e, new Object[]{ctx.id()});
                            record = new WatchRecord(record, ExecutionState.FAILED, LoggerMessageFormat.format((String)"failed to update watch status [{}]...{}", (Object[])new Object[]{ctx.id(), ExceptionsHelper.detailedMessage((Throwable)e)}));
                        }
                    }
                }
                if (!ctx.knownWatch() || record == null || !ctx.recordExecution()) break block34;
                try {
                    if (ctx.overrideRecordOnConflict()) {
                        this.historyStore.forcePut(record);
                    } else {
                        this.historyStore.put(record);
                    }
                }
                catch (Exception e) {
                    this.logger.error("failed to update watch record [{}]", (Throwable)e, new Object[]{ctx.id()});
                }
            }
            try {
                this.triggeredWatchStore.delete(ctx.id());
            }
            catch (Exception e) {
                this.logger.error("failed to delete triggered watch [{}]", (Throwable)e, new Object[]{ctx.id()});
            }
            this.currentExecutions.remove(ctx.watch().id());
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("releasing lock for [{}] -- [{}]", new Object[]{ctx.id(), System.identityHashCode(lock)});
            }
            lock.release();
            this.logger.trace("finished [{}]/[{}]", new Object[]{ctx.watch().id(), ctx.id()});
            break block36;
            catch (Exception e) {
                block35: {
                    try {
                        this.logger.warn("failed to execute watch [{}]", (Throwable)e, new Object[]{ctx.id()});
                        record = record != null ? new WatchRecord(record, ExecutionState.FAILED, LoggerMessageFormat.format((String)"failed to execute watch. {}", (String)ExceptionsHelper.detailedMessage((Throwable)e), (Object[])new Object[0])) : ctx.abortFailedExecution(ExceptionsHelper.detailedMessage((Throwable)e));
                        if (!ctx.knownWatch() || record == null || !ctx.recordExecution()) break block35;
                    }
                    catch (Throwable throwable) {
                        if (ctx.knownWatch() && record != null && ctx.recordExecution()) {
                            try {
                                if (ctx.overrideRecordOnConflict()) {
                                    this.historyStore.forcePut(record);
                                } else {
                                    this.historyStore.put(record);
                                }
                            }
                            catch (Exception e2) {
                                this.logger.error("failed to update watch record [{}]", (Throwable)e2, new Object[]{ctx.id()});
                            }
                        }
                        try {
                            this.triggeredWatchStore.delete(ctx.id());
                        }
                        catch (Exception e3) {
                            this.logger.error("failed to delete triggered watch [{}]", (Throwable)e3, new Object[]{ctx.id()});
                        }
                        this.currentExecutions.remove(ctx.watch().id());
                        if (this.logger.isTraceEnabled()) {
                            this.logger.trace("releasing lock for [{}] -- [{}]", new Object[]{ctx.id(), System.identityHashCode(lock)});
                        }
                        lock.release();
                        this.logger.trace("finished [{}]/[{}]", new Object[]{ctx.watch().id(), ctx.id()});
                        throw throwable;
                    }
                    try {
                        if (ctx.overrideRecordOnConflict()) {
                            this.historyStore.forcePut(record);
                        } else {
                            this.historyStore.put(record);
                        }
                    }
                    catch (Exception e4) {
                        this.logger.error("failed to update watch record [{}]", (Throwable)e4, new Object[]{ctx.id()});
                    }
                }
                try {
                    this.triggeredWatchStore.delete(ctx.id());
                }
                catch (Exception e5) {
                    this.logger.error("failed to delete triggered watch [{}]", (Throwable)e5, new Object[]{ctx.id()});
                }
                this.currentExecutions.remove(ctx.watch().id());
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("releasing lock for [{}] -- [{}]", new Object[]{ctx.id(), System.identityHashCode(lock)});
                }
                lock.release();
                this.logger.trace("finished [{}]/[{}]", new Object[]{ctx.watch().id(), ctx.id()});
            }
        }
        return record;
    }

    private void executeAsync(WatchExecutionContext ctx, TriggeredWatch triggeredWatch) throws Exception {
        try {
            this.executor.execute(new WatchExecutionTask(ctx));
        }
        catch (EsRejectedExecutionException e) {
            String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
            this.logger.debug(message, new Object[0]);
            WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message);
            if (ctx.overrideRecordOnConflict()) {
                this.historyStore.forcePut(record);
            } else {
                this.historyStore.put(record);
            }
            this.triggeredWatchStore.delete(triggeredWatch.id());
        }
    }

    WatchRecord executeInner(WatchExecutionContext ctx) throws IOException {
        ctx.start();
        Watch watch = ctx.watch();
        ctx.beforeInput();
        Input.Result inputResult = ctx.inputResult();
        if (inputResult == null) {
            inputResult = watch.input().execute(ctx, ctx.payload());
            ctx.onInputResult(inputResult);
        }
        if (inputResult.status() == Input.Result.Status.FAILURE) {
            return ctx.abortFailedExecution("failed to execute watch input");
        }
        ctx.beforeCondition();
        Condition.Result conditionResult = ctx.conditionResult();
        if (conditionResult == null) {
            conditionResult = watch.condition().execute(ctx);
            ctx.onConditionResult(conditionResult);
        }
        if (conditionResult.status() == Condition.Result.Status.FAILURE) {
            return ctx.abortFailedExecution("failed to execute watch condition");
        }
        if (conditionResult.met()) {
            if (watch.actions().count() > 0 && watch.transform() != null) {
                ctx.beforeWatchTransform();
                Object transformResult = watch.transform().execute(ctx, ctx.payload());
                ctx.onWatchTransformResult((Transform.Result)transformResult);
                if (((Transform.Result)transformResult).status() == Transform.Result.Status.FAILURE) {
                    return ctx.abortFailedExecution("failed to execute watch transform");
                }
            }
            ctx.beforeActions();
            for (ActionWrapper action : watch.actions()) {
                ActionWrapper.Result actionResult = action.execute(ctx);
                ctx.onActionResult(actionResult);
            }
        }
        return ctx.finish();
    }

    void executeTriggeredWatches(Collection<TriggeredWatch> triggeredWatches) throws Exception {
        assert (triggeredWatches != null);
        int counter = 0;
        for (TriggeredWatch triggeredWatch : triggeredWatches) {
            Watch watch = this.watchStore.get(triggeredWatch.id().watchId());
            if (watch == null) {
                String message = "unable to find watch for record [" + triggeredWatch.id().watchId() + "]/[" + triggeredWatch.id() + "], perhaps it has been deleted, ignoring...";
                WatchRecord record = new WatchRecord(triggeredWatch.id(), triggeredWatch.triggerEvent(), ExecutionState.NOT_EXECUTED_WATCH_MISSING, message);
                this.historyStore.forcePut(record);
                this.triggeredWatchStore.delete(triggeredWatch.id());
                continue;
            }
            StartupExecutionContext ctx = new StartupExecutionContext(watch, this.clock.now(DateTimeZone.UTC), triggeredWatch.triggerEvent(), this.defaultThrottlePeriod);
            this.executeAsync(ctx, triggeredWatch);
            ++counter;
        }
        this.logger.debug("executed [{}] watches from the watch history", new Object[]{counter});
    }

    public static class WatchExecution {
        private final WatchExecutionContext context;
        private final Thread executionThread;

        public WatchExecution(WatchExecutionContext context, Thread executionThread) {
            this.context = context;
            this.executionThread = executionThread;
        }

        public WatchExecutionSnapshot createSnapshot() {
            return this.context.createSnapshot(this.executionThread);
        }
    }

    private final class WatchExecutionTask
    implements Runnable {
        private final WatchExecutionContext ctx;

        private WatchExecutionTask(WatchExecutionContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            try {
                ExecutionService.this.execute(this.ctx);
            }
            catch (Exception e) {
                ExecutionService.this.logger.error("could not execute watch [{}]/[{}]", (Throwable)e, new Object[]{this.ctx.watch().id(), this.ctx.id()});
            }
        }
    }

    private static final class StartupExecutionContext
    extends TriggeredExecutionContext {
        public StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
            super(watch, executionTime, triggerEvent, defaultThrottlePeriod);
        }

        @Override
        public boolean overrideRecordOnConflict() {
            return true;
        }
    }
}

