Source code for common.optim.ne.utils.initialize

"""Variable initialization for Neuroevolution fitting."""

from functools import partial
from typing import Annotated as An

import numpy as np
from mpi4py import MPI

from common.optim.ne.agent import BaseAgent
from common.optim.ne.utils.type import (
    Exchange_and_mutate_info_batch_type,
    Exchange_and_mutate_info_type,
    Generation_results_batch_type,
    Generation_results_type,
    Seeds_batch_type,
)
from common.optim.utils.hydra import get_launcher_config
from common.utils.beartype import ge, le
from common.utils.misc import seed_all
from common.utils.mpi4py import get_mpi_variables


[docs] def initialize_common_variables( agents_per_task: An[int, ge(1)], num_pops: An[int, ge(1), le(2)], ) -> tuple[ An[int, ge(1)], # pop_size An[int, ge(1)], # len_agents_batch Exchange_and_mutate_info_type | None, # exchange_and_mutate_info Exchange_and_mutate_info_batch_type, # exchange_and_mutate_info_batch Seeds_batch_type, # seeds_batch Generation_results_type | None, # generation_results Generation_results_batch_type, # generation_results_batch An[int, ge(0)] | None, # total_num_env_steps ]: """Initializes variables common to all execution modes. Args: agents_per_task: See :paramref:`~.NeuroevolutionSubtaskConfig.agents_per_task`. num_pops: See :meth:`~.BaseSpace.num_pops`. Returns: :paramref:`~.compute_start_time_and_seeds.pop_size`, :paramref:`~initialize_agents.len_agents_batch`, :paramref:`~.update_exchange_and_mutate_info.exchange_and_mutate_info`, :paramref:`~.mutate.exchange_and_mutate_info_batch`, An array used as a buffer by all processes to receive the seeds from the primary process during the first generation only. :paramref:`~.compute_generation_results.generation_results`, :paramref:`~.compute_generation_results.generation_results_batch`, :paramref:`~.compute_total_num_env_steps_and_process_fitnesses.total_num_env_steps`. """ comm, rank, size = get_mpi_variables() launcher_config = get_launcher_config() pop_size = ( launcher_config.nodes * launcher_config.tasks_per_node * agents_per_task ) len_agents_batch = pop_size // size exchange_and_mutate_info = ( None if rank != 0 else np.empty( shape=(pop_size, num_pops, 4), dtype=np.uint32, ) ) exchange_and_mutate_info_batch = np.empty( shape=(len_agents_batch, num_pops, 4), dtype=np.uint32, ) seeds_batch = np.empty( shape=(len_agents_batch, num_pops), dtype=np.uint32, ) generation_results_batch = np.empty( shape=(len_agents_batch, num_pops, 3), dtype=np.float32, ) generation_results = ( None if rank != 0 else np.empty( shape=(pop_size, num_pops, 3), dtype=np.float32, ) ) total_num_env_steps = None if rank != 0 else 0 return ( pop_size, len_agents_batch, exchange_and_mutate_info, exchange_and_mutate_info_batch, seeds_batch, generation_results, generation_results_batch, total_num_env_steps, )
[docs] def initialize_gpu_comm() -> MPI.Comm: """Initializes a communicator for GPU work queueing. Assuming the experiment is ran with ``N`` MPI processes & ``M`` GPUs, this function will create ``M`` communicators, each containing ``N/M`` processes. Each communicator will be used to gather mutated agents onto one process, which will then evaluate them on the GPU. Returns: See :paramref:`~.evaluate_on_gpu.ith_gpu_comm`. """ comm, rank, size = get_mpi_variables() launcher_config = get_launcher_config() if not launcher_config.gpus_per_node: error_msg = ( "The number of GPUs per node must be a positive integer " "in order to setup GPU work queueing." ) raise ValueError(error_msg) tasks_per_gpu = size // launcher_config.gpus_per_node gpu_idx = rank // tasks_per_gpu ith_gpu_comm_task_list = np.arange( start=gpu_idx * tasks_per_gpu, stop=(gpu_idx + 1) * tasks_per_gpu, ).tolist() return comm.Create(comm.group.Incl(ith_gpu_comm_task_list))
[docs] def initialize_agents( agent: partial[BaseAgent], len_agents_batch: An[int, ge(1)], num_pops: An[int, ge(1), le(2)], *, pop_merge: bool, ) -> list[list[BaseAgent]]: # agents_batch """Initializes a batch of agents. Args: agent len_agents_batch: The number of agents per population maintained in :paramref:`~.compute_generation_results.agents_batch` by the process calling this function during a given generation. num_pops: See :meth:`~.BaseSpace.num_pops`. pop_merge: See :paramref:`~.NeuroevolutionSubtaskConfig.pop_merge`. Returns: A 2D list of agents maintained by this process. """ agents_batch: list[list[BaseAgent]] = [] for _ in range(len_agents_batch): agents_batch.append([]) for j in range(num_pops): seed_all(0) agents_batch[-1].append( agent(pop_idx=j, pops_are_merged=pop_merge), ) return agents_batch