aboutsummaryrefslogtreecommitdiffstats
path: root/lib/hashserv
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2023-11-03 08:26:38 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2023-11-09 17:21:15 +0000
commit0e945d3dec02479df1157f48fd44223c2bfb34a3 (patch)
treede8c12c494743ebf12409d1ca5a959df2383c402 /lib/hashserv
parent07eb9176f8a7449c1d2cbfff072fa0873e97a336 (diff)
downloadbitbake-0e945d3dec02479df1157f48fd44223c2bfb34a3.tar.gz
hashserv: tests: Allow authentication for external server tests
If BB_TEST_HASHSERV_USERNAME and BB_TEST_HASHSERV_PASSWORD are provided for a server admin user, the authentication tests for the external hashserver will run. In addition, any users that get created will now be deleted when the test finishes. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'lib/hashserv')
-rw-r--r--lib/hashserv/tests.py109
1 files changed, 74 insertions, 35 deletions
diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
index 2d78f9e97..5d209ffb8 100644
--- a/lib/hashserv/tests.py
+++ b/lib/hashserv/tests.py
@@ -84,17 +84,13 @@ class HashEquivalenceTestSetup(object):
return self.server.address
def start_auth_server(self):
- self.auth_server = self.start_server(self.server.dbpath, anon_perms=[], admin_username="admin", admin_password="password")
- self.admin_client = self.start_client(self.auth_server.address, username="admin", password="password")
+ auth_server = self.start_server(self.server.dbpath, anon_perms=[], admin_username="admin", admin_password="password")
+ self.auth_server_address = auth_server.address
+ self.admin_client = self.start_client(auth_server.address, username="admin", password="password")
return self.admin_client
def auth_client(self, user):
- return self.start_client(self.auth_server.address, user["username"], user["token"])
-
- def auth_perms(self, *permissions):
- self.client_index += 1
- user = self.admin_client.new_user(f"user-{self.client_index}", permissions)
- return self.auth_client(user)
+ return self.start_client(self.auth_server_address, user["username"], user["token"])
def setUp(self):
if sys.version_info < (3, 5, 0):
@@ -120,11 +116,11 @@ class HashEquivalenceTestSetup(object):
})
def assertUserCanAuth(self, user):
- with self.start_client(self.auth_server.address) as client:
+ with self.start_client(self.auth_server_address) as client:
client.auth(user["username"], user["token"])
def assertUserCannotAuth(self, user):
- with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError):
+ with self.start_client(self.auth_server_address) as client, self.assertRaises(InvokeError):
client.auth(user["username"], user["token"])
def create_test_hash(self, client):
@@ -157,6 +153,26 @@ class HashEquivalenceTestSetup(object):
class HashEquivalenceCommonTests(object):
+ def auth_perms(self, *permissions):
+ self.client_index += 1
+ user = self.create_user(f"user-{self.client_index}", permissions)
+ return self.auth_client(user)
+
+ def create_user(self, username, permissions, *, client=None):
+ def remove_user(username):
+ try:
+ self.admin_client.delete_user(username)
+ except bb.asyncrpc.InvokeError:
+ pass
+
+ if client is None:
+ client = self.admin_client
+
+ user = client.new_user(username, permissions)
+ self.addCleanup(remove_user, username)
+
+ return user
+
def test_create_hash(self):
return self.create_test_hash(self.client)
@@ -571,14 +587,14 @@ class HashEquivalenceCommonTests(object):
def test_auth_no_token_refresh_from_anon_user(self):
self.start_auth_server()
- with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError):
+ with self.start_client(self.auth_server_address) as client, self.assertRaises(InvokeError):
client.refresh_token()
def test_auth_self_token_refresh(self):
admin_client = self.start_auth_server()
# Create a new user with no permissions
- user = admin_client.new_user("test-user", [])
+ user = self.create_user("test-user", [])
with self.auth_client(user) as client:
new_user = client.refresh_token()
@@ -601,7 +617,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_token_refresh(self):
admin_client = self.start_auth_server()
- user = admin_client.new_user("test-user", [])
+ user = self.create_user("test-user", [])
with self.auth_perms() as client, self.assertRaises(InvokeError):
client.refresh_token(user["username"])
@@ -617,7 +633,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_self_get_user(self):
admin_client = self.start_auth_server()
- user = admin_client.new_user("test-user", [])
+ user = self.create_user("test-user", [])
user_info = user.copy()
del user_info["token"]
@@ -632,7 +648,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_get_user(self):
admin_client = self.start_auth_server()
- user = admin_client.new_user("test-user", [])
+ user = self.create_user("test-user", [])
user_info = user.copy()
del user_info["token"]
@@ -649,7 +665,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_reconnect(self):
admin_client = self.start_auth_server()
- user = admin_client.new_user("test-user", [])
+ user = self.create_user("test-user", [])
user_info = user.copy()
del user_info["token"]
@@ -665,7 +681,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_delete_user(self):
admin_client = self.start_auth_server()
- user = admin_client.new_user("test-user", [])
+ user = self.create_user("test-user", [])
# No self service
with self.auth_client(user) as client, self.assertRaises(InvokeError):
@@ -685,7 +701,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_set_user_perms(self):
admin_client = self.start_auth_server()
- user = admin_client.new_user("test-user", [])
+ user = self.create_user("test-user", [])
self.assertUserPerms(user, [])
@@ -710,7 +726,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_get_all_users(self):
admin_client = self.start_auth_server()
- user = admin_client.new_user("test-user", [])
+ user = self.create_user("test-user", [])
with self.auth_client(user) as client, self.assertRaises(InvokeError):
client.get_all_users()
@@ -744,10 +760,10 @@ class HashEquivalenceCommonTests(object):
permissions.sort()
with self.auth_perms() as client, self.assertRaises(InvokeError):
- client.new_user("test-user", permissions)
+ self.create_user("test-user", permissions, client=client)
with self.auth_perms("@user-admin") as client:
- user = client.new_user("test-user", permissions)
+ user = self.create_user("test-user", permissions, client=client)
self.assertIn("token", user)
self.assertEqual(user["username"], "test-user")
self.assertEqual(user["permissions"], permissions)
@@ -755,7 +771,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_become_user(self):
admin_client = self.start_auth_server()
- user = admin_client.new_user("test-user", ["@read", "@report"])
+ user = self.create_user("test-user", ["@read", "@report"])
user_info = user.copy()
del user_info["token"]
@@ -898,7 +914,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
user = admin_client.new_user("test-user", ["@read", "@report"])
p = self.run_hashclient([
- "--address", self.auth_server.address,
+ "--address", self.auth_server_address,
"--login", user["username"],
"--password", user["token"],
"refresh-token"
@@ -916,7 +932,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
print("New token is %r" % new_token)
self.run_hashclient([
- "--address", self.auth_server.address,
+ "--address", self.auth_server_address,
"--login", user["username"],
"--password", new_token,
"get-user"
@@ -928,7 +944,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
user = admin_client.new_user("test-user", ["@read"])
self.run_hashclient([
- "--address", self.auth_server.address,
+ "--address", self.auth_server_address,
"--login", admin_client.username,
"--password", admin_client.password,
"set-user-perms",
@@ -946,7 +962,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
user = admin_client.new_user("test-user", ["@read"])
p = self.run_hashclient([
- "--address", self.auth_server.address,
+ "--address", self.auth_server_address,
"--login", admin_client.username,
"--password", admin_client.password,
"get-user",
@@ -957,7 +973,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
self.assertIn("Permissions:", p.stdout)
p = self.run_hashclient([
- "--address", self.auth_server.address,
+ "--address", self.auth_server_address,
"--login", user["username"],
"--password", user["token"],
"get-user",
@@ -973,7 +989,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
admin_client.new_user("test-user2", ["@read"])
p = self.run_hashclient([
- "--address", self.auth_server.address,
+ "--address", self.auth_server_address,
"--login", admin_client.username,
"--password", admin_client.password,
"get-all-users",
@@ -987,7 +1003,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
admin_client = self.start_auth_server()
p = self.run_hashclient([
- "--address", self.auth_server.address,
+ "--address", self.auth_server_address,
"--login", admin_client.username,
"--password", admin_client.password,
"new-user",
@@ -1017,14 +1033,13 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
user = admin_client.new_user("test-user", ["@read"])
p = self.run_hashclient([
- "--address", self.auth_server.address,
+ "--address", self.auth_server_address,
"--login", admin_client.username,
"--password", admin_client.password,
"delete-user",
"-u", user["username"],
], check=True)
-
self.assertIsNone(admin_client.get_user(user["username"]))
def test_get_db_usage(self):
@@ -1104,19 +1119,43 @@ class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocket
class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
- def start_test_server(self):
- if 'BB_TEST_HASHSERV' not in os.environ:
- self.skipTest('BB_TEST_HASHSERV not defined to test an external server')
+ def get_env(self, name):
+ v = os.environ.get(name)
+ if not v:
+ self.skipTest(f'{name} not defined to test an external server')
+ return v
- return os.environ['BB_TEST_HASHSERV']
+ def start_test_server(self):
+ return self.get_env('BB_TEST_HASHSERV')
def start_server(self, *args, **kwargs):
self.skipTest('Cannot start local server when testing external servers')
+ def start_auth_server(self):
+
+ self.auth_server_address = self.server_address
+ self.admin_client = self.start_client(
+ self.server_address,
+ username=self.get_env('BB_TEST_HASHSERV_USERNAME'),
+ password=self.get_env('BB_TEST_HASHSERV_PASSWORD'),
+ )
+ return self.admin_client
+
def setUp(self):
super().setUp()
+ if "BB_TEST_HASHSERV_USERNAME" in os.environ:
+ self.client = self.start_client(
+ self.server_address,
+ username=os.environ["BB_TEST_HASHSERV_USERNAME"],
+ password=os.environ["BB_TEST_HASHSERV_PASSWORD"],
+ )
self.client.remove({"method": self.METHOD})
def tearDown(self):
self.client.remove({"method": self.METHOD})
super().tearDown()
+
+
+ def test_auth_get_all_users(self):
+ self.skipTest("Cannot test all users with external server")
+