Java 8 parallelism exercise - part II

In this second part of the exercise, I'm first going to demonstrate that the existing code has a problem with parallelism, as I explained in the final notes of the previous post. A good way do to that is to introduce a facility for tracking the progress, by means of four counters:

  1. the number of discovered files;
  2. the number of scanned files;
  3. the total size of discovered files;
  4. the total size of scanned files.
                    private final static AtomicInteger discoveryCount = new AtomicInteger();
                    private final static AtomicInteger scanCount = new AtomicInteger();
                    private final static AtomicLong discoverySize = new AtomicLong();
                    private final static AtomicLong scanSize = new AtomicLong();
                    

Even though there's no parallelism so far, I'm anticipating the future parallel refactoring so I used some Java 8 objects that are thread-safe: atomic classes. They can be safely manipulated, in a very efficient way, without the need of putting them into a synchronized block:

                    private static void notifyDiscoveredFile (final Path file)
                      {
                        try
                          {
                            log.info("Discovered {}", file.getFileName());
                            discoveryCount.incrementAndGet();
                            discoverySize.accumulateAndGet(Files.size(file), Long::sum);
                            logProgress();
                          }
                        catch (IOException e)
                          {
                            log.warn("", e);
                          }
                      }

                    private static void notifyScannedFile (final Path file)
                      {
                        try
                          {
                            scanCount.incrementAndGet();
                            scanSize.accumulateAndGet(Files.size(file), Long::sum);
                            logProgress();
                          }
                        catch (IOException e)
                          {
                            log.warn("", e);
                          }
                      }

                    private static void logProgress()
                      {
                        final int sc = scanCount.get();
                        final int dc = discoveryCount.get();
                        final long ss = scanSize.get();
                        final long ds = discoverySize.get();
                        log.info("{}", String.format("Processed files: %d/%d (%d%%) - size: %dMB/%dMB (%d%%)",
                                                     sc, dc, (100 * sc / dc),
                                                     ss / 1_000_000, ds / 1_000_000, (100 * ss / ds)));
                      }
                    

While there's clearly no big bottleneck here, in general it's important to avoid synchronized blocks as they reduce parallelism and hence they may degrade the performance. That's why atomic classes are useful.

One could point out that logProgress() is likely to read values that are not entirely consistent, because the four values are not read in a single atomic step; in other words, scanCount might be updated just before discoveryCount is read, and so we're not taking a proper snapshot of the system. I'm ignoring this problem at the moment, since it's not clear yet how these data will be used in the GUI. Anyway, a progress indication is not intended to give a precise readout to the user, rather the impression that the system is moving on, so this might not be really important.

The call to notifyDiscoveredFile() can be inserted into the pipeline by means of a peek():

                        final Map<String, String> storage = Files.walk(targetPath, FOLLOW_LINKS)
                                                                 .filter(Main::matchesExtension)
                                                                 .peek(Main::notifyDiscoveredFile)
                                                                 ...
                    

while the call to notifyScannedFile() can be conveniently put into a finally block of the computeFingerprint() method:

                    private static String computeFingerprint (final Path file, final String algorithm)
                      {
                        try
                          {
                            ...
                          }
                        catch (NoSuchAlgorithmException | IOException e)
                          {
                            ...
                          }
                        finally
                          {
                            notifyScannedFile(file);
                          }
                      }
                    

The complete changeset for this iteration can be seen as a diff at BitBucket (try the side-by-side view for a really readable view).

Now, running the application and taking a peek at the log, it's clear that the thing is not working as intended (note: the log has been snipped to remove clutter):

                    23:11:44.640 [main()] INFO  Scanning .../FG-2012-0001...
                    23:11:44.811 [main()] INFO  Discovered 20120103-0006.NEF
                    23:11:44.814 [main()] INFO  Processed files: 0/1 (0%) - size: 0MB/18MB (0%)
                    23:11:44.814 [main()] INFO  computeFingerprint(.../FG-2012-0001/20120103-0006.NEF, MD5)
                    23:11:45.622 [main()] INFO  Processed files: 1/1 (100%) - size: 18MB/18MB (100%)
                    23:11:45.622 [main()] INFO  Discovered 20120103-0009.NEF
                    23:11:45.622 [main()] INFO  Processed files: 1/2 (50%) - size: 18MB/36MB (50%)
                    23:11:45.622 [main()] INFO  computeFingerprint(.../FG-2012-0001/20120103-0009.NEF, MD5)
                    23:11:46.039 [main()] INFO  Processed files: 2/2 (100%) - size: 36MB/36MB (100%)
                    23:11:46.039 [main()] INFO  Discovered 20120103-0013.NEF
                    23:11:46.039 [main()] INFO  Processed files: 2/3 (66%) - size: 36MB/53MB (66%)
                    23:11:46.039 [main()] INFO  computeFingerprint(.../FG-2012-0001/20120103-0013.NEF, MD5)
                    23:11:46.450 [main()] INFO  Processed files: 3/3 (100%) - size: 53MB/53MB (100%)
                    23:11:46.450 [main()] INFO  Discovered 20120103-0024.NEF
                    23:11:46.450 [main()] INFO  Processed files: 3/4 (75%) - size: 53MB/71MB (75%)
                    23:11:46.450 [main()] INFO  computeFingerprint(.../FG-2012-0001/20120103-0024.NEF, MD5)
                    23:11:46.855 [main()] INFO  Processed files: 4/4 (100%) - size: 71MB/71MB (100%)
                    23:11:46.855 [main()] INFO  Discovered 20120103-0032.NEF
                    23:11:46.855 [main()] INFO  Processed files: 4/5 (80%) - size: 71MB/89MB (80%)
                    23:11:46.855 [main()] INFO  computeFingerprint(.../FG-2012-0001/20120103-0032.NEF, MD5)
                    23:11:47.251 [main()] INFO  Processed files: 5/5 (100%) - size: 89MB/89MB (100%)
                    23:11:47.252 [main()] INFO  Discovered 20120107-0002.NEF
                    23:11:47.252 [main()] INFO  Processed files: 5/6 (83%) - size: 89MB/107MB (83%)
                    

Since there is a single pipeline and File.walk() is a sequential source, what we see is that a single file is discovered, then it is scanned before the next one is discovered, and so on; so discoveryCount can't be but only a single step ahead of scanCount.

We have to “break” the pipeline into two separated pipelines. This can be done with a simple change made of two lines just before the final collect():

                        final Map<String, String> storage = Files.walk(targetPath, FOLLOW_LINKS)
                                                                 .filter(Main::matchesExtension)
                                                                 .peek(Main::notifyDiscoveredFile)
                                                                 .collect(toList())
                                                                 .stream()
                                                                 .collect(...);
                    

Now the new collect(toList()) starts the first Stream and collects the results into an intermediate List<Path>; the subsequent call to stream() creates a new Stream out of it and goes on. The complete changeset for this iteration can be seen as a diff at BitBucket.

The new behaviour can be seen in the log:

                    23:20:50.137 [main()] INFO  Scanning .../FG-2012-0001...
                    23:20:50.252 [main()] INFO  Discovered 20120103-0006.NEF
                    23:20:50.254 [main()] INFO  Processed files: 0/1 (0%) - size: 0MB/18MB (0%)
                    23:20:50.254 [main()] INFO  Discovered 20120103-0009.NEF
                    23:20:50.255 [main()] INFO  Processed files: 0/2 (0%) - size: 0MB/36MB (0%)
                    23:20:50.255 [main()] INFO  Discovered 20120103-0013.NEF
                    23:20:50.255 [main()] INFO  Processed files: 0/3 (0%) - size: 0MB/53MB (0%)
                    23:20:50.255 [main()] INFO  Discovered 20120103-0024.NEF
                    ...
                    23:20:51.058 [main()] INFO  Processed files: 0/216 (0%) - size: 0MB/4160MB (0%)
                    23:20:51.059 [main()] INFO  Discovered 20120429-0034.NEF
                    23:20:51.145 [main()] INFO  Processed files: 1/217 (0%) - size: 18MB/4179MB (0%)
                    23:20:51.145 [main()] INFO  computeFingerprint(.../FG-2012-0001/20120103-0009.NEF, MD5)
                    23:20:51.543 [main()] INFO  Processed files: 2/217 (0%) - size: 36MB/4179MB (0%)
                    23:20:51.544 [main()] INFO  computeFingerprint(.../FG-2012-0001/20120103-0013.NEF, MD5)
                    23:20:51.970 [main()] INFO  Processed files: 3/217 (1%) - size: 53MB/4179MB (1%)
                    ...
                    

Now the discovery Stream is completed before starting the scanning Stream, so we have meaningful progress values.

The behaviour in the original SolidBlue was slightly different: in fact the discovery phase itself was based on Actors, so it executed in parallel, and the scan phase started as soon as there were the first discovered files. In other words, there was not a barrier separating the execution of the two phases. This doesn't seem really important now, since it looks like that Java 8's File.walk() has an excellent performance, and scanning the filesystem in parallel might be not worth while. In any case, having the scan Stream starting as soon as there are the first discovered files might be an interesting exercise for the next steps.

At this point one might easily try to go parallel with the second Stream:

                        final Map<String, String> storage = Files.walk(targetPath, FOLLOW_LINKS)
                                                                 .filter(Main::matchesExtension)
                                                                 .peek(Main::notifyDiscoveredFile)
                                                                 .collect(toList())
                                                                 .parallelStream()
                                                                 .collect(...);
                    

Let's look at the log now:

                    ...
                    23:39:18.852 [tidalwave.solidblue2.Main.main()] INFO  Discovered 20120429-0034.NEF
                    23:39:18.853 [tidalwave.solidblue2.Main.main()] INFO  Processed files: 0/217 (0%) - size: 0MB/4179MB (0%)
                    23:39:18.864 [tidalwave.solidblue2.Main.main()] INFO  computeFingerprint(.../FG-2012-0001/20120317-0029.NEF, MD5)
                    23:39:18.864 [ForkJoinPool.commonPool-worker-4] INFO  computeFingerprint(.../FG-2012-0001/20120428-0056.NEF, MD5)
                    23:39:18.864 [ForkJoinPool.commonPool-worker-5] INFO  computeFingerprint(.../FG-2012-0001/20120429-0022.NEF, MD5)
                    23:39:18.864 [ForkJoinPool.commonPool-worker-2] INFO  computeFingerprint(.../FG-2012-0001/20120428-0138.NEF, MD5)
                    23:39:18.864 [ForkJoinPool.commonPool-worker-7] INFO  computeFingerprint(.../FG-2012-0001/20120428-0119.NEF, MD5)
                    23:39:18.864 [ForkJoinPool.commonPool-worker-1] INFO  computeFingerprint(.../FG-2012-0001/20120216-0096.NEF, MD5)
                    23:39:18.864 [ForkJoinPool.commonPool-worker-3] INFO  computeFingerprint(.../FG-2012-0001/20120216-0012.NEF, MD5)
                    23:39:18.864 [ForkJoinPool.commonPool-worker-6] INFO  computeFingerprint(.../FG-2012-0001/20120107-0035.NEF, MD5)
                    23:39:23.033 [tidalwave.solidblue2.Main.main()] INFO  Processed files: 1/217 (0%) - size: 18MB/4179MB (0%)
                    23:39:23.033 [tidalwave.solidblue2.Main.main()] INFO  computeFingerprint(.../FG-2012-0001/20120317-0030.NEF, MD5)
                    23:39:23.186 [ForkJoinPool.commonPool-worker-1] INFO  Processed files: 2/217 (0%) - size: 37MB/4179MB (0%)
                    23:39:23.187 [ForkJoinPool.commonPool-worker-1] INFO  computeFingerprint(.../FG-2012-0001/20120216-0097.NEF, MD5)
                    23:39:23.247 [ForkJoinPool.commonPool-worker-2] INFO  Processed files: 3/217 (1%) - size: 55MB/4179MB (1%)
                    23:39:23.247 [ForkJoinPool.commonPool-worker-2] INFO  computeFingerprint(.../FG-2012-0001/20120428-0142.NEF, MD5)
                    23:39:23.415 [ForkJoinPool.commonPool-worker-3] INFO  Processed files: 4/217 (1%) - size: 75MB/4179MB (1%)
                    23:39:23.415 [ForkJoinPool.commonPool-worker-3] INFO  computeFingerprint(.../FG-2012-0001/20120216-0019.NEF, MD5)
                    23:39:23.426 [ForkJoinPool.commonPool-worker-4] INFO  Processed files: 5/217 (2%) - size: 95MB/4179MB (2%)
                    23:39:23.426 [ForkJoinPool.commonPool-worker-4] INFO  computeFingerprint(.../FG-2012-0001/20120428-0060.NEF, MD5)
                    23:39:23.501 [ForkJoinPool.commonPool-worker-7] INFO  Processed files: 6/217 (2%) - size: 114MB/4179MB (2%)
                    23:39:23.501 [ForkJoinPool.commonPool-worker-7] INFO  computeFingerprint(.../FG-2012-0001/20120428-0123.NEF, MD5)
                    23:39:23.593 [ForkJoinPool.commonPool-worker-6] INFO  Processed files: 7/217 (3%) - size: 134MB/4179MB (3%)
                    23:39:23.594 [ForkJoinPool.commonPool-worker-6] INFO  computeFingerprint(.../FG-2012-0001/20120107-0037.NEF, MD5)
                    23:39:24.040 [ForkJoinPool.commonPool-worker-5] INFO  Processed files: 8/217 (3%) - size: 153MB/4179MB (3%)
                    23:39:24.040 [ForkJoinPool.commonPool-worker-5] INFO  computeFingerprint(.../FG-2012-0001/20120429-0026.NEF, MD5)
                    ...
                    

As it can be seen, the scan phase runs in different threads, taken from the Java 8 ForkJoinPool - that's how parallel Streams run.

The thing works, but the performance isn't really any better - indeed it's even slightly worse. The problem lies in the way in which computeFingerprint() works: actually, disk I/O on a magnetic storage can't be easily parallelised. Some investigation is needed, but that's some stuff for the next part.

The whole project, made with Maven, is available at BitBucket; the source illustrated in this post is part of 1.0-ALPHA-2.

Comments are managed by Disqus, which makes use of a few cookies. Please read their cookie policy for more details. If you agree to that policy, please click on the button below to accept it. If you don't, you can still enjoy this site without using Disqus cookies, but you won't be able to see and post comments. Thanks.