BulkheadConfiguration.java
package jasper.config;
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import jasper.component.ConfigCache;
import jasper.service.dto.TemplateDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import static java.time.Duration.ofMinutes;
import static java.time.Duration.ofSeconds;
import static org.apache.commons.lang3.StringUtils.isBlank;
@Configuration
public class BulkheadConfiguration {
private static final Logger logger = LoggerFactory.getLogger(BulkheadConfiguration.class);
@Autowired
ConfigCache configs;
@Autowired
BulkheadRegistry registry;
@Bean
public Bulkhead httpBulkhead() {
return registry.bulkhead("http", BulkheadConfig.custom()
.maxConcurrentCalls(configs.root().getMaxConcurrentRequests())
.maxWaitDuration(ofSeconds(0))
.build());
}
@Bean
public Bulkhead scriptBulkhead() {
return registry.bulkhead("script", BulkheadConfig.custom()
.maxConcurrentCalls(configs.root().getMaxConcurrentScripts())
.maxWaitDuration(ofSeconds(60))
.build());
}
@Bean
public Bulkhead replBulkhead() {
return registry.bulkhead("repl", BulkheadConfig.custom()
.maxConcurrentCalls(configs.root().getMaxConcurrentReplication())
.maxWaitDuration(ofMinutes(15))
.build());
}
@Bean
public Bulkhead fetchBulkhead() {
return registry.bulkhead("fetch", BulkheadConfig.custom()
.maxConcurrentCalls(configs.root().getMaxConcurrentFetch())
.maxWaitDuration(ofMinutes(5))
.build());
}
@Bean
public Bulkhead recyclerBulkhead() {
return registry.bulkhead("recycler", BulkheadConfig.custom()
.maxConcurrentCalls(1)
.maxWaitDuration(ofMinutes(0))
.build());
}
@ServiceActivator(inputChannel = "templateRxChannel")
public void handleTemplateUpdate(Message<TemplateDto> message) {
var template = message.getPayload();
if (isBlank(template.getTag())) return;
if (template.getTag().startsWith("_config/server")) {
logger.debug("Server config template updated, updating bulkhead configurations");
updateBulkheadConfig(httpBulkhead(), configs.root().getMaxConcurrentRequests());
updateBulkheadConfig(scriptBulkhead(), configs.root().getMaxConcurrentScripts());
updateBulkheadConfig(replBulkhead(), configs.root().getMaxConcurrentReplication());
updateBulkheadConfig(fetchBulkhead(), configs.root().getMaxConcurrentFetch());
}
}
public static void updateBulkheadConfig(Bulkhead bulkhead, int limit) {
bulkhead.changeConfig(BulkheadConfig
.from(bulkhead.getBulkheadConfig())
.maxConcurrentCalls(limit)
.build());
}
}