Cron.java
package jasper.component.cron;
import jakarta.annotation.PostConstruct;
import jasper.component.ConfigCache;
import jasper.component.ScriptExecutorFactory;
import jasper.component.Tagger;
import jasper.component.channel.Watch;
import jasper.domain.Ref;
import jasper.domain.proj.HasTags;
import jasper.repository.RefRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import static jasper.domain.proj.HasTags.hasMatchingTag;
import static jasper.plugin.Cron.getCron;
import static jasper.util.Logging.getMessage;
import static java.lang.Math.floor;
@Component
public class Cron {
private static final Logger logger = LoggerFactory.getLogger(Cron.class);
@Autowired
TaskScheduler taskScheduler;
@Autowired
ScriptExecutorFactory scriptExecutorFactory;
@Autowired
RefRepository refRepository;
@Autowired
ConfigCache configs;
@Autowired
Tagger tagger;
@Autowired
Watch watch;
Map<String, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
Map<String, CompletableFuture<?>> refs = new ConcurrentHashMap<>();
Map<String, CronRunner> tags = new ConcurrentHashMap<>();
/**
* Register a runner for a tag.
*/
public void addCronTag(String plugin, CronRunner r) {
tags.put(plugin, r);
}
@PostConstruct
public void init() {
configs.rootUpdate(root -> {
for (var origin : root.scriptOrigins("+plugin/cron")) {
watch.addWatch(origin, "+plugin/cron", this::schedule);
}
for (var origin : root.scriptOrigins("+plugin/user/run")) {
watch.addWatch(origin, "+plugin/user/run", this::run);
}
});
}
private void schedule(HasTags ref) {
var key = getKey(ref);
var existing = tasks.get(key);
var cancelled = false;
if (existing != null && !existing.isDone()) {
existing.cancel(true);
tasks.remove(key);
cancelled = true;
}
if (!hasMatchingTag(ref, "+plugin/cron")) {
if (cancelled) logger.info("{} Unscheduled {}: {}", ref.getOrigin(), ref.getTitle(), ref.getUrl());
return;
}
if (hasMatchingTag(ref, "+plugin/error")) {
if (cancelled) logger.info("{} Unscheduled due to error {}: {}", ref.getOrigin(), ref.getTitle(), ref.getUrl());
return;
}
var origin = ref.getOrigin();
if (!configs.root().script("+plugin/cron", origin)) return;
if (!hasScheduler(ref)) return;
var url = ref.getUrl();
var config = getCron(refRepository.findOneByUrlAndOrigin(url, origin).orElse(null));
if (config == null || config.getInterval() == null) return;
if (config.getInterval().toMinutes() < 1) {
tagger.attachError(url, origin, "Cron Error: Interval too small " + config.getInterval());
} else {
tasks.compute(key, (k, e) -> {
if (e != null && !e.isDone()) return e;
if (existing == null) logger.info("{} Scheduled every {} {}: {}", ref.getOrigin(), config.getInterval(), ref.getTitle(), ref.getUrl());
var jitter = config.getInterval().plusMillis((long) floor(config.getInterval().toMillis() * ThreadLocalRandom.current().nextDouble()));
return taskScheduler.scheduleWithFixedDelay(() -> runSchedule(url, origin),
Instant.now().plus(jitter),
config.getInterval());
});
}
}
private void run(HasTags target) {
var origin = target.getOrigin();
if (!configs.root().script("+plugin/user/run", origin)) return;
var url = refRepository.findOneByUrlAndOrigin(target.getUrl(), origin)
.map(Ref::getSources)
.map(List::getFirst)
.orElse(null);
if (url == null) {
logger.error("{} Error in run tag: No source", origin);
tagger.remove(target.getUrl(), origin, "+plugin/user/run");
return;
}
var ref = refRepository.findOneByUrlAndOrigin(url, origin).orElse(null);
try {
if (ref == null) {
logger.warn("{} Can't find Ref (Cannot run on remote origin): {}", origin, url);
throw new RuntimeException();
}
if (hasMatchingTag(ref, "+plugin/error")) {
logger.info("{} Cancelled running due to error {}: {}", origin, ref.getTitle(), url);
throw new RuntimeException();
}
if (!hasMatchingTag(target, "+plugin/user/run")) {
// Was cancelled
throw new RuntimeException();
}
var ran = new HashSet<CronRunner>();
tags.forEach((tag, v) -> {
if (ran.contains(v)) return;
if (!hasMatchingTag(ref, tag)) return;
if (!configs.root().script(tag, ref)) return;
refs.compute(getKey(ref), (s, existing) -> {
if (existing != null && !existing.isDone()) return existing;
logger.warn("{} Run Tag: {} {}", origin, tag, url);
return scriptExecutorFactory.run(tag, origin, url, () -> {
try {
v.run(refRepository.findOneByUrlAndOrigin(url, origin).orElseThrow());
ran.add(v);
tagger.removeAllResponses(url, origin, "+plugin/user/run");
} catch (Exception e) {
logger.error("{} Error in run tag {} ", origin, tag);
tagger.attachError(url, origin, "Error in run tag " + tag, getMessage(e));
throw new RuntimeException(e);
} finally {
refs.remove(s);
}
});
});
});
} catch (Exception e) {
refs.computeIfPresent(getKey(origin, url), (k, existing) -> {
if (!existing.isDone()) {
logger.info("{} Cancelled run {}: {}", origin, ref == null ? "" : ref.getTitle(), url);
existing.cancel(true);
}
return null;
});
tagger.removeAllResponses(url, origin, "+plugin/user/run");
}
}
private boolean hasScheduler(HasTags ref) {
for (var tag : tags.keySet()) if (hasMatchingTag(ref, tag)) return true;
return false;
}
private String getKey(HasTags ref) {
return ref.getOrigin() + ":" + ref.getUrl();
}
private String getKey(String origin, String url) {
return origin + ":" + url;
}
private void runSchedule(String url, String origin) {
var ref = refRepository.findOneByUrlAndOrigin(url, origin).orElse(null);
if (ref == null) {
var key = origin + ":" + url;
var existing = tasks.get(key);
if (existing != null && !existing.isDone()) {
existing.cancel(false);
tasks.remove(key);
}
return;
}
if (ref.hasPluginResponse("+plugin/user/run")) {
// Remove tag in case script had failed
logger.warn("{} Cancelled possibly stuck run {}:", origin, url);
tagger.removeAllResponses(url, origin, "+plugin/user/run");
tagger.attachLogs(url, origin, "Cancelled possibly stuck run");
// Skip scheduled run since we are running manually
return;
}
var ran = new HashSet<CronRunner>();
tags.forEach((tag, v) -> {
if (ran.contains(v)) return;
if (!hasMatchingTag(ref, tag)) return;
if (!configs.root().script(tag, ref)) return;
logger.debug("{} Cron Tag: {} {}", origin, tag, url);
refs.compute(getKey(ref), (s, existing) -> {
if (existing != null && !existing.isDone()) return existing;
return scriptExecutorFactory.run(tag, origin, url, () -> {
try {
v.run(ref);
ran.add(v);
} catch (Exception e) {
logger.error("{} Error in cron tag {} ", origin, tag);
tagger.attachError(url, origin, "Error in cron tag " + tag, getMessage(e));
}
});
});
});
}
public interface CronRunner {
void run(Ref ref) throws Exception;
}
}