# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from datetime import date

from django.conf import settings
from django.core.exceptions import ValidationError
from django.test import TestCase
from django.test.utils import override_settings
from django.utils import timezone
from django.contrib.auth import get_user_model

from passlib.hash import sha512_crypt

from osusers.models import (
    CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL,
    AdditionalGroup,
    Group,
    Shadow,
    SshPublicKey,
    User,
)
from taskresults.models import TaskResult


EXAMPLE_KEY_1_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: "1024-bit RSA, converted from OpenSSH by me@example.com"
x-command: /home/me/bin/lock-in-guest.sh
AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb
YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ
5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=
---- END SSH2 PUBLIC KEY ----"""

EXAMPLE_KEY_2_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: This is my public key for use on \
servers which I don't like.
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""

EXAMPLE_KEY_3_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: DSA Public Key for use with MyIsp
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""

EXAMPLE_KEY_4_OPENSSH = "".join((
    "ssh-rsa ",
    "AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb",
    "YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ",
    "5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE="
))

EXAMPLE_KEY_5_RFC4716_MULTILINE = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: DSA Public Key \\
for use with \\
MyIsp
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""

EXAMPLE_KEY_6_RFC4716_EMPTY_LINE = """---- BEGIN SSH2 PUBLIC KEY ----
Comment: DSA Public Key for use with MyIsp

AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
---- END SSH2 PUBLIC KEY ----"""

EXAMPLE_KEY_7_NO_COMMENT = """---- BEGIN SSH2 PUBLIC KEY ----
AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb
YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ
5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=
---- END SSH2 PUBLIC KEY ----"""

EXAMPLE_KEY_8_OPENSSH_BROKEN = "".join((
    "ssh-rsa ",
    "AschrÖdderöd"
))

EXAMPLE_KEY_9_RFC4716_ONLY_HEADER = "---- BEGIN SSH2 PUBLIC KEY ----"

Customer = get_user_model()


@override_settings(
    CELERY_ALWAYS_EAGER=True,
    CELERY_CACHE_BACKEND='memory',
    BROKER_BACKEND='memory'
)
class TestCaseWithCeleryTasks(TestCase):
    pass


class AdditionalGroupTest(TestCaseWithCeleryTasks):
    def setUp(self):
        customer = Customer.objects.create(username='test')
        self.group1 = Group.objects.create(groupname='test1', gid=1000)
        self.user = User.objects.create(
            customer=customer, username='test', uid=1000, group=self.group1,
            homedir='/home/test', shell='/bin/bash')

    def test_clean_primary_group(self):
        testsubj = AdditionalGroup(user=self.user, group=self.group1)
        with self.assertRaises(ValidationError) as cm:
            testsubj.clean()
        self.assertEqual(
            cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)

    def test_clean_other_group(self):
        group2 = Group.objects.create(groupname='test2', gid=1001)
        testsubj = AdditionalGroup(user=self.user, group=group2)
        testsubj.clean()

    def test_save(self):
        group2 = Group.objects.create(groupname='test2', gid=1001)
        addgroup = AdditionalGroup(user=self.user, group=group2)
        addgroup.save()
        taskres = TaskResult.objects.all()
        self.assertTrue(len(taskres), 4)
        creators = [r.creator for r in taskres]
        for tcount, tcreator in [
                (2, 'handle_group_created'), (1, 'handle_user_created'),
                (1, 'handle_user_added_to_group')]:
            self.assertEqual(creators.count(tcreator), tcount)

    def test_save_again(self):
        group2 = Group.objects.create(groupname='test2', gid=1001)
        TaskResult.objects.all().delete()
        addgroup = AdditionalGroup(user=self.user, group=group2)
        addgroup.save()
        TaskResult.objects.all().delete()
        addgroup.save()
        taskres = TaskResult.objects.all()
        self.assertEqual(len(taskres), 0)

    def test_delete(self):
        group2 = Group.objects.create(groupname='test2', gid=1001)
        addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
        addgroup.delete()
        self.assertEqual(len(AdditionalGroup.objects.all()), 0)

    def test___str__(self):
        group2 = Group.objects.create(groupname='test2', gid=1001)
        addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
        self.assertEqual(str(addgroup), 'test (1000) in test2 (1001)')


@override_settings(OSUSER_MINGID=10000)
class GroupManagerTest(TestCaseWithCeleryTasks):
    def test_get_next_gid_first(self):
        self.assertEqual(Group.objects.get_next_gid(), 10000)

    def test_get_next_gid_second(self):
        Group.objects.create(gid=10010, groupname='test')
        self.assertEqual(Group.objects.get_next_gid(), 10011)


class GroupTest(TestCaseWithCeleryTasks):
    def test___str__(self):
        group = Group.objects.create(gid=10000, groupname='test')
        self.assertEqual(str(group), 'test (10000)')

    def test_save(self):
        group = Group(gid=10000, groupname='test')
        self.assertIs(group.save(), group)
        taskres = TaskResult.objects.all()
        self.assertTrue(len(taskres), 1)
        creators = [r.creator for r in taskres]
        for tcount, tcreator in [
                (1, 'handle_group_created')]:
            self.assertEqual(creators.count(tcreator), tcount)

    def test_save_again(self):
        group = Group.objects.create(gid=10000, groupname='test')
        taskres = TaskResult.objects.all().delete()
        group.save()
        taskres = TaskResult.objects.all()
        self.assertEqual(len(taskres), 0)

    def test_delete(self):
        group = Group.objects.create(gid=10000, groupname='test')
        self.assertEqual(len(Group.objects.all()), 1)
        group.delete()
        self.assertEqual(len(Group.objects.all()), 0)
        self.assertEqual(len(TaskResult.objects.all()), 2)
        tr = TaskResult.objects.first()
        self.assertEqual(tr.creator, 'handle_group_created')


class ShadowManagerTest(TestCaseWithCeleryTasks):
    def setUp(self):
        self.customer = Customer.objects.create(username='test')
        super(ShadowManagerTest, self).setUp()

    def test_create_shadow(self):
        user = User(
            customer=self.customer, username='test', uid=1000,
            group=Group(gid=1000, groupname='test'), homedir='/home/test',
            shell='/bin/fooshell')
        shadow = Shadow.objects.create_shadow(user, 'test')
        self.assertTrue(sha512_crypt.verify('test', shadow.passwd))
        self.assertEqual(shadow.changedays,
                         (timezone.now().date() - date(1970, 1, 1)).days)
        self.assertEqual(shadow.user, user)
        self.assertEqual(shadow.minage, 0)
        self.assertIsNone(shadow.maxage)
        self.assertEqual(shadow.gracedays, 7)
        self.assertEqual(shadow.inactdays, 30)
        self.assertIsNone(shadow.expiredays)


class ShadowTest(TestCaseWithCeleryTasks):
    def setUp(self):
        self.customer = Customer.objects.create(username='test')
        super(ShadowTest, self).setUp()

    def test___str__(self):
        group = Group.objects.create(
            groupname='test', gid=1000)
        user = User.objects.create(
            customer=self.customer, username='test', uid=1000, group=group,
            homedir='/home/test', shell='/bin/bash')
        shadow = Shadow(user=user)
        self.assertEqual(str(shadow), 'for user test (1000)')

    def test_set_password(self):
        group = Group.objects.create(
            groupname='test', gid=1000)
        user = User.objects.create(
            customer=self.customer, username='test', uid=1000, group=group,
            homedir='/home/test', shell='/bin/bash')
        shadow = Shadow(user=user)
        shadow.set_password('test')
        self.assertTrue(sha512_crypt.verify('test', shadow.passwd))


@override_settings(
    OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
    OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
)
class UserManagerTest(TestCaseWithCeleryTasks):
    def _create_group(self):
        return Group.objects.create(gid=10000, groupname='foo')

    def setUp(self):
        self.customer = Customer.objects.create(username='test')
        super(UserManagerTest, self).setUp()

    def test_get_next_uid_first(self):
        self.assertEqual(User.objects.get_next_uid(), 10000)

    def test_get_next_uid_second(self):
        User.objects.create(
            customer=self.customer, uid=10010, username='foo',
            group=self._create_group(), homedir='/home/foo',
            shell='/bin/fooshell')
        self.assertEqual(User.objects.get_next_uid(), 10011)

    def test_get_next_username_first(self):
        self.assertEqual(User.objects.get_next_username(), 'test01')

    def test_get_next_username_second(self):
        User.objects.create(
            customer=self.customer, uid=10000, username='test01',
            group=self._create_group(), homedir='/home/foo',
            shell='/bin/fooshell')
        self.assertEqual(User.objects.get_next_username(), 'test02')

    def test_get_next_username_gaps(self):
        group = self._create_group()
        User.objects.create(
            customer=self.customer, uid=10000, username='test01', group=group,
            homedir='/home/foo', shell='/bin/fooshell')
        User.objects.create(
            customer=self.customer, uid=10002, username='test03', group=group,
            homedir='/home/foo', shell='/bin/fooshell')
        self.assertEqual(User.objects.get_next_username(), 'test02')

    def test_create_user_first(self):
        user = User.objects.create_user(customer=self.customer)
        self.assertIsInstance(user, User)
        self.assertEqual(user.uid, 10000)
        self.assertEqual(user.group.gid, 10000)
        self.assertEqual(user.group.groupname, 'test01')
        self.assertEqual(user.username, 'test01')
        self.assertEqual(user.homedir, '/home/test01')
        self.assertEqual(user.shell, '/bin/fooshell')
        self.assertIsNotNone(user.shadow)

    def test_create_user_tasks(self):
        User.objects.create_user(customer=self.customer)
        taskres = TaskResult.objects.all()
        self.assertEqual(len(taskres), 3)
        creators = [r.creator for r in taskres]
        for creator in [
                'handle_group_created', 'handle_user_created',
                'handle_user_password_set']:
            self.assertIn(creator, creators)

    def test_create_user_second(self):
        User.objects.create_user(customer=self.customer)
        user = User.objects.create_user(customer=self.customer)
        self.assertIsInstance(user, User)
        self.assertEqual(user.uid, 10001)
        self.assertEqual(user.group.gid, 10001)
        self.assertEqual(user.group.groupname, 'test02')
        self.assertEqual(user.username, 'test02')
        self.assertEqual(user.homedir, '/home/test02')
        self.assertEqual(user.shell, '/bin/fooshell')
        self.assertIsNotNone(user.shadow)
        self.assertEqual(len(User.objects.all()), 2)

    def test_create_user_known_password(self):
        user = User.objects.create_user(
            customer=self.customer, password='foobar')
        self.assertIsInstance(user, User)
        self.assertEqual(user.uid, 10000)
        self.assertEqual(user.group.gid, 10000)
        self.assertEqual(user.group.groupname, 'test01')
        self.assertEqual(user.username, 'test01')
        self.assertEqual(user.homedir, '/home/test01')
        self.assertEqual(user.shell, '/bin/fooshell')
        self.assertIsNotNone(user.shadow)
        self.assertTrue(sha512_crypt.verify('foobar', user.shadow.passwd))

    def test_create_user_predefined_username(self):
        user = User.objects.create_user(
            customer=self.customer, username='tester')
        self.assertIsInstance(user, User)
        self.assertEqual(user.uid, 10000)
        self.assertEqual(user.group.gid, 10000)
        self.assertEqual(user.group.groupname, 'tester')
        self.assertEqual(user.username, 'tester')
        self.assertEqual(user.homedir, '/home/tester')
        self.assertEqual(user.shell, '/bin/fooshell')
        self.assertIsNotNone(user.shadow)

    def test_create_user_commit(self):
        user = User.objects.create_user(customer=self.customer, commit=True)
        self.assertIsInstance(user, User)
        self.assertEqual(user.uid, 10000)
        self.assertEqual(user.group.gid, 10000)
        self.assertEqual(user.group.groupname, 'test01')
        self.assertEqual(user.username, 'test01')
        self.assertEqual(user.homedir, '/home/test01')
        self.assertEqual(user.shell, '/bin/fooshell')
        self.assertIsNotNone(user.shadow)


@override_settings(
    OSUSER_MINUID=10000, OSUSER_MINGID=10000, OSUSER_USERNAME_PREFIX='test',
    OSUSER_HOME_BASEPATH='/home', OSUSER_DEFAULT_SHELL='/bin/fooshell'
)
class UserTest(TestCaseWithCeleryTasks):
    def setUp(self):
        self.customer = Customer.objects.create_user('test')
        super(UserTest, self).setUp()

    def test___str__(self):
        user = User.objects.create_user(self.customer)
        self.assertEqual(str(user), 'test01 (10000)')

    def test_set_password(self):
        user = User.objects.create_user(self.customer)
        self.assertFalse(sha512_crypt.verify('test', user.shadow.passwd))
        user.set_password('test')
        self.assertTrue(sha512_crypt.verify('test', user.shadow.passwd))

    def test_save(self):
        user = User.objects.create_user(self.customer)
        user.save()
        taskres = TaskResult.objects.all()
        self.assertEqual(len(taskres), 3)
        creators = [r.creator for r in taskres]
        for task in [
                'handle_group_created', 'handle_user_created',
                'handle_user_password_set']:
            self.assertIn(task, creators)

    def test_delete_only_user(self):
        user = User.objects.create_user(self.customer)
        user.delete()
        taskres = TaskResult.objects.all()
        self.assertEqual(len(taskres), 6)
        creators = [r.creator for r in taskres]
        for task in [
                'handle_group_created', 'handle_user_created',
                'handle_user_password_set', 'handle_user_deleted',
                'handle_group_deleted', 'handle_user_deleted']:
            self.assertIn(task, creators)
        self.assertEqual(len(User.objects.all()), 0)

    def test_delete_additional_groups(self):
        group1 = Group.objects.create(gid=2000, groupname='group1')
        group2 = Group.objects.create(gid=2001, groupname='group2')
        user = User.objects.create_user(self.customer)
        for group in [group1, group2]:
            user.additionalgroup_set.add(
                AdditionalGroup.objects.create(user=user, group=group))
        TaskResult.objects.all().delete()
        user.delete()
        taskres = TaskResult.objects.all()
        self.assertEqual(len(taskres), 5)
        creators = [t.creator for t in taskres]
        for tcount, tcreator in [
                (2, 'handle_user_removed_from_group'),
                (2, 'handle_user_deleted'),
                (1, 'handle_group_deleted')]:
            self.assertEqual(creators.count(tcreator), tcount)
        self.assertEqual(len(User.objects.all()), 0)
        self.assertEqual(len(AdditionalGroup.objects.all()), 0)

    def test_is_sftp_user(self):
        user = User.objects.create_user(self.customer)
        self.assertFalse(user.is_sftp_user())

        sftp_group = Group.objects.create(
            gid=2000, groupname=settings.OSUSER_SFTP_GROUP)
        user.additionalgroup_set.add(
            AdditionalGroup.objects.create(user=user, group=sftp_group))
        self.assertTrue(user.is_sftp_user())


class SshPublicKeyManagerTest(TestCaseWithCeleryTasks):
    def test_parse_keytext_rfc4716_1(self):
        res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_1_RFC4716)
        self.assertEqual(len(res), 3)
        self.assertGreater(len(res[1]), 40)
        self.assertEqual(res[0], 'ssh-rsa')
        self.assertEqual(
            res[2], '"1024-bit RSA, converted from OpenSSH by me@example.com"')

    def test_parse_keytext_rfc4716_2(self):
        res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_2_RFC4716)
        self.assertEqual(len(res), 3)
        self.assertEqual(res[0], 'ssh-dss')
        self.assertGreater(len(res[1]), 40)
        self.assertEqual(
            res[2],
            "This is my public key for use on servers which I don't like.")

    def test_parse_keytext_rfc4716_3(self):
        res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_3_RFC4716)
        self.assertEqual(len(res), 3)
        self.assertEqual(res[0], 'ssh-dss')
        self.assertGreater(len(res[1]), 40)
        self.assertEqual(res[2], "DSA Public Key for use with MyIsp")

    def test_parse_keytext_openssh(self):
        res = SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_4_OPENSSH)
        self.assertEquals(len(res), 3)
        self.assertEqual(res[0], 'ssh-rsa')
        self.assertGreater(len(res[1]), 40)
        self.assertEqual(res[2], '')

    def test_parse_keytext_invalid_multiline(self):
        with self.assertRaises(ValueError):
            SshPublicKey.objects.parse_keytext("\r\n".join(["xx"]*10))

    def test_parse_keytext_empty_line(self):
        res = SshPublicKey.objects.parse_keytext(
            EXAMPLE_KEY_6_RFC4716_EMPTY_LINE)
        self.assertEqual(len(res), 3)
        self.assertEqual(res[0], 'ssh-dss')
        self.assertGreater(len(res[1]), 40)
        self.assertEqual(res[2], "DSA Public Key for use with MyIsp")

    def test_parse_keytext_invalid_empty_rfc4716_header(self):
        with self.assertRaises(ValueError):
            SshPublicKey.objects.parse_keytext(
                EXAMPLE_KEY_9_RFC4716_ONLY_HEADER)

    def test_parse_keytext_no_comment(self):
        res = SshPublicKey.objects.parse_keytext(
            EXAMPLE_KEY_7_NO_COMMENT)
        self.assertEqual(len(res), 3)
        self.assertEqual(res[0], 'ssh-rsa')
        self.assertGreater(len(res[1]), 40)
        self.assertEqual(res[2], '')

    def test_parse_keytext_multiline_comment(self):
        res = SshPublicKey.objects.parse_keytext(
            EXAMPLE_KEY_5_RFC4716_MULTILINE)
        self.assertEqual(len(res), 3)
        self.assertEqual(res[0], 'ssh-dss')
        self.assertGreater(len(res[1]), 40)
        self.assertEqual(res[2], "DSA Public Key for use with MyIsp")

    def test_parse_keytext_invalid(self):
        with self.assertRaises(ValueError):
            SshPublicKey.objects.parse_keytext('invalid')

    def test_parse_keytext_invalid_openssh(self):
        with self.assertRaises(ValueError):
            SshPublicKey.objects.parse_keytext(EXAMPLE_KEY_8_OPENSSH_BROKEN)

    def test_create_ssh_public_key(self):
        customer = Customer.objects.create_user('test')
        user = User.objects.create_user(customer)
        key = SshPublicKey.objects.create_ssh_public_key(
            user, EXAMPLE_KEY_4_OPENSSH)
        self.assertIsInstance(key, SshPublicKey)
        self.assertEqual(key.user, user)
        self.assertEqual(key.algorithm, 'ssh-rsa')
        self.assertEqual(key.data, EXAMPLE_KEY_4_OPENSSH.split()[1])
        self.assertEqual(key.comment, '')


class SshPublicKeyTest(TestCaseWithCeleryTasks):
    def setUp(self):
        super(SshPublicKeyTest, self).setUp()
        customer = Customer.objects.create_user('test')
        self.user = User.objects.create_user(customer)
        TaskResult.objects.all().delete()

    def test__str__rfc4716(self):
        res = SshPublicKey.objects.create_ssh_public_key(
            self.user, EXAMPLE_KEY_3_RFC4716)
        self.assertEqual(
            str(res), 'ssh-dss AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxae'
            'hvx5wOJ0rzZdzoSOXxbETW6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7St'
            'xyltHnXF1YLfKD1G4T6JYrdHYI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3g'
            'Jq2e7Yisk/gF+1VAAAAFQDb8D5cvwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4'
            'KLYk3IwRbXblwXdkPggA4pfdtW9vGfJ0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/F'
            'XPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAAvioUPkmdMc0zuWoSOEsSNhVDtX3WdvVc'
            'GcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACBAN7CY+KKv1gHpRzFwdQm7HK9bb1LA'
            'o2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HSn24VYtYtsMu74qXviYjziVucWK'
            'jjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5sY29ouezv4Xz2PuMch5VGPP'
            '+CDqzCM4loWgV DSA Public Key for use with MyIsp')

    def test__str__openssh(self):
        res = SshPublicKey.objects.create_ssh_public_key(
            self.user, EXAMPLE_KEY_4_OPENSSH)
        self.assertEqual(str(res), EXAMPLE_KEY_4_OPENSSH)

    def test_call_tasks_on_save(self):
        SshPublicKey.objects.create_ssh_public_key(
            self.user, EXAMPLE_KEY_4_OPENSSH)
        taskresults = TaskResult.objects.all()
        self.assertEqual(len(taskresults), 1)
        self.assertEqual(
            taskresults[0].creator, 'handle_ssh_keys_changed')

    def test_call_tasks_on_delete(self):
        key = SshPublicKey.objects.create_ssh_public_key(
            self.user, EXAMPLE_KEY_4_OPENSSH)
        TaskResult.objects.all().delete()
        key.delete()
        taskresults = TaskResult.objects.all()
        self.assertEqual(len(taskresults), 1)
        self.assertEqual(
            taskresults[0].creator, 'handle_ssh_keys_changed')