Elasticsearch as a Framework

Or how Crate uses Elasticsearch.

Most people who know Elasticsearch think of it as a search engine, and they’re
probably correct. But we at Crate think about it a bit
differently and use it as a framework.

In this post I’ll try to explain how that works.

A short Elasticsearch intro

Elasticsearch is a clustered
search engine. One can put documents into it and then run queries to find these
documents and retrieve them. Being clustered means that it can run on one or
more machines and the documents that are stored in Elasticsearch will be
distributed among those machines.

There are two forms to communicate with it. Either via HTTP or via a Java
client which uses something called transport protocol.

This transport protocol is also used for the communication between the
machines within a cluster.

The indexing and search capabilities are powered by
Lucene. A “high performance, full-featured
Information Retrieval library”.

In a nutshell:

  • Elasticsearch does clustering (including all the tricky stuff that sane
    people don’t want to worry about: Discovery, master election, replication,
    dealing with net splits and race conditions)

  • Lucene does search and indexing (which Elasticsearch distributes among
    multiple machines)

A short Crate intro

Crate is a distributed SQL database that leverages Elasticsearch and Lucene.
In it’s infant days it parsed SQL statements and translated them into
Elasticsearch queries. It was basically a layer on top of Elasticsearch.

If you wrote something like

select * from users

It would be translated into (roughly)

POST /users/default/_search -d '{"query": {"match_all": {}}}'

Those were the early days. Since then it has evolved a lot. It got it’s own
execution engine with it’s own DSL. Internally the same statement from before
is now turned into something like this:

"fetchPhase": {
    "executionNodes": ["TLMh2zg0SRiC79mYxZj5uw"],
    "phaseType": "FETCH",
    "fetchRefs": ["doc.users._doc['name']"],
    "id": 1
},
"planType": "QueryThenFetch",
"subPlan": {
    "planType": "CollectAndMerge",
    "collectPhase": {
        "projections": [...],
        "phaseType": "COLLECT",
        "toCollect": ["doc.users._docid"],
        "id": 0,
        "distribution": {"distributedByColumn": 0, "type": "BROADCAST"},
        "routing": {
            "TLMh2zg0SRiC79mYxZj5uw": {
                "users": [ 0, 1, 2, 3, 4 ]
            }
        }
    }
},
"localMerge": {
    "projections": [...],
    "executionNodes": [],
    "phaseType": "MERGE",
    "id": 2
}

(Don’t worry if that doesn’t make sense to you. You may worry if it does)

This was done to implement features in Crate that do not exist in Elasticsearch.

But this whole SQL execution engine also uses Elasticsearch in some way. I’ll
try to explain how.

Elasticsearch as a web framework

Since many developers are familiar with web frameworks I’ll try to go with
that. It actually fits with how parts of Elasticsearch work quite well.

In a web framework one can usually register a route to a handler. Like, if a
browser points to /foo/bar, then FooBarHandler.get should be called.

The get implementation of the FooBarHandler class may receives some sort of
request object, then some business logic is executed and finally a response is
generated to be returned by the get function.

Meanwhile the web framework does all the magic to produce the request object,
call the appropriate function and deliver the response object in a sensible
form back over the wire to the client.

Elasticsearch includes it’s on HTTP server and is in a sense also a web
framework. Or rather: It does what web frameworks do.

Back to the example from earlier:

POST /users/default/_search -d '{"query": {"match_all": {}}}'

A HTTP request like that causes a handler registered to
/{index}/{type}/_search to be fired.
(RestSearchAction
to be specific – in case you want to take a look at the source).

In Elasticsearch HTTP requests are always translated into their transport
request equivalents. And the purpose of this handler is to do that.

In this case it becomes a SearchRequest. The same kind of SearchRequest a
user of the Java API would create to execute the same query.

This SearchRequest is then sent to a TransportRequestHandler that is
registered under a name. (The name being similar to a route like /foo/bar in
the example before)

Those TransportRequestHandler are mostly part of a class called
TransportAction which contains the actual logic on how to process a
request.

To sum up:

  • Elasticsearch has two network services: HTTP (9200) and Transport (9300).
  • Internally routes/URLs are mapped to handlers just like in web frameworks
    (E.g. /url/x/y maps to handleRequest in class XY).
  • HTTP requests are converted to transport requests.
  • Transport requests are sent to RequestHandler classes registered under a
    name .
  • TransportAction does the actual work. (Like making more requests to
    other nodes to gather data)

Transport protocol

The transport protocol is the binary protocol used to send objects between
nodes in a Elasticsearch (or Crate) cluster.

Most web APIs nowadays accpet requests containing JSON payloads. A client can
send any JSON document as long as it is valid JSON. It doesn’t matter what keys
or values it contains. The server will be able to read the whole request and
parse the JSON. If it can then do anything useful with that payload is another
matter.

With the transport protocol things are a bit different. Requests and responses
are kind of static. The fields and their types have to be pre-defined. A
TransportAction can only ever receive one type of request.

This has the advantage that it is faster because the requests and responses
don’t have to include type and length information or field names.

(If you want to see some real code, take a look at the readFrom and writeTo
implementations of the
SQLRequest. Notice how both methods match in what they do)

This transport infastructure is one huge part of Elasticsearch that Crate uses
heavily.

But there is more. If it was just for the communication we could’ve rolled with
Netty and something like Google Protobuf. (The Transport
service is based on Netty)

A cluster: Routing, shard allocation, replication and recovery

Elasticsearch provides us with a cluster. And this isn’t just about discovering
other machines and then connecting them with each other. This includes much
more.

Shard allocation and Routing

In Elasticsearch documents are stored within an index, or rather within a shard
that is part of an index.

One Elasticsearch index consists of one or more shards. How many is defined
when an index is created.

The terminology can be a bit confusing as there are two kinds of indices. It
might refer to a index that has multiple shards, or it might refer to a Lucene
index. A Lucene index being a shard. Yes that’s right: A index consists of
(Lucene) indices.

A shard is the smallest unit that can be distributed among nodes. It cannot be
further split for distribution. If there are two nodes in a cluster and an
index has only 1 shard then one node will stay empty.

If you’re not confused enough already let’s make it a bit more complicated:

In Crate the terminology is table and shards. A shard is the same thing as in
Elasticsearch, but a table can be 1 or more Elasticsearch indices:

table > (ES) index > shard (lucene index)

Why bring this up? Because Crate uses Elasticsearch to do the shard allocation.
Elasticsearch services decide which shard should reside on which node.

Crate uses the information provided by Elasticsearch in order to make requests
to the correct nodes.

Remember the Crate execution DSL from before? Especially this part:

"routing": {
    "TLMh2zg0SRiC79mYxZj5uw": {
        "users": [ 0, 1, 2, 3, 4 ]
    }
}

It tells us that shards 0-4 of the users table are on a node with the cryptic
id TLMh2zg0SRiC79mYxZj5uw.

Replication and Recovery

Elasticsearch has replication and recovery built in, and so does Crate.

Replication means that if a document is inserted into an table/index that has a
replica configured, the document will be put into 1 shard and a copy will be
put into a replica of that 1 shard.

This replica is by default on another machine and cannot be allocated on the
same machine as the primary shard. If one machine goes poof the data is still
available.

Crate hooks into the replication/recovery mechanism to also support replication
& recovery for it’s blob tables. (A special form of storage for binary objects
that isn’t based on Elasticsearch or Lucene)


Original URL: http://feedproxy.google.com/~r/feedsapi/BwPx/~3/fRgd9_1ohws/

Original article

Comments are closed.

Proudly powered by WordPress | Theme: Baskerville 2 by Anders Noren.

Up ↑

%d bloggers like this: