Apache Ignite 1.7.0 has been rolled out recently and among the new changes you can find a killer one that was being awaited by many Apache Ignite users and customers for a long time - the Non-Collocated Distributed joins support for SQL queries. So this post will be fully dedicated to this feature, and I'll try to shed some light on how the non-collocated distributed joins work and how they are different from the traditional (affinity collocation based) joins available in Apache Ignite.
Historically, Apache Ignite allowed executing SQL queries with joins across different tables but it required collocating the data of the caches that are being joined in a query. In fact, in Ignite, collocation can be enabled easily by using the affinity key concept where the data of one business entity is stored on the same node where the other business entity resides.
For example, let's say you have two business entities - Organization and Person, and an Organization ID is used as an affinity key for Persons from that Organization. Then, Ignite will make sure to place all the Persons data on the same node where their Organization data resides. This simple concept allows executing a whole range of imaginable SQL queries that are ANSI-99 compliant, including joins between multiple caches.
Basically, the execution flow of a SQL query that uses a join is absolutely the same as that of a query without the latter.
Let's have a look at the flow of one of the basic queries using Organizations and Persons business entities defined in the following way:
- Organization(id, address) entity - where id is literally an Organization ID and its value will be used as a cache key at the time an Organization is put into the cache. The key that is used as a cache key is treated as a primary key at the Ignite SQL engine's layer. Keep this in mind till you get to the end of the blog post!
- Person(name, salary) entity - will be located in Persons cache, and as a cache key we will use AffinityKey(id, orgId) where AffinityKey is a special kind of object in Ignite that allows to define a Person's unique ID (the first parameter) as well as his affinity key (the second parameter). Here, Organization ID (orgId) has been chosen as Person's affinity key. This means that Persons will be located on the same node where their Organizations reside.
After defining these business entities and preloading caches with data, we are free to execute an SQL query like the one below. Since the Persons are affinity collocated with their Organization, we're guaranteed to receive a complete result set.
SELECT * FROM Organization as org JOIN Person as p ON org.id = p.orgId
The execution flow of this query, depicted on Picture 1 below, will be the following:
- The query initiating node (mapper & reducer) sends the query to all the nodes where cached data resides (Phase Q).
- All the nodes that receive the query from the reducer will execute it locally, performing the join using local data only (Phase E(Q)).
- The nodes respond to the reducer with their portion of the result set (Phases R1, R2 and R3).
- The reducer will eventually reduce the result sets received from all the remote nodes and provide a final aggregated result to your code (Phase R).
If the same query was executed on a non-affinity-collocated data, then you would get an incomplete and inconsistent result. The reason for that is that Apache Ignite’s versions earlier than 1.7.0 perform the query only on the local data (as described in step [2] of the flow above).
However this is no longer true in Apache Ignite 1.7.0 and later versions that provide support for the non-collocated distributed joins. These joins no longer force you to collocate your data.
- The query initiating node (mapper & reducer) sends the query to all the nodes where cached data resides (Phase Q).
- All the nodes that receive the query from the reducer will execute it locally (Phase E(Q)) performing the join using both the local data and the potential data requested from the remote nodes (Phase D(Q)).
- The nodes respond to the reducer with their portion of the result set (Phases R1, R2 and R3).
- The reducer will eventually reduce the result sets received from all the remote nodes and provide a final aggregated result to your code (Phases R).
SELECT * FROM Organization as org JOIN Person as p ON org._key = p.orgId
Add a comment