Friday, 21 July 2017

Cluster Counter

In Infinispan 9.1 we introduce the clustered counters. It is a counter distributed and shared among all nodes in the cluster and today we are going to know more about it.

Installation

To use a counter in your Infinispan cluster, first you have to add the maven dependency to your project. As you can see, it is simple as doing:

After adding the module, you can retrieve the CounterManager and start creating and using counters.

CounterManager

Each CounterManager is associated to a CacheManager. But, before showing how to use it, first we have some configuration to be done.

There are two attributes that you can configure: The num-owner - that represents the number of copies of the counter's value in a cluster; and the reliability - that represents the behavior of the counters in case of partitions.

Below, is the configuration example with their default values.
XML:
Programatically:
Then, you can retrieve the CounterManager from the CacheManager, as shown below, and start using the counters!

Counter

A counter is identified by its name. Also, it is initialized with an initial value (by default 0) and it can be persisted, if the value needs to survive a cluster restart.

There are 2 types of counters: strong and weak counters.

Strong Counters

The strong counter provides higher consistency. Its value is known during the update and its updates are applied atomically. This allows to set boundaries and provides conditional operation (as compare-and-set).

Configuration

A strong counter can be configured in the configuration file or programatically. They can be also created dynamically at runtime. Below shows us how it can be done:

XML:
Programatically:
Runtime:

Use Case

The strong counter fits the following uses cases:
  • Global Id Generator
Due to its strong consistency, it can be used as a global identifier generator, as in the example below:

  • Rate Limiter
If bounded, it can be used as a simple rate limiter. Just don't forget to invoke reset()...

  • Simply count "stuff"
Well, it is a counter after all...

Weak Counters

The weak counter provides eventual consistency and its value is not known during updates. It provides faster writes when comparing with the strong counter.

Configuration

As in strong counter, the weak counter can be configure its name and its initial value. In addition, a concurrency-level can be configure to set the number of concurrent updates that can be handled in parallel. Below shows us how to configure it:

XML:
Programatically:
Runtime:

Use Case

The main use case for the weak counter includes all scenarios where its value isn't needed while updating the counter. For example, it can be used to count the number of visits to some resource:

For more information, take a look at the documentation. If you have any feedback, or would like to request some new features, or found some issue, let us know via the forumissue tracker or the #infinispan channel on Freenode.

Monday, 17 July 2017

Conflict Management and Partition Handling

In Infinispan 9.1.0.Final we have overhauled the behaviour and configuration of partition handling in distributed and replicated caches.  Partition handling is no longer simply enabled/disabled, instead a partition strategy is configured. This allows for more fine-grained control of a cache's behaviour when a split brain occurs. Furthermore, we have created the ConflictManager component so that conflicts on cache entries can be automatically resolved on-demand by users and/or automatically during partition merges .


Conflict Manager


During a cache's lifecycle it is possible for inconsistencies to appear between replicas of a cache entry  due to a variety of reasons (e.g replication failures, incorrect use of flags etc).  The conflict manager is a tool that allows users to retrieve all stored replica values for a cache entry. In addition to allowing users to process a stream of cache entries whose stored replicas have conflicting values. Furthermore, by utilising implementations of the EntryMergePolicy interface it is possible for said conflicts to be resolved deterministically.

EntryMergePolicy


In the event of conflicts arising between one or more replicas of a given CacheEntry, it is necessary for a conflict resolution algorithm to be defined, therefore we provide the EntryMergePolicy interface. This interface consists of a single method, "merge", whose output is utilised as the "resolved" CacheEntry for a given key. A non-null return value is put to all replicas of the CacheEntry in question, whereas a null return value results in all replicas being removed from the cache.

The merge method takes two parameters: the "preferredEntry" and "otherEntries". In the context of a partition merge, the preferredEntry is the CacheEntry associated with the partition whose coordinator is conducting the merge (or if multiple entries exist in this partition, it’s the primary replica). However, in all other contexts, the preferredEntry is simply the primary replica. The second parameter, "otherEntries" is simply a list of all other entries associated with the key for which a conflict was detected.

Currently Infinispan provides the following implementations of EntryMergePolicy:


Policy Description
MergePolicies.PREFERRED_ALWAYS Always utilise the "preferredEntry".
MergePolicies.PREFERRED_NON_NULL Utilise the "preferredEntry" if it is non-null, otherwise utilise the first entry from "otherEntries".
MergePolicies.REMOVE_ALL Always remove a key from the cache when a conflict is detected.

Application Usage


For conflict resolution during partition merges, once an EntryMergePolicy has been configured for the cache, no additional actions are required by the user.  However, if an Infinispan user would like to utilise the ConflictManager explicitly in their application, it should be retrieved by passing an AdvancedCache instance to the ConflictManagerFactory

Note, that depending on the number of entries in the cache, the getConflicts and resolveConflict methods are expensive operations, as they both depend on a spliterator which lazily loads cache entries on a per segment basis. Consequently, when operating in distributed mode, if many conflicts exist, it is possible for an OutOfMemoryException to occur on the node searching for conflicts.


Partition Handling Strategies


In 9.1.0.Final the partition handling enabled/disabled option has been deprecated and users must now configure an appropriate PartitionHandling strategy for their application. A partition handling strategy determines what operations can be performed on a cache when a split brain event has occurred. Ultimately, in terms of Brewer’s CAP theorem, the configured strategy determines whether the cache's availability or consistency is sacrificed in the presence of partition(s). Below is a table of the provided strategies and their characteristics:


Strategy Description CAP
DENY_READ_WRITES If the partition does not have all owners for a given segment, both reads and writes are denied for all keys in that segment.

This is equivalent to setting partition handling to true in Infinispan 9.0.
Consistency
ALLOW_READS Allows reads for a given key if it exists in this partition, but only allows writes if this partition contains all owners of a segment. Availability
ALLOW_READ_WRITES Allow entries on each partition to diverge, with conflicts resolved during merge.

This is equivalent to setting partition handling to false in Infinispan 9.0.
Availability

Conflict Resolution on Partition Merge


When utilising the ALLOW_READ_WRITES partition strategy it is possible for the values of cache entries to diverge between competing partitions. Therefore, when the two partitions merge, it is necessary for these conflicts to be resolved. Internally Infinispan utilises a cache's ConflictManager to search for cache entry conflicts and then applies the configured EntryMergePolicy to automatically resolve said conflicts before rebalancing the cache. This conflict resolution is completely automatic and does not require any additional code or input from Infinispan users.

Note, that if you do not want conflicts to be resolved automatically during a partition merge, i.e. the behaviour before 9.1.x, you can set the merge-policy to null (or NONE in xml). 


Configuration

Programmatic



XML




Conclusion


Partition handling has been overhauled in Infinispan 9.1.0.Final to allow for increased control over a cache's behaviour. We have introduced the ConflictManager which enables users to inspect and manage the consistency of their cache entries via custom and provided merge policies.

If you have any feedback on the partition handling changes, or would like to request some new features/optimisations, let us know via the forumissue tracker or the #infinispan channel on Freenode.

Friday, 14 July 2017

Infinispan 9.1 "Bastille"

Dear Infinispan users,

after 3½ months, we are proud to present to you our latest stable release, Infinispan 9.1, codenamed "Bastille".

While minor releases are traditionally evolutionary instead of revolutionary, this release still comes loaded with a number of great features:

Scattered cache

A new clustered cache, similar to a distributed cache, but with a higher write throughput.

Consistency Checker, Conflict Resolution and Automatic merge policies

An overhaul to partition handling which allows much finer control about whether to allow reads and writes in split clusters and how data is reconciled when partitions are merged.

Clustered Counters

An implementation of clustered counters with both strong and weak semantics, threshold events, optional persistence and bounding. Currently these are only available in embedded mode, but they will be usable over Hot Rod in Infinispan 9.2.

Locked Streams

Locked streams allow you to run your stream processing operations knowing that another update cannot be performed while the Consumer is executed on an entry. Note this only works in non transactional and pessimistic transactional caches (optimistic transactional caches are not supported).

API improvements

The compute(), computeIfPresent() and computeIfAbsent() methods on the Cache interface are now implemented as proper distributed operations so that they run local to the entries.
The DeltaAware interface for supporting granular clustered operations has been deprecated in favour of functional commands.

Persistence improvements

The CacheStore SPI now supports write batching. The JDBC, JPA, RocksDB, Remote and File stores have been modified to take advantage of this. You should see great benefits when using write-behind or when using putAll operations.

Remote query with JBoss Marshalling

Remote query now also works with Java entities annotated with Hibernate Search annotations and JBoss Marshalling without requiring ProtoBuf.

HTTP/2 and ALPN support on the REST endpoint

The REST endpoint has been completely rewritten so that it now supports both HTTP/1.1 and HTTP/2 as well as ALPN (even on Java 8). The new endpoint is also 30% faster during reads and 6% faster during writes.

Hot Rod Java client improvements

The Java Hot Rod client now has proper entrySet(), keySet() and values() implementations which iterate over the remote data instead of pulling it all locally.
It is now also finally possible to create and remove caches directly from the client.

Server Administration console improvements

The console has received a number of updates for usability and consistency. It is also finally possible to configure and manage the remote endpoints.

Component upgrades

Hibernate Search 5.8, JGroups 4.0.4, KUBE Ping 1.0.0.Beta1

Bug fixes

We have also dropped the guillotine on a large number of bugs.

If all goes well, we plan to release Infinispan 9.2 at the end of October, with lots of great updates.

So, head over to our download page, consult the upgrading guide and let us know about how you use Infinispan.

Cheers !

The Infinispan team

Thursday, 6 July 2017

Reactive Big Data demo working with Infinispan 9.0.3.Final

A couple of months ago I did an extensive blog post on a reactive Big Data demo I did for Great Indian Developer Summit. At the time, the demo relied on a custom Infinispan build which fixed ISPN-7814 and ISPN-7710 issues.

These issues are now fixed in the main repository and the 9.0.x branch, and so you can now run the demo, as is, using Infinispan 9.0.3.Final. I've updated the demo so that it uses this version.

Cheers,
Galder

Monday, 3 July 2017

Scattered cache

Infinispan strives for high throughput and low latency. Version 9.1.0.CR1 comes with a new cache mode - scattered cache - that's our answer to low round-trip times for write operations. Through a smart routing algorithm it guarantees that the write operation will result in only single RPC, where distributed caches with 2 owners would often use 2 RPCs. Scattered cache is resilient against single node failure (this is equivalent to distributed cache with 2 owners) and does not support transactions.

Declarative configuration is straigtforward: and programmatic is a piece of cake, too:

What does the routing algorithm differently, then? In distributed cache, one node is always designated as the primary owner and the others owners are backups. When a (non-owner) node does a write (invoke cache.put("k", "v")), it sends the command to the primary owner, primary forwards it to the backups and the operation is completed only when all owners confirm this to the first node (originator). It's not possible to contact all owners in a single multinode RPC from the originator as the primary has to decide upon ordering in case of concurrent writes.

In scattered cache every node may be the backup. We don't designate backup owners in the routing table (also called consistent hash for historical reasons), only primaries are set there. When a node does a write it sends the command to the primary owner and when this confirms the operation the originator stores the entry locally, effectively becoming a backup. That means that there can be more than 2 copies of the entry in the cluster, the others being outdated - but only temporarily, the other copies are eventually invalidated through a background process that does not slow down the synchronous writes. As you can see, this algorithm cannot be easily extended to multiple owners (keeping the performance characteristics) and therefore scattered cache does not support multiple backups.

In case of primary crash, the reconciliation process is somewhat more complex, because there's no central record telling who are the backups. Instead, the new primary owner has to search all nodes and pick the last write - previous primary owner has assigned each entry a sequence number which makes this possible. And there are more technical difficulties - you can read about these in the design document or in the documentation.

There is a con, of course. When reading an entry from a distributed cache, there is some chance that the entry will be located on this node (being one of the owners), and local reads are very fast. In scattered cache we always read from the primary node, as the local version of the entry might be already outdated. That halves the chance for a local read - and this is the price you pay for faster writes. Don't worry too much, though - we have plans for L1-like caching that will make reads great again!

Don't touch my data stream!

In Infinispan 8.0 we were very excited to announce Distributed Streams as we moved to Java 8. This feature allows applying any of the various java.util.stream.Stream operations on the datagrid, which are performed in a distributed nature, providing the highest possible performance as data is processed on the node where it lives, only requiring the terminal operation intermediate results to be returned to the invoker.

One problem with distributed streams though is that data is processed without acquiring locks: great for performance, but there is no guarantee that some other process isn't concurrently modifying the cache entry you're working on. Consider the following example which iterates through the entire contents of a cache, modifying each entry based on its existing value:

This works great until you have another cache put() running concurrently that changes a value. In this case the only way to be sure that an update is applied properly is to perform an optional update in the forEach. In a transactional cache you could also lock the entry manually (pessimistic) or retry on a WriteSkewException (optimistic). For example this is how the optional update could be performed.

As you can see the code isn't as pretty as it was before, but is still pretty concise.

Infinispan 9.1 introduces locked streams, which allow you to run your operation knowing that another update cannot be performed while running the Consumer. Note this only works in non transactional and pessimistic transactional caches (optimistic transactional caches are not supported).

If you notice the code looks very similar to the first example. You just have to invoke the lockedStream method on the AdvancedCache and then you can use the stream knowing that data for the given key won't change while performing your update.

This locked stream has a slightly limited API compared to the normal java.util.stream API. Only the filter method is allowed in addition to forEach. The CacheStream API is also supported, with a few exceptions. For more details on the API and what methods are supported you should check out the Javadoc.

The lock is only acquired for the given key while invoking the Consumer, allowing other updates on other keys to be performed concurrently, just like a normal put operation would behave. It is not suggested to perform operations on other keys in the Consumer, as this could cause possible deadlocks.

Now go forth and perform operations using the data stream knowing that the data underneath has not changed!