Java 8 parallelism exercise - part IV

In this further step of my Java 8 exercise I'm going to introduce a better persistence facility to SolidBlue II, also addressing one of the issues that I mentioned about the previous implementation.

Before going to the focus point of this post, let me anticipate that also a bit of housekeeping has been performed to the project:

  1. SolidBlue II has now a multi-module structure, in order to be prepared to accommodate new features that I'm developing in a branch.
  2. The SCM has been switched from Mercurial to Git. Mercurial is still my favourite DSCM, but I understand that most people has more acquaintance with Git and these examples are also being used for mentoring of my customers... so Git makes things easier.

Then a better separation of responsibilities has been implemented. The class FileAndFingerprint has been introduced as a holder of file+fingerprint data, including the method to compute the MD5:

package it.tidalwave.integritychecker2;

                    import java.io.IOException;
                    import java.io.RandomAccessFile;
                    import java.nio.MappedByteBuffer;
                    import java.nio.file.Files;
                    import java.nio.file.Path;
                    import java.security.MessageDigest;
                    import java.security.NoSuchAlgorithmException;
                    import static java.nio.channels.FileChannel.MapMode.READ_ONLY;

                    public class FileAndFingerprint
                      {
                        private final Path file;

                        private final String fingerPrint;

                        public FileAndFingerprint (final Path file)
                          {
                            this.file = file;
                            this.fingerPrint = computeFingerprint("MD5");
                          }

                        public Path getFile()
                          {
                            return file;
                          }

                        public String getFingerPrint()
                          {
                            return fingerPrint;
                          }

                        @Override
                        public String toString()
                          {
                            return String.format("FileAndFingerprint(path=%s, fingerPrint=%s)", file.getFileName().toString(), fingerPrint);
                          }

                        private String computeFingerprint (final String algorithm)
                          {
                            try
                              {
                                ...
                              }
                            catch (NoSuchAlgorithmException | IOException e)
                              {
                                return e.getMessage();
                              }
                          }

                        private static String toString (final byte[] bytes)
                          {
                            ...
                          }
                      }
                    

Ok, so let's go to the main point of this post: the addressing of the problem of saving the intermediate data (e.g. the list of discovered file) even in case of premature termination. A Storage facility has been introduced which encapsulates the persistence semantics:

package it.tidalwave.integritychecker2;

                    import java.nio.file.Path;
                    import java.util.stream.Collector;
                    import java.util.stream.Stream;

                    public interface Storage extends AutoCloseable
                      {
                        public Collector<Path, ?, ? extends Storage> getIntermediateCollector();

                        public Collector<FileAndFingerprint, ?, ? extends Storage> getFinalCollector();

                        public Stream<Path> stream();
                      }
                    

The idea is that Storage provides the Collector for the pipeline... indeed, two different instances of Collector that make it possible to split in two the original pipeline:

try (final Stream<Path> stream = Files.walk(targetPath, FOLLOW_LINKS);
                         final Storage storage = new FileStorage(targetPath))
                      {
                        stream.filter(Main::matchesExtension)
                              .peek(progressTracker::notifyDiscoveredFile)
                              .collect(storage.getIntermediateCollector())
                              .stream()
                              .map(FileAndFingerprint::new)
                              .peek(progressTracker::notifyScannedFile)
                              .collect(storage.getFinalCollector());
                      }
                    

The split can be noticed by the fact that a collect() is first invoked with an intermediate Collector, thus terminating the former pipeline as soon as possible: this makes it possible to make the full list of discovered files persistent. The intermediate Collector returns the Storage itself, whose stream() method starts a latter pipeline, which is the one performing the fingerprint computation. At last, data are collected by a final Collector.

The most interesting part of this post is the implementation of FileStorage:

package it.tidalwave.integritychecker2;

                    import java.io.IOException;
                    import java.nio.charset.Charset;
                    import java.nio.file.Files;
                    import java.nio.file.Path;
                    import java.util.Map;
                    import java.util.Timer;
                    import java.util.TimerTask;
                    import java.util.concurrent.ConcurrentHashMap;
                    import java.util.stream.Collector;
                    import java.util.stream.Stream;
                    import static it.tidalwave.util.TimerTaskAdapterFactory.toTimerTask;
                    import static it.tidalwave.util.stream.FileCollector.toFile;
                    import static java.util.Comparator.comparing;

                    public class FileStorage implements Storage
                      {
                        private static final int STORE_INTERVAL = 1000;

                        private final Path storageFile;

                        private final Map<Path, String> map = new ConcurrentHashMap<>();

                        private final Timer timer = new Timer();

                        public FileStorage (final Path folder)
                          throws IOException
                          {
                            final Path storageFolder = folder.resolve(".it.tidalwave.solidblue2");
                            storageFile = storageFolder.resolve("fingerprints-j8.txt");
                            Files.createDirectories(folder);
                            timer.scheduleAtFixedRate(toTimerTask(this::store), STORE_INTERVAL, STORE_INTERVAL);
                          }

                        @Override
                        public Collector<Path, ?, FileStorage> getIntermediateCollector()
                          {
                            return Collector.of(() -> this,
                                                FileStorage::storeItem,
                                                (a, b) -> a);
                          }

                        @Override
                        public Collector<FileAndFingerprint, ?, FileStorage> getFinalCollector()
                          {
                            return Collector.of(() -> this,
                                                FileStorage::storeItem,
                                                (a, b) -> a);
                          }

                        @Override
                        public Stream<Path> stream()
                          {
                            return map.keySet().stream();
                          }

                        @Override
                        public void close()
                          throws IOException
                          {
                            timer.cancel();
                            store();
                          }

                        private void storeItem (final Path file)
                          {
                            map.put(file, "unavailable");
                          }

                        private void storeItem (final FileAndFingerprint faf)
                          {
                            map.put(faf.getFile(), faf.getFingerPrint());
                          }

                        private void store()
                          throws IOException
                          {
                            map.entrySet().stream()
                                          .sorted(comparing(Map.Entry::getKey))
                                          .map(e -> String.format("MD5(%s)=%s", e.getKey().getFileName().toString(), e.getValue()))
                                          .collect(toFile(storageFile, Charset.forName("UTF-8")));
                          }
                      }
                    

toTimerTask() is a quick and simple utility to wrap a functional method into a TimerTask, allowing the simplification of the syntax.

The core of that class is a ConcurrentHashMap that can collect data while being accessed at the same time, without the need of synchronized blocks. This allows the maximum performance, without bottlenecks. A timer periodically runs a method that iterates through the map and writes its contents to a file. The intermediate Collector accepts the incoming Paths and creates an entry for each of them, associate to the value "unavailable". The final Collector overrides the entries by putting the actual fingerprint.

It's worth while analysing the implementation of the two Collectors. The accumulator is the FileStorage object itself, in a single instance, since it supports parallel access. So, the Supplier always returns this. The combiner function is actually useless, since it always receives twice the same reference at each invocation, but it is mandatory, so it has been implemented to return either of the two parameters.

Last but not least, FileStorage implements the AutoCloseable interface, so it can be used with try-with-resources.

It's worth noting that FileStorage is a mutable object, which is not what the functional approach suggests. Given that the final purpose of FileStorage is to write data to the disk, I could have implemented it in a mutable way, e.g. creating further copies of the Map that holds data. But I wouldn't have got any practical advantage. The gain from using immutable data structures is the simplification and optimisation of parallel code, in particular to avoid synchronized blocks. But in this case ConcurrentHashMap is already taking care of both issues.

The code described in this post can be found at BitBucket in version 1.0-ALPHA-7, together with some other improvements (for instance, a very simple GUI has been added).

Comments are managed by Disqus, which makes use of a few cookies. Please read their cookie policy for more details. If you agree to that policy, please click on the button below to accept it. If you don't, you can still enjoy this site without using Disqus cookies, but you won't be able to see and post comments. Thanks.