One problem with distributed streams though is that data is processed without acquiring locks: great for performance, but there is no guarantee that some other process isn't concurrently modifying the cache entry you're working on. Consider the following example which iterates through the entire contents of a cache, modifying each entry based on its existing value:
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Cache<String, String> cache = ... | |
cache.entrySet().stream().forEach((c, e) -> c.put(e.getKey(), e.getValue() + " has been updated!"); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Cache<String, String> cache = ... | |
cache.entrySet().stream().forEach((c, e) -> { | |
while (!c.replace(e.getKey(), e.getValue(), e.getValue() + " has been updated!") { } | |
}); |
Infinispan 9.1 introduces locked streams, which allow you to run your operation knowing that another update cannot be performed while running the Consumer. Note this only works in non transactional and pessimistic transactional caches (optimistic transactional caches are not supported).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Cache<String, String> cache = ... | |
cache.getAdvancedCache().lockedStream().forEach((c, e) -> c.put(e.getKey(), e.getValue() + " has been updated!")); |
This locked stream has a slightly limited API compared to the normal java.util.stream API. Only the filter method is allowed in addition to forEach. The CacheStream API is also supported, with a few exceptions. For more details on the API and what methods are supported you should check out the Javadoc.
The lock is only acquired for the given key while invoking the Consumer, allowing other updates on other keys to be performed concurrently, just like a normal put operation would behave. It is not suggested to perform operations on other keys in the Consumer, as this could cause possible deadlocks.
Now go forth and perform operations using the data stream knowing that the data underneath has not changed!
No comments:
Post a Comment