Friday, 17 March 2017

Event Listener with C++

Dear Infinispan community,

as announced in a previous post, starting from version 8.1.0 also the C++/C# clients can receive and process Infinispan events.

Here's an example of usage of C++ event listeners that, with a good dose of imagination, pretends to be a customer behavior tracking system for our store chain (don't take this too seriously, we're just trying to add some fiction).

As a first requirement our tracking system will record every single purchase made in our stores. How many stores we have? 1, 100, millions? It doesn't matter: we're backed with an Infinispan data grid.
This is version 0.x and hence the checker must use the keyboard to enter all the needed information.

void addPurchase(RemoteCache<std::string, std::string> &cache){
std::cout << ("Enter product: ");
std::string product;
std::cin.ignore();
std::getline(std::cin, product);
std::cout << ("Enter purchase details: ");
std::string details;
std::getline(std::cin, details);
std::chrono::milliseconds ms = std::chrono::duration_cast< std::chrono::milliseconds >(std::chrono::system_clock::now().time_since_epoch());
cache.put(product+":"+std::to_string(ms.count()),details, 30, TimeUnit::SECONDS);
}
view raw gistfile1.txt hosted with ❤ by GitHub
As you can see our entry key is a concatenation of the product name and the timestamp and the entry value is an unstructured string, maybe too simply but it could work for now.
Seems we are at a good point: we have the data and we can do analytics on it, so far so good but now our boss makes a new request: he wants a runtime monitor on how's the sales performance. That's a perfect request to be fulfilled with event listener: the monitor application will be an Hotrod C++ client that registers a client listener on the server and receives and show on the boss's laptop the data flow.
A client listener, once registered on the server, can receive events related to: creation, modification, deletion, expiration of cache entries; in our example only the creation and expiration events are processed (expired events can be useful to do some moving average statistics?). Following a snip of code that creates and registers a listener that writes the events key on the stdout.

void addListener(Marshaller<std::string>* marshaller, RemoteCache<std::string, std::string> &cache) {
if (clPtr) {
std::cout << "Already installed" << std::endl;
return;
}
auto cl = new CacheClientListener<std::string, std::string>(cache);
clPtr.reset(cl);
std::function<void(ClientCacheEntryCreatedEvent<std::string>)> listenerCreated =
[](ClientCacheEntryCreatedEvent<std::string> e) {std::cout << "New entry: " << e.getKey() << std::endl;};
std::function<void(ClientCacheEntryExpiredEvent<std::string>)> listenerExpired =
[](ClientCacheEntryExpiredEvent<std::string> e) {std::cout << "Expired entry: " << e.getKey() << std::endl;};
clPtr->add_listener(listenerCreated);
clPtr->add_listener(listenerExpired);
cache.addClientListener(*clPtr, std::vector<std::vector<char> >(), std::vector<std::vector<char> >());
}
view raw gistfile1.txt hosted with ❤ by GitHub
You can git this quickstart here [1]. On startup a multiple choice menu is shown with all the available operations. Running several instances you can act as the checker (data entry) or the boss (installing the listener and seeing the events flow).

Available actions:
0. Display available actions
1. Add purchase
2. Add listener
3. Remove listener
4. Add filtered listener
5. Remove filtered listener
6. Add custom listener
7. Remove custom listener
10. Quit
view raw gistfile1.txt hosted with ❤ by GitHub

Filters

Again so far so good, but then the marketing department ask support to do targeted advertising like: soliciting customers that bought product Y to buy product X.
Let's suppose that X="harmonica" and Y="hiking boots" (it's a well known fact of life that in the high mountains you feel the desire to play an harmonica).

To do that we register on the server another listener, but this time we're not interested in the whole flow of purchase data: to run our marketing campaign, we only interested in cache entries having the key starting with "hiking". The Infinispan server can filter out events for us, if we pass in the AddClientListener operation the name of the wanted filter along with any configuration arguments.

void addFilteredListener(Marshaller<std::string>* marshaller, RemoteCache<std::string, std::string> &cache) {
if (clFilteredPtr) {
std::cout << "Already installed" << std::endl;
return;
}
auto cl = new CacheClientListener<std::string, std::string>(cache);
clFilteredPtr.reset(cl);
std::function<void(ClientCacheEntryCreatedEvent<std::string>)> listenerCreated =
[](ClientCacheEntryCreatedEvent<std::string> e) {std::cout << "Filtered New entry: " << e.getKey() << std::endl;};
std::function<void(ClientCacheEntryExpiredEvent<std::string>)> listenerExpired =
[](ClientCacheEntryExpiredEvent<std::string> e) {std::cout << "Filtered Expired entry: " << e.getKey() << std::endl;};
clFilteredPtr->add_listener(listenerCreated);
clFilteredPtr->add_listener(listenerExpired);
char fName[] = "string-is-equal-filter-factory";
clFilteredPtr->filterFactoryName = std::vector<char>(fName, fName + std::strlen(fName));
std::vector<std::vector<char> > filterFactoryParams;
std::string strArgs("hiking");
std::vector<char> param;
marshaller->marshall(strArgs, param);
filterFactoryParams.push_back(param);
cache.addClientListener(*clFilteredPtr, filterFactoryParams, std::vector<std::vector<char> >());
}
view raw gistfile1.txt hosted with ❤ by GitHub
Filter are java classes that must be deployed into the Infinispan server (more here [2])

and converters

Predefined events contain very few information: basically the event type and the entry key, this to prevent to flood the network spreading around very long entry values. Users can override this limitation using a converter, that is a java class deployed into the server, that can create custom events containing every data needed by the application.
void addCustomListener(Marshaller<std::string>* marshaller, RemoteCache<std::string, std::string> &cache) {
if (clCustomPtr) {
std::cout << "Already installed" << std::endl;
return;
}
auto cl = new CacheClientListener<std::string, std::string>(cache);
clCustomPtr.reset(cl);
std::function<void(ClientCacheEntryCreatedEvent<std::string>)> listenerCreated =
[](ClientCacheEntryCreatedEvent<std::string> e) {std::cout << "Custom listener: "<< e.getKey() << std::endl;};
std::function<void(ClientCacheEntryCustomEvent)> listenerCustom = [] (ClientCacheEntryCustomEvent e)
{
std::string str= JBasicMarshallerHelper::unmarshall<std::string>(e.getEventData().data());
std::cout << str << std::endl;
};
clCustomPtr->add_listener(listenerCreated);
clCustomPtr->add_listener(listenerCustom);
char cName[] = "to-string-converter-factory";
char fName[] = "string-is-equal-filter-factory";
clCustomPtr->converterFactoryName = std::vector<char>(cName, cName + std::strlen(cName));
clFilteredPtr->filterFactoryName = std::vector<char>(fName, fName + std::strlen(fName));
std::vector<std::vector<char> > filterFactoryParams;
std::vector<std::vector<char> > converterFactoryParams;
std::string strArgs("hiking");
std::vector<char> param;
marshaller->marshall(strArgs, param);
filterFactoryParams.push_back(param);
cache.addClientListener(*clCustomPtr, filterFactoryParams, converterFactoryParams);
}
view raw gistfile1.txt hosted with ❤ by GitHub

As in the previous case, we pass into the add operation the name of the converter and the configuration arguments, any.

That's all guys, let us know your feedback: do you like it? Could be better? Tell us how it can be improved creating an issue [3], or fork and improve it yourself [4]!

Thanks for reading and enjoy!
The Infinispan Team
[1] https://github.com/rigazilla/infinispan-simple-tutorials/tree/new_event_tutorial/c%2B%2B/events
[2] http://blog.infinispan.org/2014/08/hot-rod-remote-events-1-getting-started.html
[2] http://blog.infinispan.org/2014/08/hot-rod-remote-events-2-filtering-events.html
[2] http://blog.infinispan.org/2014/09/hot-rod-remote-events-3-customizing.html
[3] https://issues.jboss.org/projects/HRCPP/issues
[4] https://github.com/infinispan/cpp-client

No comments:

Post a Comment