In this further step of my Java 8 exercise I'm going to introduce a better persistence facility to SolidBlue II, also addressing one of the issues that I mentioned about the previous implementation.
Before going to the focus point of this post, let me anticipate that also a bit of housekeeping has been performed to the project:
- SolidBlue II has now a multi-module structure, in order to be prepared to accommodate new features that I'm developing in a branch.
- The SCM has been switched from Mercurial to Git. Mercurial is still my favourite DSCM, but I understand that most people has more acquaintance with Git and these examples are also being used for mentoring of my customers... so Git makes things easier.
Then a better separation of responsibilities has been implemented. The class FileAndFingerprint
has
been introduced as a holder of file+fingerprint data, including the method to compute the MD5:
package it.tidalwave.integritychecker2; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import static java.nio.channels.FileChannel.MapMode.READ_ONLY; public class FileAndFingerprint { private final Path file; private final String fingerPrint; public FileAndFingerprint (final Path file) { this.file = file; this.fingerPrint = computeFingerprint("MD5"); } public Path getFile() { return file; } public String getFingerPrint() { return fingerPrint; } @Override public String toString() { return String.format("FileAndFingerprint(path=%s, fingerPrint=%s)", file.getFileName().toString(), fingerPrint); } private String computeFingerprint (final String algorithm) { try { ... } catch (NoSuchAlgorithmException | IOException e) { return e.getMessage(); } } private static String toString (final byte[] bytes) { ... } }
Ok, so let's go to the main point of this post: the addressing of the problem of saving the intermediate data
(e.g. the list of discovered file) even in case of premature termination. A Storage
facility has
been introduced which encapsulates the persistence semantics:
package it.tidalwave.integritychecker2; import java.nio.file.Path; import java.util.stream.Collector; import java.util.stream.Stream; public interface Storage extends AutoCloseable { public Collector<Path, ?, ? extends Storage> getIntermediateCollector(); public Collector<FileAndFingerprint, ?, ? extends Storage> getFinalCollector(); public Stream<Path> stream(); }
The idea is that Storage
provides the Collector
for the pipeline... indeed, two
different instances of Collector
that make it possible to split in two the original pipeline:
try (final Stream<Path> stream = Files.walk(targetPath, FOLLOW_LINKS); final Storage storage = new FileStorage(targetPath)) { stream.filter(Main::matchesExtension) .peek(progressTracker::notifyDiscoveredFile) .collect(storage.getIntermediateCollector()) .stream() .map(FileAndFingerprint::new) .peek(progressTracker::notifyScannedFile) .collect(storage.getFinalCollector()); }
The split can be noticed by the fact that a collect()
is first invoked with an intermediate Collector
,
thus terminating the former pipeline as soon as possible: this makes it possible to make the full list of
discovered files persistent. The intermediate Collector
returns the Storage
itself,
whose stream()
method starts a latter pipeline, which is the one performing the fingerprint
computation. At last, data are collected by a final Collector
.
The most interesting part of this post is the implementation of FileStorage
:
package it.tidalwave.integritychecker2; import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collector; import java.util.stream.Stream; import static it.tidalwave.util.TimerTaskAdapterFactory.toTimerTask; import static it.tidalwave.util.stream.FileCollector.toFile; import static java.util.Comparator.comparing; public class FileStorage implements Storage { private static final int STORE_INTERVAL = 1000; private final Path storageFile; private final Map<Path, String> map = new ConcurrentHashMap<>(); private final Timer timer = new Timer(); public FileStorage (final Path folder) throws IOException { final Path storageFolder = folder.resolve(".it.tidalwave.solidblue2"); storageFile = storageFolder.resolve("fingerprints-j8.txt"); Files.createDirectories(folder); timer.scheduleAtFixedRate(toTimerTask(this::store), STORE_INTERVAL, STORE_INTERVAL); } @Override public Collector<Path, ?, FileStorage> getIntermediateCollector() { return Collector.of(() -> this, FileStorage::storeItem, (a, b) -> a); } @Override public Collector<FileAndFingerprint, ?, FileStorage> getFinalCollector() { return Collector.of(() -> this, FileStorage::storeItem, (a, b) -> a); } @Override public Stream<Path> stream() { return map.keySet().stream(); } @Override public void close() throws IOException { timer.cancel(); store(); } private void storeItem (final Path file) { map.put(file, "unavailable"); } private void storeItem (final FileAndFingerprint faf) { map.put(faf.getFile(), faf.getFingerPrint()); } private void store() throws IOException { map.entrySet().stream() .sorted(comparing(Map.Entry::getKey)) .map(e -> String.format("MD5(%s)=%s", e.getKey().getFileName().toString(), e.getValue())) .collect(toFile(storageFile, Charset.forName("UTF-8"))); } }
toTimerTask()
is a quick and simple utility to wrap a functional method into a TimerTask
,
allowing the simplification of the syntax.
The core of that class is a ConcurrentHashMap
that can collect data while being accessed at the
same time, without the need of synchronized
blocks. This allows the maximum performance, without
bottlenecks. A timer periodically runs a method that iterates through the map and writes its contents to a file.
The intermediate Collector
accepts the incoming Path
s and creates an entry for each of
them, associate to the value "unavailable"
. The final Collector
overrides the entries
by putting the actual fingerprint.
It's worth while analysing the implementation of the two Collector
s. The accumulator is the FileStorage
object itself, in a single instance, since it supports parallel access. So, the Supplier
always
returns this
. The combiner function is actually useless, since it always receives twice the same
reference at each invocation, but it is mandatory, so it has been implemented to return either of the two
parameters.
Last but not least, FileStorage
implements the AutoCloseable
interface, so it can be
used with try-with-resources.
It's worth noting that FileStorage
is a mutable object, which is not what the functional approach
suggests. Given that the final purpose of FileStorage
is to write data to the disk, I could have
implemented it in a mutable way, e.g. creating further copies of the Map
that holds data. But I
wouldn't have got any practical advantage. The gain from using immutable data structures is the simplification
and optimisation of parallel code, in particular to avoid synchronized
blocks. But in this case ConcurrentHashMap
is already taking care of both issues.
The code described in this post can be found at BitBucket in version 1.0-ALPHA-7, together with some other improvements (for instance, a very simple GUI has been added).