Ingest.java

package jasper.component;

import io.micrometer.core.annotation.Timed;
import jakarta.persistence.EntityExistsException;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceException;
import jakarta.persistence.RollbackException;
import jasper.config.Props;
import jasper.domain.Metadata;
import jasper.domain.Ref;
import jasper.errors.AlreadyExistsException;
import jasper.errors.DuplicateModifiedDateException;
import jasper.errors.InvalidPushException;
import jasper.errors.ModifiedException;
import jasper.errors.NotFoundException;
import jasper.repository.RefRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.orm.jpa.JpaSystemException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;

import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static jasper.component.Meta.expandTags;
import static jasper.util.DbConstraint.isPkViolation;
import static jasper.util.DbConstraint.isUniqueModifiedOriginViolation;

@Component
public class Ingest {
	private static final Logger logger = LoggerFactory.getLogger(Ingest.class);

	@Autowired
	Props props;

	@Autowired
	RefRepository refRepository;

	@Autowired
	EntityManager em;

	@Autowired
	Validate validate;

	@Autowired
	Meta meta;

	@Autowired
	Rng rng;

	@Autowired
	Messages messages;

	@Autowired
	PlatformTransactionManager transactionManager;

	// Exposed for testing
	Clock ensureUniqueModifiedClock = Clock.systemUTC();

	@Timed(value = "jasper.ref", histogram = true)
	public void create(String rootOrigin, Ref ref) {
		ref.setCreated(Instant.now());
		validate.ref(rootOrigin, ref);
		rng.update(rootOrigin, ref, null);
		meta.ref(rootOrigin, ref);
		ensureCreateUniqueModified(ref);
		meta.sources(rootOrigin, ref, null);
		messages.updateRef(ref);
	}

	@Timed(value = "jasper.ref", histogram = true)
	public void update(String rootOrigin, Ref ref) {
		var maybeExisting = refRepository.findOneByUrlAndOrigin(ref.getUrl(), ref.getOrigin());
		if (maybeExisting.isEmpty()) throw new NotFoundException("Ref");
		validate.ref(rootOrigin, ref);
		rng.update(rootOrigin, ref, maybeExisting.get());
		meta.ref(rootOrigin, ref);
		ensureUpdateUniqueModified(ref);
		meta.sources(rootOrigin, ref, maybeExisting.get());
		messages.updateRef(ref);
	}

	@Timed(value = "jasper.ref", histogram = true)
	public void updateResponse(String rootOrigin, Ref ref) {
		var maybeExisting = refRepository.findOneByUrlAndOrigin(ref.getUrl(), ref.getOrigin());
		if (maybeExisting.isEmpty()) throw new NotFoundException("Ref");
		validate.response(rootOrigin, ref);
		rng.update(rootOrigin, ref, maybeExisting.get());
		meta.response(rootOrigin, ref);
		ensureUpdateUniqueModified(ref);
		meta.responseSource(rootOrigin, ref, maybeExisting.get());
		messages.updateRef(ref);
	}

	@Timed(value = "jasper.ref", histogram = true)
	public void silent(String rootOrigin, Ref ref) {
		var maybeExisting = refRepository.findOneByUrlAndOrigin(ref.getUrl(), ref.getOrigin());
		meta.ref(rootOrigin, ref);
		ensureSilentUniqueModified(ref);
		meta.sources(rootOrigin, ref, maybeExisting.orElse(null));
		messages.updateSilentRef(ref);
	}

	@Timed(value = "jasper.ref", histogram = true)
	public void push(String rootOrigin, Ref ref, boolean validation, boolean stripInvalidPlugins) {
		var generateMetadata = ref.getModified() == null || ref.getModified().isAfter(Instant.now().minus(5, ChronoUnit.MINUTES));
		if (validation) validate.ref(rootOrigin, ref, stripInvalidPlugins);
		Ref maybeExisting = null;
		if (generateMetadata) {
			maybeExisting = refRepository.findOneByUrlAndOrigin(ref.getUrl(), ref.getOrigin()).orElse(null);
			rng.update(rootOrigin, ref, maybeExisting);
			meta.ref(rootOrigin, ref);
		} else {
			ref.setMetadata(Metadata
				.builder()
				.modified(null)
				.regen(true)
				.expandedTags(expandTags(ref.getTags()))
				.build());
		}
		pushUniqueModified(ref);
		if (generateMetadata) meta.sources(rootOrigin, ref, maybeExisting);
		messages.updateRef(ref);
	}

	@Transactional
	@Timed(value = "jasper.ref", histogram = true)
	public void delete(String rootOrigin, String url, String origin) {
		var maybeExisting = refRepository.findOneByUrlAndOrigin(url, origin);
		if (maybeExisting.isEmpty()) return;
		messages.deleteRef(maybeExisting.get());
		refRepository.deleteByUrlAndOrigin(url, origin);
		meta.sources(rootOrigin, null, maybeExisting.get());
	}

	void ensureCreateUniqueModified(Ref ref) {
		var count = 0;
		while (true) {
			try {
				count++;
				new TransactionTemplate(transactionManager).execute(status -> {
					ref.setModified(Instant.now(ensureUniqueModifiedClock));
					em.persist(ref);
					em.flush();
					return null;
				});
				break;
			} catch (DataIntegrityViolationException | PersistenceException | JpaSystemException e) {
				if (e instanceof EntityExistsException) throw new AlreadyExistsException();
				if (isPkViolation(e, "ref")) throw new AlreadyExistsException();
				if (isUniqueModifiedOriginViolation(e, "ref")) {
					if (count > props.getIngestMaxRetry()) throw new DuplicateModifiedDateException();
					continue;
				}
				throw e;
			}
		}
	}

	void ensureSilentUniqueModified(Ref ref) {
		var cursor = ref.getModified();
		var count = 0;
		while (true) {
			try {
				count++;
				new TransactionTemplate(transactionManager).execute(status -> {
					refRepository.saveAndFlush(ref);
					return null;
				});
				break;
			} catch (DataIntegrityViolationException | PersistenceException | JpaSystemException e) {
				if (!isUniqueModifiedOriginViolation(e, "ref")) throw e;
				if (count > props.getIngestMaxRetry()) {
					count = 0;
					cursor = cursor.minusNanos((long) (1000 * Math.random()));
					ref.setModified(cursor);
				} else {
					ref.setModified(ref.getModified().minusMillis(1));
				}
			}
		}
	}

	void ensureUpdateUniqueModified(Ref ref) {
		var cursor = ref.getModified();
		var count = 0;
		while (true) {
			try {
				count++;
				new TransactionTemplate(transactionManager).execute(status -> {
					ref.setModified(Instant.now(ensureUniqueModifiedClock));
					var updated = refRepository.optimisticUpdate(
						cursor,
						ref.getUrl(),
						ref.getOrigin(),
						ref.getTitle(),
						ref.getComment(),
						ref.getTags(),
						ref.getSources(),
						ref.getAlternateUrls(),
						ref.getPlugins(),
						ref.getMetadata(),
						ref.getPublished(),
						ref.getModified());
					if (updated == 0) {
						throw new ModifiedException("Ref");
					}
					return null;
				});
				break;
			} catch (DataIntegrityViolationException | PersistenceException | JpaSystemException e) {
				if (isUniqueModifiedOriginViolation(e, "ref")) {
					if (count > props.getIngestMaxRetry()) throw new DuplicateModifiedDateException();
					continue;
				}
				throw e;
			}
		}
	}

	void pushUniqueModified(Ref ref) {
		try {
			var updated = refRepository.pushAsyncMetadata(
				ref.getUrl(),
				ref.getOrigin(),
				ref.getTitle(),
				ref.getComment(),
				ref.getTags(),
				ref.getSources(),
				ref.getAlternateUrls(),
				ref.getPlugins(),
				ref.getMetadata(),
				ref.getPublished(),
				ref.getModified());
			if (updated == 0) {
				refRepository.save(ref);
			}
		} catch (DataIntegrityViolationException | PersistenceException | JpaSystemException e) {
			if (e instanceof EntityExistsException) throw new AlreadyExistsException();
			if (isPkViolation(e, "ref")) throw new AlreadyExistsException();
			if (isUniqueModifiedOriginViolation(e, "ref")) throw new DuplicateModifiedDateException();
			throw e;
		} catch (TransactionSystemException e) {
			if (e.getCause() instanceof RollbackException r) {
				if (r.getCause() instanceof jakarta.validation.ConstraintViolationException) throw new InvalidPushException();
			}
			throw e;
		}
	}

}