Actors in Java

This is a reviewed version of an original post published at Java.Net.

Recently there has been a renewed interest into the Actor programming model. The Actor Model actually comes from the '70s, but as far as I'm aware it has been used only in a very limited subset of industrial projects outside the area of telecoms. Erlang (not by chance developed at Ericsson, a telecom industry), which is a language whose concurrency model is actor oriented, is getting some attention but it's definitely a niche kind of a language. More interesting is the fact that the Scala language developed its own actor-based platform, Akka, which is also available to Java (and, generally speaking, to JVM-based languages). While there might (a-hum) be a lot of hype about Scala and I'm pretty oriented to apply a big low-pass filter on trendy and cool stuff, actors have definitely their place in the world when you're involved with complex concurrency. Once upon a time complex concurrency was a matter only for specific industrial segments (e.g. the previously cited telecom industry), but since things such as multi-cores and "the cloud" are invading our battlefields it's better to get prepared.

BTW, cloud and multi-core apart, I've already seen a lot of production code with scattered synchronized sections that "perhaps shouldn't be there, but the thing works and it's better to let them stay". Note that this argument is the typical smell of lack of testing, but concurrency is such a complex thing to test that even when the customer is a good guy and achieves a very high coverage, often he can't be sure. The risk is to have some unprotected section that sooner or later will lead to some inconsistency or raise an exception, up to the scary possibility that a deadlock occurs. The better way to deal with concurrency is to pick a programming model that avoids problems by costruction, and actors can be a good choice.
 
I've started doing some coding in the area and while I'm still far from a conclusion it's high time I posted some considerations.

Actors and Akka

So, what are actors about? These are the basic principles:

  1. No shared (mutable) data. Each actor manages its own slice of data and there's nothing shared with others. Thus no need for synchronized.
  2. Lightweight processes. Threads are still good, since they scale well; but, as per the previous point, they must be isolated.
  3. Communicate through Asynchronous Messages. What is traditionally done by invoking a method from an object to another now is done by sending an asynchronous message. The sender is never blocked, and the receiver gets all inbound messages enqueued into a 'mailbox', from which they will be consumed one at a time per each actor instance.
  4. Messages are immutable. This should be inferred by the previous points (otherwise bye bye "no shared mutable data"), but it's a good advice to stress the point.

With actors the contract of a computational entity is no more represented by an interface (that is an enumeration of methods) and behaviour, but by the set of messages that the actor can receive and send, and behaviour. Copying the first example in the Java API for Akka, a simple actor can be:

import akka.actor.UntypedActor;
                    import akka.event.EventHandler;

                    public class SampleUntypedActor extends UntypedActor
                      {
                        public void onReceive (Object message) throws Exception
                          {
                            if (message instanceof String)
                              {
                                getContext().replyUnsafe("Hello " + message);
                              }
                            else
                              {
                                throw new IllegalArgumentException("Unknown message: " + message);
                              }
                          }    
                      }

Actors, being managed objects (and possibly distributed on a network), can't be directly accessed, but must be handled by references. For instance, to send a message to the previous actor:

import static akka.actor.Actors.*;

                    final ActorRef myActor = actorOf(SampleUntypedActor.class);
                    actor.start();
                    ...
                    actor.tell("world");

Now Akka has a considerable value in being tested and optimized - performance is one of the reasons for which you might want to use actors and it's not easy to achieve and Akka is known to scale to impressive numbers. But while messages can be delivered in different fashions, unfortunately Akka seems not to be supporting publish & subscribe which is my preferred way to go (I've seen references to extensions and possibly this feature could be introduced in future). This is a first showstopper for me. Furthermore, basic concepts are simple but, as it unfortunately often occurs, things get more complicated as you start digging into details. Also, you note the total lack of syntactic sugar, as the incoming message is a vanilla Object and you need a cascade of if (message instanceof ...) to deal with multiple messages. At least, for the things Akka calls untyped actors: there are typed actors too, which can be even defined again by an interface to mimic method calling, that Akka will manipulate through AOP to transform method invocation in message passing. An approach that I don't like, because I think that in a message-passing software the approach must be evident in source code, not masked under something that pretend to be method invocation; furthermore, method invocation semantics are incompatible with publish & subscribe. So, I'd say that with Akka either you have no sugar or you're going to sink into honey.

Anyway, just before publishing this blog post, I've learned that Akka 2.0 is going to have publish & subscribe. Excellent news. In the meantime?

Understanding what I want

In the meantime, I've started sketching some code on my own, to understand how I'd like to use actors. The idea is to learn things and then try to converge to Akka, eventually with a thin layer of syntactic sugar. There's enough work so far that's worth while a blog post, both for my personal need of writing down things and to discuss with others.

Since things must be given a real context I've started thinking of desktop applications, in particular two simple tools that I need: SolidBlue computes the fingerprint of the files contained in a directory (I use it for periodically checking the sanity of my photos and related backups) and blueShades provides a GUI for Argyll, a tool for color management, suited to my needs. Both are being implemented with the NetBeans Platform.

So let's focus on SolidBlue and its main feature. The scenario is:

  1. You select a directory.
  2. The application scans the directory recursively.
  3. For each discovered file, contents are loaded and the MD5 fingerprint is computed.
  4. Each MD5 computed fingerprint is stored into a flat file.
  5. A UI monitors the workflow displaying the current status on the screen.

The flow can be parallelised. For instance, directory recursive inspection can be performed by multiple threads, as well as the MD5 computation, which only involves the CPU. On the other hand, loading from the disk is something that it's better to serialize (that is, reading only one file at a time) in order to maximize the disk throughput (which it's the true bottleneck of the application). Indeed, things are a bit more complex than I've just said, but I'll be back on the issue later.

Actors in SolidBlue

Note: in the following code samples, I'm making use of Lombok annotations for some plumbing. This is not only great for everyday's work, but it also makes code samples much easier to read. Also, I'm only listing significant import statements, as well as skipping some non relevant code chunks.

Everything starts by sending to the system a message asking for a scan:

FileScanRequestMessage.forFolder("/Volume/Media/Photos")
                                          .withFilter(withExtensions("NEF", "JPG", "TIF"))
                                          .send();

As you can see, there's no target actor, but according to the "publish and subscribe" approach the message just sends itself. The listing for FileScanRequestMessage is below:

package it.tidalwave.integritychecker;

                    import it.tidalwave.actor.MessageSupport;
                    import it.tidalwave.actor.annotation.Message;

                    @Message @Immutable @RequiredArgsConstructor(access=PRIVATE) @EqualsAndHashCode @ToString(callSuper=false)
                    public class FileScanRequestMessage extends MessageSupport
                      {
                        public static interface Filter
                          {       
                            public boolean accepts (@Nonnull FileObject fileObject); 
                          }

                        @Nonnull @Getter
                        private final FileObject folder; 

                        @Nonnull @Getter
                        private final Filter filter; 

                        @Nonnull
                        public static FileScanRequestMessage forFolder (final @Nonnull FileObject fileObject)
                          {
                            return new FileScanRequestMessage(fileObject, Filter.ANY);
                          }   

                        @Nonnull
                        public FileScanRequestMessage withFilter (final @Nonnull Filter filter)
                          { 
                            return new FileScanRequestMessage(folder, filter); 
                          }
                      }

Somewhere there's a FileObjectDiscoveryActor that listens to that message:

package it.tidalwave.integritychecker.impl;

                    import it.tidalwave.actor.annotation.Actor;
                    import it.tidalwave.actor.annotation.ListensTo;

                    @Actor @ThreadSafe
                    public class FileObjectDiscoveryActor
                      {
                        public void onScanRequested (final @ListensTo FileScanRequestMessage message)
                          {
                            for (final FileObject child : message.getFolder().getChildren())
                              {
                                if (message.getFilter().accepts(child))
                                  {
                                    (child.isFolder() ? FileScanRequestMessage.forFile(child).withFilter(message.getFilter())
                                                      : FileDiscoveredMessage.forFile(child)).send();
                                  }
                              }
                          }
                      }

You can see here my syntactic sugar: the actor is a POJO and methods designated to receive messages are annotated by @ListensTo. Multiple methods can listen to multiple messages. The annotation is similar to @Observes by CDI (JSR-299) - actually I'd love to reuse it, but it seems to have different semantics (e.g. by default it can activate managed objects that are the target of an inbound message; furthermore there are multiple delivery options related to transactionality that at the moment I'm not interested into).

Recursive directory navigation is implemented by sending another FileScanRequestMessage. When a regular file is detected, a FileDiscoveredMessage is fired (there's no need to see its listing, as it's straightforward).

Inbound messages are first enqueued and then dispatched in a FIFO fashion (with exceptions explained later). If the actor is not thread safe, there's the guarantee that it can be engaged by a single thread at any time (thus, messages are processed one per time). In this way there's no need to have synchronized sections. If the actor is thread safe (the default), as FileDiscoveredMessage that's even stateless, it can be engaged by multiple threads at the same time. The thread safety property is specified as an attribute of @Actor (unfortunately @ThreadSafe can't help since its retention is compile time).

The second actor is the one that reads data from files: 

package it.tidalwave.integritychecker.impl;

                    import it.tidalwave.actor.annotation.Actor;
                    import it.tidalwave.actor.annotation.ListensTo;

                    @Actor(initialPriority=Thread.MAX_PRIORITY) @ThreadSafe
                    public class FileLoaderActor
                      {
                        public void onFileDiscovered (final @ListensTo FileDiscoveredMessage message)
                          {
                            final FileObject fileObject = message.getFileObject();

                            try
                              {
                                final File file = FileUtil.toFile(fileObject);
                                final @Cleanup RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
                                final MappedByteBuffer byteBuffer = randomAccessFile.getChannel().map(READ_ONLY, 0, file.length()); 
                                byteBuffer.load();
                                randomAccessFile.close();
                                new FileContentsAvailableMessage(fileObject, byteBuffer).send();
                              }
                            catch (Exception e)
                              {
                                FileDamageDetectedMessage.forFile(fileObject).withCause(e).send();
                              }
                          }
                      }	

It uses memory-mapped I/O for speed and fires a FileContentsAvailableMessage carrying the bytes, or a FileDamageDetectedMessage in case of error.

The next stage is the actor that computes the fingerprint:

package it.tidalwave.integritychecker.impl;

                    import it.tidalwave.actor.annotation.Actor;
                    import it.tidalwave.actor.annotation.ListensTo;

                    @Actor @ThreadSafe
                    public class FingerprintComputerActor
                      {
                        public void onFileContentsAvailable (final @ListensTo FileContentsAvailableMessage message)
                          throws NoSuchAlgorithmException
                          {
                            final String algorithm = "MD5";
                            final MessageDigest digestComputer = MessageDigest.getInstance(algorithm);
                            digestComputer.update(message.getContents());
                            final Fingerprint fingerprint = new Fingerprint(algorithm, toString(digestComputer.digest()));
                            FingerprintComputedMessage.forFile(message.getFileObject()).withFingerprint(fingerprint).send();
                          }
                      }

Again, a FingerprintComputedMessage message carrying the result is fired. The last stage is the persistence actor:

package it.tidalwave.integritychecker.impl;

                    import it.tidalwave.actor.CollaborationStartedMessage;
                    import it.tidalwave.actor.MessageSupport;
                    import it.tidalwave.actor.annotation.Actor;
                    import it.tidalwave.actor.annotation.ListensTo;
                    import it.tidalwave.actor.annotation.Message;
                    import it.tidalwave.actor.annotation.OriginatedBy;

                    @Actor(threadSafe=false) @NotThreadSafe
                    public class PersistenceManagerActor
                      {
                        @Message(outOfBand=true) @ToString

                        static class FlushRequestMessage extends MessageSupport
                          {
                          }

                        private final Map<String, Object> map = new TreeMap<String, Object>();

                        private File persistenceFile;

                        private boolean flushPending;

                        public void onScanStarted (final @ListensTo CollaborationStartedMessage cMessage,
                                                   final @OriginatedBy FileScanRequestMessage message)
                          {
                            final File folder = FileUtil.toFile(message.getFolder());               
                            final String fileName = new SimpleDateFormat("'fingerprints-'yyyyMMdd_HHmm'.txt'").format(new Date());
                            persistenceFile = new File(folder, fileName);
                            map.clear();
                          }

                        public void onFileDiscovered (final @ListensTo FileDiscoveredMessage message)
                          {
                            final String name = message.getFileObject().getNameExt();

                            if (!map.containsKey(name)) // message sequentiality is not guaranteed
                              {
                                map.put(name, "unavailable"); 
                                requestFlush();
                              }
                          }

                        public void onFingerprintComputed (final @ListensTo FingerprintComputedMessage message)
                          {
                            final String name = message.getFileObject().getNameExt();
                            final Fingerprint fingerprint = message.getFingerprint();
                            map.put(name, fingerprint);
                            requestFlush();
                          }

                        private void onFlushRequested (final @ListensTo FlushRequestMessage message)
                          throws InterruptedException, IOException
                          {
                            flushPending = false;
                            // writes data to file
                         }

                        private void requestFlush()
                          {
                            if (!flushPending)
                              {
                                new FlushRequestMessage().sendLater(5, TimeUnit.SECONDS);
                                flushPending = true;
                              }
                          }
                      }

The basic idea is not to write to disk every piece of data as it's generated, since it might be expensive (well, perhaps writing to a relational database wouldn't be, but for a simple tool such as SolidBlue it makes more sense to write to a flat file). So, data are first put into an in-memory map (a record is generated as soon as a file has been discovered and later overwritten with the final result) that is periodically flushed to disk. So, the actor is stateful, hence it's not thread safe (there must be only one instance). Whenever the map is changed, it's marked 'dirty' and a request to flush it to the disk is generated. The flush will happen within 5 seconds, but it's not a good idea to use a Java Timer as I'd run into the threading problems that I want to avoid. Instead, a private FlushRequestMessage is sent back to itself with some delay, triggering the write to the disk. This message is annotated as 'out-of-band', it means that must delivered with high priority (in practice, it's placed at the head of the incoming message queue rather than the tail). Since the runtime guarantees that only a single message can be processed by any actor instance at any time, there's no need to protect the map, or the  flushPending flag, by synchronized sections.

The onScanStarted() method is interesting, because it introduces the concept of collaborations.

Collaborations

Actually, asynchronicity is excellent for massively concurrent system (as well as, not to say, to model things that are asynchronous in nature, and there are many). But it's nice to know, in some way, that a sequence of things that happened in response to a message of yours has been completed. In my code I've called this concept “Collaboration”:

  1. Any message is always part of a Collaboration.
  2. If the thread creating a certain message is not bound to a Collaboration, a new Collaboration is created and the message will be considered its originator.
  3. The originator message will be delivered in threads bound to its Collaboration. This means that any further message, created as a consequence of the reception of the originator message, will share its Collaboration.
  4. A Collaboration keeps tracks of all the related messages and threads by means of reference counting.
  5. When there are no more pending messages or working threads for a Collaboration, it is considered completed.

Two special messages, CollaborationStartedMessage and CollaborationTerminatedMessage, are fired by the runtime to notify of the creation and completion of collaborations.

When a message is sent, you can get its Collaboration object:

final Collaboration collaboration = new MyMessage().send();

and any listening method can get the Collaboration as well:

public void onMyMessage (final @ListensTo MyMessage message)
                      {
                        final Collaboration collaboration = message.getCollaboration();
                        ...
                      }

By construction, thus, a Collaboration is attached to all the threads that are directly or indirectly triggered by the originating message. This could be used for managing regular transactions (for instance, attaching a javax.transaction.UserTransaction to the Collaboration). But actors seem to be better dealing with Software Transactional Memory, so I drop this argument for now.

It's even possible to synchronously waiting for a Collaboration to complete:

collaboration.waitForCompletion();

even though I think it's only useful for tests.  More interesting is the case in which a Collaboration includes bidirectional interactions with a user: for instance, making a dialog box to pop up and wait for a selection. In these circumstances, a Collaboration can be suspended (this means it is not considered to be terminated even though there are no pending messages or working threads) and later resumed. More details in a further post.

Collaborations is something that I don't expect to find in platforms such as Akka, since I'm not sure it's feasible to have them working in an efficient way in a distributed context (I mean: maybe yes, maybe not, I haven't studied the problem yet). But they are a nice feature to have if possible and perhaps it's possible to implement it on top of an existing actor platform.

Back to the method onScanStarted(), it is clear now that it listens to the CollaborationStartedMessage messages originated by a FileScanRequestMessage to initialize the storage of the fingerprints, which must be located in the top scanned directory.

In a similar way, the IntegrityCheckerPresentationControllerActor listens to the various messages being delivered, including those notifying the beginning and the end of the Collaboration, to update the user interface:

package it.tidalwave.integritychecker.ui.impl;

                    import it.tidalwave.actor.MessageSupport;
                    import it.tidalwave.actor.Collaboration;
                    import it.tidalwave.actor.CollaborationCompletedMessage;
                    import it.tidalwave.actor.CollaborationStartedMessage;
                    import it.tidalwave.actor.annotation.Actor;
                    import it.tidalwave.actor.annotation.ListensTo;
                    import it.tidalwave.actor.annotation.Message;
                    import it.tidalwave.actor.annotation.OriginatedBy;

                    @Actor(threadSafe=false) @NotThreadSafe
                    public class IntegrityCheckerPresentationControllerActor
                      {
                        private static final double K10 = 1000;

                        private static final double M10 = 1000000;

                        @Message(outOfBand=true, daemon=true) @ToString
                        private static class RefreshPresentationMessage extends MessageSupport
                          {
                          }

                        private final IntegrityCheckerPresentationBuilder presentationBuilder = Locator.find(IntegrityCheckerPresentationBuilder.class);

                        private final IntegrityCheckerPresentation presentation = presentationBuilder.createPresentation();

                        private final Statistics statistics = new Statistics();

                        private boolean refreshing = false;

                        private int totalFileCount;

                        private long totalDataSize;

                        private int processedFileCount;

                        private long processedDataSize;

                        public void onScanStarted (final @ListensTo CollaborationStartedMessage cMessage,
                                                   final @OriginatedBy FileScanRequestMessage message)
                          {
                            totalFileCount = 0;
                            totalDataSize = 0;
                            processedFileCount = 0;
                            processedDataSize = 0;
                            refreshing = true;
                            new RefreshPresentationMessage().send();
                          }

                        public void onScanCompleted (final @ListensTo CollaborationCompletedMessage cMessage,
                                                     final @OriginatedBy FileScanRequestMessage message)
                          {
                            refreshing = false;
                            presentation.setProgressLabel("done");
                            new RefreshPresentationMessage().send();
                          }

                        public void onFileDiscovered (final @ListensTo FileDiscoveredMessage message)
                          {
                            totalFileCount++;
                            totalDataSize += message.getFileObject().getSize();
                          }

                        public void onFingerprintComputed (final @ListensTo FingerprintComputedMessage message)
                          {
                            processedFileCount++;
                            processedDataSize += message.getFileObject().getSize();
                            final Collaboration collaboration = message.getCollaboration();
                            final long elapsedTime = collaboration.getDuration().getMillis();
                            final double speed = (processedDataSize / M10) / (elapsedTime / K10);
                            final int eta = (int)(((totalDataSize - processedDataSize) / M10) / speed);
                            ... // and update the statistics object
                          }

                        private void updatePresentation (final @ListensTo RefreshPresentationMessage message)
                          {
                            presentation.updateStatistics(statistics);

                            if (refreshing)
                              {
                                new RefreshPresentationMessage().sendLater(1, TimeUnit.SECONDS);
                              }
                          }
                      }

Actually another advantage of message-based designs is that a user interface can be easily updated without having to know a large number of listeners.

Activation

Actors are managed objects and must be never directly accessed. To accomplish this, they are declared by means of activators, which typically are grouped together such as in:

package it.tidalwave.integritychecker.impl;

                    import org.openide.util.lookup.ServiceProvider;
                    import it.tidalwave.actor.spi.ActorGroupActivator;
                    import static it.tidalwave.actor.spi.ActorActivator.*;

                    @ServiceProvider(service=IntegrityCheckerActivator.class)
                    public class IntegrityCheckerActivator extends ActorGroupActivator
                      {
                        public IntegrityCheckerActivator()
                          {
                            add(activatorFor(FileObjectDiscoveryActor.class).withPoolSize(8));
                            add(activatorFor(FileLoaderActor.class).withPoolSize(1));
                            add(activatorFor(FingerprintComputerActor.class).withPoolSize(4));
                            add(activatorFor(PersistenceManagerActor.class).withPoolSize(1));
                          }
                      }

Activators specify the pool size of each actor. Given that, it's possible to activate and deactivate the group:

Locator.find(IntegrityCheckerActivator.class).activate();
                    ...
                    Locator.find(IntegrityCheckerActivator.class).deactivate();

For people not used to the NetBeans Platform, @ServiceProvider is a compile-time annotations that generates a service declaration into META-INF/services, while Locator is a facility that retrieves the reference at runtime.

Actors can have methods annotated with @PostConstruct and @PreDestroy, which will be called just before activation and just after deactivation, of course in a thread-safe way. There's the guarantee that messages can't be delivered before activation is completed and after deactivation.

Simpler than Akka... for how long?

This works, I enjoy the fact that I can avoid synchronized blocks and so far I don't miss the fact that that thing is clearly much less performant than Akka. Everything fits into less than 40k of jar file and it's much simpler than Akka (whose complexity is justified by the fact that it offers many more features). So what? Distributing to a network is something that we can't ignore in a cloud oriented world. So it's likely that going on I'll see the need for more features: often you discover that you need new features as you go on. Smell of reinventing the wheel ahead!

For instance, consider the FileLoadActor. I've previously said that experimental evidence demonstrates that it makes sense (at least with large files) to have it instantiated in a single instance, thus serializing data access to the disk. This makes sense given the physical nature of a magnetic disk. But this happens with a single disk: if you have more than one, it probably makes sense to have one instance per disk. Having a Solid State Drive (SSD) changes the rules. This means that you must have some more complex way to dispatch messages, a form of routing. Akka offers it, and it would be unwise to rewrite this feature from scratch.

So now I'm going to start phase two, that is to reimplement this stuff upon Akka (using the 2.0 milestone that's already available). In a few weeks I'll let you know how the story goes on.

The code that I described in this blog post are available at the Mercurial repository http://bitbucket.org/tidalwave/solidblue-src, tag 1.0-ALPHA-4.

In a further post I'll talk about how Collaborations are used by blueShades, which features more complex interactions with the user. See you later.

Comments are managed by Disqus, which makes use of a few cookies. Please read their cookie policy for more details. If you agree to that policy, please click on the button below to accept it. If you don't, you can still enjoy this site without using Disqus cookies, but you won't be able to see and post comments. Thanks.