Push.java
package jasper.component.channel;
import jasper.component.ConfigCache;
import jasper.component.Replicator;
import jasper.component.Tagger;
import jasper.config.Props;
import jasper.domain.proj.HasTags;
import jasper.repository.RefRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static jasper.domain.proj.HasOrigin.formatOrigin;
import static jasper.domain.proj.HasOrigin.origin;
import static jasper.domain.proj.HasOrigin.subOrigin;
import static jasper.plugin.Origin.getOrigin;
import static jasper.plugin.Push.getPush;
import static jasper.util.Logging.getMessage;
@Component
public class Push {
private static final Logger logger = LoggerFactory.getLogger(Push.class);
@Autowired
Props props;
@Autowired
TaskScheduler taskScheduler;
@Autowired
ConfigCache configs;
@Autowired
RefRepository refRepository;
@Autowired
Replicator replicator;
@Autowired
Tagger tagger;
@Autowired
Watch watch;
record Remote(String url, String origin) {}
private Map<String, Instant> lastSent = new ConcurrentHashMap<>();
private Map<String, Instant> queued = new ConcurrentHashMap<>();
private Map<String, Set<Remote>> pushes = new ConcurrentHashMap<>();
@EventListener(ApplicationReadyEvent.class)
public void init() {
for (var origin : configs.root().scriptOrigins("+plugin/origin/push")) {
// TODO: redo on template change
watch.addWatch(origin, "+plugin/origin/push", this::watch);
}
}
private void watch(HasTags update) {
if (!configs.root().script("+plugin/origin/push", update.getOrigin())) return;
var remote = refRepository.findOneByUrlAndOrigin(update.getUrl(), update.getOrigin())
.orElseThrow();
var config = getOrigin(remote);
var localOrigin = subOrigin(remote.getOrigin(), config.getLocal());
var push = getPush(remote);
var target = new Remote(remote.getUrl(), remote.getOrigin());
pushes.values().forEach(set -> set.remove(target));
if (remote.hasTag("+plugin/origin/push") && !remote.hasTag("+plugin/error") && remote.hasTag("+plugin/cron") && push.isPushOnChange()) {
pushes
.computeIfAbsent(localOrigin, o -> ConcurrentHashMap.newKeySet())
.add(target);
}
}
@ServiceActivator(inputChannel = "cursorRxChannel")
public void handleCursorUpdate(Message<Instant> message) {
var root = configs.root();
var origin = origin(message.getHeaders().get("origin").toString());
if (!root.script("+plugin/origin/push", origin)) return;
var cursor = message.getPayload();
if (cursor.equals(lastSent.computeIfAbsent(origin, o -> cursor))) {
taskScheduler.schedule(() -> push(origin), Instant.now());
} else {
queued.put(origin, cursor);
}
}
private void push(String origin) {
try {
if (pushes.containsKey(origin)) {
var deleted = new HashSet<Remote>();
pushes.get(origin).forEach(target -> {
var remote = refRepository.findOneByUrlAndOrigin(target.url, target.origin).orElse(null);
if (remote != null && !remote.hasTag("+plugin/error")) {
logger.info("{} Pushing origin ({}) on change {}: {}", remote.getOrigin(), formatOrigin(origin), remote.getTitle(), remote.getUrl());
try {
replicator.push(remote);
logger.info("{} Finished pushing origin ({}) on change {}: {}", remote.getOrigin(), formatOrigin(origin), remote.getTitle(), remote.getUrl());
} catch (Exception e) {
logger.error("{} Error pushing origin ({}) on change {}: {}", remote.getOrigin(), formatOrigin(origin), remote.getTitle(), remote.getUrl());
tagger.attachError(remote.getUrl(), origin, "Error pushing", getMessage(e));
}
} else {
deleted.add(target);
}
});
deleted.forEach(remote -> pushes.values().forEach(set -> set.remove(remote)));
}
} finally {
taskScheduler.schedule(() -> checkIfQueued(origin), Instant.now().plus(props.getPushCooldownSec(), ChronoUnit.SECONDS));
}
}
private void checkIfQueued(String origin) {
if (!lastSent.containsKey(origin)) return;
var next = queued.remove(origin);
if (next != null) {
lastSent.put(origin, next);
push(origin);
} else {
lastSent.remove(origin);
}
}
}