Skip to content

Commit 229d841

Browse files
committed
Support custom analysers, add es.create_models
To support custom analysers, it seems best to abandon model.create_all in favour of an index-wide method (es.create_models) that sets all mappings (and custom analysis settings) at once. Code is largely transplanted from hypothesis/h#1825 Details: Custom analysers(&filters&tokenisers) are shared in Elasticsearch among all document types (models). To update a model's mappings one needs to make sure the custom analysers are defined first, but updating those just for one model is inelegant; The index would have to be closed for each update, and one cannot check for duplicate definitions of analysers as it is not visible which model defined a particular analyser. Treating index-wide settings as per-model settings creates leaky abstractions.
1 parent 2d97f29 commit 229d841

File tree

4 files changed

+93
-24
lines changed

4 files changed

+93
-24
lines changed

annotator/annotation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@
3535
}
3636
}
3737

38+
ANALYSIS = {}
3839

3940
class Annotation(es.Model):
4041

4142
__type__ = TYPE
4243
__mapping__ = MAPPING
44+
__analysis__ = ANALYSIS
4345

4446
def save(self, *args, **kwargs):
4547
_add_default_permissions(self)

annotator/elasticsearch.py

Lines changed: 89 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,63 @@ def conn(self):
6666
self._connection = self._connect()
6767
return self._connection
6868

69+
def create_models(self, models):
70+
mappings = _compile_mappings(models)
71+
analysis = _compile_analysis(models)
72+
73+
# If it does not yet exist, simply create the index
74+
try:
75+
response = self.conn.indices.create(self.index, ignore=400, body={
76+
'mappings': mappings,
77+
'settings': {'analysis': analysis},
78+
})
79+
return
80+
except elasticsearch.exceptions.ConnectionError as e:
81+
msg = ('Can not access ElasticSearch at {0}! '
82+
'Check to ensure it is running.').format(self.host)
83+
raise elasticsearch.exceptions.ConnectionError('N/A', msg, e)
84+
85+
# Bad request (400) is ignored above, to prevent warnings in the log
86+
# when the index already exists, but the failure could be for other
87+
# reasons. If so, raise the error here.
88+
if 'error' in response and 'IndexAlreadyExists' not in response['error']:
89+
raise elasticsearch.exceptions.RequestError(400, response['error'])
90+
91+
# Update the mappings of the existing index
92+
self._update_analysis(analysis)
93+
self._update_mappings(mappings)
94+
95+
def _update_analysis(self, analysis):
96+
"""Update analyzers and filters"""
97+
settings = self.conn.indices.get_settings(index=self.index)
98+
existing = settings[self.index]['settings']['index']['analysis']
99+
# Only bother if new settings would differ from existing settings
100+
if not _analysis_up_to_date(existing, analysis):
101+
try:
102+
self.conn.indices.close(index=self.index)
103+
self.conn.indices.put_settings(index=self.index,
104+
body={'analysis': analysis})
105+
finally:
106+
self.conn.indices.open(index=self.index)
107+
108+
def _update_mappings(self, mappings):
109+
"""Update mappings.
110+
111+
Warning: can explode because of a a MergeMappingError when mappings are incompatible"""
112+
for doc_type, body in mappings.items():
113+
self.conn.indices.put_mapping(
114+
index=self.index,
115+
doc_type=doc_type,
116+
body=body
117+
)
118+
119+
def _analysis_up_to_date(existing, analysis):
120+
"""Tell whether existing settings are up to date"""
121+
new_analysis = existing.copy()
122+
for section, items in analysis.items():
123+
new_analysis.setdefault(section,{}).update(items)
124+
return new_analysis == existing
125+
69126

70127
class _Model(dict):
71128
"""Base class that represents a document type in an ElasticSearch index.
@@ -74,7 +131,7 @@ class _Model(dict):
74131
__type__ -- The name of the document type
75132
__mapping__ -- A mapping of the document's fields
76133
77-
Mapping: Calling create_all() will create the mapping in the index.
134+
Mapping:
78135
One field, 'id', is treated specially. Its value will not be stored,
79136
but be used as the _id identifier of the document in Elasticsearch. If
80137
an item is indexed without providing an id, the _id is automatically
@@ -87,25 +144,6 @@ class _Model(dict):
87144
with 'analyzer':'standard'.
88145
"""
89146

90-
@classmethod
91-
def create_all(cls):
92-
log.info("Creating index '%s'." % cls.es.index)
93-
conn = cls.es.conn
94-
try:
95-
conn.indices.create(cls.es.index)
96-
except elasticsearch.exceptions.RequestError as e:
97-
# Reraise anything that isn't just a notification that the index
98-
# already exists (either as index or as an alias).
99-
if not (e.error.startswith('IndexAlreadyExistsException')
100-
or e.error.startswith('InvalidIndexNameException')):
101-
log.fatal("Failed to create an Elasticsearch index")
102-
raise
103-
log.warn("Index creation failed as index appears to already exist.")
104-
mapping = cls.get_mapping()
105-
conn.indices.put_mapping(index=cls.es.index,
106-
doc_type=cls.__type__,
107-
body=mapping)
108-
109147
@classmethod
110148
def get_mapping(cls):
111149
return {
@@ -121,6 +159,10 @@ def get_mapping(cls):
121159
}
122160
}
123161

162+
@classmethod
163+
def get_analysis(cls):
164+
return getattr(cls, '__analysis__', {})
165+
124166
@classmethod
125167
def drop_all(cls):
126168
if cls.es.conn.indices.exists(cls.es.index):
@@ -215,6 +257,33 @@ def make_model(es):
215257
return type('Model', (_Model,), {'es': es})
216258

217259

260+
def _compile_mappings(models):
261+
"""Collect the mappings from the models"""
262+
mappings = {}
263+
for model in models:
264+
mappings.update(model.get_mapping())
265+
return mappings
266+
267+
268+
def _compile_analysis(models):
269+
"""Merge the custom analyzers and such from the models"""
270+
analysis = {}
271+
for model in models:
272+
for section, items in model.get_analysis().items():
273+
existing_items = analysis.setdefault(section, {})
274+
for name in items:
275+
if name in existing_items:
276+
fmt = "Duplicate definition of 'index.analysis.{}.{}'."
277+
msg = fmt.format(section, name)
278+
raise RuntimeError(msg)
279+
existing_items.update(items)
280+
return analysis
281+
282+
283+
def _csv_split(s, delimiter=','):
284+
return [r for r in csv.reader([s], delimiter=delimiter)][0]
285+
286+
218287
def _build_query(query, offset, limit):
219288
# Create a match query for each keyword
220289
match_clauses = [{'match': {k: v}} for k, v in iteritems(query)]

run.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ def main():
6161
es.authorization_enabled = app.config['AUTHZ_ON']
6262

6363
try:
64-
annotation.Annotation.create_all()
65-
document.Document.create_all()
64+
es.create_models([annotation.Annotation, document.Document])
6665
except elasticsearch.exceptions.RequestError as e:
6766
if e.error.startswith('MergeMappingException'):
6867
date = time.strftime('%Y-%m-%d')

tests/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ def setup_class(cls):
3434
document.Document.drop_all()
3535

3636
def setup(self):
37-
annotation.Annotation.create_all()
38-
document.Document.create_all()
37+
es.create_models([annotation.Annotation, document.Document])
3938
es.conn.cluster.health(wait_for_status='yellow')
4039
self.cli = self.app.test_client()
4140

0 commit comments

Comments
 (0)