[ckan-changes] commit/ckan: kindly: [search] ticket 1149 change domain object modification to use session extension

Bitbucket commits-noreply at bitbucket.org
Sat May 21 09:51:15 UTC 2011


1 new changeset in ckan:

http://bitbucket.org/okfn/ckan/changeset/b1634d405066/
changeset:   r3103:b1634d405066
user:        kindly
date:        2011-05-21 11:45:28
summary:     [search] ticket 1149 change domain object modification to use session extension
affected #:  3 files (602 bytes)

--- a/ckan/lib/search/worker.py	Sat May 21 10:19:01 2011 +0100
+++ b/ckan/lib/search/worker.py	Sat May 21 10:45:28 2011 +0100
@@ -31,9 +31,13 @@
     implements(IDomainObjectModification, inherit=True)
 
     def notify(self, entity, operation):
-        if hasattr(entity, 'as_dict'):
+        
+        if hasattr(entity, 'as_dict') and operation != DomainObjectOperation.deleted:
             dispatch_by_operation(entity.__class__.__name__, 
                                   entity.as_dict(), operation)
+        elif operation == DomainObjectOperation.deleted:
+            dispatch_by_operation(entity.__class__.__name__, 
+                                  {'id': entity.id}, operation)
         else:
             log.warn("Discarded Sync. indexing for: %s" % entity)
             


--- a/ckan/model/extension.py	Sat May 21 10:19:01 2011 +0100
+++ b/ckan/model/extension.py	Sat May 21 10:45:28 2011 +0100
@@ -34,13 +34,8 @@
         :param func: Any callable, which will be called for each observer
         :returns: EXT_CONTINUE if no errors encountered, otherwise EXT_STOP
         """
-        try:
-            for observer in self.observers:
-                func(observer)
-            return EXT_CONTINUE
-        except Exception, e:
-            log.exception(e)
-            return EXT_STOP
+        for observer in self.observers:
+            func(observer)
 
 class PluginMapperExtension(MapperExtension, ObserverNotifier):
     """
@@ -95,7 +90,7 @@
     
     def before_flush(self, session, flush_context, instances):
         return self.notify_observers(
-            methodcaller('after_begin', session, flush_context, instances)
+            methodcaller('before_flush', session, flush_context, instances)
         )
                                   
     def after_flush(self, session, flush_context):


--- a/ckan/model/modification.py	Sat May 21 10:19:01 2011 +0100
+++ b/ckan/model/modification.py	Sat May 21 10:45:28 2011 +0100
@@ -4,7 +4,7 @@
 from sqlalchemy.orm.interfaces import EXT_CONTINUE
 
 from ckan.plugins import SingletonPlugin, PluginImplementations, implements
-from ckan.plugins import IMapper, IDomainObjectModification
+from ckan.plugins import ISession, IDomainObjectModification
 
 from ckan.model.extension import ObserverNotifier
 from ckan.model.domain_object import DomainObjectOperation
@@ -23,55 +23,54 @@
     out with check_real_change.
     """
 
-    implements(IMapper, inherit=True)
+    implements(ISession, inherit=True)
     observers = PluginImplementations(IDomainObjectModification)
-    
-    def check_real_change(self, instance):
-        """
-        Return True if the change concerns an object with revision information
-        and has been modifed in the current SQLAlchemy session.
-        """
-        if not instance.revision:
-            return False
-        return object_session(instance).is_modified(
-            instance, include_collections=False
-        )
 
-    def after_insert(self, mapper, connection, instance):
-        return self.send_notifications(instance,
-            DomainObjectOperation.new
-        )
+    def before_flush(self, session, flush_context, instances):
 
-    def after_update(self, mapper, connection, instance):
-        return self.send_notifications(instance,
-            DomainObjectOperation.changed
-        )
-        
-    def before_delete(self, mapper, connection, instance):
-        return self.send_notifications(instance,
-            DomainObjectOperation.deleted
-        )
+        if not hasattr(session, '_object_cache'):
+            session._object_cache= {'new': set(),
+                                    'deleted': set(),
+                                    'changed': set()}
 
-    def send_notifications(self, instance, operation):
-        """
-        Called when a db object changes, this method works out what
-        notifications need to be sent and calls send_notification to do it.
-        """
-        if not (operation == DomainObjectOperation.deleted or self.check_real_change(instance)):
-            return EXT_CONTINUE
+        changed = [obj for obj in session.dirty if 
+            session.is_modified(obj, include_collections=False)]
 
-        if isinstance(instance, Package):
-            self.notify(instance, operation)
-        elif isinstance(instance, ResourceGroup):
-            self.notify(instance.package, DomainObjectOperation.changed)
-        elif isinstance(instance, Resource):
-            self.notify(instance.resource_group.package, DomainObjectOperation.changed)
-        elif isinstance(instance, (PackageExtra, PackageTag)):
-            self.notify(instance.package, DomainObjectOperation.changed)
-        else:
-            raise NotImplementedError(instance)
+        session._object_cache['new'].update(session.new)
+        session._object_cache['deleted'].update(session.deleted)
+        session._object_cache['changed'].update(changed)
 
-        return EXT_CONTINUE
+    def before_commit(self, session):
+
+        session.flush()
+        if not hasattr(session, '_object_cache'):
+            return
+
+        obj_cache = session._object_cache
+        new = obj_cache['new']
+        changed = obj_cache['changed']
+        deleted = obj_cache['deleted']
+
+        for obj in new:
+            if isinstance(obj, Package):
+                self.notify(obj, DomainObjectOperation.new)
+        for obj in deleted:
+            if isinstance(obj, Package):
+                self.notify(obj, DomainObjectOperation.deleted)
+
+        changed_pkgs = set(obj for obj in changed if isinstance(obj, Package))
+
+        for obj in new | changed | deleted:
+            if not isinstance(obj, Package):
+                try:
+                    changed_pkgs.update(obj.related_packages())
+                except AttributeError:
+                    continue
+
+        for obj in changed_pkgs:
+            self.notify(obj, DomainObjectOperation.changed)
+        del session._object_cache
+
 
     def notify(self, entity, operation):
         for observer in self.observers:

Repository URL: https://bitbucket.org/okfn/ckan/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.




More information about the ckan-changes mailing list