This is a major update in terms of search speed and recall improvement:
In previous versions top k1 matches were used for scoring to get final k2 nearest neighbours. I.E. if you request k1 = 100 and you have 6 shards, each shard would collect 100 matches, which is 600 matches in total, than client node will select 100 matches out of this 600 based on score returned by bool query and use only this subset for scoring. So 500 hits were just left behind unchecked.
Now plugin uses built-in script_scoring
in rescore
context, which means that EACH shard will score k1 hits before getting final k2 hits.
So in example above plugin will scan k1*n_shards = 600 hits.
Also, thanks to new
dense vector data type,
exact distance computation is now much faster, actually you can set k1 = 10000
(which means that for example above
plugin will compute distances for 60,000 vectors) and get result in same time as you'll get for k1=500
in previous
versions.
However this update have some drawbacks:
- You should store vectors using
dense_vector
data type mapping, so you'll need to reindex your data. - ES 7.3
script_score
supports onlydotProduct
andcosineSimilarity
for distance computation, sol2
metric is currently unavailable (l2norm
andl1norm
should be added with next update though) dense_vector
data type and vector distance metrics are experimental parts of X-Pack plugin, which means two things:- they could be changed or removed in future releases (hope not)
- they couldn't be used in Elasticsearch OSS (open-source) distributions, due to requirement for basic ES license. (it's free however).
Register new Elasticsearch ingest processor - aknn
, which can be used for indexing new documents using default ES API.
Using this plugin for indexing increases indexing speed and allow ES to automatically balance incoming index requests between nodes.
To register new pipeline with Python elasticsearch
API:
import elasticsearch
def put_pipeline(model_name, models_index='aknn_models',vec_field='_aknn_vector'):
# Get model created with _aknn_create
model = es.get(models_index, model_name)['_source']
description = model['_aknn_description']
pipeline_name = model_name
# Add 'aknn.' prefix to pipeline name for convenience
if not model_name.startswith('aknn.'):
pipeline_name = 'aknn.{}'.format(model_name)
ingest_body = {
"description": description,
"processors": [
{
"aknn": {
"field": vec_field,
"target_field": "_aknn_hashes",
'model': model,
}
}
]
}
response = es.ingest.put_pipeline(id=pipeline_name, body=ingest_body, request_timeout=60)
return response
To index documents:
# Register pipeline for model 'test_model'
put_pipeline('test_model')
# Create some doc for test:
doc = {
'_aknn_vector':[0.12,0.432,0.2342]
}
#Add document to index:
es.index(index='test_index',body=doc, pipeline='aknn.test_model')
Or you can simulate pipeline without actually indexing document:
docs = {
"docs":[
{"_source":{'_aknn_vector':[0.12,0.432,0.2342]}},
{"_source":{'_aknn_vector':[0.22,0.231,0.5321]}},
]
}
resp = es.ingest.simulate(id='aknn.test_model',body=docs)
Also you can use response of pipeline simulate to get vector hashes and use them for building request to ES without
using _aknn_search
or _aknn_search_vec
, i.e for building complicated bool queries using other fields.
-
_aknn_search_vec
- hybrid of original_aknn_search
and_aknn_index
endpoints: it accepts JSON with query vector, builds hashes for_aknn_vector
and using them for building ES query and returning hits just like original_aknn_search
endpoint.Usage example:
POST <elasticsearch host>:9200/_aknn_search_vec?rescore=true&debug=false&minimum_should_match=3&clear_cache=false { "_index": "twitter_images", "_type": "_doc", "_aknn_uri": "aknn_models/_doc/twitter_images" "query_aknn": { "_aknn_vector": [0.12, 0.23, ...], "k1":1000, "k2":100 }, "filter":{ "range":{ "post_date":{ "gte":"now-1d" } } } }
filter
in example above is used for filtering data before executing similarity search. -
_aknn_clear_cache
- clear LSH model cache on target node, usefull if you are using index name or other readable names as models_id
-
rescore (boolean) - switches on/off final L2 metric based scoring. In some cases improves search speed at 10x, while maintaining acceptable recall. (some tweaks to index mapping are required to preserve high recall)
-
debug (boolean) - keep original vectors and hashes if set to true, usefull for tinkering with metrics and scoring, also might be usefull for clustering query results.
-
minimum_should_match (integer) - changing corresponding ES bool query argument, might improve search speed by lowering number of hits ES should score. Default value - 1
-
metric (string) - Similarity metric. Available values: cosine, dot. Default value - dot
-
filter (string) - ES bool query filter as string. Is a string you would normaly put inside a filter clause, i.e if your filter looks like this:
"filter":{ "term": { "status": "published" } }
You should put
{ "term": { "status": "published" }}
in filter argument. -
n_probes - how many hashes buckets should be used for search. Should be no greater than
_aknn_nb_tables
value used for model creation. It's usefull for selecting optimal trade-off between response time and recall at query time.I.e. you can use larger number of hashes tables for indexing if disk space is not a concern for you and later select optimal
n_probes
for acceptable recall vs response time.
All original REST endpoints should work just like before, examples for new endpoints would be added later.
This fork was originally compiled and tested for ES 6.5.1, but it should work for es 6.4.1-6.5.4. So I have added precompiled plugins for all these ES versions. (6.4.0 should work too, but I just forget to compile it).
Added bash script for quick assembly using official gradle image
- Update README with examples of new endpoints.
- Add explanation of tweaks to mapping required to increase speed and recall.
- Add figures of benchmarks before and after tweaks.
- Figure out how to clear cached LSH model on all nodes at once.
- Try to refactor
_aknn_index
and_aknn_create
endpoints into ingest plugin, to allow using default ES APIs for indexing. - Implement cosine metric and dot product.
Original readme
Insight Data Engineering Project, Boston, April - June 2018
Image similarity search demo running searches on a cluster of 4 Elasticsearch nodes |
I built Elasticsearch-Aknn (EsAknn), an Elasticsearch plugin which implements approximate K-nearest-neighbors search for dense, floating-point vectors in Elasticsearch. This allows data engineers to avoid rebuilding an infrastructure for large-scale KNN and instead leverage Elasticsearch's proven distributed infrastructure.
To demonstrate the plugin, I used it to implement an image similarity search engine for a corpus of 6.7 million Twitter images. I transformed each image into a 1000-dimensional floating-point feature vector using a convolutional neural network. I used EsAknn to store the vectors and search for nearest neighbors on an Elasticsearch cluster.
The repository is structured:
demo
directory: Twitter Image Similarity search pipeline and web-applicationelasticsearch-aknn
directory: EsAknn implementation and benchmarksscratch
directory: several smaller projects implemented while prototyping
Web-application(Taken down at the end of the Insight program)- Screencast demo on Youtube
- Presentation on Google Slides
EsAknn is useful for problems roughly characterized:
- Have a large corpus of feature vectors with dimensionality ~50-1000.
- Need to run similarity searches using K-Nearest-Neighbors.
- Need to scale horizontally to support many concurrent similarity searches.
- Need to support a growing corpus with near-real-time insertions. I.e., when a new vector is created/ingested, it should be available for searching in less than 10 minutes.
Tldr: If you need to quickly run KNN on an extremely large corpus in an offline job, use one of the libraries from Ann-Benchmarks. If you need KNN in an online setting with support for horizontally-scalable searching and indexing new vectors in near-real-time, consider EsAknn (especially if you already use Elasticsearch).
There are about a dozen high-quality open-source approximate-nearest-neighbors libraries. The Ann-Benchmarks project is a great place to compare them. Most of them take a large corpus of vectors, build an index, and expose an interface to run very fast nearest-neighbors search on that fixed corpus.
Unfortunately they offer very little infrastructure for deploying your nearest-neighbors search in an online setting. Specifically, you still have to consider:
- Where do you store millions of vectors and the index?
- How do you handle many concurrent searches?
- How do you handle a growing corpus? See this issue on the lack of support for adding to an index.
- How do you distribute the index and make searches fault tolerant?
- Who manages all the infrastructure you've built for such a simple algorithm?
Elasticsearch already solves the non-trivial infrastrcture problems, and EsAknn implements approximate nearest-neighbors indexing and search atop this proven infrastructure.
EsAknn's LSH implementation is very simplistic in the grand scheme of approximate-nearest-neighbors approaches, but it maps well to Elasticsearch and still yields high recall. EsAknn's speed for serial queries is much slower than other approximate nearest-neighbor libraries, but it's also not designed for serial querying. Instead it's designed to serve many concurrent searches over a convenient HTTP endpoint, index new vectors in near-real-time, and scale horizontally with Elasticsearch. For specific performance numbers, see the performance section below and the slides linked in the demo section.
Given a sample of vectors, create a locality-sensitive-hashing (LSH) model and store it as an Elasticsearch document.
POST <elasticsearch host>:9200/_aknn_create
{
"_index": "aknn_models",
"_type": "aknn_model",
"_id": "twitter_image_search",
"_source": {
"_aknn_description": "LSH model for Twitter image similarity search",
"_aknn_nb_tables": 64,
"_aknn_nb_bits_per_table": 18,
"_aknn_nb_dimensions": 1000
},
"_aknn_vector_sample": [
# Provide a sample of 2 * _aknn_nb_tables * _aknn_nb_bits_per_table vectors
[0.11, 0.22, ...],
[0.22, 0.33, ...],
...
[0.88, 0.99, ...]
]
}
This returns:
{ "took": <number of milliseconds> }
Given a batch of new vectors, hash each vector using a pre-defined LSH model and store its raw and hashed values in an Elasticsearch document.
POST <elasticsearch host>:9200/_aknn_index
{
"_index": "twitter_images",
"_type": "twitter_image",
"_aknn_uri": "aknn_models/aknn_model/twitter_image_search"
"_aknn_docs": [
{
"_id": 1,
"_source": {
"_aknn_vector": [0.12, 0.23, ...],
# Any other fields you want...
}
}, ...
]
}
This returns:
{ "took": <number of milliseconds>, "size": <number of documents indexed> }
Given a vector in the index, search for and return its nearest neighbors.
GET <elasticsearch host>:9200/twitter_images/twitter_image/1/_aknn_search?k1=1000&k2=10
This returns:
{
"took": <number of milliseconds>,
"timed_out": false,
"hits": {
"max_score": 0,
"total": <number of hits returned, up to k2>,
"hits": [
{
"_id": "...",
'_index': "twitter_images",
"_score": <euclidean distance from query vector to this vector>,
'_source': {
# All of the document fields except for the potentially
# large fields containing the vector and hashes.
}
}, ...
]
}
}
The key things to know about the implementation are:
- EsAknn runs entirely in an existing Elasticsearch cluster/node. It operates effectively as a set of HTTP endpoint handlers and talks to Elasticsearch via the Java Client API.
- Searches can run in parallel. New vectors can be indexed on multiple nodes in parallel using a round-robin strategy. Parallel indexing on a single node has not been tested extensively.
- EsAknn uses Locality Sensitive Hashing to convert a floating-point vector into a discrete representation which can be efficiently indexed and retrieved in Elasticsearch.
- EsAknn stores the LSH models and the vectors as standard documents.
- EsAknn uses a Bool Query
to find
k1
approximate nearest neighbors based on discrete hashes. It then computes the exact distance to each of these approximate neighbors and returns thek2
closest. For example, you might setk1 = 1000
andk2 = 10
. - EsAknn currently only implements euclidean distance, but any distance function compatible with LSH can be added.
EsAknn's speed is generally characterized:
- Create a new LSH model: < 1 minute.
- Index new vectors: hundreds to low thousands per second.
- Search for a vector's neighbors: < 500 milliseconds. Search time scales sub-linearly with the size of the corpus.
The corpus vs. search time generally follows a sub-linear pattern like this:
Beyond that, speed is a function of:
- The vectors' dimensionality.
- The number of tables (a.k.a. hash functions or trees) in the LSH model.
- The number of bits in the LSH model's hashes.
- The number of approximate neighbors retrieved,
k1
. - The number of exact neighbors returned,
k2
.
In the image similarity search engine, you can see that searches against an index of 6.7 million 1000-dimensional vectors rarely exceed 200 milliseconds.
Recall is defined as the proportion of true nearest neighbors returned for
a search and can be evaluated at various values of k2
. For example, if
you know your application needs to retrieve the top ten most similar items,
you should evaluate recall at k2 = 10
.
Similar to speed, recall depends on the LSH configuration. Increasing k1
is typically the easiest way to increase recall, but the number of tables and
bits also play an important role. Finding a configuration to maximize
recall and minimize search time can be considered a form of hyper-parameter
optimization.
The figure below demonstrates that it is possible to find a configuration with high-recall and low search-time at various corpus sizes. The points plotted represent the "frontier" of recall/search-time. That is, I ran benchmarks on many configurations and chose the configurations with the lowest median search time for each median recall across three corpus sizes.
The table below shows the best configuration for each combination of corpus size, median recall, median search time with a median recall >= 0.5.
Corpus size | Med. recall | Med. search time | k1 | _aknn_nb_tables | _aknn_nb_bits_per_table | |
---|---|---|---|---|---|---|
0 | 1000000 | 1 | 191 | 500 | 200 | 12 |
1 | 1000000 | 0.9 | 100 | 500 | 100 | 14 |
2 | 1000000 | 0.8 | 62 | 1000 | 50 | 16 |
3 | 1000000 | 0.7 | 49 | 500 | 50 | 16 |
4 | 1000000 | 0.6 | 43 | 250 | 50 | 16 |
5 | 1000000 | 0.5 | 50 | 250 | 50 | 19 |
6 | 100000 | 1 | 26 | 250 | 100 | 12 |
7 | 100000 | 0.9 | 21 | 500 | 50 | 14 |
8 | 100000 | 0.8 | 14 | 250 | 50 | 18 |
9 | 100000 | 0.7 | 11 | 100 | 50 | 14 |
10 | 100000 | 0.6 | 11 | 100 | 50 | 19 |
11 | 100000 | 0.5 | 14 | 500 | 10 | 8 |
12 | 10000 | 1 | 8 | 100 | 100 | 8 |
13 | 10000 | 0.9 | 5 | 100 | 50 | 12 |
14 | 10000 | 0.8 | 5 | 100 | 50 | 18 |
15 | 10000 | 0.7 | 6 | 250 | 10 | 8 |
16 | 10000 | 0.6 | 6 | 15 | 100 | 18 |
17 | 10000 | 0.5 | 3 | 15 | 50 | 14 |
The image processing pipeline consists of the following components, shown in pink and green above:
- Python program ingests images from the Twitter public stream and stores in S3.
- Python program publishes batches of references to images stored in S3 to a Kafka topic.
- Python program consumes batches of image references, computes feature
vectors from the images, stores them on S3, publishes references to Kafka.
I use the
conv_pred
layer from Keras pre-trained MobileNet to compute the 1000-dimensional feature vectors. - Python program consumes image features from Kafka/S3 and indexes them in Elasticsearch via EsAknn.
Image feature extraction is the main bottleneck in this pipeline. It's embarrassingly parallel but still requires thoughtful optimization. In the end I was able to compute:
- 40 images / node / second on EC2 P2.xlarge (K80 GPU, $0.3/hr spot instance).
- 33 images / node / second on EC2 C5.9xlarge (36-core CPU, $0.6/hr spot instance).
My first-pass plateaued at about 2 images / node / second. I was able to improve throughput with the following optimizations:
- Produce image references to Kafka instead of full images. This allows many workers to download the images in parallel from S3. If you send the full images through Kafka, it quickly becomes a bottleneck.
- Workers use thread pools to download images in parallel from S3.
- Workers use process pools to crop and resize images for use with Keras.
- Workers use the Lycon library for fast image resizing.
- Workers use Keras/Tensorflow to compute feature vectors on large batches of images instead of single images. This is a standard deep learning optimization.
- The plugin was developed on Elasticsearch version 6.2.4.
- User mingruimingrui has a fork fork for version 5.6.6.
- User mattiasarro has a fork for version 6.3
Here are a handful of resources I found particularly helpful for this project:
- Locality Sensitive Hashing lectures by Victor Lavrenko
- Elasticsearch Plugin Starter by Alexander Reelsen
- Elasticsearch OpenNLP Plugin by Alexander Reelsen
- Discussions about similarity search in Elasticsearch: one, two.