Java 8 parallelism exercise - part I

A few years ago I started exploring a parallel computing design style based on Actors. That post referenced SolidBlue, the simplest project I've developed with that style. It's a tiny application which recursively scans a folder and computes the MD5 fingerprint of the contents, for the sake of verifying integrity. I'm routinely using it for checking that my photo files are ok, as in the past I've sadly experienced silent failure of the HFS+ filesystem of my MacBook Pro and, unfortunately, after so many years Apple didn't release a new filesystem with embedded integrity checking, à la ZFS or BTRFS. Of course I do have several backups, but verifying that everything is fine on the laptop still has a meaning.

This is a sort of perfect example for studying real-world parallelism, as the specifications are simple, but it not only manipulates data structures in memory: it also performs disk I/O - massively, as the total size of the photo folder exceeds 500GB - and it has some interactions with a GUI. SolidBlue is currently made with the NetBeans Platform as the overall graphic infrastructure, and JavaFX for the details.

After a few years I feel the need to rewrite it from scratch because a number of reasons:

  1. In the end, it has to stay a very simple tool, with a very simple GUI - nothing more than a set of progress bars during the scan. The NetBeans Platform - while it can be shrunk to some bare-bones - is definitely too much and a simple thing with JavaFX would suffice. For my recent UI-based projects I've ever abandoned the NetBeans Platform and moved to a Spring + JavaFX solution, even using some complex technique such as AspectJ. In this very case, Spring and other tools are unnecessary.
  2. Even the Actor-based design itself looks like it's too complex for this simple application. Given also the fact that I've started giving classes on Java 8, and I think I'll soon be mentoring some customer on that technology, I've decided to self-assign to myself the exercise of re-designing SolidBlue parallelism over the bare Java 8 library, that is the one of Streams. Also for keeping the thing as simple as it can, I'm initially restraining myself from using Lombok - which is very powerful and stable in my experience - just not to have too many things on the table at the same moment.

The exercise will be incremental and iterative and in this post I'm just illustrating the very first version, which supports a very simple user story: run from the command line, get the folder path to scan as an argument, and perform the scan in a sequential fashion.

This post assumes that the basic concepts and syntax of Java 8 lambdas and Streams are known.

Let's start with the main():

                    public static void main (final String ... args)
                      throws IOException
                      {
                        final Path targetPath = Paths.get(args[0]);
                        log.info("Scanning {}...", targetPath);
                        final Map<String, String> storage = Files.walk(targetPath, FOLLOW_LINKS)
                                                                 .filter(Main::matchesExtension)
                                                                 .peek(p -> log.info("Discovered {}", p.getFileName()))
                                                                 .collect(toMap(p -> p.getFileName().toString(),
                                                                                p -> computeFingerprint(p, "MD5"),
                                                                                (v1, v2) -> v2));
                        store(targetPath, storage);
                      }
                    

It uses the Files.walk() method, introduced with Java 8, which recursively scans a directory and supports lambdas: the result, in fact, is a Stream<Path>. First it filters files with the required extensions, then produces a log message, and in the end collects everything to a Map<String, String> whose key is the file name and the associated value the MD5 fingerprint. Note that the toMap() method is the overloaded version with a third parameter, which is a BinaryOperator<String>, whose implementing lambda just returns the second operand: (v1, v2) -> v2. This version must be used when the incoming Stream has duplicated values. Indeed, my photos are organised so that they have unique file names (in the form 20150111-0354.ARW), so it shouldn't happen. But there is an old directory from my very first years in photography which uses a different approach and it has a few duplicates. It should really be fixed, but this is not my priority at the moment: so I'm just working around the problem in a way that it doesn't break the program. If I didn't use the third parameter, the code would have raised a runtime exception at the first duplicate key.

The filter to extract files with the required extension is straightforward:

                    private static boolean matchesExtension (final Path file)
                      {
                        final String extension = file.getFileName().toString().replaceAll("^.*\\.", "").toLowerCase();
                        return Files.isRegularFile(file) && FILE_EXTENSIONS.contains(extension);
                      }
                    

The code to compute the fingerprint uses memory-mapped I/O to speed up the operation, even though many things should be said about this part, especially in the perspective of parallelisation. But it's not in the scope of the current post, so the only important thing is that it works.

It's worth mentioning that Streams don't accept checked exceptions in their lambdas, not because of a language limitation, but because the library was purportedly designed in this way. It's a thing that I don't like, but in this case it's not a problem: any error during the scan should be recorded, so it makes sense that the result is always a String, which in case of failure describes the error.

                    private static String computeFingerprint (final Path file, final String algorithm)
                      {
                        try
                          {
                            log.info("computeFingerprint({}, {})", file, algorithm);
                            final MessageDigest digestComputer = MessageDigest.getInstance(algorithm);

                            try (final RandomAccessFile raf = new RandomAccessFile(file.toFile(), "r"))
                              {
                                final MappedByteBuffer byteBuffer = raf.getChannel().map(READ_ONLY, 0, Files.size(file));
                                digestComputer.update(byteBuffer);
                              }

                            return toString(digestComputer.digest());
                          }
                        catch (NoSuchAlgorithmException | IOException e)
                          {
                            return e.getMessage();
                          }
                      }

                    private static String toString (final byte[] bytes)
                      {
                        final StringBuilder builder = new StringBuilder();

                        for (final byte b : bytes)
                          {
                            final int value = b & 0xff;
                            builder.append(Integer.toHexString(value >>> 4)).append(Integer.toHexString(value & 0x0f));
                          }

                        return builder.toString();
                      }
                    

At last there's the code that writes the data to a text file:

                    private static void store (final Path targetPath, final Map<String, String> storage)
                      throws IOException
                      {
                        final Path folder = targetPath.resolve(".it.tidalwave.solidblue2");
                        final Path file = folder.resolve("fingerprints-j8.txt");
                        Files.createDirectories(folder);
                        log.info("Storing results into {} ...", file);

                        try (final PrintWriter w = new PrintWriter(Files.newBufferedWriter(file, Charset.forName("UTF-8"))))
                          {
                            storage.entrySet().stream()
                                              .sorted(comparing(Entry::getKey))
                                              .forEach(e -> w.printf("MD5(%s)=%s\n", e.getKey(), e.getValue()));
                          }
                      }
                    

The written file is something like that:

                    MD5(20150828-0006.NEF)=20cbd8dafe1d02dfc305b18177bdc873
                    MD5(20150829-0004.ARW)=e2f90c58489bd12d89da121b24aab2e5
                    MD5(20150830-0016.ARW)=563cb64b228862b6335905d990e4878c
                    MD5(20150830-0021.ARW)=6919c81409ec43c7c7ce6b0b5d8eb2b7
                    MD5(20150830-0026.ARW)=b2e9175509ffe2d50154e6127cc4ad73
                    MD5(20150830-0028.ARW)=d77469d294130d8cf362312a699dc35d
                    MD5(20150830-0031.ARW)=02937ba68642a1fbc8063dcf0c05ad5c
                    MD5(20150830-0034.ARW)=bd415547599ed168028c2cdb1234b65d
                    MD5(20150830-0046.ARW)=f17e34f24f1b28f6c94b53b98b34ea41
                    MD5(20150830-0052.ARW)=7cba4d7421ccf42605a5f196022e00b6
                    MD5(20150830-0056.ARW)=f056088310ce59b247948b08141d00b7
                    

So, everything is very simple, it fits in a single class and you can read the whole source here; and it is written in functional style, that is, no extra classes, only static methods. It's a good starter.

What is it missing? Well:

  1. Parallelism. While some could be tempted to say that I could just add a parallel() call in the Stream pipeline, it wouldn't work at all - I mean, the computation would stay absolutely sequential. This is because the parallelism in a Stream is an optional operation, that is not always available. In particular, the data source must be designed so it supports parallelism. In this case the problem is the Files.walk() method that works only in a sequential fashion.
  2. Notifications. In the perspective of having a GUI I need to have notifications of the progress in real time. While the notification itself would be just a matter of inserting some stages in the pipeline, the major issue is that in order do compute a percentage of the progress I need to know how many files have been scanned. But since the Stream is sequential, I'll know that value only when the processing has been completed.
  3. The last problem is a subtle requirement of the original SolidBlue. Since the elapsed time of the run is long, sometimes I have to early kill it. In this case I'd like to just have the partial results. First problem: now the result file is only written at the end, so if I kill the process I wouldn't get anything. It would be simple to have store() been called by a shutdown hook, but there is a secondary problem. I keep the fingerprints for every run and I compare them with the diff command to see whether something went wrong, because a file changed its MD5. Of course, from session to session, I could have added some files and deleted others. If I interrupt a run before it's complete, there will be many missing entries (files that have not been discovered yet) that would be misinterpreted as deleted files. So, the original SolidBlue prepares an entry such as MD5(20150111-0354.AWR)=unavailable as soon as a file is discovered, and the results are periodically written to the disk since the beginning. This means that after the file discovery has been completed - in less than a minute - I already have a result file which at least contains all the file names. From now on, it will be periodically updated changing unavailable with the MD5 value.

The whole project, made with Maven, is available at BitBucket; the source illustrated in this post is part of 1.0-ALPHA-1.

Comments are managed by Disqus, which makes use of a few cookies. Please read their cookie policy for more details. If you agree to that policy, please click on the button below to accept it. If you don't, you can still enjoy this site without using Disqus cookies, but you won't be able to see and post comments. Thanks.