Saturday, July 6, 2013

Test your equals methods

I found out one simple, clever and so obvious thing I couldn't imaging. It is testing equals methods. You can say: what's the point? Why should I do it? It contains simple code, which in most cases is generated by you IDE. And you are right.
But there is another point we should deal with. In Java there are few rules that must be obeyed by every developer: objects must be reflexive, symmetric and transitive.
Lets take a look at every of them.

 

Reflexivity

Any instance must be equal to itself:
assertTrue(x.equals(x));

Symmetry

If we have two instances and the first one instance is equal to the second one, then this equality must be also valid in opposite direction:
assertTrue(x.equals(y));
assertTrue(y.equals(x)); 

Transitivity

In case if there three instances, where:
assertTrue(x.equals(y));
assertTrue(y.equals(z));
the last pair must also be equal:
assertTrue(x.equals(z));

And more

And also don't forget about:
  • hashCode and equals relation, which says that if two instances of a class are equals, then they have to have same hashCode value:
    assertTrue(x.equals(y));
    assertEquals(x.hashCode(), y.hashCode());
    But in case if two hashcodes are equal, instances don't need to be equal as well.
  • equals method must always return the same value for every call on two instances unless one of them has been changed.
This note is related to presentation by Matt Stine: Effective Java Reloaded.

Friday, May 31, 2013

Accessing method of private member with reflection

Basically there is nothing hard to access methods with reflection. It gives opportunities to get from class and its instances everything. But recently I found out some strange behavior which I didn't expect.
The problem is that you cannot access methods introduced with public interface contract but declared in private scope, unless you call setAccessible method.

Lets look at one simple example.
We have widely used interface java.util.List and it describes contract that every class implementing this interface with have public method size(). So when in somewhere in the code we have an instance of a class implementing List interface we can easily call to size and get what we expect.
But everything changes when reflection comes. I thought that usually we can access every method or other class element with reflection in case if it is visible for static code from the same scope.
Suppose we have such decleration:
List<String> names = new ArrayList<>();
names.add("Anna");
names.add("Nick");
names.add("Pedro");
names.add("Michael");
So we can do statically this:
int size = names.size();
System.out.println(size);
And it will compile and run printing 4.
We can replace this code with reflection based one:
Method sizeMethod = names.getClass().getMethod("size");
int reflectedSize = (int) sizeMethod.invoke(names);
System.out.println(reflectedSize);
Result will be the same as previously.
But there is another approach to create names.
List<String> names2 = Arrays.asList("Anna", "Nick", "Pedro", "Michael");
From the first view nothing changed. But if we try to get size with reflection again we will get this:
java.lang.IllegalAccessException: Class reflect.PublicContractMethodAccessTest can not access a member of class java.util.Arrays$ArrayList with modifiers "public"
	at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:98)
	at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:285)
	at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:277)
	at java.lang.reflect.Method.invoke(Method.java:480)
Hey, we didn't change anything! Why does this happen?
The reason is that we access to class in Runtime with getClass() method, but it is private, so we cannot get it's methods:
private static class ArrayList<E> extends AbstractList<E> implements RandomAccess, java.io.Serializable

Public class in Runtime

To get away from this exception, we need to access class which is accessible for us in Runtime. The most common are List and Collection.
This time our code will work:
Class<List> listClass = List.class;
Method sizeMethod = listClass.getMethod("size");
int reflectedSize = (int) sizeMethod.invoke(names2);
System.out.println(reflectedSize); 
But if we know the type, why don't call method directly?

Forcing Reflection to work

So we can make reflection work by forcing access to the class with setAccessible method:
Method sizeMethod = names2.getClass().getMethod("size");
sizeMethod.setAccessible(true);
int reflectedSize = (int) sizeMethod.invoke(names2);
System.out.println(reflectedSize);
Note! Be careful with this. In case if security enabled on running JVM this method can lead to SecurityException.

Conclusion

In case if you work with reflection be careful with such specifics in Java.
Use public classes to access methods which will be accessible in Runtime or force reflection to get to method with setAccessible.

All the code is available at GitHub.

Thursday, May 23, 2013

Aggregation and processing of data with Oracle Coherence

There is a great product as Oracle Coherence which helps developers write applications easier. According to Wikipedia:
Oracle Coherence is a Java-based in-memory data grid designed to improve reliability, scalability and performance compared to traditional relational database management systems.
In other words Coherence is such a big cache which consists of multiple JVMs.

Coherence has two different underlying schemas of data storing: distributed and replicated.
It is obvious that for the first one data is separated between multiple nodes so every instance has its own pack of data to hold. In case of second configuration we will receive more reliable system where every data entry is stored in multiple places and won't lead to data loose in case of some node fails.

But this post is not about it. It's about provided methods of working with data stored in data grid.
For cache clients Coherence looks like extended Map with a pack of additional methods gathered under interface NamedMap. Every method pack is combined in the appropriate interface extended by the root one: CacheMap, ConcurrentMap, InvocableMap, ObservableMap and QueryMap. Here we will look at InvocableMap interface and methods it provides.
It comes with two main functionalities with methods aggregate and invokeAll.
Main feature of them are that they make possible to work with data at place it is stored: on the cache nodes and without need to transmit them to client. 

Aggregation

First two methods we will consider are aggregate. They differs only in signature which determines the way of selection data against which passed aggregator will be executed.
With aggregate method we can process some calculation based on a set of data in-place.

I.e. We have client orders in the cache and want to calculate average price of a bill of a particular client. Of course we can do it by fetching every client order and making calculations on the client side. But also we can use aggregate method and pass Filter with EntryAggregator, which will select only data we are interested in. For this EqualsFilter perfectly feats. It filters data by equality to passed value.
Also we use predefined DoubleAverage aggregator which do exactly we need:
Filter selectByClientIdFilter = new EqualsFilter("getClient", client); 
EntryAggregator averageBillAggregator = new DoubleAverage("getPrize");
Double averageBillPrice = (Double) orders.aggregate(selectByClientIdFilter, averageBillAggregator);
EqualsFilter and DoubleAverage receive method to access data they need.

With these few lines we've managed to solve problem which usually needs more boilerplate code and in most cases will works slower because of processing data in a single thread on a client side.

There is a big pack of already existing aggregators and they can be found in package com.tangosol.util.aggregator.

It's all cool but what if we want to do some more difficult operation?
All aggregators implement common interface EntryAggregator with a single method receiving Set of cache entries(InvocableMap.Entry) and returning aggregated result. All data manipulation should be done in this method.
So now we will write our aggregator which will implement EntryAggregator. This time we want to get all items ever been bought.
First we need an aggregator:
public class BoughtItemsAggregator implements InvocableMap.EntryAggregator {
    @Override
    public Set<String> aggregate(Set set) {
        Set<String> boughtItems = new HashSet<>();

        for (InvocableMap.Entry orderEntry : (Set<InvocableMap.Entry>)set) {
            Order order = (Order) orderEntry.getValue();
            boughtItems.addAll(order.getItems());
        }

        return boughtItems;
    }
}
BoughtItemsAggregator simply collects all items in the order in a single set and returns it. As for the filter we need there is a predefined filter which does nothing but selects all the entries: AlwaysFilter.
The code using aggregator remains almost the same:
Filter allOrdersFilter = AlwaysFilter.INSTANCE;
InvocableMap.EntryAggregator boughtItemsAggregator = new BoughtItemsAggregator();
Set<String> boughtItems = (Set<String>) orders.aggregate(allOrdersFilter, boughtItemsAggregator);
Note! You shouldn't change content of the entry in aggregate method, these changes can be not applied to the existing cache entries.

This is all you should know to start using aggregation for your need.
But what if we want to make some changes on data in a cache.

Processing

Usually when working with Map we ask for some data on the client and process it and store back. But during processing period somebody else can change data in the cache. To prevent this we can write synchronization on client, which will work if we are the only client. Also there is a locking API in Coherence. But it can be not so efficient and needs to manually moderate access, plus there is still remains problem of transferring data to client and back. To make entry update operations more efficient and guarantee that data will remain in consistent state invoke operations where introduced.
Lets consider what cat do these methods. Method invoke gives ability to process single entry under specified key. For invokeAll pair of methods semantic is the same as for aggregate ones, but instead of EntryAggregator they expect to receive EntryProcessor.

I.e. Suppose we decided to make discount program and give 5% free for all orders, total prize of which is bigger then $25. For this we need to go through orders and update their prices.
DiscountProcessor does all this staff:
public class DiscountProcessor implements InvocableMap.EntryProcessor {
    private static final double DISCOUNT_LIMIT = 25.0;
    private static final double DISCOUNT = 0.05; // 5%

    @Override
    public Double process(InvocableMap.Entry entry) {
        Order order = (Order) entry.getValue();
        if(order.getPrize() >= DISCOUNT_LIMIT) {
            double newPrize = order.getPrize() * (1 - DISCOUNT);
            order.setPrize(newPrize);
        }
        return order.getPrize();
    }

    @Override
    public Map processAll(Set setEntries) {
        Map mapResults = new HashMap();
        for (InvocableMap.Entry entry : (Set<InvocableMap.Entry>) setEntries) {
            mapResults.put(entry.getKey(), process(entry));
        }
        return mapResults;
    }
}
As the result of processing we will get Map with new prices corresponded to order keys.
Call of our processor:
Filter allOrdersFilter = AlwaysFilter.INSTANCE;
InvocableMap.EntryProcessor discountProcessor = new DiscountProcessor();
Map<Long, Double> afterDiscountPrices = orders.invokeAll(allOrdersFilter, discountProcessor);
Benefit of using EntryProcessor is that every change done within its methods is atomic and be visible to all the cluster after processor finishes execution.

But as you've probably asked yourself there is already such processor implemented for us: NumberMultiplier. But it simply changes data, so we need to filter it before with GreaterEqualsFilter.
Filter allOrdersFilter = new GreaterEqualsFilter("getPrize", 25.0);
InvocableMap.EntryProcessor discountProcessor = new NumberMultiplier("Prize", 1 - 0.1, false);
Map<Long, Double> afterDiscountPrices = orders.invokeAll(allOrdersFilter, discountProcessor);
The only difference is that in this case we will receive only prices that have been changed.

As for aggregators there are predefined processors which can be found in com.tangosol.util.processor package.

Summary

So with API provided with InvocableMap we can easily manipulate huge amount of data in-place without paying for data transfer to client and back. Aggregation provides efficient processing of data and invocation used for safe data manipulation.

All code from the post is available at my GitHub page.

Monday, February 11, 2013

Aggregation framework in MongoDB.

Aggregation framework was added to MongoDB version 2.2 to give functionality of aggregation of data in the collection and its processing.
Basic syntax looks like this:
  db.<collection name>.aggregate([...])
where <collection name> – a placeholder for your collection,
[...] – pipelines of operators, described below.

There are 7 different aggregation stages:
  • $project – select keys we interested in, reshape the document.
  • $match – filter document.
  • $group – aggregate documents by certain keys.
  • $sort – sort documents.
  • $skip – skip number of documents.
  • $limit – limit result to specific number of documents.
  • $unwind – unjoin data, produces docs for every element in unwinding array in a document.
All stages are executes one by one and as an input receives result of executing of previous stage, and it's result is passed to next one stage.

$group

Base syntax:
$group: {
  _id: "$<group_by_field_name>",
  <operator_result_name>: {
    <operator>: "$<field name>"
  }
} 

_id points on a key grouping should be performed by. If we need to group by few fields as a value for _id should be passed document containing all needed fields:
_id: {
  <group_key1>: "$<group_by_field_name1>",
  <group_key2>: "$<group_by_field_name2>"
}

There are many operators acceptable to group stage. Result of executing every of them is assigned to operator_result_name. Here some of them:
  • $sum – sum by the specified key field or simple value.
  • Sum values of field:
    $sum: "$<field>"
    
    Add 1 for every document in a group:
    $sum: 1
    
  • $avg – average value:
  • $avg: "$<field>"
    
  • $min/$max – minimum/maximum value in a group:
  • $min: "$<field>"
    
  • $push – add every value in a group to array. Produces array of values with duplicates. To get array of unique values next operator used.
  • $addToSet – add every value in a group to a set of unique elements.
  • $first/$last – the first/last of the values in a group. Have sense to use after $sort stage.

    $project

    Reshape input document:
    • remove keys
    • add new keys
    • reshape keys
    • use simple functions on keys. Operators acceptable for $group stage can be used here:
      • $toUpper – translate field value to lower case:
        <field name>: { $toUpper: "$<field>" }
      • $toLower – translate field value to upper case
      • $add – add to field value. For example add 10 to existing field and write result to new field (source and result field names can be same):
        <field name>: { $add: ["$<field>", 10] }
      • ...
    Base syntax:
    db.<collection>.aggregate([
      { 
        $project: {
          <new_field>: <new_value>,
          <field>: { $toLower: "$<field>" },
          <field_to_exclude>: 0,
          <field_to_pass_through>: 1,
          <field_new_name>: <field_old_name>,
          <doc_field>: {
            <new_field>: "$<field>",
            <new_field>: { $multiply: ["$<field>", <value>] }
          },
          ...
        }
      }
    ])
    
    To exclude field from the projection simply do:
    <field_to_exclude>: 0
    
    To include field as is:
    <field_to_pass>: 1
    
    By default all fields but _id are excluded.

    $match

    Return only documents which satisfy matching expression.
    Performs as a filter. Has reducing effect.
    { $match: { <field>: <looking value> } }
    

    $sort

    Sorts documents in a specified order by a set of fields.
    (!) Can be a real memory hug.
    { $sort: { <field>: <order> } }
    
    Order element can have values:
    • 1 – ASC/ascending order,
    • -1 – DESC/descending order.

    $skip and $limit

    Performs as usual skip and limit functions in MongoDB: skips/limits output to first n documents.
    Not very useful without $sort stage.
    { $skip: <skip value> }
    
    { $limit: <limit value> }

    $unwind

    Unwinds arrays in the document on a couple of new documents with only one value for array field instead of a whole array.
    Example:
    { a:1, b:2, c:['c1', 'c2', 'c3'] }
    
    Unwinding of field c:
    { $unwind: "$c" }
    
    will result into three separate documents:
    { a:1, b:2, c: "c1" },
    { a:1, b:2, c: "c2" },
    { a:1, b:2, c: "c3" }
    

    Conclusion

    Aggregation Framework is a powerful feature of MongoDB but it has to be used with wisdom. It gives almost all functionality as SQL has, but has several limitations except binding to one collection:
    • result set limited to 16MB of memory;
    • cannot use more then 10% of memory on a machine;
    • on sharding environment after first $group or $sort data should be collected on mongos.
    SQL to Aggregation Framework:

    WHERE
    GROUP BY
    HAVING
    SELECT
    ORDER BY
    LIMIT
    SUM()
    COUNT()
    JOIN

    More at Aggregation Framework Reference.