[ckan-changes] [okfn/ckan] 517945: [#2374] Made tag search pageable. Moved logic tag ...
GitHub
noreply at github.com
Fri May 4 17:55:23 UTC 2012
Branch: refs/heads/master
Home: https://github.com/okfn/ckan
Commit: 517945524247721b3b7e25dcbc559e2c079d16ba
https://github.com/okfn/ckan/commit/517945524247721b3b7e25dcbc559e2c079d16ba
Author: David Read <david.read at hackneyworkshop.com>
Date: 2012-05-04 (Fri, 04 May 2012)
Changed paths:
M ckan/lib/create_test_data.py
M ckan/logic/action/get.py
M ckan/tests/__init__.py
M ckan/tests/logic/test_action.py
A ckan/tests/logic/test_tag.py
Log Message:
-----------
[#2374] Made tag search pageable. Moved logic tag tests into its own file. Centralised related testing fixtures.
diff --git a/ckan/lib/create_test_data.py b/ckan/lib/create_test_data.py
index 44d5c64..d091ddb 100644
--- a/ckan/lib/create_test_data.py
+++ b/ckan/lib/create_test_data.py
@@ -628,6 +628,68 @@ def reset(cls):
def get_all_data(cls):
return cls.pkg_names + list(cls.group_names) + cls.tag_names + cls.user_refs
+ @classmethod
+ def make_some_vocab_tags(cls):
+ import ckan.model as model
+ model.repo.new_revision()
+
+ # Create a couple of vocabularies.
+ genre_vocab = model.Vocabulary(u'genre')
+ model.Session.add(genre_vocab)
+ composers_vocab = model.Vocabulary(u'composers')
+ model.Session.add(composers_vocab)
+
+ # Create some additional free tags for tag search tests.
+ tolkien_tag = model.Tag(name="tolkien")
+ model.Session.add(tolkien_tag)
+ toledo_tag = model.Tag(name="toledo")
+ model.Session.add(toledo_tag)
+ tolerance_tag = model.Tag(name="tolerance")
+ model.Session.add(tolerance_tag)
+ tollbooth_tag = model.Tag(name="tollbooth")
+ model.Session.add(tollbooth_tag)
+ # We have to add free tags to a package or they won't show up in tag results.
+ model.Package.get('warandpeace').add_tags((tolkien_tag, toledo_tag,
+ tolerance_tag, tollbooth_tag))
+
+ # Create some tags that belong to vocabularies.
+ sonata_tag = model.Tag(name=u'sonata', vocabulary_id=genre_vocab.id)
+ model.Session.add(sonata_tag)
+
+ bach_tag = model.Tag(name=u'Bach', vocabulary_id=composers_vocab.id)
+ model.Session.add(bach_tag)
+
+ neoclassical_tag = model.Tag(name='neoclassical',
+ vocabulary_id=genre_vocab.id)
+ model.Session.add(neoclassical_tag)
+
+ neofolk_tag = model.Tag(name='neofolk', vocabulary_id=genre_vocab.id)
+ model.Session.add(neofolk_tag)
+
+ neomedieval_tag = model.Tag(name='neomedieval',
+ vocabulary_id=genre_vocab.id)
+ model.Session.add(neomedieval_tag)
+
+ neoprog_tag = model.Tag(name='neoprog',
+ vocabulary_id=genre_vocab.id)
+ model.Session.add(neoprog_tag)
+
+ neopsychedelia_tag = model.Tag(name='neopsychedelia',
+ vocabulary_id=genre_vocab.id)
+ model.Session.add(neopsychedelia_tag)
+
+ neosoul_tag = model.Tag(name='neosoul', vocabulary_id=genre_vocab.id)
+ model.Session.add(neosoul_tag)
+
+ nerdcore_tag = model.Tag(name='nerdcore', vocabulary_id=genre_vocab.id)
+ model.Session.add(nerdcore_tag)
+
+ model.Package.get('warandpeace').add_tag(bach_tag)
+ model.Package.get('annakarenina').add_tag(sonata_tag)
+
+ model.Session.commit()
+
+
search_items = [{'name':'gils',
'title':'Government Information Locator Service',
diff --git a/ckan/logic/action/get.py b/ckan/logic/action/get.py
index b0cb640..d9ba90f 100644
--- a/ckan/logic/action/get.py
+++ b/ckan/logic/action/get.py
@@ -321,7 +321,7 @@ def tag_list(context, data_dict):
check_access('tag_list', context, data_dict)
if query:
- tags = _tag_search(context, data_dict)
+ tags, count = _tag_search(context, data_dict)
else:
tags = model.Tag.all(vocab_id_or_name)
@@ -921,7 +921,8 @@ def resource_search(context, data_dict):
return {'count': count, 'results': results}
def _tag_search(context, data_dict):
- '''Return a list of tag objects that contain the given string.
+ '''Return a list of tag objects that contain the given string and
+ the full count (for paging).
The query string should be provided in the data_dict with key 'query' or
'q'.
@@ -930,6 +931,7 @@ def _tag_search(context, data_dict):
searched. If a 'vocabulary_id' is provided in the data_dict then tags
belonging to the given vocabulary (id or name) will be searched instead.
+ Use 'offset' and 'limit' parameters to page through results.
'''
model = context['model']
@@ -963,15 +965,16 @@ def _tag_search(context, data_dict):
terms.append(value)
if not len(terms):
- return []
+ return [], 0
for term in terms:
escaped_term = misc.escape_sql_like_special_characters(term, escape='\\')
q = q.filter(model.Tag.name.ilike('%' + escaped_term + '%'))
+ count = q.count()
q = q.offset(offset)
q = q.limit(limit)
- return q.all()
+ return q.all(), count
def tag_search(context, data_dict):
'''Return a list of tag dictionaries that contain the given string.
@@ -987,8 +990,8 @@ def tag_search(context, data_dict):
and 'results' (the list of tag dicts).
'''
- tags = _tag_search(context, data_dict)
- return {'count': len(tags),
+ tags, count = _tag_search(context, data_dict)
+ return {'count': count,
'results': [table_dictize(tag, context) for tag in tags]}
def tag_autocomplete(context, data_dict):
@@ -1003,7 +1006,7 @@ def tag_autocomplete(context, data_dict):
'''
check_access('tag_autocomplete', context, data_dict)
- matching_tags = _tag_search(context, data_dict)
+ matching_tags, count = _tag_search(context, data_dict)
if matching_tags:
return [tag.name for tag in matching_tags]
else:
diff --git a/ckan/tests/__init__.py b/ckan/tests/__init__.py
index 94f94fe..d0ae513 100644
--- a/ckan/tests/__init__.py
+++ b/ckan/tests/__init__.py
@@ -349,3 +349,12 @@ def assert_in(a, b):
assert a in b, '%r was not in %r' % (a, b)
def assert_not_in(a, b):
assert a not in b, '%r was in %r' % (a, b)
+
+class StatusCodes:
+ STATUS_200_OK = 200
+ STATUS_201_CREATED = 201
+ STATUS_400_BAD_REQUEST = 400
+ STATUS_403_ACCESS_DENIED = 403
+ STATUS_404_NOT_FOUND = 404
+ STATUS_409_CONFLICT = 409
+
diff --git a/ckan/tests/logic/test_action.py b/ckan/tests/logic/test_action.py
index a3294f1..38dbf23 100644
--- a/ckan/tests/logic/test_action.py
+++ b/ckan/tests/logic/test_action.py
@@ -11,22 +11,15 @@
from ckan.tests import WsgiAppCase
from ckan.tests.functional.api import assert_dicts_equal_ignoring_ordering
from ckan.tests import setup_test_search_index, search_related
+from ckan.tests import StatusCodes
from ckan.logic import get_action, NotAuthorized
from ckan.logic.action import get_domain_object
from ckan import plugins
from ckan.plugins import SingletonPlugin, implements, IPackageController
-
class TestAction(WsgiAppCase):
- STATUS_200_OK = 200
- STATUS_201_CREATED = 201
- STATUS_400_BAD_REQUEST = 400
- STATUS_403_ACCESS_DENIED = 403
- STATUS_404_NOT_FOUND = 404
- STATUS_409_CONFLICT = 409
-
sysadmin_user = None
normal_user = None
@@ -36,72 +29,12 @@ def setup_class(cls):
CreateTestData.create()
cls.sysadmin_user = model.User.get('testsysadmin')
cls.normal_user = model.User.get('annafan')
- cls.make_some_vocab_tags()
+ CreateTestData.make_some_vocab_tags()
@classmethod
def teardown_class(cls):
model.repo.rebuild_db()
- @classmethod
- def make_some_vocab_tags(cls):
- model.repo.new_revision()
-
- # Create a couple of vocabularies.
- genre_vocab = model.Vocabulary(u'genre')
- model.Session.add(genre_vocab)
- composers_vocab = model.Vocabulary(u'composers')
- model.Session.add(composers_vocab)
-
- # Create some additional free tags for tag search tests.
- tolkien_tag = model.Tag(name="tolkien")
- model.Session.add(tolkien_tag)
- toledo_tag = model.Tag(name="toledo")
- model.Session.add(toledo_tag)
- tolerance_tag = model.Tag(name="tolerance")
- model.Session.add(tolerance_tag)
- tollbooth_tag = model.Tag(name="tollbooth")
- model.Session.add(tollbooth_tag)
- # We have to add free tags to a package or they won't show up in tag results.
- model.Package.get('warandpeace').add_tags((tolkien_tag, toledo_tag,
- tolerance_tag, tollbooth_tag))
-
- # Create some tags that belong to vocabularies.
- sonata_tag = model.Tag(name=u'sonata', vocabulary_id=genre_vocab.id)
- model.Session.add(sonata_tag)
-
- bach_tag = model.Tag(name=u'Bach', vocabulary_id=composers_vocab.id)
- model.Session.add(bach_tag)
-
- neoclassical_tag = model.Tag(name='neoclassical',
- vocabulary_id=genre_vocab.id)
- model.Session.add(neoclassical_tag)
-
- neofolk_tag = model.Tag(name='neofolk', vocabulary_id=genre_vocab.id)
- model.Session.add(neofolk_tag)
-
- neomedieval_tag = model.Tag(name='neomedieval',
- vocabulary_id=genre_vocab.id)
- model.Session.add(neomedieval_tag)
-
- neoprog_tag = model.Tag(name='neoprog',
- vocabulary_id=genre_vocab.id)
- model.Session.add(neoprog_tag)
-
- neopsychedelia_tag = model.Tag(name='neopsychedelia',
- vocabulary_id=genre_vocab.id)
- model.Session.add(neopsychedelia_tag)
-
- neosoul_tag = model.Tag(name='neosoul', vocabulary_id=genre_vocab.id)
- model.Session.add(neosoul_tag)
-
- nerdcore_tag = model.Tag(name='nerdcore', vocabulary_id=genre_vocab.id)
- model.Session.add(nerdcore_tag)
-
- model.Package.get('warandpeace').add_tag(bach_tag)
- model.Package.get('annakarenina').add_tag(sonata_tag)
-
- model.Session.commit()
-
def _add_basic_package(self, package_name=u'test_package', **kwargs):
package = {
'name': package_name,
@@ -249,7 +182,7 @@ def test_18_create_package_not_authorized(self):
wee = json.dumps(package)
postparams = '%s=1' % json.dumps(package)
res = self.app.post('/api/action/package_create', params=postparams,
- status=self.STATUS_403_ACCESS_DENIED)
+ status=StatusCodes.STATUS_403_ACCESS_DENIED)
def test_04_user_list(self):
postparams = '%s=1' % json.dumps({})
@@ -331,169 +264,6 @@ def test_05b_user_show_datasets(self):
dataset = result['datasets'][0]
assert_equal(dataset['name'], u'annakarenina')
- def test_06a_tag_list(self):
- postparams = '%s=1' % json.dumps({})
- res = self.app.post('/api/action/tag_list', params=postparams)
- resbody = json.loads(res.body)
- assert resbody['success'] is True
- assert sorted(resbody['result']) == sorted(['russian', 'tolstoy',
- u'Flexible \u30a1', 'tollbooth', 'tolkien', 'toledo',
- 'tolerance'])
- assert resbody['help'].startswith(
- 'Return a list of tag dictionaries.')
- #Get all fields
- postparams = '%s=1' % json.dumps({'all_fields':True})
- res = self.app.post('/api/action/tag_list', params=postparams)
- res_obj = json.loads(res.body)
- pprint(res_obj)
- assert res_obj['success'] == True
-
- names = [ res_obj['result'][i]['name'] for i in xrange(len(res_obj['result'])) ]
- russian_index = names.index('russian')
- tolstoy_index = names.index('tolstoy')
- flexible_index = names.index(u'Flexible \u30a1')
-
- assert res_obj['result'][russian_index]['name'] == 'russian'
- assert res_obj['result'][tolstoy_index]['name'] == 'tolstoy'
-
- # The "moo" package may part of the retrieved packages, depending
- # upon whether this test is run in isolation from the rest of the
- # test suite or not.
- number_of_russian_packages = len(res_obj['result'][russian_index]['packages']) # warandpeace, annakarenina (moo?)
- number_of_tolstoy_packages = len(res_obj['result'][tolstoy_index]['packages']) # annakarenina
- number_of_flexible_packages = len(res_obj['result'][flexible_index]['packages']) # warandpeace, annakarenina (moo?)
-
- # Assert we have the correct number of packages, independantly of
- # whether the "moo" package may exist or not.
- assert number_of_russian_packages - number_of_tolstoy_packages == 1
- assert number_of_flexible_packages == (number_of_russian_packages - number_of_tolstoy_packages) + 1
-
- assert 'id' in res_obj['result'][0]
- assert 'id' in res_obj['result'][1]
- assert 'id' in res_obj['result'][2]
-
- def test_06b_tag_list_vocab(self):
- vocab_name = 'test-vocab'
- tag_name = 'test-vocab-tag'
-
- # create vocab
- params = json.dumps({'name': vocab_name})
- extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)}
- response = self.app.post('/api/action/vocabulary_create', params=params,
- extra_environ=extra_environ)
- assert response.json['success']
- vocab_id = response.json['result']['id']
-
- # create new tag with vocab
- params = json.dumps({'name': tag_name, 'vocabulary_id': vocab_id})
- extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)}
- response = self.app.post('/api/action/tag_create', params=params,
- extra_environ=extra_environ)
- assert response.json['success'] == True
-
- # check that tag shows up in list
- params = '%s=1' % json.dumps({'vocabulary_id': vocab_name})
- res = self.app.post('/api/action/tag_list', params=params)
- body = json.loads(res.body)
- assert body['success'] is True
- assert body['result'] == [tag_name]
- assert body['help'].startswith('Return a list of tag dictionaries.')
-
- # check that invalid vocab name results in a 404
- params = '%s=1' % json.dumps({'vocabulary_id': 'invalid-vocab-name'})
- res = self.app.post('/api/action/tag_list', params=params, status=404)
-
- def test_07_tag_show(self):
- postparams = '%s=1' % json.dumps({'id':'russian'})
- res = self.app.post('/api/action/tag_show', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['help'] == 'Shows tag details'
- assert res_obj['success'] == True
- result = res_obj['result']
- assert result['name'] == 'russian'
- assert 'id' in result
- assert 'packages' in result
-
- packages = [package['name'] for package in result['packages']]
-
- # the "moo" package may be part of the retrieved packages, depending
- # upon whether or not this test is run in isolation from the other tests
- # in the suite.
- expected_packages = ['annakarenina', 'warandpeace'] + (
- ['moo'] if 'moo' in packages else [])
-
- assert sorted(packages) == sorted(expected_packages), "%s != %s" %(packages, expected_packages)
-
- def test_07_flexible_tag_show(self):
- """
- Asserts that the api can be used to retrieve the details of the flexible tag.
-
- The flexible tag is the tag with spaces, punctuation and foreign
- characters in its name, that's created in `ckan/lib/create_test_data.py`.
- """
- postparams = '%s=1' % json.dumps({'id':u'Flexible \u30a1'})
- res = self.app.post('/api/action/tag_show', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['help'] == 'Shows tag details'
- assert res_obj['success'] == True
- result = res_obj['result']
- assert result['name'] == u'Flexible \u30a1'
- assert 'id' in result
- assert 'packages' in result and len(result['packages']) == 2
-
- assert sorted([package['name'] for package in result['packages']]) == \
- sorted(['annakarenina', 'warandpeace'])
-
- def test_07_tag_show_unknown_license(self):
- # create a tagged package which has an invalid license
- CreateTestData.create_arbitrary([{
- 'name': u'tag_test',
- 'tags': u'tolstoy',
- 'license': 'never_heard_of_it',
- }])
- postparams = '%s=1' % json.dumps({'id':'tolstoy'})
- res = self.app.post('/api/action/tag_show', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success'] == True
- result = res_obj['result']
- for pkg in result['packages']:
- if pkg['name'] == 'tag_test':
- break
- else:
- assert 0, 'tag_test not among packages'
- assert_equal(pkg['license_id'], 'never_heard_of_it')
- assert_equal(pkg['isopen'], False)
-
- def test_08_user_create_not_authorized(self):
- postparams = '%s=1' % json.dumps({'name':'test_create_from_action_api', 'password':'testpass'})
- res = self.app.post('/api/action/user_create', params=postparams,
- status=self.STATUS_403_ACCESS_DENIED)
- res_obj = json.loads(res.body)
- assert res_obj == {'help': 'Creates a new user',
- 'success': False,
- 'error': {'message': 'Access denied', '__type': 'Authorization Error'}}
-
- def test_09_user_create(self):
- user_dict = {'name':'test_create_from_action_api',
- 'about': 'Just a test user',
- 'email': 'me at test.org',
- 'password':'testpass'}
-
- postparams = '%s=1' % json.dumps(user_dict)
- res = self.app.post('/api/action/user_create', params=postparams,
- extra_environ={'Authorization': str(self.sysadmin_user.apikey)})
- res_obj = json.loads(res.body)
- assert res_obj['help'] == 'Creates a new user'
- assert res_obj['success'] == True
- result = res_obj['result']
- assert result['name'] == user_dict['name']
- assert result['about'] == user_dict['about']
- assert 'apikey' in result
- assert 'created' in result
- assert 'display_name' in result
- assert 'number_administered_packages' in result
- assert 'number_of_edits' in result
- assert not 'password' in result
def test_10_user_create_parameters_missing(self):
user_dict = {}
@@ -501,7 +271,7 @@ def test_10_user_create_parameters_missing(self):
postparams = '%s=1' % json.dumps(user_dict)
res = self.app.post('/api/action/user_create', params=postparams,
extra_environ={'Authorization': str(self.sysadmin_user.apikey)},
- status=self.STATUS_409_CONFLICT)
+ status=StatusCodes.STATUS_409_CONFLICT)
res_obj = json.loads(res.body)
assert res_obj == {
'error': {
@@ -522,7 +292,7 @@ def test_11_user_create_wrong_password(self):
postparams = '%s=1' % json.dumps(user_dict)
res = self.app.post('/api/action/user_create', params=postparams,
extra_environ={'Authorization': str(self.sysadmin_user.apikey)},
- status=self.STATUS_409_CONFLICT)
+ status=StatusCodes.STATUS_409_CONFLICT)
res_obj = json.loads(res.body)
assert res_obj == {
@@ -598,7 +368,7 @@ def test_12_user_update(self):
postparams = '%s=1' % json.dumps(sysadmin_user_dict)
res = self.app.post('/api/action/user_update', params=postparams,
extra_environ={'Authorization': str(self.normal_user.apikey)},
- status=self.STATUS_403_ACCESS_DENIED)
+ status=StatusCodes.STATUS_403_ACCESS_DENIED)
res_obj = json.loads(res.body)
assert res_obj == {
@@ -638,7 +408,7 @@ def test_12_user_update_errors(self):
postparams = '%s=1' % json.dumps(test_call['user_dict'])
res = self.app.post('/api/action/user_update', params=postparams,
extra_environ={'Authorization': str(self.normal_user.apikey)},
- status=self.STATUS_409_CONFLICT)
+ status=StatusCodes.STATUS_409_CONFLICT)
res_obj = json.loads(res.body)
for expected_message in test_call['messages']:
assert expected_message[1] in ''.join(res_obj['error'][expected_message[0]])
@@ -710,7 +480,7 @@ def test_14_group_show(self):
#Group not found
postparams = '%s=1' % json.dumps({'id':'not_present_in_the_db'})
res = self.app.post('/api/action/group_show', params=postparams,
- status=self.STATUS_404_NOT_FOUND)
+ status=StatusCodes.STATUS_404_NOT_FOUND)
res_obj = json.loads(res.body)
pprint(res_obj)
@@ -723,306 +493,6 @@ def test_14_group_show(self):
'success': False
}
- def test_15a_tag_search_with_empty_query(self):
- for q in ('missing', None, '', ' '):
- paramd = {}
- if q != 'missing':
- paramd['q'] = q
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_search', params=params)
- assert res.json['success'] is True
- assert res.json['result']['count'] == 0
- assert res.json['result']['results'] == []
-
- def test_15a_tag_search_with_no_matches(self):
- paramd = {'q': 'no matches' }
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_search', params=params)
- assert res.json['success'] is True
- assert res.json['result']['count'] == 0
- assert res.json['result']['results'] == []
-
- def test_15a_tag_search_with_one_match(self):
- paramd = {'q': 'russ' }
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_search', params=params)
- assert res.json['success'] is True
- assert res.json['result']['count'] == 1
- tag_dicts = res.json['result']['results']
- assert len(tag_dicts) == 1
- assert tag_dicts[0]['name'] == 'russian'
-
- def test_15a_tag_search_with_many_matches(self):
- paramd = {'q': 'tol' }
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_search', params=params)
- assert res.json['success'] is True
- assert res.json['result']['count'] == 5
- tag_dicts = res.json['result']['results']
- assert ([tag['name'] for tag in tag_dicts] ==
- sorted(['tolkien', 'toledo', 'tolerance', 'tollbooth', 'tolstoy']))
-
- def test_15a_tag_search_with_vocab_and_empty_query(self):
- for q in ('missing', None, '', ' '):
- paramd = {'vocabulary_id': 'genre'}
- if q != 'missing':
- paramd['q'] = q
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_search', params=params)
- assert res.json['success'] is True
- assert res.json['result']['count'] == 0
- assert res.json['result']['results'] == []
-
- def test_15a_tag_search_with_vocab_and_one_match(self):
- paramd = {'q': 'son', 'vocabulary_id': 'genre' }
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_search', params=params)
- assert res.json['success'] is True
- assert res.json['result']['count'] == 1
- tag_dicts = res.json['result']['results']
- assert len(tag_dicts) == 1
- assert tag_dicts[0]['name'] == 'sonata'
-
- def test_15a_tag_search_with_vocab_and_multiple_matches(self):
- paramd = {'q': 'neo', 'vocabulary_id': 'genre' }
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_search', params=params)
- assert res.json['success'] is True
- assert res.json['result']['count'] == 6
- tag_dicts = res.json['result']['results']
- assert [tag['name'] for tag in tag_dicts] == sorted(('neoclassical',
- 'neofolk', 'neomedieval', 'neoprog', 'neopsychedelia', 'neosoul'))
-
- def test_15a_tag_search_with_vocab_and_no_matches(self):
- paramd = {'q': 'xxxxxxx', 'vocabulary_id': 'genre' }
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_search', params=params)
- assert res.json['success'] is True
- assert res.json['result']['count'] == 0
- tag_dicts = res.json['result']['results']
- assert tag_dicts == []
-
- def test_15a_tag_search_with_vocab_that_does_not_exist(self):
- paramd = {'q': 'neo', 'vocabulary_id': 'xxxxxx' }
- params = json.dumps(paramd)
- self.app.post('/api/action/tag_search', params=params, status=404)
-
- def test_15a_tag_search_with_invalid_vocab(self):
- for vocab_name in (None, '', 'a', 'e'*200):
- paramd = {'q': 'neo', 'vocabulary_id': vocab_name }
- params = json.dumps(paramd)
- self.app.post('/api/action/tag_search', params=params, status=404)
-
- def test_15_tag_autocomplete(self):
- #Empty query
- postparams = '%s=1' % json.dumps({})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success'] == True
- assert res_obj['result'] == []
- assert res_obj['help'].startswith(
- 'Return a list of tag names that contain the given string.')
-
- #Normal query
- postparams = '%s=1' % json.dumps({'q':'r'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success'] == True
- assert res_obj['result'] == ['russian', 'tolerance']
- assert res_obj['help'].startswith(
- 'Return a list of tag names that contain the given string.')
-
- def test_15_tag_autocomplete_tag_with_spaces(self):
- """Asserts autocomplete finds tags that contain spaces"""
-
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-space-1',
- 'tags': [u'with space'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':'w'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert 'with space' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_tag_with_foreign_characters(self):
- """Asserts autocomplete finds tags that contain foreign characters"""
-
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-foreign-character-1',
- 'tags': [u'greek beta \u03b2'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':'greek'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert u'greek beta \u03b2' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_tag_with_punctuation(self):
- """Asserts autocomplete finds tags that contain punctuation"""
-
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-fullstop-1',
- 'tags': [u'fullstop.'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':'fullstop'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert u'fullstop.' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_tag_with_capital_letters(self):
- """
- Asserts autocomplete finds tags that contain capital letters
- """
-
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-capital-letter-1',
- 'tags': [u'CAPITAL idea old chap'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':'idea'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert u'CAPITAL idea old chap' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_search_with_space(self):
- """
- Asserts that a search term containing a space works correctly
- """
-
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-space-2',
- 'tags': [u'with space'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':'th sp'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert 'with space' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_search_with_foreign_character(self):
- """
- Asserts that a search term containing a foreign character works correctly
- """
-
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-foreign-character-2',
- 'tags': [u'greek beta \u03b2'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':u'\u03b2'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert u'greek beta \u03b2' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_search_with_punctuation(self):
- """
- Asserts that a search term containing punctuation works correctly
- """
-
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-fullstop-2',
- 'tags': [u'fullstop.'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':u'stop.'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert 'fullstop.' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_search_with_capital_letters(self):
- """
- Asserts that a search term containing capital letters works correctly
- """
-
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-capital-letter-2',
- 'tags': [u'CAPITAL idea old chap'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':u'CAPITAL'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert 'CAPITAL idea old chap' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_is_case_insensitive(self):
- CreateTestData.create_arbitrary([{
- 'name': u'package-with-tag-that-has-a-capital-letter-3',
- 'tags': [u'MIX of CAPITALS and LOWER case'],
- 'license': 'never_heard_of_it',
- }])
-
- postparams = '%s=1' % json.dumps({'q':u'lower case'})
- res = self.app.post('/api/action/tag_autocomplete', params=postparams)
- res_obj = json.loads(res.body)
- assert res_obj['success']
- assert 'MIX of CAPITALS and LOWER case' in res_obj['result'], res_obj['result']
-
- def test_15_tag_autocomplete_with_vocab_and_empty_query(self):
- for q in ('missing', None, '', ' '):
- paramd = {'vocabulary_id': u'genre'}
- if q != 'missing':
- paramd['q'] = q
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_autocomplete', params=params)
- assert res.json['success'] is True
- assert res.json['result'] == []
-
- def test_15_tag_autocomplete_with_vocab_and_single_match(self):
- paramd = {'vocabulary_id': u'genre', 'q': 'son'}
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_autocomplete', params=params)
- assert res.json['success'] is True
- assert res.json['result'] == ['sonata'], res.json['result']
-
- def test_15_tag_autocomplete_with_vocab_and_multiple_matches(self):
- paramd = {'vocabulary_id': 'genre', 'q': 'neo'}
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_autocomplete', params=params)
- assert res.json['success'] is True
- assert res.json['result'] == sorted(('neoclassical', 'neofolk',
- 'neomedieval', 'neoprog', 'neopsychedelia', 'neosoul'))
-
- def test_15_tag_autocomplete_with_vocab_and_no_matches(self):
- paramd = {'vocabulary_id': 'composers', 'q': 'Jonny Greenwood'}
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_autocomplete', params=params)
- assert res.json['success'] is True
- assert res.json['result'] == []
-
- def test_15_tag_autocomplete_with_vocab_that_does_not_exist(self):
- for q in ('', 'neo'):
- paramd = {'vocabulary_id': 'does_not_exist', 'q': q}
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_autocomplete', params=params,
- status=404)
- assert res.json['success'] is False
-
- def test_15_tag_autocomplete_with_invalid_vocab(self):
- for vocab_name in (None, '', 'a', 'e'*200):
- for q in (None, '', 'son'):
- paramd = {'vocabulary_id': vocab_name, 'q': q}
- params = json.dumps(paramd)
- res = self.app.post('/api/action/tag_autocomplete', params=params,
- status=404)
- assert res.json['success'] is False
def test_16_user_autocomplete(self):
#Empty query
@@ -1165,7 +635,7 @@ def test_22_task_status_normal_user_not_authorized(self):
res = self.app.post(
'/api/action/task_status_update', params=postparams,
extra_environ={'Authorization': str(self.normal_user.apikey)},
- status=self.STATUS_403_ACCESS_DENIED
+ status=StatusCodes.STATUS_403_ACCESS_DENIED
)
res_obj = json.loads(res.body)
expected_res_obj = {
@@ -1181,7 +651,7 @@ def test_23_task_status_validation(self):
res = self.app.post(
'/api/action/task_status_update', params=postparams,
extra_environ={'Authorization': str(self.sysadmin_user.apikey)},
- status=self.STATUS_409_CONFLICT
+ status=StatusCodes.STATUS_409_CONFLICT
)
def test_24_task_status_show(self):
diff --git a/ckan/tests/logic/test_tag.py b/ckan/tests/logic/test_tag.py
new file mode 100644
index 0000000..7349cfb
--- /dev/null
+++ b/ckan/tests/logic/test_tag.py
@@ -0,0 +1,495 @@
+import json
+from pprint import pprint
+from nose.tools import assert_equal, assert_raises
+
+import ckan.model as model
+from ckan.lib.create_test_data import CreateTestData
+from ckan.tests import WsgiAppCase
+from ckan.tests import StatusCodes
+
+class TestAction(WsgiAppCase):
+ @classmethod
+ def setup_class(cls):
+ CreateTestData.create()
+ cls.sysadmin_user = model.User.get('testsysadmin')
+ cls.normal_user = model.User.get('annafan')
+ CreateTestData.make_some_vocab_tags()
+
+ @classmethod
+ def teardown_class(cls):
+ model.repo.rebuild_db()
+
+ def test_06a_tag_list(self):
+ postparams = '%s=1' % json.dumps({})
+ res = self.app.post('/api/action/tag_list', params=postparams)
+ resbody = json.loads(res.body)
+ assert resbody['success'] is True
+ assert sorted(resbody['result']) == sorted(['russian', 'tolstoy',
+ u'Flexible \u30a1', 'tollbooth', 'tolkien', 'toledo',
+ 'tolerance'])
+ assert resbody['help'].startswith(
+ 'Return a list of tag dictionaries.')
+ #Get all fields
+ postparams = '%s=1' % json.dumps({'all_fields':True})
+ res = self.app.post('/api/action/tag_list', params=postparams)
+ res_obj = json.loads(res.body)
+ pprint(res_obj)
+ assert res_obj['success'] == True
+
+ names = [ res_obj['result'][i]['name'] for i in xrange(len(res_obj['result'])) ]
+ russian_index = names.index('russian')
+ tolstoy_index = names.index('tolstoy')
+ flexible_index = names.index(u'Flexible \u30a1')
+
+ assert res_obj['result'][russian_index]['name'] == 'russian'
+ assert res_obj['result'][tolstoy_index]['name'] == 'tolstoy'
+
+ # The "moo" package may part of the retrieved packages, depending
+ # upon whether this test is run in isolation from the rest of the
+ # test suite or not.
+ number_of_russian_packages = len(res_obj['result'][russian_index]['packages']) # warandpeace, annakarenina (moo?)
+ number_of_tolstoy_packages = len(res_obj['result'][tolstoy_index]['packages']) # annakarenina
+ number_of_flexible_packages = len(res_obj['result'][flexible_index]['packages']) # warandpeace, annakarenina (moo?)
+
+ # Assert we have the correct number of packages, independantly of
+ # whether the "moo" package may exist or not.
+ assert number_of_russian_packages - number_of_tolstoy_packages == 1
+ assert number_of_flexible_packages == (number_of_russian_packages - number_of_tolstoy_packages) + 1
+
+ assert 'id' in res_obj['result'][0]
+ assert 'id' in res_obj['result'][1]
+ assert 'id' in res_obj['result'][2]
+
+ def test_06b_tag_list_vocab(self):
+ vocab_name = 'test-vocab'
+ tag_name = 'test-vocab-tag'
+
+ # create vocab
+ params = json.dumps({'name': vocab_name})
+ extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)}
+ response = self.app.post('/api/action/vocabulary_create', params=params,
+ extra_environ=extra_environ)
+ assert response.json['success']
+ vocab_id = response.json['result']['id']
+
+ # create new tag with vocab
+ params = json.dumps({'name': tag_name, 'vocabulary_id': vocab_id})
+ extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)}
+ response = self.app.post('/api/action/tag_create', params=params,
+ extra_environ=extra_environ)
+ assert response.json['success'] == True
+
+ # check that tag shows up in list
+ params = '%s=1' % json.dumps({'vocabulary_id': vocab_name})
+ res = self.app.post('/api/action/tag_list', params=params)
+ body = json.loads(res.body)
+ assert body['success'] is True
+ assert body['result'] == [tag_name]
+ assert body['help'].startswith('Return a list of tag dictionaries.')
+
+ # check that invalid vocab name results in a 404
+ params = '%s=1' % json.dumps({'vocabulary_id': 'invalid-vocab-name'})
+ res = self.app.post('/api/action/tag_list', params=params, status=404)
+
+ def test_07_tag_show(self):
+ postparams = '%s=1' % json.dumps({'id':'russian'})
+ res = self.app.post('/api/action/tag_show', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['help'] == 'Shows tag details'
+ assert res_obj['success'] == True
+ result = res_obj['result']
+ assert result['name'] == 'russian'
+ assert 'id' in result
+ assert 'packages' in result
+
+ packages = [package['name'] for package in result['packages']]
+
+ # the "moo" package may be part of the retrieved packages, depending
+ # upon whether or not this test is run in isolation from the other tests
+ # in the suite.
+ expected_packages = ['annakarenina', 'warandpeace'] + (
+ ['moo'] if 'moo' in packages else [])
+
+ assert sorted(packages) == sorted(expected_packages), "%s != %s" %(packages, expected_packages)
+
+ def test_07_flexible_tag_show(self):
+ """
+ Asserts that the api can be used to retrieve the details of the flexible tag.
+
+ The flexible tag is the tag with spaces, punctuation and foreign
+ characters in its name, that's created in `ckan/lib/create_test_data.py`.
+ """
+ postparams = '%s=1' % json.dumps({'id':u'Flexible \u30a1'})
+ res = self.app.post('/api/action/tag_show', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['help'] == 'Shows tag details'
+ assert res_obj['success'] == True
+ result = res_obj['result']
+ assert result['name'] == u'Flexible \u30a1'
+ assert 'id' in result
+ assert 'packages' in result and len(result['packages']) == 2
+
+ assert sorted([package['name'] for package in result['packages']]) == \
+ sorted(['annakarenina', 'warandpeace'])
+
+ def test_07_tag_show_unknown_license(self):
+ # create a tagged package which has an invalid license
+ CreateTestData.create_arbitrary([{
+ 'name': u'tag_test',
+ 'tags': u'tolstoy',
+ 'license': 'never_heard_of_it',
+ }])
+ postparams = '%s=1' % json.dumps({'id':'tolstoy'})
+ res = self.app.post('/api/action/tag_show', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success'] == True
+ result = res_obj['result']
+ for pkg in result['packages']:
+ if pkg['name'] == 'tag_test':
+ break
+ else:
+ assert 0, 'tag_test not among packages'
+ assert_equal(pkg['license_id'], 'never_heard_of_it')
+ assert_equal(pkg['isopen'], False)
+
+ def test_08_user_create_not_authorized(self):
+ postparams = '%s=1' % json.dumps({'name':'test_create_from_action_api', 'password':'testpass'})
+ res = self.app.post('/api/action/user_create', params=postparams,
+ status=StatusCodes.STATUS_403_ACCESS_DENIED)
+ res_obj = json.loads(res.body)
+ assert res_obj == {'help': 'Creates a new user',
+ 'success': False,
+ 'error': {'message': 'Access denied', '__type': 'Authorization Error'}}
+
+ def test_09_user_create(self):
+ user_dict = {'name':'test_create_from_action_api',
+ 'about': 'Just a test user',
+ 'email': 'me at test.org',
+ 'password':'testpass'}
+
+ postparams = '%s=1' % json.dumps(user_dict)
+ res = self.app.post('/api/action/user_create', params=postparams,
+ extra_environ={'Authorization': str(self.sysadmin_user.apikey)})
+ res_obj = json.loads(res.body)
+ assert res_obj['help'] == 'Creates a new user'
+ assert res_obj['success'] == True
+ result = res_obj['result']
+ assert result['name'] == user_dict['name']
+ assert result['about'] == user_dict['about']
+ assert 'apikey' in result
+ assert 'created' in result
+ assert 'display_name' in result
+ assert 'number_administered_packages' in result
+ assert 'number_of_edits' in result
+ assert not 'password' in result
+
+ def test_15a_tag_search_with_empty_query(self):
+ for q in ('missing', None, '', ' '):
+ paramd = {}
+ if q != 'missing':
+ paramd['q'] = q
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 0
+ assert res.json['result']['results'] == []
+
+ def test_15a_tag_search_with_no_matches(self):
+ paramd = {'q': 'no matches' }
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 0
+ assert res.json['result']['results'] == []
+
+ def test_15a_tag_search_with_one_match(self):
+ paramd = {'q': 'russ' }
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 1
+ tag_dicts = res.json['result']['results']
+ assert len(tag_dicts) == 1
+ assert tag_dicts[0]['name'] == 'russian'
+
+ def test_15a_tag_search_with_many_matches(self):
+ paramd = {'q': 'tol' }
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 5
+ tag_dicts = res.json['result']['results']
+ assert ([tag['name'] for tag in tag_dicts] ==
+ sorted(['tolkien', 'toledo', 'tolerance', 'tollbooth', 'tolstoy']))
+
+ def test_15a_tag_search_with_many_matches_paged(self):
+ paramd = {'q': 'tol', 'limit': 2, 'offset': 2 }
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 5
+ tag_dicts = res.json['result']['results']
+ assert_equal ([tag['name'] for tag in tag_dicts],
+ [u'tolkien', u'tollbooth'])
+
+ def test_15a_tag_search_with_vocab_and_empty_query(self):
+ for q in ('missing', None, '', ' '):
+ paramd = {'vocabulary_id': 'genre'}
+ if q != 'missing':
+ paramd['q'] = q
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 0
+ assert res.json['result']['results'] == []
+
+ def test_15a_tag_search_with_vocab_and_one_match(self):
+ paramd = {'q': 'son', 'vocabulary_id': 'genre' }
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 1
+ tag_dicts = res.json['result']['results']
+ assert len(tag_dicts) == 1
+ assert tag_dicts[0]['name'] == 'sonata'
+
+ def test_15a_tag_search_with_vocab_and_multiple_matches(self):
+ paramd = {'q': 'neo', 'vocabulary_id': 'genre' }
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 6
+ tag_dicts = res.json['result']['results']
+ assert [tag['name'] for tag in tag_dicts] == sorted(('neoclassical',
+ 'neofolk', 'neomedieval', 'neoprog', 'neopsychedelia', 'neosoul'))
+
+ def test_15a_tag_search_with_vocab_and_no_matches(self):
+ paramd = {'q': 'xxxxxxx', 'vocabulary_id': 'genre' }
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_search', params=params)
+ assert res.json['success'] is True
+ assert res.json['result']['count'] == 0
+ tag_dicts = res.json['result']['results']
+ assert tag_dicts == []
+
+ def test_15a_tag_search_with_vocab_that_does_not_exist(self):
+ paramd = {'q': 'neo', 'vocabulary_id': 'xxxxxx' }
+ params = json.dumps(paramd)
+ self.app.post('/api/action/tag_search', params=params, status=404)
+
+ def test_15a_tag_search_with_invalid_vocab(self):
+ for vocab_name in (None, '', 'a', 'e'*200):
+ paramd = {'q': 'neo', 'vocabulary_id': vocab_name }
+ params = json.dumps(paramd)
+ self.app.post('/api/action/tag_search', params=params, status=404)
+
+ def test_15_tag_autocomplete(self):
+ #Empty query
+ postparams = '%s=1' % json.dumps({})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success'] == True
+ assert res_obj['result'] == []
+ assert res_obj['help'].startswith(
+ 'Return a list of tag names that contain the given string.')
+
+ #Normal query
+ postparams = '%s=1' % json.dumps({'q':'r'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success'] == True
+ assert res_obj['result'] == ['russian', 'tolerance']
+ assert res_obj['help'].startswith(
+ 'Return a list of tag names that contain the given string.')
+
+ def test_15_tag_autocomplete_tag_with_spaces(self):
+ """Asserts autocomplete finds tags that contain spaces"""
+
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-space-1',
+ 'tags': [u'with space'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':'w'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert 'with space' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_tag_with_foreign_characters(self):
+ """Asserts autocomplete finds tags that contain foreign characters"""
+
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-foreign-character-1',
+ 'tags': [u'greek beta \u03b2'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':'greek'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert u'greek beta \u03b2' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_tag_with_punctuation(self):
+ """Asserts autocomplete finds tags that contain punctuation"""
+
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-fullstop-1',
+ 'tags': [u'fullstop.'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':'fullstop'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert u'fullstop.' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_tag_with_capital_letters(self):
+ """
+ Asserts autocomplete finds tags that contain capital letters
+ """
+
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-capital-letter-1',
+ 'tags': [u'CAPITAL idea old chap'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':'idea'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert u'CAPITAL idea old chap' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_search_with_space(self):
+ """
+ Asserts that a search term containing a space works correctly
+ """
+
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-space-2',
+ 'tags': [u'with space'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':'th sp'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert 'with space' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_search_with_foreign_character(self):
+ """
+ Asserts that a search term containing a foreign character works correctly
+ """
+
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-foreign-character-2',
+ 'tags': [u'greek beta \u03b2'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':u'\u03b2'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert u'greek beta \u03b2' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_search_with_punctuation(self):
+ """
+ Asserts that a search term containing punctuation works correctly
+ """
+
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-fullstop-2',
+ 'tags': [u'fullstop.'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':u'stop.'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert 'fullstop.' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_search_with_capital_letters(self):
+ """
+ Asserts that a search term containing capital letters works correctly
+ """
+
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-capital-letter-2',
+ 'tags': [u'CAPITAL idea old chap'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':u'CAPITAL'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert 'CAPITAL idea old chap' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_is_case_insensitive(self):
+ CreateTestData.create_arbitrary([{
+ 'name': u'package-with-tag-that-has-a-capital-letter-3',
+ 'tags': [u'MIX of CAPITALS and LOWER case'],
+ 'license': 'never_heard_of_it',
+ }])
+
+ postparams = '%s=1' % json.dumps({'q':u'lower case'})
+ res = self.app.post('/api/action/tag_autocomplete', params=postparams)
+ res_obj = json.loads(res.body)
+ assert res_obj['success']
+ assert 'MIX of CAPITALS and LOWER case' in res_obj['result'], res_obj['result']
+
+ def test_15_tag_autocomplete_with_vocab_and_empty_query(self):
+ for q in ('missing', None, '', ' '):
+ paramd = {'vocabulary_id': u'genre'}
+ if q != 'missing':
+ paramd['q'] = q
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_autocomplete', params=params)
+ assert res.json['success'] is True
+ assert res.json['result'] == []
+
+ def test_15_tag_autocomplete_with_vocab_and_single_match(self):
+ paramd = {'vocabulary_id': u'genre', 'q': 'son'}
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_autocomplete', params=params)
+ assert res.json['success'] is True
+ assert res.json['result'] == ['sonata'], res.json['result']
+
+ def test_15_tag_autocomplete_with_vocab_and_multiple_matches(self):
+ paramd = {'vocabulary_id': 'genre', 'q': 'neo'}
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_autocomplete', params=params)
+ assert res.json['success'] is True
+ assert res.json['result'] == sorted(('neoclassical', 'neofolk',
+ 'neomedieval', 'neoprog', 'neopsychedelia', 'neosoul'))
+
+ def test_15_tag_autocomplete_with_vocab_and_no_matches(self):
+ paramd = {'vocabulary_id': 'composers', 'q': 'Jonny Greenwood'}
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_autocomplete', params=params)
+ assert res.json['success'] is True
+ assert res.json['result'] == []
+
+ def test_15_tag_autocomplete_with_vocab_that_does_not_exist(self):
+ for q in ('', 'neo'):
+ paramd = {'vocabulary_id': 'does_not_exist', 'q': q}
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_autocomplete', params=params,
+ status=404)
+ assert res.json['success'] is False
+
+ def test_15_tag_autocomplete_with_invalid_vocab(self):
+ for vocab_name in (None, '', 'a', 'e'*200):
+ for q in (None, '', 'son'):
+ paramd = {'vocabulary_id': vocab_name, 'q': q}
+ params = json.dumps(paramd)
+ res = self.app.post('/api/action/tag_autocomplete', params=params,
+ status=404)
+ assert res.json['success'] is False
================================================================
More information about the ckan-changes
mailing list