[ckan-changes] commit/ckanext-harvest: pudo: [harvesters] factor out a base harvester for use in generic harvesting apps
Bitbucket
commits-noreply at bitbucket.org
Thu Jun 2 10:09:29 UTC 2011
1 new changeset in ckanext-harvest:
http://bitbucket.org/okfn/ckanext-harvest/changeset/ef5990259611/
changeset: ef5990259611
branches:
user: pudo
date: 2011-06-02 12:07:07
summary: [harvesters] factor out a base harvester for use in generic harvesting apps
affected #: 5 files (11.6 KB)
--- a/ckanext/harvest/harvesters.py Tue May 31 18:06:26 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,275 +0,0 @@
-import urllib2
-
-from ckan.logic.action.create import package_create_rest
-from ckan.logic.action.update import package_update_rest
-from ckan.logic.action.get import package_show
-from ckan.logic.schema import default_package_schema
-from ckan.logic import ValidationError,NotFound
-from ckan import model
-from ckan.model import Session
-from ckan.lib.navl.validators import ignore_missing
-
-from ckan.lib.helpers import json
-
-from ckan.plugins.core import SingletonPlugin, implements
-
-from ckanext.harvest.interfaces import IHarvester
-from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \
- HarvestObjectError
-
-import logging
-log = logging.getLogger(__name__)
-
-class MockTranslator(object):
- def ugettext(self, value):
- return value
-
- def ungettext(self, singular, plural, n):
- if n > 1:
- return plural
- return singular
-
-class CKANHarvester(SingletonPlugin):
- '''
- A Harvester for CKAN instances
- '''
-
- implements(IHarvester)
-
- #TODO: check different API versions
- api_version = '2'
-
-
- def __init__(self,**kw):
- from paste.registry import Registry
- import pylons
- self.registry=Registry()
- self.registry.prepare()
-
- self.translator_obj=MockTranslator()
- self.registry.register(pylons.translator, self.translator_obj)
-
-
- def _get_rest_api_offset(self):
- return '/api/%s/rest' % self.api_version
-
- def _get_search_api_offset(self):
- return '/api/%s/search' % self.api_version
-
- def _get_content(self, url):
- http_request = urllib2.Request(
- url = url,
- )
-
- try:
- http_response = urllib2.urlopen(http_request)
-
- return http_response.read()
- except Exception, e:
- raise e
-
- def _save_gather_error(self,message,job):
- err = HarvestGatherError(message=message,job=job)
- err.save()
- log.error(message)
-
- def _save_object_error(self,message,obj,stage=u'Fetch'):
- err = HarvestObjectError(message=message,object=obj,stage=stage)
- err.save()
- log.error(message)
-
- def info(self):
- return {
- 'name': 'ckan',
- 'title': 'CKAN',
- 'description': 'Harvests remote CKAN instances'
- }
-
- def gather_stage(self,harvest_job):
- log.debug('In CKANHarvester gather_stage (%s)' % harvest_job.source.url)
-
- get_all_packages = True
- package_ids = []
-
- # Check if this source has been harvested before
- previous_job = Session.query(HarvestJob) \
- .filter(HarvestJob.source==harvest_job.source) \
- .filter(HarvestJob.gather_finished!=None) \
- .filter(HarvestJob.id!=harvest_job.id) \
- .order_by(HarvestJob.gather_finished.desc()) \
- .limit(1).first()
-
- # Get source URL
- base_url = harvest_job.source.url.rstrip('/')
- base_rest_url = base_url + self._get_rest_api_offset()
- base_search_url = base_url + self._get_search_api_offset()
-
- if previous_job and not previous_job.gather_errors:
- get_all_packages = False
-
- # Request only the packages modified since last harvest job
- last_time = harvest_job.gather_started.isoformat()
- url = base_search_url + '/revision?since_time=%s' % last_time
-
- try:
- content = self._get_content(url)
-
- revision_ids = json.loads(content)
- if len(revision_ids):
- for revision_id in revision_ids:
- url = base_rest_url + '/revision/%s' % revision_id
- try:
- content = self._get_content(url)
- except Exception,e:
- self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
- continue
-
- revision = json.loads(content)
- for package_id in revision.packages:
- if not package_id in package_ids:
- package_ids.append(package_id)
- else:
- log.info('No packages have been updated on the remote CKAN instance since the last harvest job')
- return None
-
- except urllib2.HTTPError,e:
- if e.getcode() == 400:
- log.info('CKAN instance %s does not suport revision filtering' % base_url)
- get_all_packages = True
- else:
- self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
- return None
-
-
-
- if get_all_packages:
- # Request all remote packages
- url = base_rest_url + '/package'
- try:
- content = self._get_content(url)
- except Exception,e:
- self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
- return None
-
- package_ids = json.loads(content)
-
- try:
- object_ids = []
- if len(package_ids):
- for package_id in package_ids:
- # Create a new HarvestObject for this identifier
- obj = HarvestObject(guid = package_id, job = harvest_job)
- obj.save()
- object_ids.append(obj.id)
-
- return object_ids
-
- else:
- self._save_gather_error('No packages received for URL: %s' % url,harvest_job)
- return None
- except Exception, e:
- self._save_gather_error('%r'%e.message,harvest_job)
-
-
- def fetch_stage(self,harvest_object):
- log.debug('In CKANHarvester fetch_stage')
- # Get source URL
- url = harvest_object.source.url.rstrip('/')
- url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid
-
- # Get contents
- try:
- content = self._get_content(url)
- except Exception,e:
- self._save_object_error('Unable to get content for package: %s: %r' % \
- (url, e),harvest_object)
- return None
-
- # Save the fetched contents in the HarvestObject
- harvest_object.content = content
- harvest_object.save()
-
- return True
-
- def import_stage(self,harvest_object):
-
- log.debug('In CKANHarvester import_stage')
- if not harvest_object:
- log.error('No harvest object received')
- return False
-
- if harvest_object.content is None:
- self._save_object_error('Empty content for object %s' % harvest_object.id,harvest_object,'Import')
- return False
- try:
-
- # harvest_object.content is the result of a package REST API call
- package_dict = json.loads(harvest_object.content)
-
- # Save metadata modified date in Harvest Object
- if not 'metadata_modified' in package_dict:
- # Get the date from the revision
- url = harvest_object.job.source.url.rstrip('/')
- url = url + self._get_rest_api_offset() + '/revision/%s' % package_dict['revision_id']
-
- try:
- content = self._get_content(url)
- revision_dict = json.loads(content)
- package_dict['metadata_modified'] = revision_dict['timestamp']
- except Exception,e:
- self._save_gather_error('Unable to get revision %s info : %r' % \
- (package_dict['revision_id'], e),harvest_job)
-
- harvest_object.metadata_modified_date = package_dict['metadata_modified']
- harvest_object.save()
-
- ## change default schema
- schema = default_package_schema()
- schema["id"] = [ignore_missing, unicode]
-
- context = {
- 'model': model,
- 'session':Session,
- 'user': u'harvest',
- 'api_version':'2',
- 'schema': schema,
- }
-
- # Ugly Hack: tags in DGU are created with Upper case and spaces,
- # and the validator does not like them
- if 'tags' in package_dict:
- new_tags = []
- for tag in package_dict['tags']:
- new_tags.append(tag.lower().replace(' ','_'))
- package_dict['tags'] = new_tags
-
- # Check if package exists
- context.update({'id':package_dict['id']})
- try:
- existing_package_dict = package_show(context)
- # Check modified date
- if package_dict['metadata_modified'] > existing_package_dict['metadata_modified']:
- log.info('Package with GUID %s exists and needs to be updated' % harvest_object.guid)
- # Update package
- updated_package = package_update_rest(package_dict,context)
-
- harvest_object.package_id = updated_package['id']
- harvest_object.save()
- else:
- log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid)
-
- except NotFound:
- # Package needs to be created
- #del package_dict['id']
- del context['id']
- log.info('Package with GUID %s does not exist, let\'s create it' % harvest_object.guid)
- new_package = package_create_rest(package_dict,context)
- harvest_object.package_id = new_package['id']
- harvest_object.save()
-
- return True
- except ValidationError,e:
- self._save_object_error('Invalid package with GUID %s: %r'%(harvest_object.guid,e.error_dict),harvest_object,'Import')
- except Exception, e:
- self._save_object_error('%r'%e,harvest_object,'Import')
-
--- a/pip-requirements.txt Tue May 31 18:06:26 2011 +0100
+++ b/pip-requirements.txt Thu Jun 02 12:07:07 2011 +0200
@@ -3,3 +3,4 @@
# to suit the packaging system.
carrot==0.10.1
+ckanclient>=0.7
Repository URL: https://bitbucket.org/okfn/ckanext-harvest/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the ckan-changes
mailing list