RedisConfig.java

package jasper.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jasper.service.dto.ExtDto;
import jasper.service.dto.PluginDto;
import jasper.service.dto.RefDto;
import jasper.service.dto.TemplateDto;
import jasper.service.dto.UserDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Profile;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;

import static jasper.component.Messages.originHeaders;
import static jasper.component.Messages.refHeaders;
import static jasper.component.Messages.responseHeaders;
import static jasper.component.Messages.tagHeaders;
import static jasper.domain.proj.HasOrigin.formatOrigin;
import static jasper.domain.proj.HasTags.formatTag;
import static java.util.Arrays.copyOfRange;
import static org.springframework.data.redis.listener.PatternTopic.of;

@Profile("redis")
@Import(DataRedisAutoConfiguration.class)
@Configuration
public class RedisConfig {
	private static final Logger logger = LoggerFactory.getLogger(RedisConfig.class);

	@Autowired
	ObjectMapper objectMapper;

	@Autowired
	RedisConnectionFactory redisConnectionFactory;

	@Autowired
	MessageChannel cursorTxChannel;

	@Autowired
	MessageChannel cursorRxChannel;

	@Autowired
	MessageChannel refTxChannel;

	@Autowired
	MessageChannel refRxChannel;

	@Autowired
	MessageChannel tagTxChannel;

	@Autowired
	MessageChannel tagRxChannel;

	@Autowired
	MessageChannel responseTxChannel;

	@Autowired
	MessageChannel responseRxChannel;

	@Autowired
	MessageChannel userTxChannel;

	@Autowired
	MessageChannel userRxChannel;

	@Autowired
	MessageChannel extTxChannel;

	@Autowired
	MessageChannel extRxChannel;

	@Autowired
	MessageChannel pluginTxChannel;

	@Autowired
	MessageChannel pluginRxChannel;

	@Autowired
	MessageChannel templateTxChannel;

	@Autowired
	MessageChannel templateRxChannel;

	@Bean
	public MessageChannel cursorRedisChannel() {
		return new DirectChannel();
	}

	@Bean
	public MessageChannel refRedisChannel() {
		return new DirectChannel();
	}

	@Bean
	public MessageChannel tagRedisChannel() {
		return new DirectChannel();
	}

	@Bean
	public MessageChannel responseRedisChannel() {
		return new DirectChannel();
	}

	@Bean
	public MessageChannel userRedisChannel() {
		return new DirectChannel();
	}

	@Bean
	public MessageChannel extRedisChannel() {
		return new DirectChannel();
	}

	@Bean
	public MessageChannel pluginRedisChannel() {
		return new DirectChannel();
	}

	@Bean
	public MessageChannel templateRedisChannel() {
		return new DirectChannel();
	}

	@Bean
	public IntegrationFlow redisPublishCursorFlow() {
		return IntegrationFlow
			.from(cursorTxChannel)
			.handle(new CustomPublishingMessageHandler<Instant>() {
				@Override
				protected String getTopic(Message<Instant> message) {
					return "cursor/" + formatOrigin(message.getHeaders().get("origin"));
				}

				@Override
				protected byte[] getMessage(Message<Instant> message) {
					return message.getPayload().toString().getBytes();
				}
			})
			.get();
	}

	@Bean
	public IntegrationFlow redisSubscribeCursorFlow() {
		return IntegrationFlow
			.from(cursorRedisChannel())
			.channel(cursorRxChannel)
			.get();
	}

	@Bean
	public RedisMessageListenerContainer redisCursorRxAdapter(RedisConnectionFactory redisConnectionFactory) {
		var container = new RedisMessageListenerContainer();
		container.setConnectionFactory(redisConnectionFactory);
		container.addMessageListener((message, pattern) -> {
			var cursor = Instant.parse(new String(message.getBody()));
			var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
			var origin = parts[1];
			cursorRedisChannel().send(MessageBuilder.createMessage(cursor, originHeaders(origin)));
		}, of("cursor/*"));
		return container;
	}

	@Bean
	public IntegrationFlow redisPublishRefFlow() {
		return IntegrationFlow
			.from(refTxChannel)
			.handle(new CustomPublishingMessageHandler<RefDto>() {
				@Override
				protected String getTopic(Message<RefDto> message) {
					return "ref/" + formatOrigin(message.getHeaders().get("origin")) + "/" + message.getHeaders().get("url");
				}

				@Override
				protected byte[] getMessage(Message<RefDto> message) {
					try {
						return objectMapper.writeValueAsBytes(message.getPayload());
					} catch (JsonProcessingException e) {
						logger.error("Cannot serialize RefDto.");
						throw new RuntimeException(e);
					}
				}
			})
			.get();
	}

	@Bean
	public IntegrationFlow redisSubscribeRefFlow() {
		return IntegrationFlow
			.from(refRedisChannel())
			.channel(refRxChannel)
			.get();
	}

	@Bean
	public RedisMessageListenerContainer redisRefRxAdapter(RedisConnectionFactory redisConnectionFactory) {
		var container = new RedisMessageListenerContainer();
		container.setConnectionFactory(redisConnectionFactory);
		container.addMessageListener((message, pattern) -> {
			try {
				var ref = objectMapper.readValue(message.getBody(), RefDto.class);
				var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
				var origin = parts[1];
				refRedisChannel().send(MessageBuilder.createMessage(ref, refHeaders(origin, ref)));
			} catch (IOException e) {
				logger.error("Error parsing RefDto from redis.");
			}
		}, of("ref/*"));
		return container;
	}

	@Bean
	public IntegrationFlow redisPublishTagFlow() {
		return IntegrationFlow
			.from(tagTxChannel)
			.handle(new CustomPublishingMessageHandler<String>() {
				@Override
				protected String getTopic(Message<String> message) {
					return "tag/" + formatOrigin(message.getHeaders().get("origin")) + "/" + formatTag(message.getHeaders().get("tag"));
				}

				@Override
				protected byte[] getMessage(Message<String> message) {
					return message.getPayload().getBytes();
				}
			})
			.get();
	}

	@Bean
	public IntegrationFlow redisSubscribeTagFlow() {
		return IntegrationFlow
			.from(tagRedisChannel())
			.channel(tagRxChannel)
			.get();
	}

	@Bean
	public RedisMessageListenerContainer redisTagRxAdapter(RedisConnectionFactory redisConnectionFactory) {
		var container = new RedisMessageListenerContainer();
		container.setConnectionFactory(redisConnectionFactory);
		container.addMessageListener((message, pattern) -> {
			var fullTag = new String(message.getBody(), StandardCharsets.UTF_8);
			var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
			var origin = parts[1];
			var tag = String.join("/", copyOfRange(parts, 2, parts.length));
			tagRedisChannel().send(MessageBuilder.createMessage(fullTag, tagHeaders(origin, tag)));
		}, of("tag/*"));
		return container;
	}

	@Bean
	public IntegrationFlow redisPublishResponseFlow() {
		return IntegrationFlow
			.from(responseTxChannel)
			.handle(new CustomPublishingMessageHandler<String>() {
				@Override
				protected String getTopic(Message<String> message) {
					return "response/" + formatOrigin(message.getHeaders().get("origin")) + "/" + message.getHeaders().get("response");
				}

				@Override
				protected byte[] getMessage(Message<String> message) {
					return message.getPayload().getBytes();
				}
			})
			.get();
	}

	@Bean
	public IntegrationFlow redisSubscribeResponseFlow() {
		return IntegrationFlow
			.from(responseRedisChannel())
			.channel(responseRxChannel)
			.get();
	}

	@Bean
	public RedisMessageListenerContainer redisResponseRxAdapter(RedisConnectionFactory redisConnectionFactory) {
		var container = new RedisMessageListenerContainer();
		container.setConnectionFactory(redisConnectionFactory);
		container.addMessageListener((message, pattern) -> {
			var response = new String(message.getBody(), StandardCharsets.UTF_8);
			var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
			var origin = parts[1];
			var source = String.join("/", copyOfRange(parts, 2, parts.length));
			responseRedisChannel().send(MessageBuilder.createMessage(response, responseHeaders(origin, source)));
		}, of("response/*"));
		return container;
	}

	@Bean
	public IntegrationFlow redisPublishUserFlow() {
		return IntegrationFlow
			.from(userTxChannel)
			.handle(new CustomPublishingMessageHandler<UserDto>() {
				@Override
				protected String getTopic(Message<UserDto> message) {
					return "user/" + formatOrigin(message.getHeaders().get("origin")) + "/" + message.getHeaders().get("tag");
				}

				@Override
				protected byte[] getMessage(Message<UserDto> message) {
					try {
						return objectMapper.writeValueAsBytes(message.getPayload());
					} catch (JsonProcessingException e) {
						logger.error("Cannot serialize UserDto.");
						throw new RuntimeException(e);
					}
				}
			})
			.get();
	}

	@Bean
	public IntegrationFlow redisSubscribeUserFlow() {
		return IntegrationFlow
			.from(userRedisChannel())
			.channel(userRxChannel)
			.get();
	}

	@Bean
	public RedisMessageListenerContainer redisUserRxAdapter(RedisConnectionFactory redisConnectionFactory) {
		var container = new RedisMessageListenerContainer();
		container.setConnectionFactory(redisConnectionFactory);
		container.addMessageListener((message, pattern) -> {
			try {
				var user = objectMapper.readValue(message.getBody(), UserDto.class);
				var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
				var origin = parts[1];
				var tag = String.join("/", copyOfRange(parts, 2, parts.length));
				userRedisChannel().send(MessageBuilder.createMessage(user, tagHeaders(origin, tag)));
			} catch (IOException e) {
				logger.error("Error parsing UserDto from redis.");
			}
		}, of("user/*"));
		return container;
	}

	@Bean
	public IntegrationFlow redisPublishExtFlow() {
		return IntegrationFlow
			.from(extTxChannel)
			.handle(new CustomPublishingMessageHandler<ExtDto>() {
				@Override
				protected String getTopic(Message<ExtDto> message) {
					return "ext/" + formatOrigin(message.getHeaders().get("origin")) + "/" + formatTag(message.getHeaders().get("tag"));
				}

				@Override
				protected byte[] getMessage(Message<ExtDto> message) {
					try {
						return objectMapper.writeValueAsBytes(message.getPayload());
					} catch (JsonProcessingException e) {
						logger.error("Cannot serialize ExtDto.");
						throw new RuntimeException(e);
					}
				}
			})
			.get();
	}

	@Bean
	public IntegrationFlow redisSubscribeExtFlow() {
		return IntegrationFlow
			.from(extRedisChannel())
			.channel(extRxChannel)
			.get();
	}

	@Bean
	public RedisMessageListenerContainer redisExtRxAdapter(RedisConnectionFactory redisConnectionFactory) {
		var container = new RedisMessageListenerContainer();
		container.setConnectionFactory(redisConnectionFactory);
		container.addMessageListener((message, pattern) -> {
			try {
				var ext = objectMapper.readValue(message.getBody(), ExtDto.class);
				var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
				var origin = parts[1];
				var tag = String.join("/", copyOfRange(parts, 2, parts.length));
				extRedisChannel().send(MessageBuilder.createMessage(ext, tagHeaders(origin, tag)));
			} catch (IOException e) {
				logger.error("Error parsing ExtDto from redis.");
			}
		}, of("ext/*"));
		return container;
	}

	@Bean
	public IntegrationFlow redisPublishPluginFlow() {
		return IntegrationFlow
			.from(pluginTxChannel)
			.handle(new CustomPublishingMessageHandler<PluginDto>() {
				@Override
				protected String getTopic(Message<PluginDto> message) {
					return "plugin/" + formatOrigin(message.getHeaders().get("origin")) + "/" + formatTag(message.getHeaders().get("tag"));
				}

				@Override
				protected byte[] getMessage(Message<PluginDto> message) {
					try {
						return objectMapper.writeValueAsBytes(message.getPayload());
					} catch (JsonProcessingException e) {
						logger.error("Cannot serialize PluginDto.");
						throw new RuntimeException(e);
					}
				}
			})
			.get();
	}

	@Bean
	public IntegrationFlow redisSubscribePluginFlow() {
		return IntegrationFlow
			.from(pluginRedisChannel())
			.channel(pluginRxChannel)
			.get();
	}

	@Bean
	public RedisMessageListenerContainer redisPluginRxAdapter(RedisConnectionFactory redisConnectionFactory) {
		var container = new RedisMessageListenerContainer();
		container.setConnectionFactory(redisConnectionFactory);
		container.addMessageListener((message, pattern) -> {
			try {
				var plugin = objectMapper.readValue(message.getBody(), PluginDto.class);
				var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
				var origin = parts[1];
				var tag = String.join("/", copyOfRange(parts, 2, parts.length));
				pluginRedisChannel().send(MessageBuilder.createMessage(plugin, tagHeaders(origin, tag)));
			} catch (IOException e) {
				logger.error("Error parsing PluginDto from redis.");
			}
		}, of("plugin/*"));
		return container;
	}

	@Bean
	public IntegrationFlow redisPublishTemplateFlow() {
		return IntegrationFlow
			.from(templateTxChannel)
			.handle(new CustomPublishingMessageHandler<TemplateDto>() {
				@Override
				protected String getTopic(Message<TemplateDto> message) {
					return "template/" + formatOrigin(message.getHeaders().get("origin")) + "/" + formatTag(message.getHeaders().get("tag"));
				}

				@Override
				protected byte[] getMessage(Message<TemplateDto> message) {
					try {
						return objectMapper.writeValueAsBytes(message.getPayload());
					} catch (JsonProcessingException e) {
						logger.error("Cannot serialize TemplateDto.");
						throw new RuntimeException(e);
					}
				}
			})
			.get();
	}

	@Bean
	public IntegrationFlow redisSubscribeTemplateFlow() {
		return IntegrationFlow
			.from(templateRedisChannel())
			.channel(templateRxChannel)
			.get();
	}

	@Bean
	public RedisMessageListenerContainer redisTemplateRxAdapter(RedisConnectionFactory redisConnectionFactory) {
		var container = new RedisMessageListenerContainer();
		container.setConnectionFactory(redisConnectionFactory);
		container.addMessageListener((message, pattern) -> {
			try {
				var template = objectMapper.readValue(message.getBody(), TemplateDto.class);
				var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
				var origin = parts[1];
				var tag = String.join("/", copyOfRange(parts, 2, parts.length));
				templateRedisChannel().send(MessageBuilder.createMessage(template, tagHeaders(origin, tag)));
			} catch (IOException e) {
				logger.error("Error parsing TemplateDto from redis.");
			}
		}, of("template/*"));
		return container;
	}

	private abstract class CustomPublishingMessageHandler<T> extends AbstractMessageHandler {

		private final RedisTemplate<?, ?> template;

		public CustomPublishingMessageHandler() {
			template = new RedisTemplate<>();
			template.setConnectionFactory(redisConnectionFactory);
			template.setEnableDefaultSerializer(false);
			template.afterPropertiesSet();
		}

		@Override
		public String getComponentType() {
			return "redis:outbound-channel-adapter";
		}

		@Override
		@SuppressWarnings("unchecked")
		protected void handleMessageInternal(Message<?> message) {
			template.convertAndSend(getTopic((Message<T>) message), getMessage((Message<T>) message));
		}

		protected abstract String getTopic(Message<T> message);
		protected abstract byte[] getMessage(Message<T> message);
	}
}