In this second part of the exercise, I'm first going to demonstrate that the existing code has a problem with parallelism, as I explained in the final notes of the previous post. A good way do to that is to introduce a facility for tracking the progress, by means of four counters:
- the number of discovered files;
- the number of scanned files;
- the total size of discovered files;
- the total size of scanned files.
private final static AtomicInteger discoveryCount = new AtomicInteger(); private final static AtomicInteger scanCount = new AtomicInteger(); private final static AtomicLong discoverySize = new AtomicLong(); private final static AtomicLong scanSize = new AtomicLong();
Even though there's no parallelism so far, I'm anticipating the future parallel refactoring so I used some Java 8
objects that are thread-safe: atomic classes. They can be safely manipulated, in a very efficient way, without the
need of putting them into a synchronized
block:
private static void notifyDiscoveredFile (final Path file) { try { log.info("Discovered {}", file.getFileName()); discoveryCount.incrementAndGet(); discoverySize.accumulateAndGet(Files.size(file), Long::sum); logProgress(); } catch (IOException e) { log.warn("", e); } } private static void notifyScannedFile (final Path file) { try { scanCount.incrementAndGet(); scanSize.accumulateAndGet(Files.size(file), Long::sum); logProgress(); } catch (IOException e) { log.warn("", e); } } private static void logProgress() { final int sc = scanCount.get(); final int dc = discoveryCount.get(); final long ss = scanSize.get(); final long ds = discoverySize.get(); log.info("{}", String.format("Processed files: %d/%d (%d%%) - size: %dMB/%dMB (%d%%)", sc, dc, (100 * sc / dc), ss / 1_000_000, ds / 1_000_000, (100 * ss / ds))); }
While there's clearly no big bottleneck here, in general it's important to avoid synchronized
blocks as they reduce parallelism and hence they may degrade the performance. That's why atomic classes are
useful.
One could point out that logProgress()
is likely to read values that are not entirely consistent,
because the four values are not read in a single atomic step; in other words, scanCount
might be
updated just before discoveryCount
is read, and so we're not taking a proper snapshot of the
system. I'm ignoring this problem at the moment, since it's not clear yet how these data will be used in the
GUI. Anyway, a progress indication is not intended to give a precise readout to the user, rather the impression
that the system is moving on, so this might not be really important.
The call to notifyDiscoveredFile()
can be inserted into the pipeline by means of a peek()
:
final Map<String, String> storage = Files.walk(targetPath, FOLLOW_LINKS) .filter(Main::matchesExtension) .peek(Main::notifyDiscoveredFile) ...
while the call to notifyScannedFile()
can be conveniently put into a finally
block of
the computeFingerprint()
method:
private static String computeFingerprint (final Path file, final String algorithm) { try { ... } catch (NoSuchAlgorithmException | IOException e) { ... } finally { notifyScannedFile(file); } }
The complete changeset for this iteration can be seen as a diff at BitBucket (try the side-by-side view for a really readable view).
Now, running the application and taking a peek at the log, it's clear that the thing is not working as intended (note: the log has been snipped to remove clutter):
23:11:44.640 [main()] INFO Scanning .../FG-2012-0001... 23:11:44.811 [main()] INFO Discovered 20120103-0006.NEF 23:11:44.814 [main()] INFO Processed files: 0/1 (0%) - size: 0MB/18MB (0%) 23:11:44.814 [main()] INFO computeFingerprint(.../FG-2012-0001/20120103-0006.NEF, MD5) 23:11:45.622 [main()] INFO Processed files: 1/1 (100%) - size: 18MB/18MB (100%) 23:11:45.622 [main()] INFO Discovered 20120103-0009.NEF 23:11:45.622 [main()] INFO Processed files: 1/2 (50%) - size: 18MB/36MB (50%) 23:11:45.622 [main()] INFO computeFingerprint(.../FG-2012-0001/20120103-0009.NEF, MD5) 23:11:46.039 [main()] INFO Processed files: 2/2 (100%) - size: 36MB/36MB (100%) 23:11:46.039 [main()] INFO Discovered 20120103-0013.NEF 23:11:46.039 [main()] INFO Processed files: 2/3 (66%) - size: 36MB/53MB (66%) 23:11:46.039 [main()] INFO computeFingerprint(.../FG-2012-0001/20120103-0013.NEF, MD5) 23:11:46.450 [main()] INFO Processed files: 3/3 (100%) - size: 53MB/53MB (100%) 23:11:46.450 [main()] INFO Discovered 20120103-0024.NEF 23:11:46.450 [main()] INFO Processed files: 3/4 (75%) - size: 53MB/71MB (75%) 23:11:46.450 [main()] INFO computeFingerprint(.../FG-2012-0001/20120103-0024.NEF, MD5) 23:11:46.855 [main()] INFO Processed files: 4/4 (100%) - size: 71MB/71MB (100%) 23:11:46.855 [main()] INFO Discovered 20120103-0032.NEF 23:11:46.855 [main()] INFO Processed files: 4/5 (80%) - size: 71MB/89MB (80%) 23:11:46.855 [main()] INFO computeFingerprint(.../FG-2012-0001/20120103-0032.NEF, MD5) 23:11:47.251 [main()] INFO Processed files: 5/5 (100%) - size: 89MB/89MB (100%) 23:11:47.252 [main()] INFO Discovered 20120107-0002.NEF 23:11:47.252 [main()] INFO Processed files: 5/6 (83%) - size: 89MB/107MB (83%)
Since there is a single pipeline and File.walk()
is a sequential source, what we see is that a
single file is discovered, then it is scanned before the next one is discovered, and so on; so discoveryCount
can't be but only a single step ahead of scanCount
.
We have to “break” the pipeline into two separated pipelines. This can be done with a simple change made of two
lines just before the final collect()
:
final Map<String, String> storage = Files.walk(targetPath, FOLLOW_LINKS) .filter(Main::matchesExtension) .peek(Main::notifyDiscoveredFile) .collect(toList()) .stream() .collect(...);
Now the new collect(toList())
starts the first Stream
and collects the results into
an intermediate List<Path>
; the subsequent call to stream()
creates a new Stream
out of it and goes on. The complete changeset for this iteration can be seen as a diff at BitBucket.
The new behaviour can be seen in the log:
23:20:50.137 [main()] INFO Scanning .../FG-2012-0001... 23:20:50.252 [main()] INFO Discovered 20120103-0006.NEF 23:20:50.254 [main()] INFO Processed files: 0/1 (0%) - size: 0MB/18MB (0%) 23:20:50.254 [main()] INFO Discovered 20120103-0009.NEF 23:20:50.255 [main()] INFO Processed files: 0/2 (0%) - size: 0MB/36MB (0%) 23:20:50.255 [main()] INFO Discovered 20120103-0013.NEF 23:20:50.255 [main()] INFO Processed files: 0/3 (0%) - size: 0MB/53MB (0%) 23:20:50.255 [main()] INFO Discovered 20120103-0024.NEF ... 23:20:51.058 [main()] INFO Processed files: 0/216 (0%) - size: 0MB/4160MB (0%) 23:20:51.059 [main()] INFO Discovered 20120429-0034.NEF 23:20:51.145 [main()] INFO Processed files: 1/217 (0%) - size: 18MB/4179MB (0%) 23:20:51.145 [main()] INFO computeFingerprint(.../FG-2012-0001/20120103-0009.NEF, MD5) 23:20:51.543 [main()] INFO Processed files: 2/217 (0%) - size: 36MB/4179MB (0%) 23:20:51.544 [main()] INFO computeFingerprint(.../FG-2012-0001/20120103-0013.NEF, MD5) 23:20:51.970 [main()] INFO Processed files: 3/217 (1%) - size: 53MB/4179MB (1%) ...
Now the discovery Stream
is completed before starting the scanning Stream
, so we have
meaningful progress values.
The behaviour in the original SolidBlue was slightly different: in fact the discovery phase itself was based on
Actors, so it executed in parallel, and the scan phase started as soon as there were the first discovered files.
In other words, there was not a barrier separating the execution of the two phases. This doesn't seem really
important now, since it looks like that Java 8's File.walk()
has an excellent performance, and
scanning the filesystem in parallel might be not worth while. In any case, having the scan Stream
starting as soon as there are the first discovered files might be an interesting exercise for the next steps.
At this point one might easily try to go parallel with the second Stream
:
final Map<String, String> storage = Files.walk(targetPath, FOLLOW_LINKS) .filter(Main::matchesExtension) .peek(Main::notifyDiscoveredFile) .collect(toList()) .parallelStream() .collect(...);
Let's look at the log now:
... 23:39:18.852 [tidalwave.solidblue2.Main.main()] INFO Discovered 20120429-0034.NEF 23:39:18.853 [tidalwave.solidblue2.Main.main()] INFO Processed files: 0/217 (0%) - size: 0MB/4179MB (0%) 23:39:18.864 [tidalwave.solidblue2.Main.main()] INFO computeFingerprint(.../FG-2012-0001/20120317-0029.NEF, MD5) 23:39:18.864 [ForkJoinPool.commonPool-worker-4] INFO computeFingerprint(.../FG-2012-0001/20120428-0056.NEF, MD5) 23:39:18.864 [ForkJoinPool.commonPool-worker-5] INFO computeFingerprint(.../FG-2012-0001/20120429-0022.NEF, MD5) 23:39:18.864 [ForkJoinPool.commonPool-worker-2] INFO computeFingerprint(.../FG-2012-0001/20120428-0138.NEF, MD5) 23:39:18.864 [ForkJoinPool.commonPool-worker-7] INFO computeFingerprint(.../FG-2012-0001/20120428-0119.NEF, MD5) 23:39:18.864 [ForkJoinPool.commonPool-worker-1] INFO computeFingerprint(.../FG-2012-0001/20120216-0096.NEF, MD5) 23:39:18.864 [ForkJoinPool.commonPool-worker-3] INFO computeFingerprint(.../FG-2012-0001/20120216-0012.NEF, MD5) 23:39:18.864 [ForkJoinPool.commonPool-worker-6] INFO computeFingerprint(.../FG-2012-0001/20120107-0035.NEF, MD5) 23:39:23.033 [tidalwave.solidblue2.Main.main()] INFO Processed files: 1/217 (0%) - size: 18MB/4179MB (0%) 23:39:23.033 [tidalwave.solidblue2.Main.main()] INFO computeFingerprint(.../FG-2012-0001/20120317-0030.NEF, MD5) 23:39:23.186 [ForkJoinPool.commonPool-worker-1] INFO Processed files: 2/217 (0%) - size: 37MB/4179MB (0%) 23:39:23.187 [ForkJoinPool.commonPool-worker-1] INFO computeFingerprint(.../FG-2012-0001/20120216-0097.NEF, MD5) 23:39:23.247 [ForkJoinPool.commonPool-worker-2] INFO Processed files: 3/217 (1%) - size: 55MB/4179MB (1%) 23:39:23.247 [ForkJoinPool.commonPool-worker-2] INFO computeFingerprint(.../FG-2012-0001/20120428-0142.NEF, MD5) 23:39:23.415 [ForkJoinPool.commonPool-worker-3] INFO Processed files: 4/217 (1%) - size: 75MB/4179MB (1%) 23:39:23.415 [ForkJoinPool.commonPool-worker-3] INFO computeFingerprint(.../FG-2012-0001/20120216-0019.NEF, MD5) 23:39:23.426 [ForkJoinPool.commonPool-worker-4] INFO Processed files: 5/217 (2%) - size: 95MB/4179MB (2%) 23:39:23.426 [ForkJoinPool.commonPool-worker-4] INFO computeFingerprint(.../FG-2012-0001/20120428-0060.NEF, MD5) 23:39:23.501 [ForkJoinPool.commonPool-worker-7] INFO Processed files: 6/217 (2%) - size: 114MB/4179MB (2%) 23:39:23.501 [ForkJoinPool.commonPool-worker-7] INFO computeFingerprint(.../FG-2012-0001/20120428-0123.NEF, MD5) 23:39:23.593 [ForkJoinPool.commonPool-worker-6] INFO Processed files: 7/217 (3%) - size: 134MB/4179MB (3%) 23:39:23.594 [ForkJoinPool.commonPool-worker-6] INFO computeFingerprint(.../FG-2012-0001/20120107-0037.NEF, MD5) 23:39:24.040 [ForkJoinPool.commonPool-worker-5] INFO Processed files: 8/217 (3%) - size: 153MB/4179MB (3%) 23:39:24.040 [ForkJoinPool.commonPool-worker-5] INFO computeFingerprint(.../FG-2012-0001/20120429-0026.NEF, MD5) ...
As it can be seen, the scan phase runs in different threads, taken from the Java 8 ForkJoinPool - that's how
parallel Stream
s run.
The thing works, but the performance isn't really any better - indeed it's even slightly worse. The problem lies
in the way in which computeFingerprint()
works: actually, disk I/O on a magnetic storage can't be
easily parallelised. Some investigation is needed, but that's some stuff for the next part.
The whole project, made with Maven, is available at BitBucket; the source illustrated in this post is part of 1.0-ALPHA-2.