From 4bf9516e0c002a66feb4d220cd20f97cb8738da5 Mon Sep 17 00:00:00 2001 From: Dalton Bohning Date: Wed, 28 May 2025 18:26:41 +0000 Subject: [PATCH] DAOS-17420 test: verify auto recovery policy Test-tag: RbldAutoRecoveryPolicy Test-repeat: 3 Skip-unit-tests: true Skip-fault-injection-test: true Signed-off-by: Dalton Bohning --- .../ftest/rebuild/auto_recovery_policy.py | 389 ++++++++++++++++++ .../ftest/rebuild/auto_recovery_policy.yaml | 30 ++ src/tests/ftest/util/dmg_utils.py | 42 ++ src/tests/ftest/util/dmg_utils_base.py | 44 ++ src/tests/ftest/util/test_utils_pool.py | 34 +- 5 files changed, 535 insertions(+), 4 deletions(-) create mode 100644 src/tests/ftest/rebuild/auto_recovery_policy.py create mode 100644 src/tests/ftest/rebuild/auto_recovery_policy.yaml diff --git a/src/tests/ftest/rebuild/auto_recovery_policy.py b/src/tests/ftest/rebuild/auto_recovery_policy.py new file mode 100644 index 00000000000..bee90b1592e --- /dev/null +++ b/src/tests/ftest/rebuild/auto_recovery_policy.py @@ -0,0 +1,389 @@ +""" + (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" +import re +import time +from functools import partial + +from apricot import TestWithServers +from data_utils import assert_val_in_list +from exception_utils import CommandFailure +from general_utils import list_to_str + + +class RbldAutoRecoveryPolicy(TestWithServers): + """Rebuild test cases featuring IOR. + + This class contains tests for pool rebuild that feature I/O going on + during the rebuild using IOR. + + :avocado: recursive + """ + + def test_rebuild_auto_recovery_policy(self): + """Jira ID: DAOS-17420. + + Test Description: Verify Rebuild Auto Recovery Policy + + :avocado: tags=all,full_regression + :avocado: tags=vm + :avocado: tags=pool,rebuild + :avocado: tags=RbldAutoRecoveryPolicy,test_rebuild_auto_recovery_policy + """ + # Can run only a subset of scenarios for debugging + scenarios_to_verify = set(self.params.get('scenarios_to_verify', '/run/test/*', ['all'])) + + self.log_step('Setup pool') + pool = self.get_pool(connect=False) + dmg = self.get_dmg_command() + + # TODO calculate this. hard-coded for now + # The detection delay shall be a couple of SWIM periods (1s) + SWIM suspicion timeout (20s) + # + CRT_EVENT_DELAY (1s) + some margin of error (?) + detection_delay = 30 + + # Get 2 distinct sets of ranks to stop + all_ranks = list(self.server_managers[0].ranks.keys()) + ranks_x = self.random.sample(all_ranks, k=1) + ranks_y = self.random.sample(list(set(all_ranks) - set(ranks_x)), k=1) + + # + # Scenario 1: System Creation and default self_heal + # + if not set([1, 'all']) & scenarios_to_verify: + self.log.warning('Skipping Scenario 1') + else: + self.log_step('Scenario 1 - Verify default system self_heal policy') + response = dmg.system_get_prop(properties='self_heal')['response'] + actual_value = response[0]['value'] + expected_value = 'exclude;pool_exclude;pool_rebuild' + if actual_value != expected_value: + self.fail( + f'Expected system self_heal policy to be {expected_value}, ' + f'but got {actual_value}') + + self.log_step('Scenario 1 - Verify default pool self_heal policy') + response = pool.get_prop(name='self_heal')['response'] + actual_value = response[0]['value'] + expected_value = 'exclude;rebuild' + if actual_value != expected_value: + self.fail( + f'Expected pool self_heal policy to be {expected_value}, ' + f'but got {actual_value}') + + # + # Scenario 2: Disabling and Enabling Self-Heal + # + if not set([2, 'all']) & scenarios_to_verify: + self.log.warning('Skipping Scenario 2') + else: + self.log_step('Scenario 2 - Disable system self_heal') + dmg.system_set_prop('self_heal:none') + + self.log_step('Scenario 2 - Stop a rank and verify it is not excluded') + dmg.system_stop(ranks=ranks_x) + self.server_managers[0].update_expected_states(ranks_x, 'stopped') + self.log.info('Waiting for detection delay of %s seconds', detection_delay) + time.sleep(detection_delay) + self.__verify_rank_state(ranks_x, 'stopped') + # TODO we cannot query the pool, which means we can't check rebuild status + + self.log_step( + 'Scenario 2 - Enable system self_heal and invoke dmg system self-heal eval') + dmg.system_set_prop('self_heal:exclude;pool_exclude;pool_rebuild') + try: + dmg.system_self_heal_eval() + except CommandFailure: + # TODO workaround for now + self.log.error('dmg system self-heal eval failed - retrying') + time.sleep(1) + dmg.system_self_heal_eval() + self.server_managers[0].update_expected_states(ranks_x, ['stopped', 'excluded']) + + self.log_step('Scenario 2 - Verify ranks are excluded and rebuilt in the pool') + self.__verify_rank_state(ranks_x, 'excluded') + pool.wait_for_rebuild_to_start(interval=1) + pool.wait_for_rebuild_to_end(interval=3) + + self.log_step( + 'Scenario 2 - Stop another rank and verify it is excluded and rebuilt in the pool') + dmg.system_stop(ranks=ranks_y) + self.server_managers[0].update_expected_states(ranks_y, ['stopped', 'excluded']) + pool.wait_for_rebuild_to_start(interval=1) + pool.wait_for_rebuild_to_end(interval=3) + self.__verify_rank_state(ranks_y, 'excluded') + + self.log_step( + 'Scenario 2 - Reintegrate stopped ranks to bring system back to original state') + stopped_ranks_str = list_to_str(ranks_x + ranks_y) + dmg.system_start(stopped_ranks_str) + dmg.system_reintegrate(stopped_ranks_str) + self.server_managers[0].update_expected_states(ranks_x + ranks_y, ['joined']) + pool.wait_for_rebuild_to_start(interval=1) + pool.wait_for_rebuild_to_end(interval=3) + self.__verify_rank_state(all_ranks, 'joined') + + # + # Scenario 3: Online System Maintenance + # + if not set([3, 'all']) & scenarios_to_verify: + self.log.warning('Skipping Scenario 3') + else: + self.log_step('Scenario 3 - Set system.self_heal.pool_rebuild = disabled') + dmg.system_set_prop('self_heal:exclude;pool_exclude') + dmg.system_get_prop(properties='self_heal') + + self.log_step('Scenario 3 - Stop a rank and verify it is excluded without rebuild') + dmg.system_stop(ranks=ranks_x) + self.server_managers[0].update_expected_states(ranks_x, ['stopped', 'excluded']) + self.log.info('Waiting for detection delay of %s seconds', detection_delay) + time.sleep(detection_delay) + self.__verify_rank_state(ranks_x, 'excluded') + pool.verify_query({ + 'disabled_ranks': ranks_x, + 'rebuild': { + 'state': partial(assert_val_in_list, allowed_list=['done', 'idle'])}}) + # Targets should be down but not down_out + pool.verify_query_targets_state(ranks_x, 'down') + + self.log_step('Scenario 3 - Restart the rank and make sure it rejoins') + dmg.system_start(ranks=ranks_x) + self.server_managers[0].update_expected_states(ranks_x, ['joined']) + self.__verify_rank_state(all_ranks, 'joined', tries=5, delay=3) + + self.log_step('Scenario 3 - Reintegrate the rank and wait for rebuild') + dmg.system_reintegrate(list_to_str(ranks_x)) + self.server_managers[0].update_expected_states(ranks_x, ['joined']) + pool.wait_for_rebuild_to_start(interval=1) + pool.wait_for_rebuild_to_end(interval=3) + + # The pool version changes after exclusion, + # but should not changed after resetting self_heal + self.log.info('Save current pool version') + pool_version = pool.query()['response']['version'] + + self.log_step('Scenario 3 - Reset system self_heal to default') + dmg.system_set_prop('self_heal:exclude;pool_exclude;pool_rebuild') + + self.log_step('Scenario 3 - Verify dmg system self-heal eval does not trigger rebuild') + dmg.system_self_heal_eval() + self.log.info('Waiting for detection delay of %s seconds', detection_delay) + time.sleep(detection_delay) + pool.verify_query({ + 'disabled_ranks': [], + 'rebuild': { + 'state': partial(assert_val_in_list, allowed_list=['done', 'idle'])}, + 'version': pool_version}) + + # + # Scenario 4: Offline System Maintenance + # + if not set([4, 'all']) & scenarios_to_verify: + self.log.warning('Skipping Scenario 4') + else: + self.log_step('Scenario 4 - Disable system self_heal') + dmg.system_set_prop('self_heal:none') + + # We expect the pool version to stay the same through this scenario since + # there are no exclusions or rebuilds + self.log.info('Save current pool version') + pool_version = pool.query()['response']['version'] + + self.log_step( + 'Scenario 4 - Stop more ranks than the pool RF and verify there are no exclusions') + pool_rf = int(re.findall(r'rd_fac:([0-9]+)', pool.properties.value)[0]) + self.assertGreater( + len(all_ranks), pool_rf, 'Not enough ranks to stop more than pool RF') + ranks_over_rf = self.random.sample(all_ranks, k=pool_rf + 1) + dmg.system_stop(ranks=list_to_str(ranks_over_rf)) + self.server_managers[0].update_expected_states(ranks_over_rf, ['stopped']) + self.log.info('Waiting for detection delay of %s seconds', detection_delay) + time.sleep(detection_delay) + self.__verify_rank_state(ranks_over_rf, 'stopped') + + self.log_step('Scenario 4 - Restart the stopped ranks and make sure they rejoin') + dmg.system_start(ranks=list_to_str(ranks_over_rf)) + self.server_managers[0].update_expected_states(ranks_over_rf, ['joined']) + self.__verify_rank_state(all_ranks, 'joined', tries=5, delay=3) + + self.log_step('Scenario 4 - Reset system self_heal to default') + dmg.system_set_prop('self_heal:exclude;pool_exclude;pool_rebuild') + + self.log_step('Scenario 4 - Verify dmg system self-heal eval does not trigger rebuild') + dmg.system_self_heal_eval() + self.log.info('Waiting for detection delay of %s seconds', detection_delay) + time.sleep(detection_delay) + pool.verify_query({ + 'disabled_ranks': [], + 'rebuild': { + 'state': partial(assert_val_in_list, allowed_list=['done', 'idle'])}, + 'version': pool_version}) + + # + # Scenario 5: Normal System Restart + # + if not set([5, 'all']) & scenarios_to_verify: + self.log.warning('Skipping Scenario 5') + else: + self.log_step('Scenario 5 - Disable system self_heal') + dmg.system_set_prop('self_heal:none') + + # We expect the pool version to stay the same through this scenario since + # there are no exclusions or rebuilds + self.log.info('Save current pool version') + pool_version = pool.query()['response']['version'] + + self.log_step('Scenario 5 - Stop the system and verify no ranks are excluded') + dmg.system_stop() + self.server_managers[0].update_expected_states(all_ranks, ['stopped']) + self.log.info('Waiting for detection delay of %s seconds', detection_delay) + time.sleep(detection_delay) + self.__verify_rank_state(all_ranks, 'stopped') + + self.log_step('Scenario 5 - Restart the system and make sure all ranks rejoin') + dmg.system_start() + self.server_managers[0].update_expected_states(all_ranks, ['joined']) + self.__verify_rank_state(all_ranks, 'joined', tries=5, delay=3) + + self.log_step('Scenario 5 - Reset system self_heal to default') + dmg.system_set_prop('self_heal:exclude;pool_exclude;pool_rebuild') + + self.log_step('Scenario 5 - Verify dmg system self-heal eval does not trigger rebuild') + dmg.system_self_heal_eval() + self.log.info('Waiting for detection delay of %s seconds', detection_delay) + time.sleep(detection_delay) + pool.verify_query({ + 'disabled_ranks': [], + 'rebuild': { + 'state': partial(assert_val_in_list, allowed_list=['done', 'idle'])}, + 'version': pool_version}) + + # + # Scenario 6: Unexpected System Restart + # + if not set([6, 'all']) & scenarios_to_verify: + self.log.warning('Skipping Scenario 6') + else: + self.log_step('Scenario 6 - Simulate restart with dmg system stop') + + # We expect the pool version to stay the same through this scenario since + # there are no exclusions or rebuilds + self.log.info('Save current pool version') + pool_version = pool.query()['response']['version'] + dmg.system_stop() + self.server_managers[0].update_expected_states(all_ranks, ['stopped']) + + self.log_step('Scenario 6 - Start all but 1 rank and immediately disable self-heal') + all_ranks_minus_1 = self.random.sample(all_ranks, k=len(all_ranks) - 1) + dmg.system_start(ranks=list_to_str(all_ranks_minus_1)) + self.server_managers[0].update_expected_states(all_ranks_minus_1, ['joined']) + dmg.system_set_prop('self_heal:none') + + self.log_step('Scenario 6 - Verify all but 1 rank rejoins') + self.__verify_rank_state(all_ranks_minus_1, 'joined', tries=5, delay=3) + + self.log_step('Scenario 6 - Restart the last rank and make sure it rejoins') + dmg.system_start() + self.server_managers[0].update_expected_states(all_ranks, ['joined']) + self.__verify_rank_state(all_ranks, 'joined', tries=5, delay=3) + + self.log_step('Scenario 6 - Reset system self_heal to default') + dmg.system_set_prop('self_heal:exclude;pool_exclude;pool_rebuild') + + self.log_step('Scenario 6 - Verify dmg system self-heal eval does not trigger rebuild') + dmg.system_self_heal_eval() + self.log.info('Waiting for detection delay of %s seconds', detection_delay) + time.sleep(detection_delay) + pool.verify_query({ + 'disabled_ranks': [], + 'rebuild': { + 'state': partial(assert_val_in_list, allowed_list=['done', 'idle'])}, + 'version': pool_version}) + + # + # Scenario 7: Problematic Pools + # + if not set([7, 'all']) & scenarios_to_verify: + self.log.warning('Skipping Scenario 7') + else: + self.log_step('Scenario 7 - Create a second pool') + pool2 = self.get_pool(connect=False) + + self.log_step('Scenario 7 - Disable self_heal rebuild on just the second pool') + pool2.set_prop('self_heal:exclude') + pool2.query() + + self.log_step('Scenario 7 - Stop a rank and wait for the detection delay') + dmg.system_stop(ranks=ranks_x) + self.server_managers[0].update_expected_states(ranks_x, ['stopped', 'excluded']) + time.sleep(detection_delay) + + self.log_step( + 'Scenario 7 - Verify the rank is excluded and rebuilds in first pool only') + self.__verify_rank_state(ranks_x, 'excluded') + pool.wait_for_rebuild_to_start(interval=1) + pool.wait_for_rebuild_to_end(interval=3) + pool.verify_query({ + 'disabled_ranks': ranks_x, + 'rebuild': { + 'state': 'done'}}) + self.log_step( + 'Scenario 7 - Verify the rank is excluded and does not rebuild in second pool') + pool2.verify_query({ + 'disabled_ranks': ranks_x, + 'rebuild': { + 'state': partial(assert_val_in_list, allowed_list=['done', 'idle'])}}) + # Targets should be down but not down_out + pool2.verify_query_targets_state(ranks_x, 'down') + + self.log_step( + 'Scenario 7 - Reintegrate stopped ranks to bring system back to original state') + stopped_ranks_str = list_to_str(ranks_x) + dmg.system_start(stopped_ranks_str) + dmg.system_reintegrate(stopped_ranks_str) + self.server_managers[0].update_expected_states(ranks_x, ['joined']) + pool.wait_for_rebuild_to_start(interval=1) + pool.wait_for_rebuild_to_end(interval=3) + self.__verify_rank_state(all_ranks, 'joined') + + self.log_step('Scenario 7 - Destroy second pool') + pool2.destroy() + del pool2 + + self.log_step('Destroy pool') + pool.destroy() + + self.log_step('Test passed') + + def __verify_rank_state(self, ranks, state, tries=1, delay=3): + """Verify the state of the given ranks. + + Args: + ranks (list): The list of ranks to verify. + state (str): The expected state of the ranks. + tries (int, optional): Number of attempts to verify the state. Defaults to 1. + delay (int, optional): Delay between attempts in seconds. Defaults to 3. + """ + for current_try in range(tries): + current_state = self.server_managers[0].get_current_state() + + # All ranks are in expected state + if set(current_state[rank]['state'] for rank in ranks) == {state}: + return + + # Retry + if current_try < tries - 1: + self.log.info( + 'Not all ranks are in expected state %s. Retrying in %s seconds...', + state, delay) + time.sleep(delay) + continue + + # Final attempt failed + for rank in ranks: + if current_state[rank]['state'] != state: + self.fail( + f'Expected rank {rank} to be in state {state}, ' + f'but current state is {current_state[rank]["state"]}') diff --git a/src/tests/ftest/rebuild/auto_recovery_policy.yaml b/src/tests/ftest/rebuild/auto_recovery_policy.yaml new file mode 100644 index 00000000000..5ac5748e625 --- /dev/null +++ b/src/tests/ftest/rebuild/auto_recovery_policy.yaml @@ -0,0 +1,30 @@ +hosts: + test_servers: 7 + test_clients: 1 + +timeout: 800 + +skip_add_log_msg: true + +server_config: + name: daos_server + engines_per_host: 1 + engines: + 0: + targets: 4 + nr_xs_helpers: 0 + storage: + 0: + class: ram + scm_mount: /mnt/daos + system_ram_reserved: 1 + +pool: + size: 10G + properties: rd_fac:2 + pool_query_timeout: 30 + register_cleanup: False # if something goes wrong, this will likely fail + +test: + scenarios_to_verify: + - all diff --git a/src/tests/ftest/util/dmg_utils.py b/src/tests/ftest/util/dmg_utils.py index 1579d1cd144..bb77c8b4c03 100644 --- a/src/tests/ftest/util/dmg_utils.py +++ b/src/tests/ftest/util/dmg_utils.py @@ -1157,6 +1157,21 @@ def system_exclude(self, ranks=None, rank_hosts=None): return self._get_json_result( ("system", "exclude"), ranks=ranks, rank_hosts=rank_hosts) + def system_get_prop(self, properties=None): + """Call dmg system get-prop. + + Args: + properties (str, optional): Comma separated properties to get. + + Raises: + CommandFailure: if the command fails. + + Returns: + dict: the dmg json command output converted to a python dictionary + + """ + return self._get_json_result(("system", "get-prop"), properties=properties) + def system_leader_query(self): """Call dmg system leader-query. @@ -1268,6 +1283,33 @@ def system_rebuild_stop(self, verbose=False, force=False): return self._get_json_result( ("system", "rebuild", "stop"), verbose=verbose, force=force) + def system_self_heal_eval(self): + """Call dmg system self-heal eval. + + Raises: + CommandFailure: if the command fails. + + Returns: + dict: the dmg json command output converted to a python dictionary + + """ + return self._get_json_result(("system", "self-heal", "eval")) + + def system_set_prop(self, properties=None): + """Call dmg system set-prop. + + Args: + properties (str): properties in the form of key:val[,key:val...] + + Raises: + CommandFailure: if the command fails. + + Returns: + dict: the dmg json command output converted to a python dictionary + + """ + return self._get_json_result(("system", "set-prop"), properties=properties) + def system_start(self, ranks=None, ignore_admin_excluded=False): """Start the system. diff --git a/src/tests/ftest/util/dmg_utils_base.py b/src/tests/ftest/util/dmg_utils_base.py index be13a06c107..78c05870579 100644 --- a/src/tests/ftest/util/dmg_utils_base.py +++ b/src/tests/ftest/util/dmg_utils_base.py @@ -885,6 +885,8 @@ def get_sub_command_class(self): self.sub_command_class = self.EraseSubCommand() elif self.sub_command.value == "exclude": self.sub_command_class = self.ExcludeSubCommand() + elif self.sub_command.value == "get-prop": + self.sub_command_class = self.GetPropSubCommand() elif self.sub_command.value == "leader-query": self.sub_command_class = self.LeaderQuerySubCommand() elif self.sub_command.value == "list-pools": @@ -895,6 +897,10 @@ def get_sub_command_class(self): self.sub_command_class = self.RebuildSubCommand() elif self.sub_command.value == "reintegrate": self.sub_command_class = self.ReintegrateSubCommand() + elif self.sub_command.value == "self-heal": + self.sub_command_class = self.SelfHealSubCommand() + elif self.sub_command.value == "set-prop": + self.sub_command_class = self.SetPropSubCommand() elif self.sub_command.value == "start": self.sub_command_class = self.StartSubCommand() elif self.sub_command.value == "stop": @@ -945,6 +951,14 @@ def __init__(self): self.ranks = FormattedParameter("--ranks={}") self.rank_hosts = FormattedParameter("--rank-hosts={}") + class GetPropSubCommand(CommandWithParameters): + """Defines an object for the dmg system get-prop command.""" + + def __init__(self): + """Create a dmg system get-prop command object.""" + super().__init__("/run/dmg/system/get-prop/*", "get-prop") + self.properties = BasicParameter(None, position=1) + class LeaderQuerySubCommand(CommandWithParameters): """Defines an object for the dmg system leader-query command.""" @@ -1011,6 +1025,36 @@ def __init__(self): self.verbose = FormattedParameter("--verbose", False) self.force = FormattedParameter("--force", False) + class SelfHealSubCommand(CommandWithSubCommand): + """Defines an object for the dmg system self-heal command.""" + + def __init__(self): + """Create a dmg system self-heal command object.""" + super().__init__("/run/dmg/system/self-heal/*", "self-heal") + + def get_sub_command_class(self): + # pylint: disable=redefined-variable-type + """Get the dmg system sub command object.""" + if self.sub_command.value == "eval": + self.sub_command_class = self.EvalSubCommand() + else: + self.sub_command_class = None + + class EvalSubCommand(CommandWithParameters): + """Defines an object for the dmg system self-heal eval command.""" + + def __init__(self): + """Create a dmg system self-heal eval command object.""" + super().__init__("/run/dmg/system/self-heal/eval/*", "eval") + + class SetPropSubCommand(CommandWithParameters): + """Defines an object for the dmg system set-prop command.""" + + def __init__(self): + """Create a dmg system set-prop command object.""" + super().__init__("/run/dmg/system/set-prop/*", "set-prop") + self.properties = BasicParameter(None, position=1) + class StartSubCommand(CommandWithParameters): """Defines an object for the dmg system start command.""" diff --git a/src/tests/ftest/util/test_utils_pool.py b/src/tests/ftest/util/test_utils_pool.py index a863c38cfdd..02a3c425772 100644 --- a/src/tests/ftest/util/test_utils_pool.py +++ b/src/tests/ftest/util/test_utils_pool.py @@ -1,6 +1,6 @@ """ (C) Copyright 2018-2024 Intel Corporation. - (C) Copyright 2025 Hewlett Packard Enterprise Development LP + (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -96,8 +96,9 @@ def add_pool(test, namespace=POOL_NAMESPACE, create=True, connect=True, dmg=None # Add a step to remove this pool when the test completes and ensure their is enough time for the # pool destroy to be attempted - accounting for a possible dmg command timeout - test.increment_timeout(POOL_TIMEOUT_INCREMENT) - test.register_cleanup(remove_pool, test=test, pool=pool) + if pool.register_cleanup.value is True: + test.increment_timeout(POOL_TIMEOUT_INCREMENT) + test.register_cleanup(remove_pool, test=test, pool=pool) return pool @@ -306,6 +307,8 @@ def __init__(self, context, dmg_command, label_generator=None, namespace=POOL_NA # Parameter to control running 'dmg storage query usage --show_usable' if pool create fails self.query_on_create_error = BasicParameter(None, False) + self.register_cleanup = BasicParameter(True, True) # call register_cleanup by default + self.pool = None self.info = None self.svc_ranks = None @@ -724,7 +727,7 @@ def set_prop(self, *args, **kwargs): dict: json output of dmg pool set-prop command """ - return self.dmg.pool_set_prop(pool=self.identifier, *args, **kwargs) + return self.dmg.pool_set_prop(self.identifier, *args, **kwargs) @fail_on(CommandFailure) def get_prop(self, *args, **kwargs): @@ -1623,3 +1626,26 @@ def verify_query(self, expected_response, use_cached_query=False): response = self.query_data['response'] assert_dict_subset(expected_response, response) + + def verify_query_targets_state(self, ranks, expected_target_state): + """Verify all targets are in the expected state with dmg pool query-targets. + + Args: + ranks (list): The list of ranks to verify. + expected_target_state (str): The expected target state. + + Raises: + AssertionError: if the pool query-targets response does not match expected values + + """ + def _all_targets_in_state(infos): + for target, info in enumerate(infos): + if info['target_state'] != expected_target_state: + raise AssertionError( + f'Expected target {target} to be in state {expected_target_state}, ' + f'but current state is {info["target_state"]}') + return True + + for rank in ranks: + response = self.query_targets(rank=rank)['response'] + assert_dict_subset({'Infos': _all_targets_in_state}, response)