Monday, April 24, 2017

Apache Spark RDD and Java Streams

A few months ago, I was fortunate enough to participate in a few PoCs (proof-of-concepts) that used Apache Spark. There, I got the chance to use resilient distributed datasets (RDDs for short), transformations, and actions.

After a few days, I realized that while Apache Spark and the JDK are very different platforms, there are similarities between RDD transformations and actions, and stream intermediate and terminal operations. I think these similarities can help beginners (like me *grin*) get started with Apache Spark.

Java StreamApache Spark RDD
Intermediate operationTransformation
Terminal operationAction

Java Streams

Let's start with streams. Java 8 was released sometime in 2014. Arguably, the most significant feature it brought is the Streams API (or simply streams).

Once a Stream is created, it provides many operations that can be grouped in two categories:

  • intermediate,
  • and terminal.

Intermediate operations return a stream from the previous one. These intermediate operations can be connected together to form a pipeline. Terminal operations, on the other hand, closes the stream pipeline, and returns a result.

Here's an example.

Stream.of(1, 2, 3)
        .peek(n -> System.out.println("Peeked at: " + n))
        .map(n -> n*n)
        .forEach(System.out::println);

When the above example is run, it generates the following output:

Peeked at: 1
1
Peeked at: 2
4
Peeked at: 3
9

Intermediate operations are lazy. The actual execution does not start until the terminal operation is encountered. The terminal operation in this case is forEach(). That's why, we do not see the following.

Peeked at: 1
Peeked at: 2
Peeked at: 3
1
4
9

Instead, what we see is that the operations: peek(), map(), and forEach(), have been joined to form a pipeline. In each pass, the static of() operation returns one element from the specified values. Then the pipeline is invoked: peek() that prints the string "Peeked at: 1", followed by map(), and terminated by forEach() that prints the number "1". Then with another pass starting with of() that returns the next element from the specified values, followed by peek(), and map(), and so on.

Executing an intermediate operation such as peek() does not actually perform any peeking, but instead creates a new stream that, when traversed, contains the same elements of the initial stream, but additionally performing the provided action.

Apache Spark RDD

Now, let's turn to Spark's RDD (resilient distributed dataset). Spark's core abstraction for working with data is the resilient distributed dataset (RDD).

An RDD is simply a distributed collection of elements. In Spark all work is expressed as either creating new RDDs, or calling operations on RDDs to compute a result. Under the hood, Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

Once created, RDDs offer two types of operations:

  • transformations,
  • and actions.

Transformations construct a new RDD from a previous one. Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).

Here's an example with a rough equivalent using Java Streams.

SparkConf conf = new SparkConf().setAppName(...);
JavaSparkContext sc = new JavaSparkContext(conf);

List<Integer> squares = sc.parallelize(Arrays.asList(1, 2, 3))
        .map(n -> n*n)
        .collect();

System.out.println(squares.toString());

// Rough equivalent using Java Streams
List<Integer> squares2 = Stream.of(1, 2, 3)
        .map(n -> n*n)
        .collect(Collectors.toList());

System.out.println(squares2.toString());

After setting up the Spark context, we call parallelize() which creates an RDD from the given list of elements. map() is a transformation, and collect() is an action. Transformations, like intermediate stream operations in Java, are lazily evaluated. In this example, Spark will not begin to execute the function provided in a call to map() until it sees an action. This approach might seem unusual at first, but it makes a lot of sense when dealing with huge amounts of data (big data, in other words). It allows Spark to split up the work and do them in parallel.

Word Count Example

Let's use word count as an example. Here, we have two implementations: one uses Apache Spark, and the other uses Java Streams.

Here's the Java Stream version.

public class WordCountJava {

 private static final String REGEX = "\\s+";
 
 public Map<String, Long> count(URI uri) throws IOException {
  return Files.lines(Paths.get(uri))
   .map(line -> line.split(REGEX))
   .flatMap(Arrays::stream)
   .map(word -> word.toLowerCase())
   .collect(groupingBy(
    identity(), TreeMap::new, counting()));
 }

}

Here, we read the source file line by line and transforming each line in a sequence of words (via the map() intermediate operation). Since we have a sequence of words for each line and we have many lines, we convert them to a single sequence of words using flatMap(). In the end, we group them by their identity() (i.e. the identity of a string is the string itself) and we count them.

When tested against a text file that contains the two lines:

The quick brown fox jumps over the lazy dog
The quick brown fox jumps over the lazy dog

It outputs the following map:

{brown=2, dog=2, fox=2, jumps=2, lazy=2, over=2, quick=2, the=4}

And now, here's the Spark version.

public class WordCountSpark {

 private static final String REGEX = "\\s+";
 
 public List<Tuple2<String, Long>> count(URI uri, JavaSparkContext sc) throws IOException {
  JavaRDD<String> input = sc.textFile(Paths.get(uri).toString());
  return input.flatMap(
     line -> Arrays.asList(line.split(REGEX)).iterator())
    .map(word -> word.toLowerCase())
    .mapToPair(word -> new Tuple2<String, Long>(word, 1L))
    .reduceByKey((x, y) -> (Long) x + (Long) y)
    .sortByKey()
    .collect();
 }

}

When run against the same two-line text file, it outputs the following:

[(brown,2), (dog,2), (fox,2), (jumps,2), (lazy,2), (over,2), (quick,2), (the,4)]

The initial configuration of a JavaSparkContext has been excluded for brevity. We create a JavaRDD from a text file. It's worth mentioning that this initial RDD will operate line-by-line from the text file. That's why we split each line into sequence of words and flatMap() them. Then we transform a word into a key-value tuple with a count of one (1) for incremental counting. Once we have done that, we group by words (reduceByKey()) our key-value tuples from the previous RDD and in the end we sort them in natural order.

In Closing

As shown, both implementations are similar. The Spark implementation requires more setup and configuration, and is more powerful. Learning about intermediate and terminal stream operations can help get a Java developer started with understanding Apache Spark.

Thanks to Krischelle, RB, and Juno, for letting me participate in the PoCs that used Apache Spark.

Friday, December 30, 2016

Isolating the Domain Logic

In one design patterns class, I had an interesting discussion about modelling domain logic. Specifically, it was about isolating the domain logic. An application would typically be divided into three parts:

  1. Presentation (e.g. desktop GUI, browser, web service)
  2. Domain logic
  3. Infrastructure (e.g. persistence storage, e-mail)

The class found it interesting that the dependency arrows were pointing towards the domain logic part. They asked, “Is the diagram intentionally made wrong? Shouldn’t the domain logic part be dependent on the persistence storage?” It was a great question. And I wanted to share and post the discussion and explanation here.

Often Misunderstood

Most developers would usually have this misunderstanding in mind.

Misunderstood
vs.
Proper

And this misunderstanding is largely due to the sequence of operations. It usually starts with a trigger (e.g. a user clicking a button or a link) in the presentation layer, which then calls something within the domain logic layer, which then calls something within the infrastructure layer (e.g. update a database table record).

While this is the correct sequence of operations, there’s something subtle in the way in which the domain logic layer can be implemented. This has something to do with dependency inversion.

Dependency Inversion Principle

The domain logic layer may need something from the infrastructure layer, like some form of access to retrieve from persistence storage. The usual patterns for this are: DAO and repository. I won’t explain these two patterns here. Instead, I would point out that the interface definitions are placed within the domain logic layer, and their implementations are placed in another separate layer.

Placing the (DAO and repository) interface definitions inside the domain logic layer means that it is the domain logic layer that defines it. It is the one that dictates which methods are needed, and what return types are expected. This also marks the boundaries of the domain logic.

This separation between interface and implementation may be subtle, but key. Placing just the interface definitions allows the domain logic part to be free from infrastructure details, and allows it to be unit-tested without actual implementations. The interfaces can have mock implementations during unit testing. This subtle difference makes a big difference in rapid verification of (the development team’s understanding of) business rules.

This separation is the classic dependency inversion principle in action. Domain logic (higher-level modules) should not depend on DAO and repository implementations (low-level modules). Both should depend on abstractions. The domain logic defines the abstractions, and infrastructure implementations depend on these abstractions.

Most novice teams I’ve seen, place the DAO and repository interfaces together with their infrastructure-specific implementations. For example, say we have an StudentRepository and its JPA-specific implementation StudentJpaRepository. I would usually find novice teams placing them in the same package. While this is fine, since the application will still compile successfully. But the separation is gone, and domain logic is no longer isolated.

Now that I’ve explained why and how the domain logic part does not depend on the infrastructure part, I’d like to touch on how the presentation part is accidentally entangled with the domain logic.

Separated Presentation

Another thing I often see with novice teams is how they end up entangling their domain logic with their presentation. And this results into this nasty cyclic dependency. This cyclic dependency is more logical than physical. Which makes it all the more difficult to detect and prevent.

I won’t use a rich GUI presentation example here, since Martin Fowler has already written a great piece on it. Instead, I’ll use a web-browser-based presentation as an example.

Most web-based systems would use a web framework for its presentation. These frameworks usually implement some form of MVC (model-view-controller). The model used is usually the model straight from the domain logic part. Unfortunately, most MVC frameworks require something about the model. In the Java world, most MVC frameworks require that the model follow JavaBean conventions. Specifically, it requires the model to have a public zero-arguments constructor, and getters and setters. The zero-arguments constructor and setters are used to automatically bind parameters (from HTTP POST) to the model. The getters are used in rendering the model in a view.

Because of this implied requirement by MVC frameworks used in the presentation, developers would add a public zero-arguments constructor, getter and setters, to all their domain entities. And they would justify this as being required. Unfortunately, this gets the in the way of implementing domain logic. It gets entangled with the presentation. And worse, I’ve seen domain entities being polluted with code that emits HTML-encoded strings (e.g. HTML code with less-than and greater-than signs encoded) and XML, just because of presentation.

If it is all right to have your domain entity implemented as a JavaBean, then it would be fine to have it used directly in your presentation. But if the domain logic gets a bit more complicated, and requires the domain entity to lose its JavaBean-ness (e.g. no more public zero-arguments constructor, no more setters), then it would be advisable for the domain logic part to implement domain logic, and have the presentation part adapt by creating another JavaBean object to satisfy its MVC needs.

An example I use often is a UserAccount that is used to authenticate a user. In most cases, when a user wishes to change the password, the old password is also needed. This helps prevent unauthorized changing of the password. This is clearly shown in the code below.

public class UserAccount {
  ...
  public void changePassword(
      String oldPassword, String newPassword) {…}
}

But this does not follow JavaBean conventions. And if the MVC presentation framework would not work well with the changePassword method, a naive approach would be to remove the erring method and add a setPassword method (shown below). This weakens the isolation of the domain logic, and causes the rest of the team to implement it all over the place.

public class UserAccount {
  ...
  public void setPassword(String password) {…}
}

It’s important for developers to understand that the presentation depends on the domain logic. And not the other way around. If the presentation has needs (e.g. JavaBean convention), then it should not have the domain logic comply with that. Instead, the presentation should create additional classes (e.g. JavaBeans) that have knowledge of the corresponding domain entities. But unfortunately, I still see a lot of teams forcing their domain entities to look like JavaBeans just because of presentation, or worse, having domain entities create JavaBeans (e.g. DTOs) for presentation purposes.

Arrangement Tips

Here’s a tip in arranging your application. Keep your domain entities and repositories in one package. Keep your repository and other infrastructure implementations in a separate package. Keep your presentation-related classes in its own package. Be mindful of which package depends on which package. The package that contains the domain logic is preferrably at the center of it all. Everything else depends on it.

When using Java, the packages would look something like this:

  • com.acme.myapp.context1.domain.model
    • Keep your domain entities, value objects, and repositories (interface definitions only) here
  • com.acme.myapp.context1.infrastructure.persistence.jpa
    • Place your JPA-based repository and other JPA persistence-related implementations here
  • com.acme.myapp.context1.infrastructure.persistence.jdbc
    • Place your JDBC-based repository and other JDBC persistence-related implementations here
  • com.acme.myapp.context1.presentation.web
    • Place your web/MVC presentation components here. If the domain entities needed for presentation do not comply with MVC framework requirements, create additional classes here. These additional classes will adapt the domain entities for presentation-purposes, and still keep the domain entities separated from presentation.

Note that I’ve used context1, since there could be several contexts (or sub-systems) in a given application (or system). I’ll discuss about having multiple contexts and having multiple models in a future post.

That’s all for now. I hope this short explanation can shed some light to those who wonder why their code is arranged and split in a certain way.

Thanks to Juno Aliento for helping me with the class during this interesting discussion.

Happy holidays!

Thursday, October 27, 2016

Architectural Layers and Modeling Domain Logic

As I was discussing the PoEAA patterns used to model domain logic (i.e. transaction script, table module, domain model), I noticed that people get the impression (albeit wrong impression) that the domain model pattern is best. So, they set out to apply it on everything.

Not Worthy of Domain Model Pattern

Let's get real. The majority of sub-systems are CRUD-based. Only a certain portion of the system requires the domain model implementation pattern. Or, put it in another way, there are parts of the application that just needs forms over data, and some validation logic (e.g. required/mandatory fields, min/max values on numbers, min/max length on text). For these, the domain model is not worth the effort.

For these, perhaps an anemic domain model would fit nicely.

Anemic Domain Model Isn't As Bad As It Sounds

The anemic domain model isn't as bad as it sounds. There, I said it (at least here in my blog post).

But how does it look like?

package com.acme.bc.domain.model;
...
@Entity
class Person {
 @Id ... private Long id;
 private String firstName;
 private String lastName;
 // ...
 // getters and setters
}
...
interface PersonRepository /* extends CrudRepository<Person, Long> */ {
 // CRUD methods (e.g. find, find/pagination, update, delete)
}
package com.acme.bc.infrastructure.persistence;
...
class PersonRepositoryJpa implements PersonRepository {
 ...
}

In the presentation layer, the controllers can have access to the repository. The repository does its job of abstracting persistence details.

package com.acme.bc.interfaces.web;

@Controller
class PersonsController {
 private PersonRepository personRepository;
 public PersonsController(PersonRepository personRepository) {...}
 // ...
}

In this case, having the Person class exposed to the presentation layer is perfectly all right. The presentation layer can use it directly, since it has a public zero-arguments constructor, getters and setters, which are most likely needed by the view.

And there you have it. A simple CRUD-based application.

Do you still need a service layer? No. Do you still need DTO (data transfer objects)? No. In this simple case of CRUD, you don't need additional services or DTOs.

Yes, the Person looks like a domain entity. But it does not contain logic, and is simply used to transfer data. So, it's really just a DTO. But this is all right since it does the job of holding the data stored-to and retrieved-from persistence.

Now, if the business logic starts to get more complicated, some entities in the initially anemic domain model can become richer with behavior. And if so, those entities can merit a domain model pattern.

Alternative to Anemic Domain Model

As an alternative to the anemic domain model (discussed above), the classes can be moved out of the domain logic layer and in to the presentation layer. Instead of naming it PersonRepository, it is now named PersonDao.

package com.acme.bc.interfaces.web;

@Entity
class Person {...}

@Controller
class PersonsController {
 private PersonDao personDao;
 public PersonsController(PersonDao personDao) {...}
 // ...
}

interface PersonDao /* extends CrudRepository<Person, Long> */ {
 // CRUD methods (e.g. find, find/pagination, update, delete)
}
package com.acme.bc.infrastructure.persistence;

class PersonDaoJpa implements PersonDao {
 ...
}

Too Much Layering

I think that it would be an overkill if you have to go through a mandatory application service that does not add value.

package com.acme.bc.interfaces.web;
...
@Controller
class PersonsController {
 private PersonService personService;
 public PersonsController(PersonService personService) {...}
 // ...
}
package com.acme.bc.application;
...
@Service
class PersonService {
 private PersonRepository personRepository;
 public PersonService(PersonRepository personRepository) {...}
 // expose repository CRUD methods and pass to repository
 // no value add
}

Application Services for Transactions

So, when would application services be appropriate? The application services are responsible for driving workflow and coordinating transaction management (e.g. by use of the declarative transaction management support in Spring).

If you find the simple CRUD application needing to start transactions in the presentation-layer controller, then it might be a good sign to move them into an application service. This usually happens when the controller needs to update more than one entity that does not have a single root. The usual example here is transferring amounts between bank accounts. A transaction is needed to ensure that debit and credit both succeed, or both fail.

package sample.domain.model;
...
@Entity
class Account {...}
...
interface AccountRepository {...}
package sample.interfaces.web;
...
@Controller
class AccountsController {
 private AccountRepository accountRepository;
 ...
 @Transactional
 public ... transfer(...) {...}
}

If you see this, then it might be a good idea to move this (from the presentation layer) to an application-layer service.

package sample.interfaces.web;
...
@Controller
class AccountsController {
 private AccountRepository accountRepository;
 private TransferService transferService;
 ...
 public ... transfer(...) {...}
}
package sample.application;
...
@Service
@Transactional
class TransferService {
 private AccountRepository accountRepository;
 ...
 public ... transfer(...) {...}
}
package sample.domain.model;
...
@Entity
class Account {...}
...
interface AccountRepository {...}

Domain Model Pattern (only) for Complex Logic

I'll use the double-entry accounting as an example. But I'm sure there are more complex logic that's better suited.

Let's say we model journal entries and accounts as domain entities. The account contains a balance (a monetary amount). But this amount is not something that one would simply set. A journal entry needs to be created. When the journal entry is posted, it will affect the specified accounts. The account will then update its balance.

package ….accounting.domain.model;
...
/** Immutable */
@Entity
class JournalEntry {
 // zero-sum items
 @ElementCollection
 private Collection<JournalEntryItem> items;
 ...
}
...
/** A value object */
@Embeddable
class JournalEntryItem {...}
...
interface JournalEntryRepository {...}
...
@Entity
class Account {...}
...
interface AccountRepository {...}
...
@Entity
class AccountTransaction {...}
...
interface AccountTransactionRepository {...}

Now, in this case, a naive implementation would have a presentation-layer controller create a journal entry object, and use a repository to save it. And at some point in time (or if auto-posting is used), the corresponding account transactions are created, with account balances updated. All this needs to be rolled into a transaction (i.e. all-or-nothing).

Again, this transaction is ideally moved to an application service.

package ….accounting.application;

@Service
@Transactional
class PostingService {...}

If there's a need to allow the user to browse through journal entries and account transactions, the presentation-layer controller can directly use the corresponding repositories. If the domain entities are not suitable for the view technology (e.g. it doesn't follow JavaBean naming conventions), then the presentation-layer can define DTOs that are suitable for the view. Be careful! Don't change the domain entity just to suit the needs of the presentation-layer.

package ….interfaces.web;

@Controller
class AccountsController {
 private AccountRepository accountRepository;
 private AccountTransactionRepository accountTransactionRepository;
 private PostingService postingService;
  ...
}

In Closing...

So, there you have it. Hopefully, this post can shed some light on when (and when not) to use domain model pattern.