Source code for virne.solver.meta_heuristic.simulated_annealing_solver

# ==============================================================================
# Copyright 2023 GeminiLight (wtfly2018@gmail.com). All Rights Reserved.
# ==============================================================================


import copy
import random
import threading
import numpy as np

from virne.core.environment import SolutionStepEnvironment
from virne.network import VirtualNetwork, PhysicalNetwork
from virne.core import Controller, Recorder, Counter, Solution, Logger
from virne.solver.meta_heuristic.base_meta_heuristic_solver import Individual, BaseMetaHeuristicSolver
from virne.solver.base_solver import Solver, SolverRegistry


[docs] @SolverRegistry.register(solver_name='sa_meta', solver_type='meta_heuristic') class SimulatedAnnealingSolver(BaseMetaHeuristicSolver): """ Simulated Annealing Algorithm (SA) for VNE References: - Sheng Zhang et al. "FELL: A Flexible Virtual Network Embedding Algorithm with Guaranteed Load Balancing". In ICC, 2011. Attributes: num_individuals: number of individuals max_iteration: max iteration max_attempt_times: max attempt times initial_temperature: initial temperature attenuation_factor: attenuation factor """ def __init__(self, controller: Controller, recorder: Recorder, counter: Counter, logger: Logger, config, **kwargs): super(SimulatedAnnealingSolver, self).__init__(controller, recorder, counter, logger, config, **kwargs) """ """ # super parameters self.num_individuals: int = 8 self.max_iteration: int = 12 self.max_attempt_times: int = 1 self.initial_temperature: float = 2. self.attenuation_factor: float = 0.95
[docs] def ready(self, v_net, p_net): return super().ready(v_net, p_net)
def meta_run(self, v_net, p_net): # initialization self.initialize(v_net, p_net) # start iterating individual_runners = [] for individual in self.individuals: individual_runners.append(threading.Thread(target=self.evolve, args=(individual, ))) for individual_runner in individual_runners: individual_runner.start() for individual_runner in individual_runners: individual_runner.join() self.update_best_individual(self.individuals) return self.best_individual.best_solution def initialize(self, v_net, p_net): # individuals self.individuals = [Individual(i, v_net, p_net) for i in range(self.num_individuals)] # initialize individuals best experience for individual in self.individuals: node_slots = self.generate_initial_node_slots(v_net, p_net, select_method='random') individual.update_solution(node_slots=node_slots) self.controller.deploy_with_node_slots(v_net, p_net, node_slots, individual.solution, inplace=False, shortest_method=self.shortest_method, k_shortest=self.k_shortest) self.counter.count_solution(v_net, individual.solution) individual.update_best_solution() # initialize global best experience self.update_best_individual(self.individuals) return self.best_individual def generate_neighor(self, individual): for i in range(self.max_attempt_times): changed_v_node_id = random.randint(0, individual.v_net.num_nodes-1) current_p_node = individual.solution['node_slots'][changed_v_node_id] p_candidate = self.select_p_candidate(changed_v_node_id, individual.selected_p_nodes, p_node_weights=None, method='random') if p_candidate != -1 and p_candidate != current_p_node: individual.update_solution(node_slots={changed_v_node_id: p_candidate}) self.controller.deploy_with_node_slots(individual.v_net, individual.p_net, individual.solution['node_slots'], individual.solution, inplace=False) self.counter.count_solution(individual.v_net, individual.solution) break def evolve(self, individual): iter_id = 0 temperature = self.initial_temperature individual.last_solution = copy.deepcopy(individual.solution) while iter_id < self.max_iteration: self.generate_neighor(individual) last_fitness = individual.calc_fitness(individual.last_solution) curr_fitness = individual.calc_fitness(individual.solution) diff_fitness = curr_fitness - last_fitness if diff_fitness < 0: individual.last_solution = copy.deepcopy(individual.solution) individual.update_best_solution() else: # acceptance criterion prob = np.exp(- diff_fitness / temperature) if random.random() < prob: individual.last_solution = copy.deepcopy(individual.solution) else: individual.solution = copy.deepcopy(individual.last_solution) iter_id += 1 temperature *= self.attenuation_factor