Backup.java

package jasper.component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import jakarta.persistence.EntityManager;
import jasper.component.Storage.Zipped;
import jasper.config.Props;
import jasper.domain.Ext;
import jasper.domain.Plugin;
import jasper.domain.Ref;
import jasper.domain.Template;
import jasper.domain.User;
import jasper.domain.proj.Cursor;
import jasper.repository.BackfillRepository;
import jasper.repository.ExtRepository;
import jasper.repository.PluginRepository;
import jasper.repository.RefRepository;
import jasper.repository.StreamMixin;
import jasper.repository.TemplateRepository;
import jasper.repository.UserRepository;
import jasper.service.dto.BackupOptionsDto;
import jasper.util.JsonArrayStreamDataSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.StreamUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import static jasper.component.FileCache.CACHE;

@Component
public class Backup {
	private final Logger logger = LoggerFactory.getLogger(Backup.class);
	private static final String BACKUPS = "backups";

	@Autowired
	Props props;

	@Autowired
	RefRepository refRepository;

	@Autowired
	BackfillRepository backfillRepository;

	@Autowired
	ExtRepository extRepository;

	@Autowired
	UserRepository userRepository;

	@Autowired
	PluginRepository pluginRepository;

	@Autowired
	TemplateRepository templateRepository;

	@Autowired
	ObjectMapper objectMapper;

	@Autowired
	EntityManager entityManager;

	@Autowired
	PlatformTransactionManager transactionManager;

	@Autowired
	Optional<Storage> storage;

	public record BackupStream(InputStream inputStream, long size) {}

	@Async
	@Transactional(readOnly = true)
	@Counted(value = "jasper.backup")
	public void createBackup(String origin, String id, BackupOptionsDto options) throws IOException {
		if (storage.isEmpty()) {
			logger.error("{} Backup create failed: No storage present.", origin);
			return;
		}
		var start = Instant.now();
		logger.info("{} Creating Backup", origin);
		try (var zipped = storage.get().zipAt(origin, BACKUPS, id + ".zip")) {
			if (options.isRef()) {
				backupRepo(refRepository, origin, options.getNewerThan(), zipped.out("ref.json"), false);
			}
			if (options.isExt()) {
				backupRepo(extRepository, origin, options.getNewerThan(), zipped.out("ext.json"));
			}
			if (options.isUser()) {
				backupRepo(userRepository, origin, options.getNewerThan(), zipped.out("user.json"));
			}
			if (options.isPlugin()) {
				backupRepo(pluginRepository, origin, options.getNewerThan(), zipped.out("plugin.json"));
			}
			if (options.isTemplate()) {
				backupRepo(templateRepository, origin, options.getNewerThan(), zipped.out("template.json"));
			}
			if (options.isCache()) {
				backupCache(origin, options.getNewerThan(), zipped);
			}
		}
		logger.info("{} Finished Backup in {}", origin, Duration.between(start, Instant.now()));
	}

	void backupRepo(StreamMixin<?> repo, String origin, Instant newerThan, OutputStream out) throws IOException {
		backupRepo(repo, origin, newerThan, out, true);
	}

	void backupRepo(StreamMixin<?> repo, String origin, Instant newerThan, OutputStream out, boolean evict) throws IOException {
		try (out) {
			var firstElementProcessed = new AtomicBoolean(false);
			var buf = new StringBuilder();
			buf.append("[");
			var buffSize = props.getBackupBufferSize();
			Stream<?> stream;
			if (newerThan != null) {
				stream = repo.streamAllByOriginAndModifiedGreaterThanEqualOrderByModifiedDesc(origin, newerThan);
			} else {
				stream = repo.streamAllByOriginOrderByModifiedDesc(origin);
			}
			stream.forEach(entity -> {
				try {
					if (firstElementProcessed.getAndSet(true)) {
						buf.append(",\n");
					}
					buf.append(objectMapper.writeValueAsString(entity));
					if (buf.length() > buffSize) {
						logger.debug("Flushing buffer {} bytes", buf.length());
						StreamUtils.copy(buf.toString().getBytes(), out);
						buf.setLength(0);
						entityManager.clear();
					}
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
				if (evict) {
					entityManager.detach(entity);
				}
			});
			buf.append("]");
			logger.debug("Flushing buffer {} bytes", buf.length());
			StreamUtils.copy(buf.toString().getBytes(), out);
		}
	}

	void backupCache(String origin, Instant newerThan, Zipped backup) {
		try {
			storage.get().backup(origin, CACHE, backup, newerThan);
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}

	@Timed(value = "jasper.backup", histogram = true)
	public BackupStream get(String origin, String id) {
		if (storage.isEmpty()) {
			logger.error("Backup get failed: No storage present.");
			return null;
		}
		return new BackupStream(
			storage.get().stream(origin, BACKUPS, id + ".zip"),
			storage.get().size(origin, BACKUPS, id + ".zip")
		);
	}

	public boolean exists(String origin, String id) {
		if (storage.isEmpty()) {
			logger.error("Backup exist check failed: No storage present.");
			return false;
		}
		return storage.get().exists(origin, BACKUPS, id + ".zip");
	}

	public List<Storage.StorageRef> listBackups(String origin) {
		if (storage.isEmpty()) {
			logger.error("Backup list failed: No storage present.");
			return null;
		}
		return storage.get().listStorage(origin, BACKUPS).stream()
			.filter(n -> n.id().endsWith(".zip")).toList();
	}

	@Async
	@Counted(value = "jasper.backup")
	public void restore(String origin, String id, BackupOptionsDto options) {
		if (storage.isEmpty()) {
			logger.error("{} Backup restore failed: No storage present.", origin);
			return;
		}
		var start = Instant.now();
		logger.info("{} Restoring Backup", origin);
		try (var zipped = storage.get().streamZip(origin, BACKUPS, id + ".zip")) {
			if (options == null || options.isRef()) {
				restoreRepo(refRepository, origin, zipped.list("ref.*\\.json"), Ref.class);
			}
			if (options == null || options.isExt()) {
				restoreRepo(extRepository, origin, zipped.list("ext.*\\.json"), Ext.class);
			}
			if (options == null || options.isUser()) {
				restoreRepo(userRepository, origin, zipped.list("user.*\\.json"), User.class);
			}
			if (options == null || options.isPlugin()) {
				restoreRepo(pluginRepository, origin, zipped.list("plugin.*\\.json"), Plugin.class);
			}
			if (options == null || options.isTemplate()) {
				restoreRepo(templateRepository, origin, zipped.list("template.*\\.json"), Template.class);
			}
			if (options == null || options.isCache()) {
				restoreCache(origin, zipped);
			}
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		logger.info("{} Finished Restore in {}", origin, Duration.between(start, Instant.now()));
	}

	<T extends Cursor> void restoreRepo(JpaRepository<T, ?> repo, String origin, Iterator<InputStream> files, Class<T> type) {
		files.forEachRemaining(file -> {
			restoreRepo(repo, origin, file, type);
		});
    }

	<T extends Cursor> void restoreRepo(JpaRepository<T, ?> repo, String origin, InputStream file, Class<T> type) {
		if (file == null) return; // Silently ignore missing files
		var done = new AtomicBoolean(false);
		var it = new JsonArrayStreamDataSupplier<>(file, type, objectMapper);
		int count = 0;
		try {
			while (!done.get()) {
				if (count > 0) logger.info("{} {} {} restored...", origin, type.getSimpleName(), count * props.getRestoreBatchSize());
				var lastBatchCount = count * props.getRestoreBatchSize();
				TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
				transactionTemplate.execute(status -> {
					for (var i = 0; i < props.getRestoreBatchSize(); i++) {
						if (!it.hasNext()) {
							done.set(true);
							logger.info("{} {} {} restored...", origin, type.getSimpleName(), lastBatchCount + i);
							return null;
						}
						var t = it.next();
						try {
							t.setOrigin(origin);
							repo.save(t);
						} catch (Exception e) {
							try {
								logger.error("{} Skipping {} {} due to constraint violation", origin, type.getSimpleName(), objectMapper.writeValueAsString(t), e);
							} catch (JsonProcessingException ex) {
								logger.error("{} Skipping {} {} due to constraint violation", origin, type.getSimpleName(), type, e);
							}
						}
					}
					return null;
				});
				count++;
			}
		} catch (Exception e) {
			logger.error("Failed to restore", e);
		}
    }

	private void restoreCache(String origin, Zipped backup) {
		try {
			storage.get().restore(origin, CACHE, backup);
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}

	@Timed(value = "jasper.backup", histogram = true)
	public void store(String origin, String id, InputStream zipFile) throws IOException {
		if (storage.isEmpty()) {
			logger.error("Backup store failed: No storage present.");
			return;
		}
		storage.get().storeAt(origin, BACKUPS, id+ ".zip", zipFile);
	}

	/**
	 * Will first drop and then regenerate all metadata
	 * for the given origin and all sub-origins.
	 */
	@Async
	@Counted(value = "jasper.backup")
	public void regen(String origin) {
		var start = Instant.now();
		logger.info("{} Starting Backfill", origin);
		refRepository.dropMetadata(origin);
		logger.info("{} Cleared old metadata", origin);
		int count = 0;
		while (props.getBackfillBatchSize() == backfillRepository.backfillMetadata(origin, props.getBackfillBatchSize())) {
			count += props.getBackfillBatchSize();
			logger.info("{} Generating metadata... {} done", origin, count);
		}
		logger.info("{} Finished Backfill in {}", origin, Duration.between(start, Instant.now()));
	}

	@Timed(value = "jasper.backup", histogram = true)
	public void delete(String origin, String id) throws IOException {
		if (storage.isEmpty()) {
			logger.error("Backup delete failed: No storage present.");
			return;
		}
		storage.get().delete(origin, BACKUPS, id + ".zip");
	}
}