Watch.java
package jasper.component.channel;
import jasper.domain.Ref_;
import jasper.domain.proj.HasTags;
import jasper.repository.RefRepository;
import jasper.repository.filter.RefFilter;
import jasper.service.dto.RefDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.data.domain.PageRequest;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static jasper.domain.proj.HasOrigin.origin;
import static jasper.domain.proj.HasTags.hasMatchingTag;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.springframework.data.domain.Sort.by;
@Component()
public class Watch {
private static final Logger logger = LoggerFactory.getLogger(Watch.class);
@Autowired
RefRepository refRepository;
Map<String, Map<String, Watcher>> watchers = new ConcurrentHashMap<>();
Map<String, Map<String, Set<String>>> watching = new ConcurrentHashMap<>();
/**
* Register a watcher to watch all Refs in an origin.
*/
public void addWatch(String origin, Watcher w) {
watchers.computeIfAbsent(origin, o -> new ConcurrentHashMap<>()).put("", w);
}
/**
* Register a watcher for all tagged Refs in an origin.
*/
public void addWatch(String origin, String tag, Watcher w) {
watchers.computeIfAbsent(origin, o -> new ConcurrentHashMap<>()).put(tag, w);
}
@EventListener(ApplicationReadyEvent.class)
public void init() {
try {
for (var origin : watchers.keySet()) {
for (var tag : watchers.get(origin).keySet()) {
if (isBlank(tag)) continue;
Instant lastModified = null;
while (true) {
var maybeRef = refRepository.findAll(RefFilter.builder()
.origin(origin)
.query(tag)
.modifiedAfter(lastModified)
.build().spec(), PageRequest.of(0, 1, by(Ref_.MODIFIED)));
if (maybeRef.isEmpty()) break;
var ref = maybeRef.getContent().getFirst();
lastModified = ref.getModified();
try {
watchers.get(origin).get(tag).notify(ref);
} catch (Exception e) {
logger.warn("Error starting watcher", e);
}
}
}
}
} catch (Exception e) {
logger.error("Error serializing watchers", e);
}
}
@ServiceActivator(inputChannel = "refRxChannel")
public void handleRefUpdate(Message<RefDto> message) {
var origin = origin(message.getHeaders().get("origin").toString());
if (!watchers.containsKey(origin)) return;
var ref = message.getPayload();
for (var tag : watchers.get(origin).keySet()) {
var set = watching
.computeIfAbsent(origin, o -> new ConcurrentHashMap<>())
.computeIfAbsent(tag, t -> ConcurrentHashMap.newKeySet());
if (isNotBlank(tag)) {
if (!set.contains(ref.getUrl())) {
if (!hasMatchingTag(ref, tag)) continue;
set.add(ref.getUrl());
}
}
try {
watchers.get(origin).get(tag).notify(ref);
} finally {
if (isNotBlank(tag) && !hasMatchingTag(ref, tag)) {
set.remove(ref.getUrl());
}
}
}
}
public interface Watcher {
void notify(HasTags ref);
}
}