You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

170 lines
6.2 KiB

"""
This file comes from https://github.com/sfermigier/WhooshAlchemy.
WhooshAlchemy
~~~~~~~~~~~~~
Adds Whoosh indexing capabilities to SQLAlchemy models.
Based on Flask-whooshalchemy by Karl Gyllstrom (Flask is still supported, but not mandatory).
:copyright: (c) 2012 by Stefane Fermigier
:copyright: (c) 2012 by Karl Gyllstrom
:license: BSD (see LICENSE.txt)
"""
# pylint: skip-file
from __future__ import absolute_import, print_function, unicode_literals
import os
from six import text_type
import sqlalchemy
import whoosh.index
from sqlalchemy import event
from sqlalchemy.orm.session import Session
from whoosh.analysis import StemmingAnalyzer
from whoosh.fields import Schema
from whoosh.qparser import MultifieldParser
class IndexService(object):
def __init__(self, config=None, whoosh_base=None):
if not whoosh_base and config:
whoosh_base = config.get("WHOOSH_BASE")
if not whoosh_base:
whoosh_base = "whoosh_indexes" # Default value
self.whoosh_base = whoosh_base
self.indexes = {}
event.listen(Session, "before_commit", self.before_commit)
event.listen(Session, "after_commit", self.after_commit)
def register_class(self, model_class):
"""
Registers a model class, by creating the necessary Whoosh index if needed.
"""
index_path = os.path.join(self.whoosh_base, model_class.__name__)
schema, primary = self._get_whoosh_schema_and_primary(model_class)
if whoosh.index.exists_in(index_path):
index = whoosh.index.open_dir(index_path)
else:
if not os.path.exists(index_path):
os.makedirs(index_path)
index = whoosh.index.create_in(index_path, schema)
self.indexes[model_class.__name__] = index
model_class.search_query = Searcher(model_class, primary, index)
return index
def index_for_model_class(self, model_class):
"""
Gets the whoosh index for this model, creating one if it does not exist.
in creating one, a schema is created based on the fields of the model.
Currently we only support primary key -> whoosh.ID, and sqlalchemy.TEXT
-> whoosh.TEXT, but can add more later. A dict of model -> whoosh index
is added to the ``app`` variable.
"""
index = self.indexes.get(model_class.__name__)
if index is None:
index = self.register_class(model_class)
return index
def _get_whoosh_schema_and_primary(self, model_class):
schema = {}
primary = None
for field in model_class.__table__.columns:
if field.primary_key:
schema[field.name] = whoosh.fields.ID(stored=True, unique=True)
primary = field.name
continue
if field.name in model_class.__searchable__:
schema[field.name] = whoosh.fields.TEXT(
analyzer=StemmingAnalyzer())
return Schema(**schema), primary
def before_commit(self, session):
self.to_update = {}
for model in session.new:
model_class = model.__class__
if hasattr(model_class, '__searchable__'):
self.to_update.setdefault(model_class.__name__, []).append(
("new", model))
for model in session.deleted:
model_class = model.__class__
if hasattr(model_class, '__searchable__'):
self.to_update.setdefault(model_class.__name__, []).append(
("deleted", model))
for model in session.dirty:
model_class = model.__class__
if hasattr(model_class, '__searchable__'):
self.to_update.setdefault(model_class.__name__, []).append(
("changed", model))
def after_commit(self, session):
"""
Any db updates go through here. We check if any of these models have
``__searchable__`` fields, indicating they need to be indexed. With these
we update the whoosh index for the model. If no index exists, it will be
created here; this could impose a penalty on the initial commit of a model.
"""
for typ, values in self.to_update.items():
model_class = values[0][1].__class__
index = self.index_for_model_class(model_class)
with index.writer() as writer:
primary_field = model_class.search_query.primary
searchable = model_class.__searchable__
for change_type, model in values:
# delete everything. stuff that's updated or inserted will get
# added as a new doc. Could probably replace this with a whoosh
# update.
writer.delete_by_term(
primary_field, text_type(getattr(model, primary_field)))
if change_type in ("new", "changed"):
attrs = dict((key, getattr(model, key))
for key in searchable)
attrs = {
attr: text_type(getattr(model, attr))
for attr in attrs.keys()
}
attrs[primary_field] = text_type(getattr(model, primary_field))
writer.add_document(**attrs)
self.to_update = {}
class Searcher(object):
"""
Assigned to a Model class as ``search_query``, which enables text-querying.
"""
def __init__(self, model_class, primary, index):
self.model_class = model_class
self.primary = primary
self.index = index
self.searcher = index.searcher()
fields = set(index.schema._fields.keys()) - set([self.primary])
self.parser = MultifieldParser(list(fields), index.schema)
def __call__(self, session, query, limit=None):
results = self.index.searcher().search(
self.parser.parse(query), limit=limit)
keys = [x[self.primary] for x in results]
primary_column = getattr(self.model_class, self.primary)
db_query = session.query(self.model_class)
if keys:
return db_query.filter(primary_column.in_(keys))
return db_query.filter(sqlalchemy.sql.false())