Friday, May 31, 2013

Accessing method of private member with reflection

Basically there is nothing hard to access methods with reflection. It gives opportunities to get from class and its instances everything. But recently I found out some strange behavior which I didn't expect.
The problem is that you cannot access methods introduced with public interface contract but declared in private scope, unless you call setAccessible method.

Lets look at one simple example.
We have widely used interface java.util.List and it describes contract that every class implementing this interface with have public method size(). So when in somewhere in the code we have an instance of a class implementing List interface we can easily call to size and get what we expect.
But everything changes when reflection comes. I thought that usually we can access every method or other class element with reflection in case if it is visible for static code from the same scope.
Suppose we have such decleration:
List<String> names = new ArrayList<>();
names.add("Anna");
names.add("Nick");
names.add("Pedro");
names.add("Michael");
So we can do statically this:
int size = names.size();
System.out.println(size);
And it will compile and run printing 4.
We can replace this code with reflection based one:
Method sizeMethod = names.getClass().getMethod("size");
int reflectedSize = (int) sizeMethod.invoke(names);
System.out.println(reflectedSize);
Result will be the same as previously.
But there is another approach to create names.
List<String> names2 = Arrays.asList("Anna", "Nick", "Pedro", "Michael");
From the first view nothing changed. But if we try to get size with reflection again we will get this:
java.lang.IllegalAccessException: Class reflect.PublicContractMethodAccessTest can not access a member of class java.util.Arrays$ArrayList with modifiers "public"
	at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:98)
	at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:285)
	at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:277)
	at java.lang.reflect.Method.invoke(Method.java:480)
Hey, we didn't change anything! Why does this happen?
The reason is that we access to class in Runtime with getClass() method, but it is private, so we cannot get it's methods:
private static class ArrayList<E> extends AbstractList<E> implements RandomAccess, java.io.Serializable

Public class in Runtime

To get away from this exception, we need to access class which is accessible for us in Runtime. The most common are List and Collection.
This time our code will work:
Class<List> listClass = List.class;
Method sizeMethod = listClass.getMethod("size");
int reflectedSize = (int) sizeMethod.invoke(names2);
System.out.println(reflectedSize); 
But if we know the type, why don't call method directly?

Forcing Reflection to work

So we can make reflection work by forcing access to the class with setAccessible method:
Method sizeMethod = names2.getClass().getMethod("size");
sizeMethod.setAccessible(true);
int reflectedSize = (int) sizeMethod.invoke(names2);
System.out.println(reflectedSize);
Note! Be careful with this. In case if security enabled on running JVM this method can lead to SecurityException.

Conclusion

In case if you work with reflection be careful with such specifics in Java.
Use public classes to access methods which will be accessible in Runtime or force reflection to get to method with setAccessible.

All the code is available at GitHub.

Thursday, May 23, 2013

Aggregation and processing of data with Oracle Coherence

There is a great product as Oracle Coherence which helps developers write applications easier. According to Wikipedia:
Oracle Coherence is a Java-based in-memory data grid designed to improve reliability, scalability and performance compared to traditional relational database management systems.
In other words Coherence is such a big cache which consists of multiple JVMs.

Coherence has two different underlying schemas of data storing: distributed and replicated.
It is obvious that for the first one data is separated between multiple nodes so every instance has its own pack of data to hold. In case of second configuration we will receive more reliable system where every data entry is stored in multiple places and won't lead to data loose in case of some node fails.

But this post is not about it. It's about provided methods of working with data stored in data grid.
For cache clients Coherence looks like extended Map with a pack of additional methods gathered under interface NamedMap. Every method pack is combined in the appropriate interface extended by the root one: CacheMap, ConcurrentMap, InvocableMap, ObservableMap and QueryMap. Here we will look at InvocableMap interface and methods it provides.
It comes with two main functionalities with methods aggregate and invokeAll.
Main feature of them are that they make possible to work with data at place it is stored: on the cache nodes and without need to transmit them to client. 

Aggregation

First two methods we will consider are aggregate. They differs only in signature which determines the way of selection data against which passed aggregator will be executed.
With aggregate method we can process some calculation based on a set of data in-place.

I.e. We have client orders in the cache and want to calculate average price of a bill of a particular client. Of course we can do it by fetching every client order and making calculations on the client side. But also we can use aggregate method and pass Filter with EntryAggregator, which will select only data we are interested in. For this EqualsFilter perfectly feats. It filters data by equality to passed value.
Also we use predefined DoubleAverage aggregator which do exactly we need:
Filter selectByClientIdFilter = new EqualsFilter("getClient", client); 
EntryAggregator averageBillAggregator = new DoubleAverage("getPrize");
Double averageBillPrice = (Double) orders.aggregate(selectByClientIdFilter, averageBillAggregator);
EqualsFilter and DoubleAverage receive method to access data they need.

With these few lines we've managed to solve problem which usually needs more boilerplate code and in most cases will works slower because of processing data in a single thread on a client side.

There is a big pack of already existing aggregators and they can be found in package com.tangosol.util.aggregator.

It's all cool but what if we want to do some more difficult operation?
All aggregators implement common interface EntryAggregator with a single method receiving Set of cache entries(InvocableMap.Entry) and returning aggregated result. All data manipulation should be done in this method.
So now we will write our aggregator which will implement EntryAggregator. This time we want to get all items ever been bought.
First we need an aggregator:
public class BoughtItemsAggregator implements InvocableMap.EntryAggregator {
    @Override
    public Set<String> aggregate(Set set) {
        Set<String> boughtItems = new HashSet<>();

        for (InvocableMap.Entry orderEntry : (Set<InvocableMap.Entry>)set) {
            Order order = (Order) orderEntry.getValue();
            boughtItems.addAll(order.getItems());
        }

        return boughtItems;
    }
}
BoughtItemsAggregator simply collects all items in the order in a single set and returns it. As for the filter we need there is a predefined filter which does nothing but selects all the entries: AlwaysFilter.
The code using aggregator remains almost the same:
Filter allOrdersFilter = AlwaysFilter.INSTANCE;
InvocableMap.EntryAggregator boughtItemsAggregator = new BoughtItemsAggregator();
Set<String> boughtItems = (Set<String>) orders.aggregate(allOrdersFilter, boughtItemsAggregator);
Note! You shouldn't change content of the entry in aggregate method, these changes can be not applied to the existing cache entries.

This is all you should know to start using aggregation for your need.
But what if we want to make some changes on data in a cache.

Processing

Usually when working with Map we ask for some data on the client and process it and store back. But during processing period somebody else can change data in the cache. To prevent this we can write synchronization on client, which will work if we are the only client. Also there is a locking API in Coherence. But it can be not so efficient and needs to manually moderate access, plus there is still remains problem of transferring data to client and back. To make entry update operations more efficient and guarantee that data will remain in consistent state invoke operations where introduced.
Lets consider what cat do these methods. Method invoke gives ability to process single entry under specified key. For invokeAll pair of methods semantic is the same as for aggregate ones, but instead of EntryAggregator they expect to receive EntryProcessor.

I.e. Suppose we decided to make discount program and give 5% free for all orders, total prize of which is bigger then $25. For this we need to go through orders and update their prices.
DiscountProcessor does all this staff:
public class DiscountProcessor implements InvocableMap.EntryProcessor {
    private static final double DISCOUNT_LIMIT = 25.0;
    private static final double DISCOUNT = 0.05; // 5%

    @Override
    public Double process(InvocableMap.Entry entry) {
        Order order = (Order) entry.getValue();
        if(order.getPrize() >= DISCOUNT_LIMIT) {
            double newPrize = order.getPrize() * (1 - DISCOUNT);
            order.setPrize(newPrize);
        }
        return order.getPrize();
    }

    @Override
    public Map processAll(Set setEntries) {
        Map mapResults = new HashMap();
        for (InvocableMap.Entry entry : (Set<InvocableMap.Entry>) setEntries) {
            mapResults.put(entry.getKey(), process(entry));
        }
        return mapResults;
    }
}
As the result of processing we will get Map with new prices corresponded to order keys.
Call of our processor:
Filter allOrdersFilter = AlwaysFilter.INSTANCE;
InvocableMap.EntryProcessor discountProcessor = new DiscountProcessor();
Map<Long, Double> afterDiscountPrices = orders.invokeAll(allOrdersFilter, discountProcessor);
Benefit of using EntryProcessor is that every change done within its methods is atomic and be visible to all the cluster after processor finishes execution.

But as you've probably asked yourself there is already such processor implemented for us: NumberMultiplier. But it simply changes data, so we need to filter it before with GreaterEqualsFilter.
Filter allOrdersFilter = new GreaterEqualsFilter("getPrize", 25.0);
InvocableMap.EntryProcessor discountProcessor = new NumberMultiplier("Prize", 1 - 0.1, false);
Map<Long, Double> afterDiscountPrices = orders.invokeAll(allOrdersFilter, discountProcessor);
The only difference is that in this case we will receive only prices that have been changed.

As for aggregators there are predefined processors which can be found in com.tangosol.util.processor package.

Summary

So with API provided with InvocableMap we can easily manipulate huge amount of data in-place without paying for data transfer to client and back. Aggregation provides efficient processing of data and invocation used for safe data manipulation.

All code from the post is available at my GitHub page.