/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.watcher.trigger.schedule.engine;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.core.watcher.trigger.Trigger;
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;

public class TickerScheduleTriggerEngine
extends ScheduleTriggerEngine {
    public static final Setting<TimeValue> TICKER_INTERVAL_SETTING = Setting.positiveTimeSetting((String)"xpack.watcher.trigger.schedule.ticker.tick_interval", (TimeValue)TimeValue.timeValueMillis((long)500L), (Setting.Property[])new Setting.Property[]{Setting.Property.NodeScope});
    private static final Logger logger = LogManager.getLogger(TickerScheduleTriggerEngine.class);
    private final TimeValue tickInterval;
    private final Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<String, ActiveSchedule>();
    private final Ticker ticker;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);

    public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
        super(scheduleRegistry, clock);
        this.tickInterval = (TimeValue)TICKER_INTERVAL_SETTING.get(settings);
        this.ticker = new Ticker(DiscoveryNode.canContainData((Settings)settings));
    }

    @Override
    public synchronized void start(Collection<Watch> jobs) {
        long startTime = this.clock.millis();
        this.isRunning.set(true);
        logger.info("Starting watcher engine at {}", (Object)WatcherDateTimeUtils.dateTimeFormatter.formatMillis(startTime));
        Map startingSchedules = Maps.newMapWithExpectedSize((int)jobs.size());
        for (Watch job : jobs) {
            Trigger trigger = job.trigger();
            if (!(trigger instanceof ScheduleTrigger)) continue;
            ScheduleTrigger trigger2 = (ScheduleTrigger)trigger;
            startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger2.getSchedule(), startTime));
        }
        this.schedules.putAll(startingSchedules);
    }

    @Override
    public void stop() {
        logger.info("Stopping watcher engine");
        this.isRunning.set(false);
        this.schedules.clear();
        this.ticker.close();
    }

    @Override
    public void pauseExecution() {
        logger.info("Pausing watcher engine");
        this.isRunning.set(false);
        this.schedules.clear();
    }

    @Override
    public void add(Watch watch) {
        logger.trace("Adding watch [{}] to engine (engine is running: {})", (Object)watch.id(), (Object)this.isRunning.get());
        assert (watch.trigger() instanceof ScheduleTrigger);
        ScheduleTrigger trigger = (ScheduleTrigger)watch.trigger();
        ActiveSchedule currentSchedule = this.schedules.get(watch.id());
        if (currentSchedule == null || !currentSchedule.schedule.equals(trigger.getSchedule())) {
            this.schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), this.clock.millis()));
        }
    }

    @Override
    public boolean remove(String jobId) {
        logger.debug("Removing watch [{}] from engine (engine is running: {})", (Object)jobId, (Object)this.isRunning.get());
        return this.schedules.remove(jobId) != null;
    }

    void checkJobs() {
        if (!this.isRunning.get()) {
            logger.debug("Watcher not running because the engine is paused. Currently scheduled watches being skipped: {}", (Object)this.schedules.size());
            return;
        }
        long triggeredTime = this.clock.millis();
        ArrayList<TriggerEvent> events = new ArrayList<TriggerEvent>();
        for (ActiveSchedule schedule : this.schedules.values()) {
            if (!this.isRunning.get()) {
                logger.debug("Watcher paused while running [{}]", (Object)schedule.name);
                break;
            }
            long scheduledTime = schedule.check(triggeredTime);
            if (scheduledTime <= 0L) continue;
            ZonedDateTime triggeredDateTime = TickerScheduleTriggerEngine.utcDateTimeAtEpochMillis(triggeredTime);
            ZonedDateTime scheduledDateTime = TickerScheduleTriggerEngine.utcDateTimeAtEpochMillis(scheduledTime);
            logger.debug("triggered job [{}] at [{}] (scheduled time was [{}])", (Object)schedule.name, (Object)triggeredDateTime, (Object)scheduledDateTime);
            events.add(new ScheduleTriggerEvent(schedule.name, triggeredDateTime, scheduledDateTime));
            if (events.size() < 1000) continue;
            this.notifyListeners(events);
            events.clear();
        }
        if (!events.isEmpty()) {
            this.notifyListeners(events);
        }
    }

    private static ZonedDateTime utcDateTimeAtEpochMillis(long triggeredTime) {
        return Instant.ofEpochMilli(triggeredTime).atZone(ZoneOffset.UTC);
    }

    Map<String, ActiveSchedule> getSchedules() {
        return Collections.unmodifiableMap(this.schedules);
    }

    protected void notifyListeners(List<TriggerEvent> events) {
        this.consumers.forEach(consumer -> consumer.accept(events));
    }

    class Ticker
    extends Thread {
        private volatile boolean active;
        private final CountDownLatch closeLatch;
        private boolean isDataNode;

        Ticker(boolean isDataNode) {
            super("ticker-schedule-trigger-engine");
            this.active = true;
            this.closeLatch = new CountDownLatch(1);
            this.isDataNode = isDataNode;
            this.setDaemon(true);
            if (isDataNode) {
                this.start();
            }
        }

        @Override
        public void run() {
            while (this.active) {
                logger.trace("checking jobs [{}]", (Object)TickerScheduleTriggerEngine.this.clock.instant().atZone(ZoneOffset.UTC));
                TickerScheduleTriggerEngine.this.checkJobs();
                try {
                    Ticker.sleep(TickerScheduleTriggerEngine.this.tickInterval.millis());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            this.closeLatch.countDown();
        }

        public void close() {
            if (this.isDataNode) {
                logger.trace("stopping ticker thread");
                this.active = false;
                try {
                    this.closeLatch.await();
                }
                catch (InterruptedException e) {
                    logger.warn("caught an interrupted exception when waiting while closing ticker thread", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
                logger.trace("ticker thread stopped");
            }
        }
    }

    static class ActiveSchedule {
        private final String name;
        private final Schedule schedule;
        private final long startTime;
        private volatile long scheduledTime;

        ActiveSchedule(String name, Schedule schedule, long startTime) {
            this.name = name;
            this.schedule = schedule;
            this.startTime = startTime;
            this.scheduledTime = schedule.nextScheduledTimeAfter(startTime, startTime);
            logger.debug("Watcher: activating schedule for watch '{}', first run at {}", (Object)name, (Object)WatcherDateTimeUtils.dateTimeFormatter.formatMillis(this.scheduledTime));
        }

        public long check(long time) {
            if (time < this.scheduledTime) {
                return -1L;
            }
            long prevScheduledTime = this.scheduledTime == 0L ? time : this.scheduledTime;
            this.scheduledTime = this.schedule.nextScheduledTimeAfter(this.startTime, time);
            return prevScheduledTime;
        }
    }
}

