Java 8 parallelism exercise - part III

In the third part of this exercise I'm going to focus on an issue related to just a few lines of code, that is quite interesting: partly because it involves a minor quirk in Java 8, partly because the understanding of it and its work-around is a good way to deepen the comprehension of lambdas.

Before moving to the main topic, there are a few things to say. First, I have to mention that the source code underwent a slight refactoring, in which:

  1. I encapsulated everything related to progress tracking into a separate ProgressTracker class.
  2. I turned some methods in the Main class from static to regular instance method - this means that now Main is instantiated before being executed.
  3. I also renamed the package, but this is just a cosmetic thing.

There could be many things to say about this transformation, which converts the original, functional “look” into something traditionally object-oriented. But the scope of SolidBlue2, so far, is so simple that there is not enough context to make serious considerations about this topic.

Second, the scanning of the filesystem has been improved with a try-with-resources:

try (final Stream<Path> s = Files.walk(targetPath, FOLLOW_LINKS))
  {
    final Map<String, String> storage = s.filter(Main::matchesExtension)
                                         .peek(progressTracker::notifyDiscoveredFile)
                                         .collect(toList())
                                         .stream()
                                         .collect(toMap(p -> p.getFileName().toString(),
                                                        p -> computeFingerprint(p, "MD5"),
                                                        (v1, v2) -> v2));
    store(targetPath, storage);
  }

Stream, actually, has got a close() method. When the Stream source is in memory there's no need to close it, as the memory resources are handled by the Garbage Collector as usual. But when the source opens some system resources (such as sockets or files), the Stream should be explicitly closed. SolidBlue2 is currently a tiny command line application that terminates after the scan, so this aspect is not relevant now; but since it is going to become a full-fledged application with its own user interface, it's better to properly handled the thing before I forget. The best way to do that is to use try-with-resources, since Stream implements AutoCloseable.

Ok. Now we can focus on the main theme of this post. Let's look at the method that stores data into a file:

private void store (final Path targetPath, final Map<String, String> storage)
  throws IOException
  {
    final Path folder = targetPath.resolve(".it.tidalwave.solidblue2");
    final Path file = folder.resolve("fingerprints-j8.txt");
    Files.createDirectories(folder);
    log.info("Storing results into {} ...", file);

    try (final PrintWriter w = new PrintWriter(Files.newBufferedWriter(file, Charset.forName("UTF-8"))))
      {
        storage.entrySet().stream()
                          .sorted(comparing(Entry::getKey))
                          .forEach(e -> w.printf("MD5(%s)=%s\n", e.getKey(), e.getValue()));
      }
  }

While the code is fine and it's also robust, as it uses try-with-resources, one might ask whether Java 8 offers a less verbose way to do a simple operation such as writing some strings into a file. The verbosity related to BufferedWriter and PrintWriter is the thing I'm aiming at. In Java 8, actually, the Files class has been augmented with a couple of methods (I'm omitting some non relevant stuff):

public final class Files 
  {
    public static Stream<String> lines (Path path)
      throws IOException;
public static Path write (Path path, Iterable<? extends CharSequence> iterable) throws IOException;

... }

They are pretty useful as they encapsulate all the technicalities about opening and closing a file and provide a good abstraction that only focuses on the data to be read and written. Thus one might be induced to write something such as:

Files.write(file, 
            storage.entrySet().stream()
                              .sorted(comparing(Entry::getKey))
                              .map(e -> String.format("MD5(%s)=%s", e.getKey(), e.getValue())), 
            Charset.forName("UTF-8"));
    

Unfortunately this fails in compilation, because Stream<T> doesn't extend Iterable<T>. This is pretty curious, since Stream<T> does provide indeed a public Iterator<T> iterator() method (as well as another couple of methods defined in Iterable). The thing has been already debated at StackOverflow, where one of the answers was more or less “the thing is deliberate, because programmers are used to the fact that they can call iterator() on a collection multiple times, while Stream can be used only once“. Actually, one should recall that Stream is not a collection, rather a manipulator of collections of data. And Streams have been designed with the internal iteration concept in mind, while iterators are made for external iteration.

To have an authoritative answer I've asked about this issue at the core-libs-dev mailing list and got a very exhaustive reply from Remi Forax, including this confirmation:

iterator() is not a 'real' method of Stream, it's a kind of escape hatch that you can use if you have an API that takes an Iterator as parameter but it will be slow.

Given that anyway Stream<T> provides all the methods in Iterable<T>, one might be tempted to try a cast (I've slightly rewritten the code chunk for better readability):

final Stream<String> s = storage.entrySet().stream()
                                           .sorted(comparing(Entry::getKey))
                                           .map(e -> String.format("MD5(%s)=%s", e.getKey(), e.getValue()));
Files.write(file, (Iterable<String>)s, Charset.forName("UTF-8"));

But this is clearly a naïve approach: you can fool the compiler, nevertheless you will get a ClassCastException at runtime. After all, Java is still a strongly typed language, both statically and dynamically, and the JVM cannot but tell you that Stream<T> doesn't extend Iterable<T>. A safe and quick way to “convert” a Stream<T> into an Iterable<T> would be to just add collect(toList()), but this would accumulate all the products into a memory buffer. No problem with smaller scenarios, but it could be an issue for larger amounts of data. And it would be a pity to waste memory for an operation that might be inherently pipeline-oriented.

Stream.sorted(), like in the code example above, is already buffering all data into a list, because it's required by the sorting algorithm. Anyway I'd like to discuss a general case in which there could be no sorting.

But there's a trick:

final Stream<String> s = storage.entrySet().stream()
                                           .sorted(comparing(Entry::getKey))
                                           .map(e -> String.format("MD5(%s)=%s", e.getKey(), e.getValue()));
Files.write(file, (Iterable<String>)s::iterator, Charset.forName("UTF-8"));

This compiles and works. What's happening here? s::iterator is a method reference which matches a Supplier<Iterator<T>>. By matching, in Java 8 we mean that the two things are both functional interfaces sharing the same “functional descriptor”: Java 8 doesn't care of the method names, but of the fact that they have the same parameters and return type (and eventually the throw clause). Since also Iterable<T> matches a Supplier<Iterator<T>>, s::iterator can be assigned to an Iterable<String>.

So, why the explicit cast (Iterable<String>)? It is required indeed, otherwise the compiler would complain again:

error: no suitable method found for write(Path,s::iterator,Charset)

The reason for this fact is more obscure. Again, Remi Forax explains that:

The conversion from a method reference to a functional interface doesn't work if the target type is parameterized by wildcard. This is specified like this in the Java spec. I consider that this is a bug but this is actually what the spec says. I hope we will be able to fix that in Java 10, but that just my opinion.

The problem is the wildcard, since Files.write() accepts an Iterable<? extends CharSequence>. So we have to live with that cast, that it's just a minor annoyance after all. Unless this RFE is accepted for Java 10...

A better approach

Note that first comment of the RFE: the idea has been suggested to write a specific Collector to do the job as an alternate solution - it is a smart idea IMHO, as what we're actually doing is a collection of the Stream products into a file. I've already posted about how to write a custom Collector. A possible solution is:

public final class FileCollector implements Collector<String, PrintWriter, Void>
  {
    private final PrintWriter pw;

    private final AtomicBoolean parallelChecker = new AtomicBoolean();

    public static Collector<String, ?, ?> toFile (final Path file,
                                                  final Charset charset,
                                                  final OpenOption ... openOptions)
      throws IOException
      {
        return new FileCollector(file, charset, openOptions);
      }

    private FileCollector()
      {
        throw new UnsupportedOperationException();
      }

    private FileCollector (final Path file, final Charset charset, final OpenOption ... openOptions)
      throws IOException
      {
        pw = new PrintWriter(Files.newBufferedWriter(file, charset, openOptions));
      }

    @Override
    public Supplier<PrintWriter> supplier()
      {
        return this::oneShotPrintWriterSupplier;
      }

    @Override
    public BiConsumer<PrintWriter, String> accumulator()
      {
        return PrintWriter::println;
      }

    @Override
    public BinaryOperator<PrintWriter> combiner()
      {
        return (a, b) -> { fail(); return null; }; // never called
      }

    @Override
    public Function<PrintWriter, Void> finisher()
      {
        return pw -> { pw.close(); return null; };
      }

    @Override
    public Set<Characteristics> characteristics()
      {
        return Collections.emptySet();
      }

    private PrintWriter oneShotPrintWriterSupplier()
      {
        if (parallelChecker.getAndSet(true))
          {
            fail();
          }

        return pw;
      }

    private void fail()
      {
        pw.close();
        throw new IllegalStateException("Can't be used with a parallel Stream!");
      }
  }

Basically, we're saying that this collector can be applied to a Stream of Strings, it uses a PrintWriter as the accumulator and doesn't produce a Java object as the result (that's why I used Void as the third type parameter). The implementation is straightforward, but there is a point that must be dealt with: parallelism. Collectors in fact are designed to work in parallel if the incoming Stream is parallel; but writing objects to a file can't be a parallel operation. Eventually, the incoming Stream must be turned into a sequential one before calling collect(); and our Collector should perform a safety check. Unfortunately the Collector itself never sees the Stream it is applied to; but we can assume that it is being run in parallel if the Supplier of the accumulator is called more than once (note that the check might be triggered after that some items have been already processed and written to the file). At this point, it is safe to assume (at least with the current implementation of the Java 8 runtime...) that the combiner is never called.

The code can be tested with TestNG:

private Collector<String, ?, ?> underTest;

...

@BeforeMethod
public void setup()
  throws IOException
  {
    underTest = FileCollector.toFile(file, Charset.forName("UTF-8"));
  }

...

@Test(invocationCount = 100,
      expectedExceptions = IllegalStateException.class,
      expectedExceptionsMessageRegExp = ".*Can't be used with a parallel Stream!")
public void must_fail_with_a_parallel_stream()
  {
    IntStream.range(0, 10000)
             .mapToObj(n -> "String #" + n)
             .collect(toList())
             .parallelStream()
             .collect(underTest);
  }

Note that not specifying CONCURRENT in the characteristics does not ensure that the Collector is run in a sequential fashion: it only means that the accumulator cannot be manipulated in a concurrent way, but with a parallel Stream the runtime might still use multiple accumulators in parallel (synchronizing the calls to the Collector that manipulate them). As per Java 8, there is no way to surely force a Collector to run in a sequential fashion with a parallel Stream.

It's possible to write the Collector in a radically shorter fashion. Given that in the end it's just an aggregate of four lambdas, a factory method Collector.or() might be used by providing them inline. Note that this is possible even when there are status variables, as they can be turned into variables in the scope of the factory method. Since lambdas can be usually written in a single line, the result is shorter. Below the shorter code, even though I'm unsure what version is more readable, especially considering a reader that is at his first experience with Java 8.

Lambdas can access variables declared in the enclosing scope, but they must be final, as it happens with inner classes. The compiler doesn't require them to be declared final, it suffices that they are not re-assigned (“effectively final”). Nevertheless I prefer to always use the final keyword every time something is final, for better readability.

public final class FileCollector
  {
    public static Collector<String, ?, ?> toFile (final Path file,
                                                  final Charset charset,
                                                  final OpenOption ... openOptions)
      throws IOException
      {
        final PrintWriter pw = new PrintWriter(Files.newBufferedWriter(file, charset, openOptions));
        final AtomicBoolean parallelChecker = new AtomicBoolean();

        return Collector.of(() -> safeGetPrintWriter(parallelChecker.getAndSet(true), pw),
                            PrintWriter::println,
                            (a, b) -> safeGetPrintWriter(true, pw), // never called
                            pw2 -> { pw2.close(); return null; });
      }

    private FileCollector()
      {
        throw new UnsupportedOperationException();
      }

    private static PrintWriter safeGetPrintWriter (final boolean condition, final PrintWriter pw)
      {
        if (condition)
          {
            pw.close();
            throw new IllegalStateException("Can't be used with a parallel Stream!");
          }

        return pw;
      }
  }

With the new Collector this point the original code can be refactored into:

private void store (final Path targetPath, final Map<String, String> storage)
  throws IOException
  {
    final Path folder = targetPath.resolve(".it.tidalwave.solidblue2");
    final Path file = folder.resolve("fingerprints-j8.txt");
    Files.createDirectories(folder);
    log.info("Storing results into {} ...", file);
    storage.entrySet().stream()
                      .sorted(comparing(Entry::getKey))
                      .map(e -> String.format("MD5(%s)=%s", e.getKey(), e.getValue()))
                      .collect(toFile(file, Charset.forName("UTF-8")));
  }

One relevant part of Remi's comment was about performance. Perhaps in most cases we can safely ignore the performance hit (for instance, when we have just a few lines to write). Since I've started grokking with the Java Micro Benchmarking Harness (JMH) it could be a useful exercise to measure the performance of the alternate solutions of the problem I presented here. Stuff for a further post.

The whole project, made with Maven, is available at BitBucket; the source illustrated in this post is part of 1.0-ALPHA-3.

Comments are managed by Disqus, which makes use of a few cookies. Please read their cookie policy for more details. If you agree to that policy, please click on the button below to accept it. If you don't, you can still enjoy this site without using Disqus cookies, but you won't be able to see and post comments. Thanks.