Thursday, May 23, 2013

Aggregation and processing of data with Oracle Coherence

There is a great product as Oracle Coherence which helps developers write applications easier. According to Wikipedia:
Oracle Coherence is a Java-based in-memory data grid designed to improve reliability, scalability and performance compared to traditional relational database management systems.
In other words Coherence is such a big cache which consists of multiple JVMs.

Coherence has two different underlying schemas of data storing: distributed and replicated.
It is obvious that for the first one data is separated between multiple nodes so every instance has its own pack of data to hold. In case of second configuration we will receive more reliable system where every data entry is stored in multiple places and won't lead to data loose in case of some node fails.

But this post is not about it. It's about provided methods of working with data stored in data grid.
For cache clients Coherence looks like extended Map with a pack of additional methods gathered under interface NamedMap. Every method pack is combined in the appropriate interface extended by the root one: CacheMap, ConcurrentMap, InvocableMap, ObservableMap and QueryMap. Here we will look at InvocableMap interface and methods it provides.
It comes with two main functionalities with methods aggregate and invokeAll.
Main feature of them are that they make possible to work with data at place it is stored: on the cache nodes and without need to transmit them to client. 

Aggregation

First two methods we will consider are aggregate. They differs only in signature which determines the way of selection data against which passed aggregator will be executed.
With aggregate method we can process some calculation based on a set of data in-place.

I.e. We have client orders in the cache and want to calculate average price of a bill of a particular client. Of course we can do it by fetching every client order and making calculations on the client side. But also we can use aggregate method and pass Filter with EntryAggregator, which will select only data we are interested in. For this EqualsFilter perfectly feats. It filters data by equality to passed value.
Also we use predefined DoubleAverage aggregator which do exactly we need:
Filter selectByClientIdFilter = new EqualsFilter("getClient", client); 
EntryAggregator averageBillAggregator = new DoubleAverage("getPrize");
Double averageBillPrice = (Double) orders.aggregate(selectByClientIdFilter, averageBillAggregator);
EqualsFilter and DoubleAverage receive method to access data they need.

With these few lines we've managed to solve problem which usually needs more boilerplate code and in most cases will works slower because of processing data in a single thread on a client side.

There is a big pack of already existing aggregators and they can be found in package com.tangosol.util.aggregator.

It's all cool but what if we want to do some more difficult operation?
All aggregators implement common interface EntryAggregator with a single method receiving Set of cache entries(InvocableMap.Entry) and returning aggregated result. All data manipulation should be done in this method.
So now we will write our aggregator which will implement EntryAggregator. This time we want to get all items ever been bought.
First we need an aggregator:
public class BoughtItemsAggregator implements InvocableMap.EntryAggregator {
    @Override
    public Set<String> aggregate(Set set) {
        Set<String> boughtItems = new HashSet<>();

        for (InvocableMap.Entry orderEntry : (Set<InvocableMap.Entry>)set) {
            Order order = (Order) orderEntry.getValue();
            boughtItems.addAll(order.getItems());
        }

        return boughtItems;
    }
}
BoughtItemsAggregator simply collects all items in the order in a single set and returns it. As for the filter we need there is a predefined filter which does nothing but selects all the entries: AlwaysFilter.
The code using aggregator remains almost the same:
Filter allOrdersFilter = AlwaysFilter.INSTANCE;
InvocableMap.EntryAggregator boughtItemsAggregator = new BoughtItemsAggregator();
Set<String> boughtItems = (Set<String>) orders.aggregate(allOrdersFilter, boughtItemsAggregator);
Note! You shouldn't change content of the entry in aggregate method, these changes can be not applied to the existing cache entries.

This is all you should know to start using aggregation for your need.
But what if we want to make some changes on data in a cache.

Processing

Usually when working with Map we ask for some data on the client and process it and store back. But during processing period somebody else can change data in the cache. To prevent this we can write synchronization on client, which will work if we are the only client. Also there is a locking API in Coherence. But it can be not so efficient and needs to manually moderate access, plus there is still remains problem of transferring data to client and back. To make entry update operations more efficient and guarantee that data will remain in consistent state invoke operations where introduced.
Lets consider what cat do these methods. Method invoke gives ability to process single entry under specified key. For invokeAll pair of methods semantic is the same as for aggregate ones, but instead of EntryAggregator they expect to receive EntryProcessor.

I.e. Suppose we decided to make discount program and give 5% free for all orders, total prize of which is bigger then $25. For this we need to go through orders and update their prices.
DiscountProcessor does all this staff:
public class DiscountProcessor implements InvocableMap.EntryProcessor {
    private static final double DISCOUNT_LIMIT = 25.0;
    private static final double DISCOUNT = 0.05; // 5%

    @Override
    public Double process(InvocableMap.Entry entry) {
        Order order = (Order) entry.getValue();
        if(order.getPrize() >= DISCOUNT_LIMIT) {
            double newPrize = order.getPrize() * (1 - DISCOUNT);
            order.setPrize(newPrize);
        }
        return order.getPrize();
    }

    @Override
    public Map processAll(Set setEntries) {
        Map mapResults = new HashMap();
        for (InvocableMap.Entry entry : (Set<InvocableMap.Entry>) setEntries) {
            mapResults.put(entry.getKey(), process(entry));
        }
        return mapResults;
    }
}
As the result of processing we will get Map with new prices corresponded to order keys.
Call of our processor:
Filter allOrdersFilter = AlwaysFilter.INSTANCE;
InvocableMap.EntryProcessor discountProcessor = new DiscountProcessor();
Map<Long, Double> afterDiscountPrices = orders.invokeAll(allOrdersFilter, discountProcessor);
Benefit of using EntryProcessor is that every change done within its methods is atomic and be visible to all the cluster after processor finishes execution.

But as you've probably asked yourself there is already such processor implemented for us: NumberMultiplier. But it simply changes data, so we need to filter it before with GreaterEqualsFilter.
Filter allOrdersFilter = new GreaterEqualsFilter("getPrize", 25.0);
InvocableMap.EntryProcessor discountProcessor = new NumberMultiplier("Prize", 1 - 0.1, false);
Map<Long, Double> afterDiscountPrices = orders.invokeAll(allOrdersFilter, discountProcessor);
The only difference is that in this case we will receive only prices that have been changed.

As for aggregators there are predefined processors which can be found in com.tangosol.util.processor package.

Summary

So with API provided with InvocableMap we can easily manipulate huge amount of data in-place without paying for data transfer to client and back. Aggregation provides efficient processing of data and invocation used for safe data manipulation.

All code from the post is available at my GitHub page.

1 comment:

  1. Thanks for very important points about oracle.it really help us in somewhere.keep posting in the future.order processing

    ReplyDelete