The problem with the API up to now was that, if you wanted to use lambdas, it was quite an ugly scene. Take for example the following code snippet:
8.0 Distributed Streams Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Map<String, Integer> wordCountMap = c1.entrySet().parallelStream() | |
.map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s")) | |
.flatMap((Serializable & Function<String[], Stream<String>>) Arrays::stream) | |
.collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()))); |
This allows for the code to be much cleaner as we can see here:
9.0 Distributed Streams Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Map<String, Integer> wordCountMap = c1.entrySet().parallelStream()
.map(e -> e.getValue().split("\\s"))
.flatMap(Arrays::stream)
.collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())));
Extra Methods
This is not the only benefit of providing the CacheStream interface: we can also provide new methods that aren't available on the standard Stream interface. One example is the forEach method which allows the user to more easily provide a Cache that is injected on each node as required. This way you don't have to use the clumsy CacheAware interface and can directly use lambdas as desired.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Map<String, Integer> wordCountMap = c1.entrySet().parallelStream() | |
.map(e -> e.getValue().split("\\s")) | |
.flatMap(Arrays::stream) | |
.collect(CacheCollectors.serializableCollector(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()))); |
Here is an example of the new forEach method in action:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cache.entrySet().stream() | |
.filter(e -> e.getKey().startsWith("RedHat")) | |
.forEach((c, e) -> { | |
c.getCacheManager().getCache("RedHat").put(e.getKey(), e.getValue()); | |
}); |
All in all these improvements should make using Distributed Streams with Infinispan much easier. The extra methods could be extended further if users have use cases they would love to suggest. Just let us know, and I hope you enjoy using Infinispan!
No comments:
Post a Comment