631 lines
24 KiB
Python
631 lines
24 KiB
Python
# -*- coding: utf-8 -*-
|
|
from datetime import date
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.exceptions import ValidationError
|
|
from django.test import TestCase
|
|
from django.test.utils import override_settings
|
|
from django.utils import timezone
|
|
from passlib.handlers.sha2_crypt import sha512_crypt
|
|
|
|
from osusers.models import (
|
|
CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL,
|
|
AdditionalGroup,
|
|
Group,
|
|
Shadow,
|
|
SshPublicKey,
|
|
User,
|
|
)
|
|
from taskresults.models import TaskResult
|
|
|
|
EXAMPLE_KEY_1_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
|
|
Comment: "1024-bit RSA, converted from OpenSSH by me@example.com"
|
|
x-command: /home/me/bin/lock-in-guest.sh
|
|
AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb
|
|
YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ
|
|
5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=
|
|
---- END SSH2 PUBLIC KEY ----"""
|
|
|
|
EXAMPLE_KEY_2_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
|
|
Comment: This is my public key for use on \
|
|
servers which I don't like.
|
|
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
|
|
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
|
|
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
|
|
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
|
|
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
|
|
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
|
|
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
|
|
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
|
|
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
|
|
---- END SSH2 PUBLIC KEY ----"""
|
|
|
|
EXAMPLE_KEY_3_RFC4716 = """---- BEGIN SSH2 PUBLIC KEY ----
|
|
Comment: DSA Public Key for use with MyIsp
|
|
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
|
|
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
|
|
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
|
|
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
|
|
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
|
|
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
|
|
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
|
|
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
|
|
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
|
|
---- END SSH2 PUBLIC KEY ----"""
|
|
|
|
EXAMPLE_KEY_4_OPENSSH = "".join(
|
|
(
|
|
"ssh-rsa ",
|
|
"AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb",
|
|
"YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ",
|
|
"5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=",
|
|
)
|
|
)
|
|
|
|
EXAMPLE_KEY_5_RFC4716_MULTILINE = """---- BEGIN SSH2 PUBLIC KEY ----
|
|
Comment: DSA Public Key \\
|
|
for use with \\
|
|
MyIsp
|
|
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
|
|
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
|
|
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
|
|
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
|
|
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
|
|
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
|
|
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
|
|
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
|
|
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
|
|
---- END SSH2 PUBLIC KEY ----"""
|
|
|
|
EXAMPLE_KEY_6_RFC4716_EMPTY_LINE = """---- BEGIN SSH2 PUBLIC KEY ----
|
|
Comment: DSA Public Key for use with MyIsp
|
|
|
|
AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxaehvx5wOJ0rzZdzoSOXxbET
|
|
W6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7StxyltHnXF1YLfKD1G4T6JYrdH
|
|
YI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3gJq2e7Yisk/gF+1VAAAAFQDb8D5c
|
|
vwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4KLYk3IwRbXblwXdkPggA4pfdtW9vGf
|
|
J0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/FXPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAA
|
|
vioUPkmdMc0zuWoSOEsSNhVDtX3WdvVcGcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACB
|
|
AN7CY+KKv1gHpRzFwdQm7HK9bb1LAo2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HS
|
|
n24VYtYtsMu74qXviYjziVucWKjjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5
|
|
sY29ouezv4Xz2PuMch5VGPP+CDqzCM4loWgV
|
|
---- END SSH2 PUBLIC KEY ----"""
|
|
|
|
EXAMPLE_KEY_7_NO_COMMENT = """---- BEGIN SSH2 PUBLIC KEY ----
|
|
AAAAB3NzaC1yc2EAAAABIwAAAIEA1on8gxCGJJWSRT4uOrR13mUaUk0hRf4RzxSZ1zRb
|
|
YYFw8pfGesIFoEuVth4HKyF8k1y4mRUnYHP1XNMNMJl1JcEArC2asV8sHf6zSPVffozZ
|
|
5TT4SfsUu/iKy9lUcCfXzwre4WWZSXXcPff+EHtWshahu3WzBdnGxm5Xoi89zcE=
|
|
---- END SSH2 PUBLIC KEY ----"""
|
|
|
|
EXAMPLE_KEY_8_OPENSSH_BROKEN = "".join(("ssh-rsa ", "AschrÖdderöd"))
|
|
|
|
EXAMPLE_KEY_9_RFC4716_ONLY_HEADER = "---- BEGIN SSH2 PUBLIC KEY ----"
|
|
|
|
Customer = get_user_model()
|
|
|
|
|
|
@override_settings(
|
|
CELERY_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND="memory", BROKER_BACKEND="memory"
|
|
)
|
|
class TestCaseWithCeleryTasks(TestCase):
|
|
pass
|
|
|
|
|
|
class AdditionalGroupTest(TestCaseWithCeleryTasks):
|
|
def setUp(self):
|
|
customer = Customer.objects.create(username="test")
|
|
self.group1 = Group.objects.create(groupname="test1", gid=1000)
|
|
self.user = User.objects.create(
|
|
customer=customer,
|
|
username="test",
|
|
uid=1000,
|
|
group=self.group1,
|
|
homedir="/home/test",
|
|
shell="/bin/bash",
|
|
)
|
|
|
|
def test_clean_primary_group(self):
|
|
testsubj = AdditionalGroup(user=self.user, group=self.group1)
|
|
with self.assertRaises(ValidationError) as cm:
|
|
testsubj.clean()
|
|
self.assertEqual(cm.exception.message, CANNOT_USE_PRIMARY_GROUP_AS_ADDITIONAL)
|
|
|
|
def test_clean_other_group(self):
|
|
group2 = Group.objects.create(groupname="test2", gid=1001)
|
|
testsubj = AdditionalGroup(user=self.user, group=group2)
|
|
testsubj.clean()
|
|
|
|
def test_save(self):
|
|
group2 = Group.objects.create(groupname="test2", gid=1001)
|
|
addgroup = AdditionalGroup(user=self.user, group=group2)
|
|
addgroup.save()
|
|
taskres = TaskResult.objects.all()
|
|
self.assertTrue(len(taskres), 4)
|
|
creators = [r.creator for r in taskres]
|
|
for tcount, tcreator in [
|
|
(2, "handle_group_created"),
|
|
(1, "handle_user_created"),
|
|
(1, "handle_user_added_to_group"),
|
|
]:
|
|
self.assertEqual(creators.count(tcreator), tcount)
|
|
|
|
def test_save_again(self):
|
|
group2 = Group.objects.create(groupname="test2", gid=1001)
|
|
TaskResult.objects.all().delete()
|
|
addgroup = AdditionalGroup(user=self.user, group=group2)
|
|
addgroup.save()
|
|
TaskResult.objects.all().delete()
|
|
addgroup.save()
|
|
taskres = TaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 0)
|
|
|
|
def test_delete(self):
|
|
group2 = Group.objects.create(groupname="test2", gid=1001)
|
|
# noinspection PyUnresolvedReferences
|
|
addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
|
|
addgroup.delete()
|
|
# noinspection PyUnresolvedReferences
|
|
self.assertEqual(len(AdditionalGroup.objects.all()), 0)
|
|
|
|
def test___str__(self):
|
|
group2 = Group.objects.create(groupname="test2", gid=1001)
|
|
# noinspection PyUnresolvedReferences
|
|
addgroup = AdditionalGroup.objects.create(user=self.user, group=group2)
|
|
self.assertEqual(str(addgroup), "test (1000) in test2 (1001)")
|
|
|
|
|
|
@override_settings(OSUSER_MINGID=10000)
|
|
class GroupManagerTest(TestCaseWithCeleryTasks):
|
|
def test_get_next_gid_first(self):
|
|
self.assertEqual(Group.objects.get_next_gid(), 10000)
|
|
|
|
def test_get_next_gid_second(self):
|
|
Group.objects.create(gid=10010, groupname="test")
|
|
self.assertEqual(Group.objects.get_next_gid(), 10011)
|
|
|
|
|
|
class GroupTest(TestCaseWithCeleryTasks):
|
|
def test___str__(self):
|
|
group = Group.objects.create(gid=10000, groupname="test")
|
|
self.assertEqual(str(group), "test (10000)")
|
|
|
|
def test_save(self):
|
|
group = Group(gid=10000, groupname="test")
|
|
self.assertIs(group.save(), group)
|
|
taskres = TaskResult.objects.all()
|
|
self.assertTrue(len(taskres), 1)
|
|
creators = [r.creator for r in taskres]
|
|
for tcount, tcreator in [(1, "handle_group_created")]:
|
|
self.assertEqual(creators.count(tcreator), tcount)
|
|
|
|
def test_save_again(self):
|
|
group = Group.objects.create(gid=10000, groupname="test")
|
|
TaskResult.objects.all().delete()
|
|
group.save()
|
|
taskres = TaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 0)
|
|
|
|
def test_delete(self):
|
|
group = Group.objects.create(gid=10000, groupname="test")
|
|
self.assertEqual(len(Group.objects.all()), 1)
|
|
group.delete()
|
|
self.assertEqual(len(Group.objects.all()), 0)
|
|
self.assertEqual(len(TaskResult.objects.all()), 2)
|
|
tr = TaskResult.objects.order_by("created").first()
|
|
self.assertEqual(tr.creator, "handle_group_created")
|
|
|
|
|
|
class ShadowManagerTest(TestCaseWithCeleryTasks):
|
|
def setUp(self):
|
|
self.customer = Customer.objects.create(username="test")
|
|
super(ShadowManagerTest, self).setUp()
|
|
|
|
def test_create_shadow(self):
|
|
group = Group.objects.create(gid=1000, groupname="test")
|
|
user = User.objects.create(
|
|
customer=self.customer,
|
|
username="test",
|
|
uid=1000,
|
|
group=group,
|
|
homedir="/home/test",
|
|
shell="/bin/fooshell",
|
|
)
|
|
shadow = Shadow.objects.create_shadow(user, "test")
|
|
self.assertTrue(sha512_crypt.verify("test", shadow.passwd))
|
|
self.assertEqual(
|
|
shadow.changedays, (timezone.now().date() - date(1970, 1, 1)).days
|
|
)
|
|
self.assertEqual(shadow.user, user)
|
|
self.assertEqual(shadow.minage, 0)
|
|
self.assertIsNone(shadow.maxage)
|
|
self.assertEqual(shadow.gracedays, 7)
|
|
self.assertEqual(shadow.inactdays, 30)
|
|
self.assertIsNone(shadow.expiredays)
|
|
|
|
|
|
class ShadowTest(TestCaseWithCeleryTasks):
|
|
def setUp(self):
|
|
self.customer = Customer.objects.create(username="test")
|
|
super(ShadowTest, self).setUp()
|
|
|
|
def test___str__(self):
|
|
group = Group.objects.create(groupname="test", gid=1000)
|
|
user = User.objects.create(
|
|
customer=self.customer,
|
|
username="test",
|
|
uid=1000,
|
|
group=group,
|
|
homedir="/home/test",
|
|
shell="/bin/bash",
|
|
)
|
|
shadow = Shadow(user=user)
|
|
self.assertEqual(str(shadow), "for user test (1000)")
|
|
|
|
def test_set_password(self):
|
|
group = Group.objects.create(groupname="test", gid=1000)
|
|
user = User.objects.create(
|
|
customer=self.customer,
|
|
username="test",
|
|
uid=1000,
|
|
group=group,
|
|
homedir="/home/test",
|
|
shell="/bin/bash",
|
|
)
|
|
shadow = Shadow(user=user)
|
|
shadow.set_password("test")
|
|
self.assertTrue(sha512_crypt.verify("test", shadow.passwd))
|
|
|
|
|
|
@override_settings(
|
|
OSUSER_MINUID=10000,
|
|
OSUSER_MINGID=10000,
|
|
OSUSER_USERNAME_PREFIX="test",
|
|
OSUSER_HOME_BASEPATH="/home",
|
|
OSUSER_DEFAULT_SHELL="/bin/fooshell",
|
|
)
|
|
class UserManagerTest(TestCaseWithCeleryTasks):
|
|
def _create_group(self):
|
|
return Group.objects.create(gid=10000, groupname="foo")
|
|
|
|
def setUp(self):
|
|
self.customer = Customer.objects.create(username="test")
|
|
super(UserManagerTest, self).setUp()
|
|
|
|
def test_get_next_uid_first(self):
|
|
self.assertEqual(User.objects.get_next_uid(), 10000)
|
|
|
|
def test_get_next_uid_second(self):
|
|
User.objects.create(
|
|
customer=self.customer,
|
|
uid=10010,
|
|
username="foo",
|
|
group=self._create_group(),
|
|
homedir="/home/foo",
|
|
shell="/bin/fooshell",
|
|
)
|
|
self.assertEqual(User.objects.get_next_uid(), 10011)
|
|
|
|
def test_get_next_username_first(self):
|
|
self.assertEqual(User.objects.get_next_username(), "test01")
|
|
|
|
def test_get_next_username_second(self):
|
|
User.objects.create(
|
|
customer=self.customer,
|
|
uid=10000,
|
|
username="test01",
|
|
group=self._create_group(),
|
|
homedir="/home/foo",
|
|
shell="/bin/fooshell",
|
|
)
|
|
self.assertEqual(User.objects.get_next_username(), "test02")
|
|
|
|
def test_get_next_username_gaps(self):
|
|
group = self._create_group()
|
|
User.objects.create(
|
|
customer=self.customer,
|
|
uid=10000,
|
|
username="test01",
|
|
group=group,
|
|
homedir="/home/foo",
|
|
shell="/bin/fooshell",
|
|
)
|
|
User.objects.create(
|
|
customer=self.customer,
|
|
uid=10002,
|
|
username="test03",
|
|
group=group,
|
|
homedir="/home/foo",
|
|
shell="/bin/fooshell",
|
|
)
|
|
self.assertEqual(User.objects.get_next_username(), "test02")
|
|
|
|
def test_create_user_first(self):
|
|
user = User.objects.create_user(customer=self.customer)
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10000)
|
|
self.assertEqual(user.group.gid, 10000)
|
|
self.assertEqual(user.group.groupname, "test01")
|
|
self.assertEqual(user.username, "test01")
|
|
self.assertEqual(user.homedir, "/home/test01")
|
|
self.assertEqual(user.shell, "/bin/fooshell")
|
|
self.assertIsNotNone(user.shadow)
|
|
|
|
def test_create_user_tasks(self):
|
|
User.objects.create_user(customer=self.customer)
|
|
taskres = TaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 3)
|
|
creators = [r.creator for r in taskres]
|
|
for creator in [
|
|
"handle_group_created",
|
|
"handle_user_created",
|
|
"handle_user_password_set",
|
|
]:
|
|
self.assertIn(creator, creators)
|
|
|
|
def test_create_user_second(self):
|
|
User.objects.create_user(customer=self.customer)
|
|
user = User.objects.create_user(customer=self.customer)
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10001)
|
|
self.assertEqual(user.group.gid, 10001)
|
|
self.assertEqual(user.group.groupname, "test02")
|
|
self.assertEqual(user.username, "test02")
|
|
self.assertEqual(user.homedir, "/home/test02")
|
|
self.assertEqual(user.shell, "/bin/fooshell")
|
|
self.assertIsNotNone(user.shadow)
|
|
self.assertEqual(len(User.objects.all()), 2)
|
|
|
|
def test_create_user_known_password(self):
|
|
user = User.objects.create_user(customer=self.customer, password="foobar")
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10000)
|
|
self.assertEqual(user.group.gid, 10000)
|
|
self.assertEqual(user.group.groupname, "test01")
|
|
self.assertEqual(user.username, "test01")
|
|
self.assertEqual(user.homedir, "/home/test01")
|
|
self.assertEqual(user.shell, "/bin/fooshell")
|
|
self.assertIsNotNone(user.shadow)
|
|
self.assertTrue(sha512_crypt.verify("foobar", user.shadow.passwd))
|
|
|
|
def test_create_user_predefined_username(self):
|
|
user = User.objects.create_user(customer=self.customer, username="tester")
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10000)
|
|
self.assertEqual(user.group.gid, 10000)
|
|
self.assertEqual(user.group.groupname, "tester")
|
|
self.assertEqual(user.username, "tester")
|
|
self.assertEqual(user.homedir, "/home/tester")
|
|
self.assertEqual(user.shell, "/bin/fooshell")
|
|
self.assertIsNotNone(user.shadow)
|
|
|
|
def test_create_user_commit(self):
|
|
user = User.objects.create_user(customer=self.customer, commit=True)
|
|
self.assertIsInstance(user, User)
|
|
self.assertEqual(user.uid, 10000)
|
|
self.assertEqual(user.group.gid, 10000)
|
|
self.assertEqual(user.group.groupname, "test01")
|
|
self.assertEqual(user.username, "test01")
|
|
self.assertEqual(user.homedir, "/home/test01")
|
|
self.assertEqual(user.shell, "/bin/fooshell")
|
|
self.assertIsNotNone(user.shadow)
|
|
|
|
|
|
@override_settings(
|
|
OSUSER_MINUID=10000,
|
|
OSUSER_MINGID=10000,
|
|
OSUSER_USERNAME_PREFIX="test",
|
|
OSUSER_HOME_BASEPATH="/home",
|
|
OSUSER_DEFAULT_SHELL="/bin/fooshell",
|
|
)
|
|
class UserTest(TestCaseWithCeleryTasks):
|
|
def setUp(self):
|
|
self.customer = Customer.objects.create_user("test")
|
|
super(UserTest, self).setUp()
|
|
|
|
def test___str__(self):
|
|
user = User.objects.create_user(self.customer)
|
|
self.assertEqual(str(user), "test01 (10000)")
|
|
|
|
def test_set_password(self):
|
|
user = User.objects.create_user(self.customer)
|
|
self.assertFalse(sha512_crypt.verify("test", user.shadow.passwd))
|
|
user.set_password("test")
|
|
self.assertTrue(sha512_crypt.verify("test", user.shadow.passwd))
|
|
|
|
def test_save(self):
|
|
user = User.objects.create_user(self.customer)
|
|
user.save()
|
|
taskres = TaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 3)
|
|
creators = [r.creator for r in taskres]
|
|
for task in [
|
|
"handle_group_created",
|
|
"handle_user_created",
|
|
"handle_user_password_set",
|
|
]:
|
|
self.assertIn(task, creators)
|
|
|
|
def test_delete_only_user(self):
|
|
user = User.objects.create_user(self.customer)
|
|
user.delete()
|
|
taskres = TaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 6)
|
|
creators = [r.creator for r in taskres]
|
|
for task in [
|
|
"handle_group_created",
|
|
"handle_user_created",
|
|
"handle_user_password_set",
|
|
"handle_user_deleted",
|
|
"handle_group_deleted",
|
|
"handle_user_deleted",
|
|
]:
|
|
self.assertIn(task, creators)
|
|
self.assertEqual(len(User.objects.all()), 0)
|
|
|
|
def test_delete_additional_groups(self):
|
|
group1 = Group.objects.create(gid=2000, groupname="group1")
|
|
group2 = Group.objects.create(gid=2001, groupname="group2")
|
|
user = User.objects.create_user(self.customer)
|
|
for group in [group1, group2]:
|
|
# noinspection PyUnresolvedReferences
|
|
user.additionalgroup_set.add(
|
|
AdditionalGroup.objects.create(user=user, group=group)
|
|
)
|
|
TaskResult.objects.all().delete()
|
|
user.delete()
|
|
taskres = TaskResult.objects.all()
|
|
self.assertEqual(len(taskres), 5)
|
|
creators = [t.creator for t in taskres]
|
|
for tcount, tcreator in [
|
|
(2, "handle_user_removed_from_group"),
|
|
(2, "handle_user_deleted"),
|
|
(1, "handle_group_deleted"),
|
|
]:
|
|
self.assertEqual(creators.count(tcreator), tcount)
|
|
self.assertEqual(len(User.objects.all()), 0)
|
|
# noinspection PyUnresolvedReferences
|
|
self.assertEqual(len(AdditionalGroup.objects.all()), 0)
|
|
|
|
def test_is_sftp_user(self):
|
|
user = User.objects.create_user(self.customer)
|
|
self.assertFalse(user.is_sftp_user())
|
|
|
|
sftp_group = Group.objects.create(
|
|
gid=2000, groupname=settings.OSUSER_SFTP_GROUP
|
|
)
|
|
# noinspection PyUnresolvedReferences
|
|
user.additionalgroup_set.add(
|
|
AdditionalGroup.objects.create(user=user, group=sftp_group)
|
|
)
|
|
self.assertTrue(user.is_sftp_user())
|
|
|
|
|
|
class SshPublicKeyManagerTest(TestCaseWithCeleryTasks):
|
|
def test_parse_keytext_rfc4716_1(self):
|
|
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_1_RFC4716)
|
|
self.assertEqual(len(res), 3)
|
|
self.assertGreater(len(res[1]), 40)
|
|
self.assertEqual(res[0], "ssh-rsa")
|
|
self.assertEqual(
|
|
res[2], '"1024-bit RSA, converted from OpenSSH by me@example.com"'
|
|
)
|
|
|
|
def test_parse_keytext_rfc4716_2(self):
|
|
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_2_RFC4716)
|
|
self.assertEqual(len(res), 3)
|
|
self.assertEqual(res[0], "ssh-dss")
|
|
self.assertGreater(len(res[1]), 40)
|
|
self.assertEqual(
|
|
res[2], "This is my public key for use on servers which I don't like."
|
|
)
|
|
|
|
def test_parse_keytext_rfc4716_3(self):
|
|
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_3_RFC4716)
|
|
self.assertEqual(len(res), 3)
|
|
self.assertEqual(res[0], "ssh-dss")
|
|
self.assertGreater(len(res[1]), 40)
|
|
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
|
|
|
|
def test_parse_keytext_openssh(self):
|
|
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_4_OPENSSH)
|
|
self.assertEqual(len(res), 3)
|
|
self.assertEqual(res[0], "ssh-rsa")
|
|
self.assertGreater(len(res[1]), 40)
|
|
self.assertEqual(res[2], "")
|
|
|
|
def test_parse_keytext_invalid_multiline(self):
|
|
with self.assertRaises(ValueError):
|
|
SshPublicKey.objects.parse_key_text("\r\n".join(["xx"] * 10))
|
|
|
|
def test_parse_keytext_empty_line(self):
|
|
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_6_RFC4716_EMPTY_LINE)
|
|
self.assertEqual(len(res), 3)
|
|
self.assertEqual(res[0], "ssh-dss")
|
|
self.assertGreater(len(res[1]), 40)
|
|
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
|
|
|
|
def test_parse_keytext_invalid_empty_rfc4716_header(self):
|
|
with self.assertRaises(ValueError):
|
|
SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_9_RFC4716_ONLY_HEADER)
|
|
|
|
def test_parse_keytext_no_comment(self):
|
|
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_7_NO_COMMENT)
|
|
self.assertEqual(len(res), 3)
|
|
self.assertEqual(res[0], "ssh-rsa")
|
|
self.assertGreater(len(res[1]), 40)
|
|
self.assertEqual(res[2], "")
|
|
|
|
def test_parse_keytext_multiline_comment(self):
|
|
res = SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_5_RFC4716_MULTILINE)
|
|
self.assertEqual(len(res), 3)
|
|
self.assertEqual(res[0], "ssh-dss")
|
|
self.assertGreater(len(res[1]), 40)
|
|
self.assertEqual(res[2], "DSA Public Key for use with MyIsp")
|
|
|
|
def test_parse_keytext_invalid(self):
|
|
with self.assertRaises(ValueError):
|
|
SshPublicKey.objects.parse_key_text("invalid")
|
|
|
|
def test_parse_keytext_invalid_openssh(self):
|
|
with self.assertRaises(ValueError):
|
|
SshPublicKey.objects.parse_key_text(EXAMPLE_KEY_8_OPENSSH_BROKEN)
|
|
|
|
def test_create_ssh_public_key(self):
|
|
customer = Customer.objects.create_user("test")
|
|
user = User.objects.create_user(customer)
|
|
key = SshPublicKey.objects.create_ssh_public_key(user, EXAMPLE_KEY_4_OPENSSH)
|
|
self.assertIsInstance(key, SshPublicKey)
|
|
self.assertEqual(key.user, user)
|
|
self.assertEqual(key.algorithm, "ssh-rsa")
|
|
self.assertEqual(key.data, EXAMPLE_KEY_4_OPENSSH.split()[1])
|
|
self.assertEqual(key.comment, "")
|
|
|
|
|
|
class SshPublicKeyTest(TestCaseWithCeleryTasks):
|
|
def setUp(self):
|
|
super(SshPublicKeyTest, self).setUp()
|
|
customer = Customer.objects.create_user("test")
|
|
self.user = User.objects.create_user(customer)
|
|
TaskResult.objects.all().delete()
|
|
|
|
def test__str__rfc4716(self):
|
|
res = SshPublicKey.objects.create_ssh_public_key(
|
|
self.user, EXAMPLE_KEY_3_RFC4716
|
|
)
|
|
self.maxDiff = None
|
|
self.assertEqual(
|
|
str(res),
|
|
"ssh-dss AAAAB3NzaC1kc3MAAACBAPY8ZOHY2yFSJA6XYC9HRwNHxae"
|
|
"hvx5wOJ0rzZdzoSOXxbETW6ToHv8D1UJ/z+zHo9Fiko5XybZnDIaBDHtblQ+Yp7St"
|
|
"xyltHnXF1YLfKD1G4T6JYrdHYI14Om1eg9e4NnCRleaqoZPF3UGfZia6bXrGTQf3g"
|
|
"Jq2e7Yisk/gF+1VAAAAFQDb8D5cvwHWTZDPfX0D2s9Rd7NBvQAAAIEAlN92+Bb7D4"
|
|
"KLYk3IwRbXblwXdkPggA4pfdtW9vGfJ0/RHd+NjB4eo1D+0dix6tXwYGN7PKS5R/F"
|
|
"XPNwxHPapcj9uL1Jn2AWQ2dsknf+i/FAAvioUPkmdMc0zuWoSOEsSNhVDtX3WdvVc"
|
|
"GcBq9cetzrtOKWOocJmJ80qadxTRHtUAAACBAN7CY+KKv1gHpRzFwdQm7HK9bb1LA"
|
|
"o2KwaoXnadFgeptNBQeSXG1vO+JsvphVMBJc9HSn24VYtYtsMu74qXviYjziVucWK"
|
|
"jjKEb11juqnF0GDlB3VVmxHLmxnAz643WK42Z7dLM5sY29ouezv4Xz2PuMch5VGPP"
|
|
"+CDqzCM4loWgV DSA Public Key for use with MyIsp",
|
|
)
|
|
|
|
def test__str__openssh(self):
|
|
res = SshPublicKey.objects.create_ssh_public_key(
|
|
self.user, EXAMPLE_KEY_4_OPENSSH
|
|
)
|
|
self.assertEqual(str(res), EXAMPLE_KEY_4_OPENSSH)
|
|
|
|
def test_call_tasks_on_save(self):
|
|
SshPublicKey.objects.create_ssh_public_key(self.user, EXAMPLE_KEY_4_OPENSSH)
|
|
taskresults = TaskResult.objects.all()
|
|
self.assertEqual(len(taskresults), 1)
|
|
self.assertEqual(taskresults[0].creator, "handle_ssh_keys_changed")
|
|
|
|
def test_call_tasks_on_delete(self):
|
|
key = SshPublicKey.objects.create_ssh_public_key(
|
|
self.user, EXAMPLE_KEY_4_OPENSSH
|
|
)
|
|
TaskResult.objects.all().delete()
|
|
key.delete()
|
|
taskresults = TaskResult.objects.all()
|
|
self.assertEqual(len(taskresults), 1)
|
|
self.assertEqual(taskresults[0].creator, "handle_ssh_keys_changed")
|