Source code for common.optim.ne.fit

""":func:`.fit`, :func:`.evolve` and :func:`.fit.test`."""

import logging
import pickle
from collections.abc import Callable
from functools import partial
from pathlib import Path
from typing import Any

import numpy as np

from common.optim.ne.agent import BaseAgent
from common.optim.ne.space import BaseSpace
from common.optim.ne.utils.compute import (
    compute_generation_results,
    compute_save_points,
    compute_start_time_and_seeds,
    compute_total_num_env_steps_and_process_fitnesses,
)
from common.optim.ne.utils.evolve import (
    evaluate_on_cpu,
    evaluate_on_gpu,
    mutate,
)
from common.optim.ne.utils.exchange import (
    exchange_agents,
    update_exchange_and_mutate_info,
)
from common.optim.ne.utils.initialize import (
    initialize_agents,
    initialize_common_variables,
    initialize_gpu_comm,
)
from common.optim.ne.utils.readwrite import (
    find_existing_save_points,
    load_state,
    save_state,
)
from common.optim.ne.utils.type import Generation_results_type
from common.optim.ne.utils.validate import validate_space
from common.optim.ne.utils.wandb import setup_wandb, terminate_wandb
from common.utils.misc import seed_all
from common.utils.mpi4py import get_mpi_variables

from .config import (
    NeuroevolutionSubtaskConfig,
    NeuroevolutionSubtaskTestConfig,
)

MAX_INT = 2**31 - 1

log = logging.getLogger(__name__)


[docs] def fit( space: BaseSpace, agent: partial[BaseAgent], logger: Callable[..., Any], config: NeuroevolutionSubtaskConfig, ) -> None: """Neuroevolution + testing. Note that this function and all of its sub-functions will be called by ``num_nodes * tasks_per_node`` MPI processes/tasks. These two variables are set in the Hydra launcher configuration. Args: space agent logger: See :func:`~.utils.wandb.setup_wandb`. config """ if not isinstance(config, NeuroevolutionSubtaskTestConfig): evolve(space=space, agent=agent, logger=logger, config=config) else: test(space=space, config=config)
[docs] def evolve( space: BaseSpace, agent: partial[BaseAgent], logger: Callable[..., Any], config: NeuroevolutionSubtaskConfig, ) -> None: """Neuroevolution. Args: space agent logger: See :func:`~.utils.wandb.setup_wandb`. config """ seed_all(config.seed) comm, _, _ = get_mpi_variables() validate_space(space=space, pop_merge=config.pop_merge) prev_num_gens, save_points = compute_save_points( output_dir=config.output_dir, total_num_gens=config.total_num_gens, save_interval=config.save_interval, save_first_gen=config.save_first_gen, ) ( pop_size, len_agents_batch, exchange_and_mutate_info, exchange_and_mutate_info_batch, seeds_batch, generation_results, generation_results_batch, total_num_env_steps, ) = initialize_common_variables( agents_per_task=config.agents_per_task, num_pops=space.num_pops, ) if space.evaluates_on_gpu: ith_gpu_comm = initialize_gpu_comm() if prev_num_gens > 0: ( agents_batch, generation_results, total_num_env_steps, ) = load_state( prev_num_gens=prev_num_gens, len_agents_batch=len_agents_batch, output_dir=config.output_dir, ) else: agents_batch = initialize_agents( agent=agent, len_agents_batch=len_agents_batch, num_pops=space.num_pops, pop_merge=config.pop_merge, ) setup_wandb(logger=logger, output_dir=config.output_dir) for curr_gen in range(prev_num_gens + 1, config.total_num_gens + 1): start_time, seeds = compute_start_time_and_seeds( generation_results=generation_results, curr_gen=curr_gen, num_pops=space.num_pops, pop_size=pop_size, pop_merge=config.pop_merge, ) if curr_gen == 1: # See https://github.com/MaximilienLC/ai_repo/blob/main/docs/genetic.pdf # for a full example execution of the genetic algorithm. # The following block is examplified in section 3. comm.Scatter( sendbuf=seeds, recvbuf=seeds_batch, ) exchange_and_mutate_info_batch[:, :, 3] = seeds_batch else: update_exchange_and_mutate_info( num_pops=space.num_pops, pop_size=pop_size, exchange_and_mutate_info=exchange_and_mutate_info, generation_results=generation_results, seeds=seeds, ) # See https://github.com/MaximilienLC/ai_repo/blob/main/docs/genetic.pdf # for a full example execution of the genetic algorithm. # The following block is examplified in section 13. comm.Scatter( sendbuf=exchange_and_mutate_info, recvbuf=exchange_and_mutate_info_batch, ) exchange_agents( num_pops=space.num_pops, pop_size=pop_size, agents_batch=agents_batch, exchange_and_mutate_info_batch=exchange_and_mutate_info_batch, ) mutate( agents_batch=agents_batch, exchange_and_mutate_info_batch=exchange_and_mutate_info_batch, num_pops=space.num_pops, ) fitnesses_and_num_env_steps_batch = ( ( evaluate_on_gpu( ith_gpu_comm=ith_gpu_comm, agents_batch=agents_batch, space=space, curr_gen=curr_gen, transfer=config.env_transfer or config.fit_transfer or config.mem_transfer, ) ) if space.evaluates_on_gpu else evaluate_on_cpu( agents_batch=agents_batch, space=space, curr_gen=curr_gen, ) ) compute_generation_results( generation_results=generation_results, generation_results_batch=generation_results_batch, fitnesses_and_num_env_steps_batch=fitnesses_and_num_env_steps_batch, agents_batch=agents_batch, num_pops=space.num_pops, ) # Primary process gathers fitnesses, number of environment steps # and pickled agent sizes comm.Gather( sendbuf=generation_results_batch, recvbuf=generation_results, ) total_num_env_steps = ( compute_total_num_env_steps_and_process_fitnesses( generation_results=generation_results, total_num_env_steps=total_num_env_steps, curr_gen=curr_gen, start_time=start_time, pop_merge=config.pop_merge, ) ) # State saving. if curr_gen in save_points: save_state( agents_batch=agents_batch, generation_results=generation_results, total_num_env_steps=total_num_env_steps, curr_gen=curr_gen, output_dir=config.output_dir, ) terminate_wandb()
[docs] def test( space: BaseSpace, config: NeuroevolutionSubtaskTestConfig, ) -> None: """Neuroevolution testing. Args: space config """ # Get MPI info comm, rank, size = get_mpi_variables() # Setup work distribution across MPI processes save_points = find_existing_save_points(output_dir=config.output_dir) assigned_save_points = [ save_points[i] for i in range(len(save_points)) if i % size == rank ] # Loop over work assigned to this MPI process for gen in assigned_save_points: # Validate path & load state path = Path(f"{config.output_dir}/{gen}/") if not (path / "state.pkl").is_file(): log.info(f"No saved state found at {path}.") continue if (path / "evaluation.pkl").is_file(): log.info(f"Already evaluated generation {gen}.") continue with (path / "state.pkl").open(mode="rb") as f: state = pickle.load(f) # Extract state information for testing agents: list[list[BaseAgent]] = state[0] pop_size: int = len(agents) generation_results: Generation_results_type = state[1] total_num_env_steps: int = state[2] fitnesses = generation_results[:, :, 0] fitnesses_sorting_indices = fitnesses.argsort(axis=0) fitnesses_index_ranking = fitnesses_sorting_indices.argsort(axis=0) selected = np.greater_equal(fitnesses_index_ranking, pop_size // 2) selected_indices = np.where(selected[:, 0])[0] # Setup evaluation scores = np.empty((pop_size // 2, config.num_tests)) # Setting `eval_num_steps` to `test_num_steps` to have # later evaluate the agent for this potentially different # number of steps. space.config.eval_num_steps = config.test_num_steps # Loop over selected agents for i in range(pop_size // 2): agent: BaseAgent = agents[selected_indices[i]][0] for j in range(config.num_tests): log.info(f"Test #{j}, agent #{i}, generation #{gen}.") seed_all(MAX_INT - j) # env,fit,env+fit,env+fit+mem: reset # mem,mem+fit: no reset if not ( agent.config.mem_transfer or ( agent.config.mem_transfer and agent.config.fit_transfer ) ): agent.reset() # Setting `fit_transfer` to `False` to have # `space.evaluate` return the evaluation score rather # than the continual fitness. agent.config.fit_transfer = False # Setting `env_transfer` to `False` to not have # `space.evaluate` loop forever. agent.config.env_transfer = False fitnesses_and_num_env_steps = space.evaluate( agents=[[agent]], curr_gen=MAX_INT - j, ) scores[i][j] = fitnesses_and_num_env_steps[0] with (path / "evaluation.pkl").open(mode="wb") as f: pickle.dump([scores, total_num_env_steps], f)