[ckan-changes] commit/ckan: kindly: [search] ticket 1149 change domain object modification to use session extension
Bitbucket
commits-noreply at bitbucket.org
Sat May 21 09:51:15 UTC 2011
1 new changeset in ckan:
http://bitbucket.org/okfn/ckan/changeset/b1634d405066/
changeset: r3103:b1634d405066
user: kindly
date: 2011-05-21 11:45:28
summary: [search] ticket 1149 change domain object modification to use session extension
affected #: 3 files (602 bytes)
--- a/ckan/lib/search/worker.py Sat May 21 10:19:01 2011 +0100
+++ b/ckan/lib/search/worker.py Sat May 21 10:45:28 2011 +0100
@@ -31,9 +31,13 @@
implements(IDomainObjectModification, inherit=True)
def notify(self, entity, operation):
- if hasattr(entity, 'as_dict'):
+
+ if hasattr(entity, 'as_dict') and operation != DomainObjectOperation.deleted:
dispatch_by_operation(entity.__class__.__name__,
entity.as_dict(), operation)
+ elif operation == DomainObjectOperation.deleted:
+ dispatch_by_operation(entity.__class__.__name__,
+ {'id': entity.id}, operation)
else:
log.warn("Discarded Sync. indexing for: %s" % entity)
--- a/ckan/model/extension.py Sat May 21 10:19:01 2011 +0100
+++ b/ckan/model/extension.py Sat May 21 10:45:28 2011 +0100
@@ -34,13 +34,8 @@
:param func: Any callable, which will be called for each observer
:returns: EXT_CONTINUE if no errors encountered, otherwise EXT_STOP
"""
- try:
- for observer in self.observers:
- func(observer)
- return EXT_CONTINUE
- except Exception, e:
- log.exception(e)
- return EXT_STOP
+ for observer in self.observers:
+ func(observer)
class PluginMapperExtension(MapperExtension, ObserverNotifier):
"""
@@ -95,7 +90,7 @@
def before_flush(self, session, flush_context, instances):
return self.notify_observers(
- methodcaller('after_begin', session, flush_context, instances)
+ methodcaller('before_flush', session, flush_context, instances)
)
def after_flush(self, session, flush_context):
--- a/ckan/model/modification.py Sat May 21 10:19:01 2011 +0100
+++ b/ckan/model/modification.py Sat May 21 10:45:28 2011 +0100
@@ -4,7 +4,7 @@
from sqlalchemy.orm.interfaces import EXT_CONTINUE
from ckan.plugins import SingletonPlugin, PluginImplementations, implements
-from ckan.plugins import IMapper, IDomainObjectModification
+from ckan.plugins import ISession, IDomainObjectModification
from ckan.model.extension import ObserverNotifier
from ckan.model.domain_object import DomainObjectOperation
@@ -23,55 +23,54 @@
out with check_real_change.
"""
- implements(IMapper, inherit=True)
+ implements(ISession, inherit=True)
observers = PluginImplementations(IDomainObjectModification)
-
- def check_real_change(self, instance):
- """
- Return True if the change concerns an object with revision information
- and has been modifed in the current SQLAlchemy session.
- """
- if not instance.revision:
- return False
- return object_session(instance).is_modified(
- instance, include_collections=False
- )
- def after_insert(self, mapper, connection, instance):
- return self.send_notifications(instance,
- DomainObjectOperation.new
- )
+ def before_flush(self, session, flush_context, instances):
- def after_update(self, mapper, connection, instance):
- return self.send_notifications(instance,
- DomainObjectOperation.changed
- )
-
- def before_delete(self, mapper, connection, instance):
- return self.send_notifications(instance,
- DomainObjectOperation.deleted
- )
+ if not hasattr(session, '_object_cache'):
+ session._object_cache= {'new': set(),
+ 'deleted': set(),
+ 'changed': set()}
- def send_notifications(self, instance, operation):
- """
- Called when a db object changes, this method works out what
- notifications need to be sent and calls send_notification to do it.
- """
- if not (operation == DomainObjectOperation.deleted or self.check_real_change(instance)):
- return EXT_CONTINUE
+ changed = [obj for obj in session.dirty if
+ session.is_modified(obj, include_collections=False)]
- if isinstance(instance, Package):
- self.notify(instance, operation)
- elif isinstance(instance, ResourceGroup):
- self.notify(instance.package, DomainObjectOperation.changed)
- elif isinstance(instance, Resource):
- self.notify(instance.resource_group.package, DomainObjectOperation.changed)
- elif isinstance(instance, (PackageExtra, PackageTag)):
- self.notify(instance.package, DomainObjectOperation.changed)
- else:
- raise NotImplementedError(instance)
+ session._object_cache['new'].update(session.new)
+ session._object_cache['deleted'].update(session.deleted)
+ session._object_cache['changed'].update(changed)
- return EXT_CONTINUE
+ def before_commit(self, session):
+
+ session.flush()
+ if not hasattr(session, '_object_cache'):
+ return
+
+ obj_cache = session._object_cache
+ new = obj_cache['new']
+ changed = obj_cache['changed']
+ deleted = obj_cache['deleted']
+
+ for obj in new:
+ if isinstance(obj, Package):
+ self.notify(obj, DomainObjectOperation.new)
+ for obj in deleted:
+ if isinstance(obj, Package):
+ self.notify(obj, DomainObjectOperation.deleted)
+
+ changed_pkgs = set(obj for obj in changed if isinstance(obj, Package))
+
+ for obj in new | changed | deleted:
+ if not isinstance(obj, Package):
+ try:
+ changed_pkgs.update(obj.related_packages())
+ except AttributeError:
+ continue
+
+ for obj in changed_pkgs:
+ self.notify(obj, DomainObjectOperation.changed)
+ del session._object_cache
+
def notify(self, entity, operation):
for observer in self.observers:
Repository URL: https://bitbucket.org/okfn/ckan/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the ckan-changes
mailing list