diff --git a/app/entities.py b/app/entities.py index 535da02f6..9ee945fe6 100644 --- a/app/entities.py +++ b/app/entities.py @@ -372,9 +372,6 @@ class User: "homedirectory": "homeDirectory", } - def get_upn_prefix(self) -> str: - return self.user_principal_name.split("@")[0] - def is_expired(self) -> bool: if self.account_exp is None: return False diff --git a/app/extra/scripts/principal_block_user_sync.py b/app/extra/scripts/principal_block_user_sync.py index 858a65a7d..d4c483728 100644 --- a/app/extra/scripts/principal_block_user_sync.py +++ b/app/extra/scripts/principal_block_user_sync.py @@ -32,7 +32,7 @@ async def principal_block_sync( if "@" in user.user_principal_name: principal_postfix = user.user_principal_name.split("@")[1].upper() - principal_name = f"{user.get_upn_prefix()}@{principal_postfix}" + principal_name = f"{user.sam_account_name}@{principal_postfix}" else: continue diff --git a/app/extra/scripts/uac_sync.py b/app/extra/scripts/uac_sync.py index f0623e1b1..8f9ce4f46 100644 --- a/app/extra/scripts/uac_sync.py +++ b/app/extra/scripts/uac_sync.py @@ -71,7 +71,7 @@ async def disable_accounts( ) # fmt: skip async for user in users: - await kadmin.lock_principal(user.get_upn_prefix()) + await kadmin.lock_principal(user.sam_account_name) await add_lock_and_expire_attributes( session, diff --git a/app/ldap_protocol/auth/auth_manager.py b/app/ldap_protocol/auth/auth_manager.py index 61c336f56..d2e9073fd 100644 --- a/app/ldap_protocol/auth/auth_manager.py +++ b/app/ldap_protocol/auth/auth_manager.py @@ -232,7 +232,7 @@ async def _update_password( if include_krb: await self._kadmin.create_or_update_principal_pw( - user.get_upn_prefix(), + user.sam_account_name, new_password, ) diff --git a/app/ldap_protocol/ldap_requests/add.py b/app/ldap_protocol/ldap_requests/add.py index 75be3f6fc..a16c6b182 100644 --- a/app/ldap_protocol/ldap_requests/add.py +++ b/app/ldap_protocol/ldap_requests/add.py @@ -249,11 +249,7 @@ async def handle( # noqa: C901 # in the attributes if ( attr_name in Directory.ro_fields - or attr_name - in ( - "userpassword", - "unicodepwd", - ) + or attr_name in ("userpassword", "unicodepwd") or attr_name == new_dir.rdname ): continue @@ -342,11 +338,11 @@ async def handle( # noqa: C901 ), ) - for uattr, value in { - "loginShell": "/bin/bash", - "uidNumber": str(create_integer_hash(user.sam_account_name)), - "homeDirectory": f"/home/{user.sam_account_name}", - }.items(): + for uattr, value in ( + ("loginShell", "/bin/bash"), + ("uidNumber", str(create_integer_hash(user.sam_account_name))), + ("homeDirectory", f"/home/{user.sam_account_name}"), + ): if uattr in user_attributes: value = user_attributes[uattr] del user_attributes[uattr] @@ -421,6 +417,15 @@ async def handle( # noqa: C901 ), ) + if is_computer: + attributes.append( + Attribute( + name="sAMAccountName", + value=f"{new_dir.name}", + directory_id=new_dir.id, + ), + ) + if not ctx.attribute_value_validator.is_directory_attributes_valid( entity_type.name if entity_type else "", attributes, @@ -461,16 +466,14 @@ async def handle( # noqa: C901 KRBAPIDeletePrincipalError, KRBAPIPrincipalNotFoundError, ): - await ctx.kadmin.del_principal( - user.get_upn_prefix(), - ) + await ctx.kadmin.del_principal(user.sam_account_name) pw = ( self.password.get_secret_value() if self.password else None ) - await ctx.kadmin.add_principal(user.get_upn_prefix(), pw) + await ctx.kadmin.add_principal(user.sam_account_name, pw) elif is_computer: await ctx.kadmin.add_principal( diff --git a/app/ldap_protocol/ldap_requests/bind.py b/app/ldap_protocol/ldap_requests/bind.py index 445b2f25c..ad764649e 100644 --- a/app/ldap_protocol/ldap_requests/bind.py +++ b/app/ldap_protocol/ldap_requests/bind.py @@ -209,7 +209,7 @@ async def handle( KRBAPIConnectionError, ): await ctx.kadmin.add_principal( - user.get_upn_prefix(), + user.sam_account_name, self.authentication_choice.password.get_secret_value(), 0.1, ) diff --git a/app/ldap_protocol/ldap_requests/delete.py b/app/ldap_protocol/ldap_requests/delete.py index e2b127331..3bf89343b 100644 --- a/app/ldap_protocol/ldap_requests/delete.py +++ b/app/ldap_protocol/ldap_requests/delete.py @@ -154,7 +154,7 @@ async def handle( # noqa: C901 await ctx.session_storage.clear_user_sessions( directory.user.id, ) - await ctx.kadmin.del_principal(directory.user.get_upn_prefix()) + await ctx.kadmin.del_principal(directory.user.sam_account_name) if await is_computer(directory.id, ctx.session): await ctx.kadmin.del_principal(directory.host_principal) diff --git a/app/ldap_protocol/ldap_requests/extended.py b/app/ldap_protocol/ldap_requests/extended.py index c3967889e..1f8cca946 100644 --- a/app/ldap_protocol/ldap_requests/extended.py +++ b/app/ldap_protocol/ldap_requests/extended.py @@ -248,7 +248,7 @@ async def handle( ): try: await ctx.kadmin.create_or_update_principal_pw( - user.get_upn_prefix(), + user.sam_account_name, new_password, ) except ( diff --git a/app/ldap_protocol/ldap_requests/modify.py b/app/ldap_protocol/ldap_requests/modify.py index 676550e3e..7ab3333fb 100644 --- a/app/ldap_protocol/ldap_requests/modify.py +++ b/app/ldap_protocol/ldap_requests/modify.py @@ -8,7 +8,7 @@ from typing import AsyncGenerator, ClassVar from loguru import logger -from sqlalchemy import Select, and_, delete, or_, select, update +from sqlalchemy import Select, and_, delete, func, or_, select, update from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, selectinload @@ -25,6 +25,7 @@ KRBAPIForcePasswordChangeError, KRBAPILockPrincipalError, KRBAPIPrincipalNotFoundError, + KRBAPIRenamePrincipalError, ) from ldap_protocol.ldap_codes import LDAPCodes from ldap_protocol.ldap_responses import ModifyResponse, PartialAttribute @@ -37,11 +38,16 @@ from ldap_protocol.policies.password import PasswordPolicyUseCases from ldap_protocol.session_storage import SessionStorage from ldap_protocol.utils.cte import check_root_group_membership_intersection -from ldap_protocol.utils.helpers import ft_to_dt, validate_entry +from ldap_protocol.utils.helpers import ( + ft_to_dt, + is_dn_in_base_directory, + validate_entry, +) from ldap_protocol.utils.queries import ( add_lock_and_expire_attributes, clear_group_membership, extend_group_membership, + get_base_directories, get_directories, get_directory_by_rid, get_filter_from_path, @@ -71,6 +77,7 @@ class ModifyForbiddenError(Exception): PermissionError, ModifyForbiddenError, KRBAPIPrincipalNotFoundError, + KRBAPIRenamePrincipalError, KRBAPILockPrincipalError, KRBAPIForcePasswordChangeError, ) @@ -100,6 +107,11 @@ class ModifyRequest(BaseRequest): object: str changes: list[Changes] + # NOTE: If the old value was changed (for example, in _delete) + # in one method, then you need to have access to the old value + # from other methods (for example, from _add) + _old_vals: dict[str, str | None] = {} + @classmethod def from_data(cls, data: list[ASN1Row]) -> "ModifyRequest": entry, proto_changes = data @@ -184,7 +196,7 @@ async def handle( entity_type_id=directory.entity_type_id, ) - names = {change.get_name() for change in self.changes} + names = {change.l_type for change in self.changes} password_change_requested = self._is_password_change_requested(names) self_modify = directory.id == ctx.ldap_session.user.directory_id @@ -212,7 +224,7 @@ async def handle( return for change in self.changes: - if change.modification.type.lower() in Directory.ro_fields: + if change.l_type in Directory.ro_fields: continue if not ctx.attribute_value_validator.is_partial_attribute_valid( # noqa: E501 @@ -222,7 +234,7 @@ async def handle( await ctx.session.rollback() yield ModifyResponse( result_code=LDAPCodes.UNDEFINED_ATTRIBUTE_TYPE, - message="Invalid attribute value(s)", + error_message="Invalid attribute value(s)", ) return @@ -612,6 +624,18 @@ async def _validate_object_class_modification( if is_object_class_in_replaced or is_object_class_in_deleted: raise ModifyForbiddenError("ObjectClass can't be deleted.") + def _need_to_cache_samaccountname_old_value( + self, + change: Changes, + directory: Directory, + ) -> bool: + return bool( + directory.entity_type + and directory.entity_type.name == EntityTypeNames.COMPUTER + and change.modification.type == "sAMAccountName" + and not self._old_vals.get(change.modification.type), + ) + async def _delete( self, change: Changes, @@ -621,9 +645,8 @@ async def _delete( name_only: bool = False, ) -> None: attrs = [] - name = change.modification.type.lower() - if name == "memberof": + if change.l_type == "memberof": await self._delete_memberof( change=change, directory=directory, @@ -632,7 +655,7 @@ async def _delete( ) return - if name == "member": + if change.l_type == "member": await self._delete_member( change=change, directory=directory, @@ -641,14 +664,16 @@ async def _delete( ) return - if name == "objectclass": + if change.l_type == "objectclass": await self._validate_object_class_modification(change, directory) if name_only or not change.modification.vals: attrs.append(qa(Attribute.name) == change.modification.type) else: for value in change.modification.vals: - if name not in (Directory.search_fields | User.search_fields): + if change.l_type not in ( + Directory.search_fields | User.search_fields + ): if isinstance(value, str): condition = qa(Attribute.value) == value elif isinstance(value, bytes): @@ -656,10 +681,15 @@ async def _delete( attrs.append( and_( - qa(Attribute.name) == change.modification.type, + func.lower(qa(Attribute.name)) == change.l_type, condition, ), - ) + ) # fmt: skip + + if self._need_to_cache_samaccountname_old_value(change, directory): + vals = directory.attributes_dict.get(change.modification.type) + if vals: + self._old_vals[change.modification.type] = vals[0] if attrs: del_query = ( @@ -773,16 +803,15 @@ async def _add_group_attrs( directory: Directory, session: AsyncSession, ) -> None: - name = change.get_name() - if name == "primarygroupid": + if change.l_type == "primarygroupid": await self._add_primary_group_attribute( change, directory, session, ) - elif name == "memberof": + elif change.l_type == "memberof": await self._add_memberof(change, directory, session) - elif name == "member": + elif change.l_type == "member": await self._add_member(change, directory, session) async def _add( # noqa: C901 @@ -798,23 +827,22 @@ async def _add( # noqa: C901 password_utils: PasswordUtils, ) -> None: attrs = [] - name = change.get_name() - if name in {"memberof", "member", "primarygroupid"}: + if change.l_type in ("memberof", "member", "primarygroupid"): await self._add_group_attrs(change, directory, session) return + base_dir = await self._get_base_dir(directory, session) + for value in change.modification.vals: - if name == "useraccountcontrol": + if change.l_type == "useraccountcontrol": uac_val = int(value) if not UserAccountControlFlag.is_value_valid(uac_val): continue elif ( - bool( - uac_val & UserAccountControlFlag.ACCOUNTDISABLE, - ) + bool(uac_val & UserAccountControlFlag.ACCOUNTDISABLE) and directory.user ): if directory.path_dn == current_user.dn: @@ -823,7 +851,7 @@ async def _add( # noqa: C901 ) await kadmin.lock_principal( - directory.user.get_upn_prefix(), + directory.user.sam_account_name, ) await add_lock_and_expire_attributes( @@ -837,9 +865,7 @@ async def _add( # noqa: C901 ) elif ( - not bool( - uac_val & UserAccountControlFlag.ACCOUNTDISABLE, - ) + not bool(uac_val & UserAccountControlFlag.ACCOUNTDISABLE) and directory.user ): await unlock_principal( @@ -858,37 +884,87 @@ async def _add( # noqa: C901 ), ) # fmt: skip - if name == "pwdlastset" and value == "0" and directory.user: + if ( + change.l_type == "pwdlastset" + and value == "0" + and directory.user + ): await kadmin.force_princ_pw_change( - directory.user.get_upn_prefix(), + directory.user.sam_account_name, ) - if name == directory.rdname: + if change.l_type == directory.rdname: await session.execute( update(Directory) .filter(directory_table.c.id == directory.id) .values(name=value), ) - if name in Directory.search_fields: + if change.l_type in Directory.search_fields: await session.execute( update(Directory) .filter(directory_table.c.id == directory.id) - .values({name: value}), + .values({change.l_type: value}), ) - elif name in User.search_fields: - if name == "accountexpires": + elif ( + change.l_type in User.search_fields + and directory.entity_type + and directory.entity_type.name == EntityTypeNames.USER + and directory.user + ): + if change.l_type == "accountexpires": new_value = ft_to_dt(int(value)) if value != "0" else None else: new_value = value # type: ignore - await session.execute( - update(User) - .filter_by(directory=directory) - .values({name: new_value}), + if change.l_type in ("userprincipalname", "samaccountname"): + if change.l_type == "userprincipalname": + new_user_principal_name = str(new_value) + new_sam_account_name = new_user_principal_name.split("@")[0] # noqa: E501 # fmt: skip + elif change.l_type == "samaccountname": + new_sam_account_name = str(new_value) + new_user_principal_name = f"{new_sam_account_name}@{base_dir.name}" # noqa: E501 # fmt: skip + + if directory.user.sam_account_name != new_sam_account_name: + await kadmin.rename_princ( + directory.user.sam_account_name, + new_sam_account_name, + ) + + directory.user.user_principal_name = new_user_principal_name # noqa: E501 # fmt: skip + directory.user.sam_account_name = new_sam_account_name + else: + await session.execute( + update(User) + .filter_by(directory=directory) + .values({change.l_type: new_value}), + ) + + elif ( + change.l_type == "samaccountname" + and directory.entity_type + and directory.entity_type.name == EntityTypeNames.COMPUTER + ): + await self._modify_computer_samaccountname( + change, + kadmin, + base_dir, + value, ) - elif name in ("userpassword", "unicodepwd") and directory.user: + attrs.append( + Attribute( + name=change.modification.type, + value=value if isinstance(value, str) else None, + bvalue=value if isinstance(value, bytes) else None, + directory_id=directory.id, + ), + ) # fmt: skip + + elif ( + change.l_type in ("userpassword", "unicodepwd") + and directory.user + ): if not settings.USE_CORE_TLS: raise PermissionError("TLS required") @@ -918,7 +994,7 @@ async def _add( # noqa: C901 directory.user, ) await kadmin.create_or_update_principal_pw( - directory.user.get_upn_prefix(), + directory.user.sam_account_name, value, ) @@ -935,3 +1011,47 @@ async def _add( # noqa: C901 ) session.add_all(attrs) + + async def _modify_computer_samaccountname( + self, + change: Changes, + kadmin: AbstractKadmin, + base_dir: Directory, + new_sam_account_name: bytes | str, + ) -> None: + old_sam_account_name = self._old_vals.get(change.modification.type) + new_sam_account_name = str(new_sam_account_name) + + if not old_sam_account_name: + raise ModifyForbiddenError("Old sAMAccountName value not found.") + + if old_sam_account_name != new_sam_account_name: + await kadmin.rename_princ( + f"host/{old_sam_account_name}", + f"host/{new_sam_account_name}", + ) + await kadmin.rename_princ( + f"host/{old_sam_account_name}.{base_dir.name}", + f"host/{new_sam_account_name}.{base_dir.name}", + ) + + async def _get_base_dir( + self, + directory: Directory, + session: AsyncSession, + ) -> Directory: + base_dir = None + + for base_directory in await get_base_directories(session): + if is_dn_in_base_directory( + base_directory, + directory.path_dn, + ): + base_dir = base_directory + break + else: + raise ModifyForbiddenError( + "Base directory for computer not found.", + ) + + return base_dir diff --git a/app/ldap_protocol/objects.py b/app/ldap_protocol/objects.py index 75effb3f0..d69301c0e 100644 --- a/app/ldap_protocol/objects.py +++ b/app/ldap_protocol/objects.py @@ -5,6 +5,7 @@ """ from enum import IntEnum, IntFlag, StrEnum, unique +from functools import cached_property from typing import Annotated import annotated_types @@ -82,8 +83,9 @@ class Changes(BaseModel): operation: Operation modification: PartialAttribute - def get_name(self) -> str: - """Get mod name.""" + @cached_property + def l_type(self) -> str: + """Get modification type (it's attribute name) in lower case.""" return self.modification.type.lower() diff --git a/app/ldap_protocol/roles/access_manager.py b/app/ldap_protocol/roles/access_manager.py index 9e7a964f4..e651e5d62 100644 --- a/app/ldap_protocol/roles/access_manager.py +++ b/app/ldap_protocol/roles/access_manager.py @@ -123,17 +123,16 @@ def check_modify_access( return False for change in changes: - attr_name = change.get_name() if change.operation == Operation.DELETE: if not cls._check_modify_access( - attr_name, + change.l_type, filtered_aces, AceType.DELETE, ): return False elif change.operation == Operation.ADD: if not cls._check_modify_access( - attr_name, + change.l_type, filtered_aces, AceType.WRITE, ): @@ -141,12 +140,12 @@ def check_modify_access( else: if not ( cls._check_modify_access( - attr_name, + change.l_type, filtered_aces, AceType.WRITE, ) and cls._check_modify_access( - attr_name, + change.l_type, filtered_aces, AceType.DELETE, ) diff --git a/interface b/interface index f31962020..e1ca5656a 160000 --- a/interface +++ b/interface @@ -1 +1 @@ -Subproject commit f31962020a6689e6a4c61fb3349db5b5c7895f92 +Subproject commit e1ca5656aeabc20a1862aeaf11ded72feaa97403 diff --git a/tests/test_api/test_main/conftest.py b/tests/test_api/test_main/conftest.py index 8f1b58dea..5ad39a9c2 100644 --- a/tests/test_api/test_main/conftest.py +++ b/tests/test_api/test_main/conftest.py @@ -138,6 +138,30 @@ async def adding_test_user( assert auth.cookies.get("id") +@pytest_asyncio.fixture(scope="function") +async def adding_test_computer( + http_client: AsyncClient, +) -> None: + """Test api correct (name) add.""" + response = await http_client.post( + "/entry/add", + json={ + "entry": "cn=mycomputer,dc=md,dc=test", + "password": None, + "attributes": [ + {"type": "name", "vals": ["mycomputer name"]}, + {"type": "cn", "vals": ["mycomputer"]}, + {"type": "objectClass", "vals": ["computer", "top"]}, + ], + }, + ) + + data = response.json() + + assert isinstance(data, dict) + assert data.get("resultCode") == LDAPCodes.SUCCESS + + @pytest_asyncio.fixture(scope="function") async def add_dns_settings( session: AsyncSession, diff --git a/tests/test_api/test_main/test_router/test_add.py b/tests/test_api/test_main/test_router/test_add.py index 3050bedec..323775256 100644 --- a/tests/test_api/test_main/test_router/test_add.py +++ b/tests/test_api/test_main/test_router/test_add.py @@ -171,6 +171,13 @@ async def test_api_add_computer(http_client: AsyncClient) -> None: else: raise Exception("Computer without userAccountControl") + for attr in data["search_result"][0]["partial_attributes"]: + if attr["type"] == "sAMAccountName": + assert attr["vals"][0] == "PC" + break + else: + raise Exception("Computer without sAMAccountName") + @pytest.mark.asyncio @pytest.mark.usefixtures("session") diff --git a/tests/test_api/test_main/test_router/test_modify.py b/tests/test_api/test_main/test_router/test_modify.py index 3e46e879d..c1aa72535 100644 --- a/tests/test_api/test_main/test_router/test_modify.py +++ b/tests/test_api/test_main/test_router/test_modify.py @@ -8,6 +8,7 @@ from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession +from ldap_protocol.kerberos.base import AbstractKadmin from ldap_protocol.ldap_codes import LDAPCodes from ldap_protocol.ldap_requests.modify import Operation @@ -16,10 +17,13 @@ @pytest.mark.usefixtures("adding_test_user") @pytest.mark.usefixtures("setup_session") @pytest.mark.usefixtures("session") -async def test_api_correct_modify(http_client: AsyncClient) -> None: +async def test_api_correct_modify_user_accountexpires( + http_client: AsyncClient, +) -> None: """Test API for modify object attribute.""" entry_dn = "cn=test,dc=md,dc=test" new_value = "133632677730000000" + response = await http_client.patch( "/entry/update", json={ @@ -37,7 +41,6 @@ async def test_api_correct_modify(http_client: AsyncClient) -> None: ) data = response.json() - assert isinstance(data, dict) assert data.get("resultCode") == LDAPCodes.SUCCESS @@ -57,13 +60,232 @@ async def test_api_correct_modify(http_client: AsyncClient) -> None: ) data = response.json() - assert data["resultCode"] == LDAPCodes.SUCCESS assert data["search_result"][0]["object_name"] == entry_dn for attr in data["search_result"][0]["partial_attributes"]: if attr["type"] == "accountExpires": assert attr["vals"][0] == new_value + break + else: + raise Exception("User without accountExpires") + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("adding_test_user") +@pytest.mark.usefixtures("setup_session") +@pytest.mark.usefixtures("session") +async def test_api_correct_modify_user_samaccountname( + http_client: AsyncClient, + kadmin: AbstractKadmin, +) -> None: + """Test API for modify object attribute.""" + entry_dn = "cn=test,dc=md,dc=test" + + response = await http_client.patch( + "/entry/update", + json={ + "object": entry_dn, + "changes": [ + { + "operation": Operation.REPLACE, + "modification": { + "type": "sAMAccountName", + "vals": ["NEW user name"], + }, + }, + ], + }, + ) + + data = response.json() + assert isinstance(data, dict) + assert data.get("resultCode") == LDAPCodes.SUCCESS + assert kadmin.rename_princ.call_args.args == ("new_user", "NEW user name") # type: ignore + + response = await http_client.post( + "entry/search", + json={ + "base_object": entry_dn, + "scope": 0, + "deref_aliases": 0, + "size_limit": 1000, + "time_limit": 10, + "types_only": True, + "filter": "(objectClass=*)", + "attributes": [], + "page_number": 1, + }, + ) + + data = response.json() + assert data["resultCode"] == LDAPCodes.SUCCESS + assert data["search_result"][0]["object_name"] == entry_dn + + for attr in data["search_result"][0]["partial_attributes"]: + if attr["type"] == "sAMAccountName": + assert attr["vals"][0] == "NEW user name" + break + else: + raise Exception("User without sAMAccountName") + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("adding_test_user") +@pytest.mark.usefixtures("setup_session") +@pytest.mark.usefixtures("session") +async def test_api_correct_modify_user_userprincipalname( + http_client: AsyncClient, + kadmin: AbstractKadmin, +) -> None: + """Test API for modify object attribute.""" + entry_dn = "cn=test,dc=md,dc=test" + + response = await http_client.patch( + "/entry/update", + json={ + "object": entry_dn, + "changes": [ + { + "operation": Operation.REPLACE, + "modification": { + "type": "userPrincipalName", + "vals": ["newbiguser@md.test"], + }, + }, + ], + }, + ) + + data = response.json() + assert isinstance(data, dict) + assert data.get("resultCode") == LDAPCodes.SUCCESS + assert kadmin.rename_princ.call_args.args == ("new_user", "newbiguser") # type: ignore + + response = await http_client.post( + "entry/search", + json={ + "base_object": entry_dn, + "scope": 0, + "deref_aliases": 0, + "size_limit": 1000, + "time_limit": 10, + "types_only": True, + "filter": "(objectClass=*)", + "attributes": [], + "page_number": 1, + }, + ) + + data = response.json() + assert data["resultCode"] == LDAPCodes.SUCCESS + assert data["search_result"][0]["object_name"] == entry_dn + + for attr in data["search_result"][0]["partial_attributes"]: + if attr["type"] == "userPrincipalName": + assert attr["vals"][0] == "newbiguser@md.test" + break + else: + raise Exception("User without userPrincipalName") + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("adding_test_computer") +@pytest.mark.usefixtures("setup_session") +@pytest.mark.usefixtures("session") +async def test_api_correct_modify_computer_samaccountname_replace( + http_client: AsyncClient, + kadmin: AbstractKadmin, +) -> None: + """Test API for modify computer sAMAccountName.""" + entry_dn = "cn=mycomputer,dc=md,dc=test" + response = await http_client.patch( + "/entry/update", + json={ + "object": entry_dn, + "changes": [ + { + "operation": Operation.REPLACE, + "modification": { + "type": "sAMAccountName", + "vals": ["maincomputer"], + }, + }, + ], + }, + ) + + data = response.json() + + assert isinstance(data, dict) + assert data.get("resultCode") == LDAPCodes.SUCCESS + assert kadmin.rename_princ.call_count == 2 # type: ignore + assert kadmin.rename_princ.call_args_list[0].args == ( # type: ignore + "host/mycomputer", + "host/maincomputer", + ) + assert kadmin.rename_princ.call_args_list[1].args == ( # type: ignore + "host/mycomputer.md.test", + "host/maincomputer.md.test", + ) + + response = await http_client.post( + "entry/search", + json={ + "base_object": entry_dn, + "scope": 0, + "deref_aliases": 0, + "size_limit": 1000, + "time_limit": 10, + "types_only": True, + "filter": "(objectClass=*)", + "attributes": [], + "page_number": 1, + }, + ) + + data = response.json() + + assert data["resultCode"] == LDAPCodes.SUCCESS + assert data["search_result"][0]["object_name"] == entry_dn + + for attr in data["search_result"][0]["partial_attributes"]: + if attr["type"] == "sAMAccountName": + assert attr["vals"][0] == "maincomputer" + break + else: + raise Exception("Computer without sAMAccountName") + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("adding_test_computer") +@pytest.mark.usefixtures("setup_session") +@pytest.mark.usefixtures("session") +async def test_api_incorrect_modify_computer_samaccountname_add( + http_client: AsyncClient, +) -> None: + """Test API for modify computer sAMAccountName.""" + entry_dn = "cn=mycomputer,dc=md,dc=test" + response = await http_client.patch( + "/entry/update", + json={ + "object": entry_dn, + "changes": [ + { + "operation": Operation.ADD, + "modification": { + "type": "sAMAccountName", + "vals": ["maincomputer"], + }, + }, + ], + }, + ) + + data = response.json() + + assert isinstance(data, dict) + assert data.get("resultCode") == LDAPCodes.OPERATIONS_ERROR @pytest.mark.asyncio