Messages.java
package jasper.component;
import com.fasterxml.jackson.databind.ObjectMapper;
import jasper.component.dto.ComponentDtoMapper;
import jasper.domain.Ext;
import jasper.domain.Plugin;
import jasper.domain.Ref;
import jasper.domain.Template;
import jasper.domain.User;
import jasper.domain.proj.HasTags;
import jasper.service.dto.PluginDto;
import jasper.service.dto.TemplateDto;
import jasper.service.dto.UserDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static jasper.component.Replicator.deletorTag;
import static jasper.domain.proj.HasOrigin.formatOrigin;
import static jasper.domain.proj.HasTags.formatTag;
import static jasper.domain.proj.Tag.localTag;
import static jasper.domain.proj.Tag.matchesTag;
import static jasper.domain.proj.Tag.tagOrigin;
import static org.springframework.messaging.support.MessageBuilder.createMessage;
@Component
public class Messages {
private final Logger logger = LoggerFactory.getLogger(Messages.class);
@Autowired
ExecutorService taskExecutor;
@Autowired
MessageChannel cursorTxChannel;
@Autowired
MessageChannel refTxChannel;
@Autowired
MessageChannel tagTxChannel;
@Autowired
MessageChannel responseTxChannel;
@Autowired
MessageChannel userTxChannel;
@Autowired
MessageChannel extTxChannel;
@Autowired
MessageChannel pluginTxChannel;
@Autowired
MessageChannel templateTxChannel;
@Autowired
ComponentDtoMapper mapper;
@Autowired
ObjectMapper objectMapper;
boolean ready = false;
@EventListener(ApplicationReadyEvent.class)
public void init() {
ready = true;
}
@Async
public void updateRef(Ref ref) {
// TODO: Debounce
var update = mapper.domainToDto(ref);
sendAndRetry(() -> refTxChannel.send(createMessage(update, refHeaders(ref.getOrigin(), update))));
if (update.getTags() != null) {
for (var path : ref.getExpandedTags()) {
var headers = tagHeaders(ref.getOrigin(), path);
for (var tag : update.getTags()) {
if (matchesTag(path, tag)) {
sendAndRetry(() -> tagTxChannel.send(createMessage(tag, headers)));
}
}
}
}
if (ref.getSources() != null) {
for (var source : ref.getSources()) {
if (source.equals(ref.getUrl())) continue;
sendAndRetry(() -> responseTxChannel.send(createMessage(ref.getUrl(), responseHeaders(ref.getOrigin(), source))));
}
}
sendAndRetry(() -> cursorTxChannel.send(createMessage(ref.getModified(), originHeaders(ref.getOrigin()))));
}
@Async
public void updateSilentRef(Ref ref) {
// TODO: Debounce
var update = mapper.domainToDto(ref);
sendAndRetry(() -> refTxChannel.send(createMessage(update, refHeaders(ref.getOrigin(), update))));
}
@Async
public void updateMetadata(Ref ref) {
// TODO: Debounce
var update = mapper.domainToDto(ref);
sendAndRetry(() -> refTxChannel.send(createMessage(update, refHeaders(ref.getOrigin(), update))));
}
@Async
public void deleteRef(Ref ref) {
updateRef(deleteNotice(ref));
}
@Async
public void updateExt(Ext ext) {
var update = mapper.domainToDto(ext);
sendAndRetry(() -> extTxChannel.send(createMessage(update, tagHeaders(ext.getOrigin(), ext.getTag()))));
sendAndRetry(() -> cursorTxChannel.send(createMessage(ext.getModified(), originHeaders(ext.getOrigin()))));
}
@Async
public void updateUser(User user) {
var update = mapper.domainToDto(user);
sendAndRetry(() -> userTxChannel.send(createMessage(update, tagHeaders(user.getOrigin(), user.getTag()))));
sendAndRetry(() -> cursorTxChannel.send(createMessage(user.getModified(), originHeaders(user.getOrigin()))));
}
@Async
public void deleteUser(String qualifiedTag) {
var tag = localTag(qualifiedTag);
var origin = tagOrigin(qualifiedTag);
sendAndRetry(() -> userTxChannel.send(createMessage(deleteNotice(tag, origin, UserDto.class), tagHeaders(origin, tag))));
}
@Async
public void updatePlugin(Plugin plugin) {
var update = mapper.domainToDto(plugin);
sendAndRetry(() -> pluginTxChannel.send(createMessage(update, tagHeaders(plugin.getOrigin(), plugin.getTag()))));
sendAndRetry(() -> cursorTxChannel.send(createMessage(plugin.getModified(), originHeaders(plugin.getOrigin()))));
}
@Async
public void deletePlugin(String qualifiedTag) {
var tag = localTag(qualifiedTag);
var origin = tagOrigin(qualifiedTag);
sendAndRetry(() -> pluginTxChannel.send(createMessage(deleteNotice(tag, origin, PluginDto.class), tagHeaders(origin, tag))));
}
@Async
public void updateTemplate(Template template) {
var update = mapper.domainToDto(template);
sendAndRetry(() -> templateTxChannel.send(createMessage(update, tagHeaders(template.getOrigin(), template.getTag()))));
sendAndRetry(() -> cursorTxChannel.send(createMessage(template.getModified(), originHeaders(template.getOrigin()))));
}
@Async
public void deleteTemplate(String qualifiedTag) {
var tag = localTag(qualifiedTag);
var origin = tagOrigin(qualifiedTag);
sendAndRetry(() -> templateTxChannel.send(createMessage(deleteNotice(tag, origin, TemplateDto.class), tagHeaders(origin, tag))));
}
private <T> T deleteNotice(String tag, String origin, Class<T> type) {
return objectMapper.convertValue(Map.of(
"tag", deletorTag(tag),
"origin", origin
), type);
}
private Ref deleteNotice(Ref ref) {
return objectMapper.convertValue(Map.of(
"url", ref.getUrl(),
"origin", ref.getOrigin(),
"tags", List.of("internal", "plugin/delete")
), Ref.class);
}
private void sendAndRetry(Runnable fn) {
sendAndRetry(fn, 5);
}
private void sendAndRetry(Runnable fn, int tries) {
if (!ready) return;
try {
fn.run();
} catch (MessageDeliveryException e) {
if (tries > 0) {
logger.debug("Retrying message delivery", e);
taskExecutor.execute(() -> sendAndRetry(fn, tries - 1));
} else {
logger.error("Message delivery failed", e);
}
}
}
public static MessageHeaders originHeaders(String origin) {
return new MessageHeaders(Map.of(
"origin", formatOrigin(origin)
));
}
public static MessageHeaders tagHeaders(String origin, String tag) {
return new MessageHeaders(Map.of(
"origin", formatOrigin(origin),
"tag", formatTag(tag)
));
}
public static MessageHeaders refHeaders(String origin, HasTags ref) {
return new MessageHeaders(Map.of(
"origin", formatOrigin(origin),
"url", ref.getUrl()
));
}
public static MessageHeaders responseHeaders(String origin, String source) {
return new MessageHeaders(Map.of(
"origin", formatOrigin(origin),
"response", source
));
}
}