StorageImplLocal.java

package jasper.component;


import io.micrometer.core.annotation.Timed;
import jasper.config.Props;
import jasper.errors.AlreadyExistsException;
import jasper.errors.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import org.springframework.util.StreamUtils;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

@Profile("storage")
@Component
public class StorageImplLocal implements Storage {
	private final Logger logger = LoggerFactory.getLogger(StorageImplLocal.class);

	@Autowired
	Props props;

	@Timed(value = "jasper.storage", histogram = true)
	public byte[] get(String origin, String namespace, String id) {
		try {
			return Files.readAllBytes(path(origin, namespace, id));
		} catch (IOException e) {
			throw new NotFoundException("Cache " + id);
		}
	}

	@Timed(value = "jasper.storage", histogram = true)
	public long size(String origin, String namespace, String id) {
		return path(origin, namespace, id).toFile().length();
	}

	@Timed(value = "jasper.storage", histogram = true)
	public InputStream stream(String origin, String namespace, String id) {
		try {
			return new FileInputStream(path(origin, namespace, id).toFile());
		} catch (IOException e) {
			throw new NotFoundException("Storage file (" + origin + ", " + namespace + ") " + id);
		}
	}

	@Timed(value = "jasper.storage", histogram = true)
	public long stream(String origin, String namespace, String id, OutputStream os) {
		try {
			return Files.copy(path(origin, namespace, id), os);
		} catch (IOException e) {
			throw new NotFoundException("Storage file (" + origin + ", " + namespace + ") " + id);
		}
	}

	@Override
	public Zipped streamZip(String origin, String namespace, String id) throws IOException {
		return new ZippedLocal(origin, namespace, id, false);
	}

	@Timed(value = "jasper.storage", histogram = true)
	public void visitTenants(PathVisitor v) {
		var dir = tenants();
		if (!dir.toFile().exists()) return;
		try (var list = Files.list(dir)) {
			list.forEach(t -> {
				if (!t.toFile().isDirectory()) return;
				var tenant = t.getFileName().toString();
				v.visit(tenantOrigin(tenant));
			});
		} catch (IOException e) {
			logger.warn("Error reading tenant dir", e);
		}
	}

	@Override
	public List<String> listTenants() {
		try (var list = Files.list(tenants())) {
			return list
				.map(f -> f.getFileName().toString())
				.collect(Collectors.toList());
		} catch (IOException e) {
			return Collections.emptyList();
		}
	}

	@Timed(value = "jasper.storage", histogram = true)
	public void visitStorage(String origin, String namespace, PathVisitor v) {
		var dir = dir(origin, namespace);
		if (!dir.toFile().exists()) return;
		try (var list = Files.list(dir)) {
			list.forEach(p -> v.visit(p.getFileName().toString()));
		} catch (IOException e) {
			logger.warn("Error reading storage", e);
		}
	}

	@Override
	public List<StorageRef> listStorage(String origin, String namespace) {
		try (var list = Files.list(dir(origin, namespace))) {
			return list
				.map(f -> new StorageRef(f.getFileName().toString(), f.toFile().length()))
				.collect(Collectors.toList());
		} catch (IOException e) {
			return Collections.emptyList();
		}
	}

	@Timed(value = "jasper.storage", histogram = true)
	public boolean exists(String origin, String namespace, String id) {
		return path(origin, namespace, id).toFile().exists();
	}

	@Timed(value = "jasper.storage", histogram = true)
	public void overwrite(String origin, String namespace, String id, byte[] file) throws IOException {
		if (!exists(origin, namespace, id)) throw new NotFoundException("Cache " + id);
		var path = path(origin, namespace, id);
		Files.write(path, file, StandardOpenOption.TRUNCATE_EXISTING);
	}

	@Timed(value = "jasper.storage", histogram = true)
	public String store(String origin, String namespace, byte[] file) throws IOException {
		var id = UUID.randomUUID().toString();
		storeAt(origin, namespace, id, file);
		return id;
	}

	@Timed(value = "jasper.storage", histogram = true)
	public String store(String origin, String namespace, InputStream is) throws IOException {
		var id = UUID.randomUUID().toString();
		var path = path(origin, namespace, id);
		Files.createDirectories(path.getParent());
		try (var fos = new FileOutputStream(path.toFile())) {
			StreamUtils.copy(is, fos);
		}
		return id;
	}

	@Override
	public Zipped zipAt(String origin, String namespace, String id) throws IOException {
		if (path(origin, namespace, id).toFile().exists()) throw new AlreadyExistsException();
		Files.createDirectories(dir(origin, namespace));
		return new ZippedLocal(origin, namespace, id, true);
	}

	@Timed(value = "jasper.storage", histogram = true)
	public void storeAt(String origin, String namespace, String id, byte[] file) throws IOException {
		var path = path(origin, namespace, id);
		if (path.toFile().exists()) throw new AlreadyExistsException();
		Files.createDirectories(path.getParent());
		Files.write(path, file, StandardOpenOption.CREATE_NEW);
	}

	@Timed(value = "jasper.storage", histogram = true)
	public void storeAt(String origin, String namespace, String id, InputStream is) throws IOException {
		var path = path(origin, namespace, id);
		if (path.toFile().exists()) throw new AlreadyExistsException();
		Files.createDirectories(path.getParent());
		try (var fos = new FileOutputStream(path.toFile())) {
			StreamUtils.copy(is, fos);
		}
	}

	@Timed(value = "jasper.storage", histogram = true)
	public void delete(String origin, String namespace, String id) throws IOException {
		Files.delete(path(origin, namespace, id));
	}

	@Override
	public void backup(String origin, String namespace, Zipped backup, Instant modifiedAfter) throws IOException {
		if (!dir(origin, namespace).toFile().exists()) return;
		Files.createDirectories(backup.get(namespace));
		try (var w = Files.walk(dir(origin, namespace))) {
			w.forEach(f -> {
				if (Files.isRegularFile(f) && (modifiedAfter == null || f.toFile().lastModified() > modifiedAfter.toEpochMilli())) {
					try {
						Files.copy(f, backup.get(namespace, f.getFileName().toString()));
					} catch (IOException e) {
						throw new RuntimeException(e);
					}
				}
			});
		}
	}

	@Override
	public void restore(String origin, String namespace, Zipped backup) throws IOException {
		if (!Files.exists(backup.get(namespace))) return;
		Files.createDirectories(dir(origin, namespace));
		try (var w = Files.walk(backup.get(namespace))) {
			w.forEach(f -> {
				if (Files.isRegularFile(f)) {
					try {
						Files.copy(f, path(origin, namespace, f.getFileName().toString()));
					} catch (FileAlreadyExistsException e) {
						// TODO: overwrite option?
					} catch (IOException e) {
						throw new RuntimeException(e);
					}
				}
			});
		}
	}

	Path tenants() {
		return Paths.get(props.getStorage());
	}

	Path dir(String origin, String namespace) {
		sanitize(origin, namespace);
		return Paths.get(props.getStorage(), originTenant(origin), namespace);
	}

	Path path(String origin, String namespace, String id) {
		sanitize(origin, namespace, id);
		return Paths.get(props.getStorage(), originTenant(origin), namespace, id);
	}

	private class ZippedLocal implements Zipped {
		private final FileSystem zipfs;
		private final boolean create;
		private final String origin;
		private final String namespace;
		private final String id;

		public ZippedLocal(String origin, String namespace, String id, boolean create) throws IOException {
			this.origin = origin;
			this.namespace = namespace;
			this.id = id;
			this.create = create;
			zipfs = FileSystems.newFileSystem(path(origin, namespace, create ? "_" + id : id), Map.of("create", create ? "true" : "false"));
		}

		@Override
		public Path get(String first, String... more) {
			return zipfs.getPath(first, more);
		}

		@Override
		public InputStream in(String filename) {
			logger.debug("Reading zip file {}", filename);
            try {
                return Files.newInputStream(zipfs.getPath(filename));
            } catch (IOException e) {
                return null;
            }
        }

		@Override
		public OutputStream out(String filename) throws IOException {
			logger.debug("Zipping up {}", filename);
			return Files.newOutputStream(zipfs.getPath(filename));
        }

		@Override
		public Iterator<InputStream> list(String pattern) throws IOException {
			logger.debug("Listing files matching pattern {}", pattern);
			var root = zipfs.getPath("/");
			if (!Files.exists(root)) return Collections.emptyIterator();
			try (var stream = Files.walk(root)) {
				var files = stream
					.filter(Files::isRegularFile)
					.map(p -> p.toString().startsWith("/") ? p.toString().substring(1) : p.toString())
					.filter(name -> name.matches(pattern))
					.collect(Collectors.toList());
				return files.stream()
					.map(filename -> {
						try {
							return Files.newInputStream(zipfs.getPath(filename));
						} catch (IOException e) {
							logger.warn("Failed to open file {} in zip", filename, e);
							return null;
						}
					})
					.filter(is -> is != null)
					.iterator();
			}
		}

		@Override
		public void close() throws IOException {
			zipfs.close();
			if (create) {
				// Remove underscore to indicate writing has finished
				Files.move(path(origin, namespace, "_" + id), path(origin, namespace, id));
			}
		}
	}
}