tokio.connectors.es module¶
Retrieve data stored in Elasticsearch
This module provides a wrapper around the Elasticsearch connection handler and methods to query, scroll, and process pages of scrolling data.
-
class
tokio.connectors.es.EsConnection(host, port, index=None, scroll_size='1m', page_size=10000, timeout=30, **kwargs)[source]¶ Bases:
objectElasticsearch connection handler.
Wrapper around an Elasticsearch connection context that provides simpler scrolling functionality for very long documents and callback functions to be run after each page is retrieved.
-
__init__(host, port, index=None, scroll_size='1m', page_size=10000, timeout=30, **kwargs)[source]¶ Configure and connect to an Elasticsearch endpoint.
Parameters: - host (str) – hostname for the Elasticsearch REST endpoint
- port (int) – port where Elasticsearch REST endpoint is bound
- index (str) – name of index against which queries will be issued
- scroll_size (str) – how long to keep the scroll search context open (e.g., “1m” for 1 minute)
- page_size (int) – how many documents should be returned per scrolled page (e.g., 10000 for 10k docs per scroll)
- timeout (int) – how many seconds to wait for a response from Elasticsearch before the query should time out
Variables: - client – Elasticsearch connection handler
- page (dict) – last page retrieved by a query
- scroll_pages (list) – dictionary of pages retrieved by query
- index (str) – name of index against which queries will be issued
- connect_host (str) – hostname for Elasticsearch REST endpoint
- connect_port (int) – port where Elasticsearch REST endpoint is bound
- connect_timeout (int) – seconds before query should time out
- page_size (int) – max number of documents returned per page
- scroll_size (int) – duration to keep scroll search context open
- scroll_id – identifier for the scroll search context currently in use
- sort_by (str) – field by which Elasticsearch should sort results before returning them as query results
- fake_pages (list) – A list of
pagestructures that should be returned by self.scroll() when the elasticsearch module is not actually available. Used only for debugging. - local_mode (bool) – If True, retrieve query results from self.fake_pages instead of attempting to contact an Elasticsearch server
- kwargs (dict) – Passed to elasticsearch.Elasticsearch.__init__ if host and port are defined
-
_process_page()[source]¶ Remove a page from the incoming queue and append it
Takes the last received page (self.page), updates the internal state of the scroll operation, updates some internal counters, calls the flush function if applicable, and applies the filter function. Then appends the results to self.scroll_pages.
Returns: True if hits were appended or not Return type: bool
-
connect(**kwargs)[source]¶ Instantiate a connection and retain the connection context.
Parameters: kwargs (dict) – Passed to elasticsearch.Elasticsearch.__init__
-
classmethod
from_cache(cache_file)[source]¶ Initializes an EsConnection object from a cache file.
This path is designed to be used for testing.
Parameters: cache_file (str) – Path to the JSON formatted list of pages
-
query(query)[source]¶ Issue an Elasticsearch query.
Issues a query and returns the resulting page. If the query included a scrolling request, the scroll_id attribute is set so that scrolling can continue.
Parameters: query (dict) – Dictionary representing the query to issue Returns: The page resulting from the issued query. Return type: dict
-
query_and_scroll(query, source_filter=True, filter_function=None, flush_every=None, flush_function=None)[source]¶ Issue a query and retain all results.
Issues a query and scrolls through every resulting page, optionally applying in situ logic for filtering and flushing. All resulting pages are appended to the
scroll_pagesattribute of this object.The
scroll_pagesattribute must be wiped by whatever is consuming it; if this does not happen, query_and_scroll() will continue appending results to the results of previous queries.Parameters: - query (dict) – Dictionary representing the query to issue
- source_filter (bool or list) – Return all fields contained in each document’s _source field if True; otherwise, only return source fields contained in the provided list of str.
- filter_function (function, optional) – Function to call before each
set of results is appended to the
scroll_pagesattribute; if specified, return value of this function is what is appended. - flush_every (int or None) – trigger the flush function once the
number of docs contained across all
scroll_pagesreaches this value. If None, do not apply flush_function. - flush_function (function, optional) – function to call when flush_every docs are retrieved.
-
query_timeseries(query_template, start, end, source_filter=True, filter_function=None, flush_every=None, flush_function=None)[source]¶ Craft and issue query bounded by time
Parameters: - query_template (dict) – a query object containing at least one
@timestampfield - start (datetime.datetime) – lower bound for query (inclusive)
- end (datetime.datetime) – upper bound for query (exclusive)
- source_filter (bool or list) – Return all fields contained in each document’s _source field if True; otherwise, only return source fields contained in the provided list of str.
- filter_function (function, optional) – Function to call before each
set of results is appended to the
scroll_pagesattribute; if specified, return value of this function is what is appended. - flush_every (int or None) – trigger the flush function once the
number of docs contained across all
scroll_pagesreaches this value. If None, do not apply flush_function. - flush_function (function, optional) – function to call when flush_every docs are retrieved.
- query_template (dict) – a query object containing at least one
-
save_cache(output_file=None)[source]¶ Persist the response of the last query to a file
This is a little different from other connectors’ save_cache() methods in that it only saves down the state of the last query’s results. It does not save any connection information and does not restore the state of a previous EsConnection object.
Its principal intention is to be used with testing.
Parameters: output_file (str or None) – Path to file to which json should be written. If None, write to stdout. Default is None.
-
-
tokio.connectors.es.build_timeseries_query(orig_query, start, end, start_key='@timestamp', end_key=None)[source]¶ Create a query object with time ranges bounded.
Given a query dict and a start/end datetime object, return a new query object with the correct time ranges bounded. Relies on orig_query containing at least one
@timestampfield to indicate where the time ranges should be inserted.If orig_query is querying records that contain both a “start” and “end” time (e.g., a job) rather than a discrete point in time (e.g., a sampled metric),
start_keyandend_keycan be used to modify the query to return all records that overlapped with the interval specified bystart_timeandend_time.Parameters: - orig_query (dict) – A query object containing at least one
@timestampfield. - start (datetime.datetime) – lower bound for query (inclusive)
- end (datetime.datetime) – upper bound for query (exclusive)
- start_key (str) – The key containing a timestamp against which a time range query should be applied.
- end_key (str) – The key containing a timestamp against which the upper
bound of the time range should be applied. If None, treat
start_keyas a single point in time rather than the start of a recorded process.
Returns: A query object with all instances of
@timestampbounded by start and end.Return type: - orig_query (dict) – A query object containing at least one
-
tokio.connectors.es.mutate_query(mutable_query, field, value, term='term')[source]¶ Inserts a new condition into a query object
See https://www.elastic.co/guide/en/elasticsearch/reference/current/term-level-queries.html for complete documentation.
Parameters: Returns: Nothing.
mutable_queryis updated in place.