[ckan-changes] commit/ckanext-harvest: amercader: [ckan harvester] Request only packages modified since last harvest job. Also support older versions which do not include 'metadata_modified'

Bitbucket commits-noreply at bitbucket.org
Tue May 17 16:27:05 UTC 2011


1 new changeset in ckanext-harvest:

http://bitbucket.org/okfn/ckanext-harvest/changeset/55b7fd966c21/
changeset:   r97:55b7fd966c21
user:        amercader
date:        2011-05-17 18:26:42
summary:     [ckan harvester] Request only packages modified since last harvest job. Also support older versions which do not include 'metadata_modified'
affected #:  1 file (2.8 KB)

--- a/ckanext/harvest/harvesters.py	Wed May 11 17:07:05 2011 +0100
+++ b/ckanext/harvest/harvesters.py	Tue May 17 17:26:42 2011 +0100
@@ -14,15 +14,15 @@
 from ckan.plugins.core import SingletonPlugin, implements
 
 from ckanext.harvest.interfaces import IHarvester
-from ckanext.harvest.model import HarvestObject, HarvestGatherError, \
+from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \
                                     HarvestObjectError
 
 import logging
 log = logging.getLogger(__name__)
 
-class MockTranslator(object): 
-    def ugettext(self, value): 
-        return value 
+class MockTranslator(object):
+    def ugettext(self, value):
+        return value
 
     def ungettext(self, singular, plural, n):
         if n > 1:
@@ -33,32 +33,35 @@
     '''
     A Harvester for CKAN instances
     '''
-    
+
     implements(IHarvester)
 
+    #TODO: check different API versions
+    api_version = '2'
+
+
     def __init__(self,**kw):
         from paste.registry import Registry
         import pylons
-        self.registry=Registry() 
-        self.registry.prepare() 
+        self.registry=Registry()
+        self.registry.prepare()
 
         self.translator_obj=MockTranslator()
         self.registry.register(pylons.translator, self.translator_obj)
 
 
-    def _get_api_offset(self):
-        #TODO: check different API versions?
-        return '/api/2/rest'
+    def _get_rest_api_offset(self):
+        return '/api/%s/rest' % self.api_version
+
+    def _get_search_api_offset(self):
+        return '/api/%s/search' % self.api_version
 
     def _get_content(self, url):
-        #TODO: configure
         http_request = urllib2.Request(
             url = url,
-            headers = {'Authorization' : 'fcff821f-1f92-42ef-8c52-7c38d74a7291'}
         )
 
         try:
-            #http_response = urllib2.urlopen(url)
             http_response = urllib2.urlopen(http_request)
 
             return http_response.read()
@@ -79,22 +82,74 @@
         return 'CKAN'
 
     def gather_stage(self,harvest_job):
-        log.debug('In CKANHarvester gather_stage')
+        log.debug('In CKANHarvester gather_stage (%s)' % harvest_job.source.url)
+
+        get_all_packages = True
+        package_ids = []
+
+        # Check if this source has been harvested before
+        previous_job = Session.query(HarvestJob) \
+                        .filter(HarvestJob.source==harvest_job.source) \
+                        .filter(HarvestJob.gather_finished!=None) \
+                        .filter(HarvestJob.id!=harvest_job.id) \
+                        .order_by(HarvestJob.gather_finished.desc()) \
+                        .limit(1).first()
 
         # Get source URL
-        url = harvest_job.source.url.rstrip('/')
-        url = url + self._get_api_offset() + '/package'
+        base_url = harvest_job.source.url.rstrip('/')
+        base_rest_url = base_url + self._get_rest_api_offset()
+        base_search_url = base_url + self._get_search_api_offset()
+        
+        if previous_job and not previous_job.gather_errors:
+            get_all_packages = False
 
-        # Get contents
-        try:
-            content = self._get_content(url)
-        except Exception,e:
-            self._save_gather_error('Unable to get content for URL: %s: %r' % \
-                                        (url, e),harvest_job)
-            return None
+            # Request only the packages modified since last harvest job
+            last_time = harvest_job.gather_started.isoformat()
+            url = base_search_url + '/revision?since_time=%s' % last_time
+
+            try:
+                content = self._get_content(url)
+
+                revision_ids = json.loads(content)
+                if len(revision_ids):
+                    for revision_id in revision_ids:
+                        url = base_rest_url + '/revision/%s' % revision_id
+                        try:
+                            content = self._get_content(url)
+                        except Exception,e:
+                            self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
+                            continue
+
+                        revision = json.loads(content)
+                        for package_id in revision.packages:
+                            if not package_id in package_ids:
+                                package_ids.append(package_id)
+                else:
+                    log.info('No packages have been updated on the remote CKAN instance since the last harvest job')
+                    return None
+
+            except urllib2.HTTPError,e:
+                if e.getcode() == 400:
+                    log.info('CKAN instance %s does not suport revision filtering' % base_url)
+                    get_all_packages = True
+                else:
+                    self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
+                    return None
+
+
+
+        if get_all_packages:
+            # Request all remote packages
+            url = base_rest_url + '/package'
+            try:
+                content = self._get_content(url)
+            except Exception,e:
+                self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
+                return None
+
+            package_ids = json.loads(content)
 
         try:
-            package_ids = json.loads(content)
             object_ids = []
             if len(package_ids):
                 for package_id in package_ids:
@@ -116,7 +171,7 @@
         log.debug('In CKANHarvester fetch_stage')
         # Get source URL
         url = harvest_object.source.url.rstrip('/')
-        url = url + self._get_api_offset() + '/package/' + harvest_object.guid
+        url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid
 
         # Get contents
         try:
@@ -144,11 +199,23 @@
             return False
         try:
 
-            # harvest_object.content is the result of an API call like
-            # http://ec2-46-51-149-132.eu-west-1.compute.amazonaws.com:8081/api/2/rest/package/77d93608-3a3e-42e5-baab-3521afb504f1
+            # harvest_object.content is the result of a package REST API call
             package_dict = json.loads(harvest_object.content)
 
             # Save metadata modified date in Harvest Object
+            if not 'metadata_modified' in package_dict:
+                # Get the date from the revision
+                url = harvest_object.job.source.url.rstrip('/')
+                url = url + self._get_rest_api_offset() + '/revision/%s' % package_dict['revision_id']
+
+                try:
+                    content = self._get_content(url)
+                    revision_dict = json.loads(content)
+                    package_dict['metadata_modified'] = revision_dict['timestamp']
+                except Exception,e:
+                    self._save_gather_error('Unable to get revision %s info : %r' % \
+                                                (package_dict['revision_id'], e),harvest_job)
+
             harvest_object.metadata_modified_date = package_dict['metadata_modified']
             harvest_object.save()
 
@@ -163,7 +230,7 @@
                 'api_version':'2',
                 'schema': schema,
             }
-            
+
             # Ugly Hack: tags in DGU are created with Upper case and spaces,
             # and the validator does not like them
             if 'tags' in package_dict:
@@ -174,10 +241,8 @@
 
             # Check if package exists
             context.update({'id':package_dict['id']})
-            
             try:
                 existing_package_dict = package_show(context)
-
                 # Check modified date
                 if package_dict['metadata_modified'] > existing_package_dict['metadata_modified']:
                     log.info('Package with GUID %s exists and needs to be updated' % harvest_object.guid)

Repository URL: https://bitbucket.org/okfn/ckanext-harvest/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.




More information about the ckan-changes mailing list