# # gvacommon - common parts of gnuviechadmin # Copyright (C) 2014-2018 Jan Dittberner # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # """ Tests for gvacommon celery router code. """ import unittest from gvacommon.celeryrouters import GvaRouter class GvaRouteTest(unittest.TestCase): def test_route_for_task_ldap(self): router = GvaRouter() route = router.route_for_task('ldap') self.assertEqual( route, {'exchange': 'ldap', 'exchange_type': 'direct', 'queue': 'ldap'}) def test_route_for_task_file(self): router = GvaRouter() route = router.route_for_task('file') self.assertEqual( route, {'exchange': 'file', 'exchange_type': 'direct', 'queue': 'file'}) def test_route_for_task_mysql(self): router = GvaRouter() route = router.route_for_task('mysql') self.assertEqual( route, {'exchange': 'mysql', 'exchange_type': 'direct', 'queue': 'mysql'}) def test_route_for_task_pgsql(self): router = GvaRouter() route = router.route_for_task('pgsql') self.assertEqual( route, {'exchange': 'pgsql', 'exchange_type': 'direct', 'queue': 'pgsql'}) def test_route_for_task_web(self): router = GvaRouter() route = router.route_for_task('web') self.assertEqual( route, {'exchange': 'web', 'exchange_type': 'direct', 'queue': 'web'}) def test_route_for_task_other(self): router = GvaRouter() route = router.route_for_task('unknown') self.assertIsNone(route)