diff --git a/ExecutionControl.cpp b/ExecutionControl.cpp new file mode 100644 index 0000000..fb358f6 --- /dev/null +++ b/ExecutionControl.cpp @@ -0,0 +1,80 @@ +/*============================================================================== +Execution control + +The source file implements the static variables and functions of the Execution +control actor. + +Author and Copyright: Geir Horn, University of Oslo +Contact: Geir.Horn@mn.uio.no +License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) +==============================================================================*/ + +#include "Actor.hpp" +#include "Communication/NetworkEndpoint.hpp" +#include "ExecutionControl.hpp" + +namespace NebulOuS +{ + +// ----------------------------------------------------------------------------- +// Static variables +// ----------------------------------------------------------------------------- + +bool ExecutionControl::Running = true; +std::mutex ExecutionControl::TerminationLock; +std::condition_variable ExecutionControl::ReadyToTerminate; + +// ----------------------------------------------------------------------------- +// Waiting function +// ----------------------------------------------------------------------------- +// +// The function used to wait for the termination message simply waits on the +// condition variable until it is signalled by the message handler. As there +// could be spurious wake-ups it is necessary to check if the actor is still +// running when the condition variable is signalled, and if so the calling +// thread will just block again in another wait. + +void ExecutionControl::WaitForTermination( void ) +{ + while( Running ) + { + std::unique_lock< std::mutex > Lock( TerminationLock ); + ReadyToTerminate.wait( Lock ); + } +} + +// ----------------------------------------------------------------------------- +// Stop message handler +// ----------------------------------------------------------------------------- +// +// The stop message handler will first send the network stop message to the +// session layer requesting it to coordinate the network shutdown and close all +// externally communicating actors. + +void ExecutionControl::StopMessageHandler( const StopMessage & Command, + const Address Sender ) +{ + std::lock_guard< std::mutex > Lock( TerminationLock ); + + Send( Theron::Network::ShutDown(), + Theron::Network::GetAddress( Theron::Network::Layer::Session ) ); + + Running = false; + ReadyToTerminate.notify_all(); +} + +// ----------------------------------------------------------------------------- +// Constructor +// ----------------------------------------------------------------------------- +// +// The only action taken by the constructor is to register the handler for the +// stop message. + +ExecutionControl::ExecutionControl( const std::string & TheActorName ) +: Actor( TheActorName ), + StandardFallbackHandler( Actor::GetAddress().AsString() ) +{ + RegisterHandler( this, &ExecutionControl::StopMessageHandler ); +} + +} // namespace NebulOuS \ No newline at end of file diff --git a/ExecutionControl.hpp b/ExecutionControl.hpp new file mode 100644 index 0000000..2986579 --- /dev/null +++ b/ExecutionControl.hpp @@ -0,0 +1,108 @@ +/*============================================================================== +Execution control + +The Solver Component should run as long as the application being optimised is +running. This requires an external message to the Solver Component about when +the Solver Component should shut down, and a way to stop other threads from +progressing until the shut down message has been processed. + +The following Actor may run on its own, but it may also be included with +another Actor to avoid running a separate thread just waiting for a single shut +down message. This Actor will therefore be base class for the Solver Manager +actor, but the implementation cannot be done there since the Solver Manager is +a templated actor, and knowlege about the template parameter would be necessary +to call the function to wait for termination. + +The threads calling the function to wait for termination will block until the +required message is received. + +Author and Copyright: Geir Horn, University of Oslo +Contact: Geir.Horn@mn.uio.no +License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) +==============================================================================*/ + +#ifndef NEBULOUS_EXECUTION_CONTROL +#define NEBULOUS_EXECUTION_CONTROL + +// Standard headers + +#include // Execution stop management +#include // Lock the condtion variable + +// Theron++ headers + +#include "Actor.hpp" // Actor base class +#include "Utility/StandardFallbackHandler.hpp" // Exception unhanded messages + +namespace NebulOuS +{ + +/*============================================================================== + + Execution control + +==============================================================================*/ + +class ExecutionControl +: virtual public Theron::Actor, + virtual public Theron::StandardFallbackHandler +{ + // The mechanism used for blocking other threads will be to make them wait + // for a condition variable until the message handler for the exit message + // will trigger and notifiy this variable. + +private: + + static bool Running; + static std::mutex TerminationLock; + static std::condition_variable ReadyToTerminate; + +public: + + // The function used to wait for the termination message simply waits on the + // condition variable until it is signalled by the message handler. As there + // could be spurious wake-ups it is necessary to check if the actor is still + // running when the condition variable is signalled, and if so the calling + // thread will just block again in another wait. + // + // Note that returning from this function does not imply that all actors have + // closed and finished processing. One should wait for the local actor system + // to close before deleting the local actors, see the normal function + // Actor::WaitForGlobalTermination() + + static void WaitForTermination( void ); + + // The stop message has not yet been defined and it is defined as an empty + // class here as a named placeholder for a better future definition. + + class StopMessage + { + public: + + StopMessage() = default; + StopMessage( const StopMessage & Other ) = default; + ~StopMessage() = default; + }; + +protected: + + // The message handler will change the value of the flag indicating that the + // Actor is running, and signalling the condition variable to indicate that + // the termination has started. + + virtual void StopMessageHandler( const StopMessage & Command, + const Address Sender ); + + // The constructor is simply taking the name of the actor as parameter and + // initialises the base classes. + +public: + + ExecutionControl( const std::string & TheActorName ); + + ExecutionControl() = delete; + virtual ~ExecutionControl() = default; +}; + +} // namespace NebulOuS +#endif // NEBULOUS_EXECUTION_CONTROL diff --git a/MetricUpdater.cpp b/MetricUpdater.cpp index 560acc3..5d5fd46 100644 --- a/MetricUpdater.cpp +++ b/MetricUpdater.cpp @@ -151,7 +151,7 @@ void MetricUpdater::SLOViolationHandler( SeverityMessage[ NebulOuS::TimePoint ].get< Solver::TimePointType >(), SeverityMessage[ NebulOuS::ObjectiveFunctionName ], TheApplicationExecutionContext - ), TheSolutionManger ); + ), TheSolverManager ); } // -------------------------------------------------------------------------- @@ -166,11 +166,11 @@ void MetricUpdater::SLOViolationHandler( // to for their values, and the second for receiving the SLO violation message. MetricUpdater::MetricUpdater( const std::string UpdaterName, - const Address ManagerForSolutions ) + const Address ManagerOfSolvers ) : Actor( UpdaterName ), StandardFallbackHandler( Actor::GetAddress().AsString() ), NetworkingActor( Actor::GetAddress().AsString() ), - MetricValues(), ValidityTime(0), TheSolutionManger( ManagerForSolutions ) + MetricValues(), ValidityTime(0), TheSolverManager( ManagerOfSolvers ) { RegisterHandler( this, &MetricUpdater::AddMetricSubscription ); RegisterHandler( this, &MetricUpdater::UpdateMetricValue ); diff --git a/MetricUpdater.hpp b/MetricUpdater.hpp index e6f0b49..85d0215 100644 --- a/MetricUpdater.hpp +++ b/MetricUpdater.hpp @@ -196,10 +196,10 @@ private: // values should be sent as an application execution context (message) to the // Solution Manager actor that will invoke a solver to find the optimal // configuration for this configuration. The Metric Updater must therefore - // know the address of the Solution Manager, and this must be passed to + // know the address of the Soler Manager, and this must be passed to // the constructor. - const Address TheSolutionManger; + const Address TheSolverManager; // -------------------------------------------------------------------------- // Subscribing to metric prediction values @@ -315,7 +315,7 @@ private: public: MetricUpdater( const std::string UpdaterName, - const Address ManagerForSolutions ); + const Address ManagerOfSolvers ); // The destructor will unsubscribe from the control channels for the // message defining metrics, and the channel for receiving SLO violation diff --git a/SolverComponent.cpp b/SolverComponent.cpp index 564770d..e63e34f 100644 --- a/SolverComponent.cpp +++ b/SolverComponent.cpp @@ -2,9 +2,228 @@ Solver Component This is the main file for the Solver Component executable including the parsing -of command line arguments and the AMQ network interface. +of command line arguments and the AMQ network interface. It first starts the +AMQ interface actors of the Network Endpoint, then creates the actors of the +solver component: The Metric Updater and the Solution Manager, which in turn +will start the solver actor(s). All actors are executing on proper operating +system threads, and they are scheduled for execution whenever they have a +pending message. + +The command line arguments that can be givne to the Solver Component are + +-A or --AMPLDir for the AMPL model interpreter +-B or --broker for the location of the AMQ broker +-E or --endpoint The endpoint name +-M ir --ModelDir for model and data files +-N or --name The AMQ identity of the solver (see below) +-P or --port the port to use on the AMQ broker URL +-U or --user the user to authenticate for the AMQ broker +-Pw or --password the AMQ broker password for the user +-? or --Help prints a help message for the options + +Default values: + +-A taken from the standard AMPL environment variables if omitted +-B localhost +-E +-M +-N "NebulOuS::Solver" +-P 5672 +-U admin +-Pw admin + +A note on the mandatory endpoint name defining the extension used for the +solver component when connecting to the AMQ server. Typically the connection +will be established as "name@endpoint" and so if there are several +solver components running, the endpoint is the only way for the AMQ solvers to +distinguish the different solver component subscriptions. Author and Copyright: Geir Horn, University of Oslo Contact: Geir.Horn@mn.uio.no License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) ==============================================================================*/ + +// Standard headers + +#include // For standard strings +// #include // For smart pointers +#include // Making informative error messages +#include // To format error messages +#include // standard exceptions +#include // Access to the file system +// #include // To unpack variable arguments +// #include // To constrain types +// #include // To store subscribed topics +// #include // To sleep while waiting for termination +// #include // To have a concept of fime + +// Theron++ headers + +#include "Actor.hpp" +#include "Utility/StandardFallbackHandler.hpp" +#include "Utility/ConsolePrint.hpp" + +#include "Communication/PolymorphicMessage.hpp" +#include "Communication/NetworkingActor.hpp" + +// AMQ protocol related headers + +#include "proton/connection_options.hpp" // Options for the Broker +#include "Communication/AMQ/AMQMessage.hpp" // The AMQP messages +#include "Communication/AMQ/AMQEndpoint.hpp" // The AMP endpoint +#include "Communication/AMQ/AMQjson.hpp" // Transparent JSON-AMQP + +// The cxxopts command line options parser + +#include "cxxopts.hpp" + +// AMPL Application Programmer Interface (API) + +#include "ampl/ampl.h" + +// NegulOuS related headers + +#include "MetricUpdater.hpp" +#include "SolverManager.hpp" +#include "AMPLSolver.hpp" + +/*============================================================================== + + Main file + +==============================================================================*/ +// + +int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) +{ + // -------------------------------------------------------------------------- + // Defining and parsing the Command Line Interface (CLI) options + // -------------------------------------------------------------------------- + + cxxopts::Options CLIOptions("./SolverComponent", + "The NebulOuS Solver component"); + + CLIOptions.add_options() + ("A,AMPLDir", "The AMPL installation path", + cxxopts::value()->default_value("") ) + ("B,broker", "The URL of the AMQ broker", + cxxopts::value()->default_value("localhost") ) + ("E,endpoint", "The endpoint name", cxxopts::value() ) + ("M,ModelDir", "Directory to store the model and its data", + cxxopts::value()->default_value("") ) + ("N,name", "The name of the Solver Component", + cxxopts::value()->default_value("NebulOuS::Solver") ) + ("P,port", "TCP port on AMQ Broker", + cxxopts::value()->default_value("5672") ) + ("U,user", "The user name used for the AMQ Broker connection", + cxxopts::value()->default_value("admin") ) + ("Pw,password", "The password for the AMQ Broker connection", + cxxopts::value()->default_value("admin") ) + ("?,help", "Print help information"); + + CLIOptions.allow_unrecognised_options(); + + auto CLIValues = CLIOptions.parse( NumberOfCLIOptions, CLIOptionStrings ); + + if( CLIValues.count("help") ) + { + std::cout << CLIOptions.help() << std::endl; + exit( EXIT_SUCCESS ); + } + + // -------------------------------------------------------------------------- + // Validating directories + // -------------------------------------------------------------------------- + // + // The directories are given as strings and they must be validated to see if + // the provided values correspond to an existing directory in the case of the + // AMPL directory. The model directory will be created if it is not an empty + // string, for which a temparary directory will be created. + + std::filesystem::path TheAMPLDirectory( CLIValues["AMPLDir"].as() ); + + if( !std::filesystem::exists( TheAMPLDirectory ) ) + { + std::source_location Location = std::source_location::current(); + std::ostringstream ErrorMessage; + + ErrorMessage << "[" << Location.file_name() << " at line " << Location.line() + << "in function " << Location.function_name() <<"] " + << "The AMPL installation driectory is given as [" + << CLIValues["AMPLDir"].as() + << "] but this directory does not ezist!"; + + throw std::invalid_argument( ErrorMessage.str() ); + } + + std::filesystem::path ModelDirectory( CLIValues["ModelDir"].as() ); + + if( ModelDirectory.empty() || !std::filesystem::exists( ModelDirectory ) ) + ModelDirectory = std::filesystem::temp_directory_path(); + + // -------------------------------------------------------------------------- + // AMQ communication + // -------------------------------------------------------------------------- + // + // The AMQ communication is managed by the standard communication actors of + // the Theron++ Actor framewokr. Thus, it is just a matter of starting the + // endpoint actors with the given command line parameters. + // + // The network endpoint takes the endpoint name as the first argument, then + // the URL for the broker and the port number. The user name and the password + // are defined in the AMQ Qpid Proton connection options, and the values are + // therefore set for the connection options. + + proton::connection_options AMQOptions; + + AMQOptions.user( CLIValues["user"].as< std::string >() ); + AMQOptions.password( CLIValues["password"].as< std::string >() ); + + // Then the network endpoint cna be constructed using the default names for + // the various network endpoint servers in order to pass the defined + // connection options. + + Theron::AMQ::NetworkEndpoint AMQNetWork( + CLIValues["endpoint"].as< std::string >(), + CLIValues["broker"].as< std::string >(), + CLIValues["port"].as< unsigned int >(), + Theron::AMQ::Network::NetworkLayerLabel, + Theron::AMQ::Network::SessionLayerLabel, + Theron::AMQ::Network::PresentationLayerLabel, + AMQOptions + ); + + // -------------------------------------------------------------------------- + // Solver component actors + // -------------------------------------------------------------------------- + // + // The solver managager must be started first since its address should be + // a parameter to the constructor of the Metric Updater so the latter actor + // knows where to send application execution contexts whenever a new solution + // is requested by the SLO Violation Detector through the Optimzer Controller. + + NebulOuS::SolverManager< NebulOuS::AMPLSolver > + WorkloadMabager( "WorkloadManager", + std::string( NebulOuS::Solver::Solution::MessageIdentifier ), + std::string( NebulOuS::Solver::ApplicationExecutionContext::MessageIdentifier ), + "AMPLSolver", ampl::Environment( TheAMPLDirectory.native() ), ModelDirectory ); + + NebulOuS::MetricUpdater + ContextMabager( "MetricUpdater", WorkloadMabager.GetAddress() ); + + // -------------------------------------------------------------------------- + // Termination management + // -------------------------------------------------------------------------- + // + // The critical part is to wait for the global shut down message from the + // Optimiser controller. That message will trigger the network to shut down + // and the Solver Component may terminate when the actor system has finished. + // Thus, the actors can still be running for some time after the global shut + // down message has been received, and it is therefore necessary to also wait + // for the actors to terminate. + + NebulOuS::ExecutionControl::WaitForTermination(); + Theron::Actor::WaitForGlobalTermination(); + + return EXIT_SUCCESS; +} \ No newline at end of file diff --git a/SolutionManager.hpp b/SolverManager.hpp similarity index 55% rename from SolutionManager.hpp rename to SolverManager.hpp index be3a49c..7f91358 100644 --- a/SolutionManager.hpp +++ b/SolverManager.hpp @@ -1,5 +1,5 @@ /*============================================================================== -Solution Manager +Solver Manager This class handles the Execution Context mssage containing a time stamp and a set of variable value assignments.It manages a time sorted queue and dispatches @@ -7,7 +7,7 @@ the first application execution context to the solver when the solver is ready. The solution returned for a given execution context will be published together with the execution context and the maximal utility value found by the solver. -The solver actor class is given as a template argument to the solution manager, +The solver actor class is given as a template argument to the solver manager, and at least one solver actor is instantiated at start up. This to allow multiple solvers to run in parallel should this be necessary to serve properly the queue of waiting application execution contexts. If there are multiple @@ -52,6 +52,8 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) #include // For nice error messages #include // Standard exceptions #include // Error location reporting +#include // Execution stop management +#include // Lock the condtion variable // Other packages @@ -72,6 +74,7 @@ using JSON = nlohmann::json; // Short form name space // NebulOuS headers +#include "ExecutionControl.hpp" // Shut down messages #include "Solver.hpp" // The basic solver class namespace NebulOuS @@ -87,7 +90,8 @@ class SolverManager : virtual public Theron::Actor, virtual public Theron::StandardFallbackHandler, virtual public Theron::NetworkingActor< - typename Theron::AMQ::Message::PayloadType > + typename Theron::AMQ::Message::PayloadType >, + virtual public ExecutionControl { // There is a topic name used to publish solutions found by the solvers. This // topic is given to the constructor and kept as a constant during the class @@ -104,8 +108,6 @@ private: // The solution manager dispatches the application execution contexts as // requests for solutions to a pool of solvers. -private: - std::list< SolverType > SolverPool; std::unordered_set< Address > ActiveSolvers, PassiveSolvers; @@ -161,137 +163,140 @@ private: ContextExecutionQueue.erase( ContextExecutionQueue.begin(), ContextExecutionQueue.begin() + DispatchedContexts ); + } } -} -// The handler function simply enqueues the received context, records its -// timesamp and dispatch as many contexts as possible to the solvers. Note -// that the context identifiers must be unique and there is a logic error -// if there is already a context with the same identifier. Then an invalid -// arguemtn exception will be thrown. This strategy should be reconsidered -// if there will be multiple entities firing execution contexts. + // The handler function simply enqueues the received context, records its + // timesamp and dispatch as many contexts as possible to the solvers. Note + // that the context identifiers must be unique and there is a logic error + // if there is already a context with the same identifier. Then an invalid + // arguemtn exception will be thrown. This strategy should be reconsidered + // if there will be multiple entities firing execution contexts. -void HandleApplicationExecutionContext( - const Solver:: ApplicationExecutionContext & TheContext, - const Address TheRequester ) -{ - auto [_, Success] = Contexts.try_emplace( - TheContext[ Solver::ContextIdentifier.data() ], TheContext ); - - if( Success ) + void HandleApplicationExecutionContext( + const Solver:: ApplicationExecutionContext & TheContext, + const Address TheRequester ) { - ContextExecutionQueue.emplace( - TheContext[ Solver::TimeStamp.data() ], - TheContext[ Solver::ContextIdentifier.data() ] ); + auto [_, Success] = Contexts.try_emplace( + TheContext[ Solver::ContextIdentifier.data() ], TheContext ); + if( Success ) + { + ContextExecutionQueue.emplace( + TheContext[ Solver::TimeStamp.data() ], + TheContext[ Solver::ContextIdentifier.data() ] ); + + DispatchToSolvers(); + } + else + { + std::source_location Location = std::source_location::current(); + std::ostringstream ErrorMessage; + + ErrorMessage << "[" << Location.file_name() << " at line " + << Location.line() + << "in function " << Location.function_name() <<"] " + << "An Application Execution Context with identifier " + << TheContext[ Solver::ContextIdentifier.data() ] + << " was received while there is already one with the same " + << "identifer. The identifiers must be unique!"; + + throw std::invalid_argument( ErrorMessage.str() ); + } + } + + // -------------------------------------------------------------------------- + // Solutions + // -------------------------------------------------------------------------- + // + // When a solution is received from a solver, it will be dispatched to all + // entities subscribing to the solution topic, and the solver will be returned + // to the pool of passive solvers. The dispatch function will be called at the + // end to ensure that the solver starts working on queued application execution + // contexts, if any. + + void PublishSolution( const Solver::Solution & TheSolution, + const Addres TheSolver ) + { + Send( TheSolution, SolutionReceiver ); + PassiveSolvers.insert( ActiveSolvers.extract( TheSolver ) ); DispatchToSolvers(); } - else - { - std::source_location Location = std::source_location::current(); - std::ostringstream ErrorMessage; - ErrorMessage << "[" << Location.file_name() << " at line " - << Location.line() - << "in function " << Location.function_name() <<"] " - << "An Application Execution Context with identifier " - << TheContext[ Solver::ContextIdentifier.data() ] - << " was received while there is already one with the same " - << "identifer. The identifiers must be unique!"; + // -------------------------------------------------------------------------- + // Constructor and destructor + // -------------------------------------------------------------------------- + // + // The constructor takes the name of the Solution Mnager Actor, the name of + // the topic where the solutions should be published, and the topic where the + // application execution contexts will be published. If the latter is empty, + // the manager will not listen to any externally generated requests, only those + // being sent from the Metric Updater supposed to exist on the same Actor + // system node as the manager.The final arguments to the constructor is a + // set of arguments to the solver type in the order expected by the solver + // type and repeated for the number of (local) solvers that should be created. + // + // Currently this manager does not support dispatching configurations to + // remote solvers and collect responses from these. However, this can be + // circumvented by creating a local "solver" transferring the requests to + // a remote solvers and collecting results from the remote solver. + +public: + + SolverManager( const std::string & TheActorName, + const Theron::AMQ::TopicName & SolutionTopic, + const Theron::AMQ::TopicName & ContextPublisherTopic, + const auto & ...SolverArguments ) + : Actor( TheActorName ), + StandardFallbackHandler( Actor::GetAddress().AsString() ), + NetworkingActor( Actor::GetAddress().AsString() ), + ExecutionControl( Actor::GetAddress().AsString() ), + SolutionReceiver( SolutionTopic ), + SolverPool(), ActiveSolvers(), PassiveSolvers(), + Contexts(), ContextExecutionQueue() + { + // The solvers are created by expanding the arguments for the solvers + // one by one creating new elements in the solver pool + + ( SolverPool.emplace_back( std::forward( SolverArguments ) ), ... ); + + // If the solvers were successfully created, their addresses are recorded as + // passive servers, and a publisher is made for the solution channel, and + // optionally, a subscritpion is made for the alternative context publisher + // topic. If the solvers could not be created, then an invalid argument + // exception will be thrown. + + if( !SolverPool.empty() ) + { + std::ranges::transform( ServerPool, std::inserter( PassiveSolvers ), + [](const SolverType & TheSolver){ return TheSolver.GetAddress(); } ); + + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Publisher, + SolutionTopic ), GetSessionLayerAddress() ); + + if( !ContextPublisherTopic.empty() ) + Send( Theron::AMQ::NetworkLayer::TopicSubscription( + Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, + ContextPublisherTopic ), GetSessionLayerAddress() ); + } + else + { + std::source_location Location = std::source_location::current(); + std::ostringstream ErrorMessage; + + ErrorMessage << "[" << Location.file_name() << " at line " + << Location.line() + << "in function " << Location.function_name() <<"] " + << "It was not possible to construct any solver of type " + << boost::core::demangle( typeid( SolverType ).name() ) + << " from the given constructor argument types: "; + + (( ErrorMessage << boost::core::demangle( typeid( SolverArguments ).name() ) << " " ), ... ); throw std::invalid_argument( ErrorMessage.str() ); + } } -} - -// -------------------------------------------------------------------------- -// Solutions -// -------------------------------------------------------------------------- -// -// When a solution is received from a solver, it will be dispatched to all -// entities subscribing to the solution topic, and the solver will be returned -// to the pool of passive solvers. The dispatch function will be called at the -// end to ensure that the solver starts working on queued application execution -// contexts, if any. - -void PublishSolution( const Solver::Solution & TheSolution, - const Addres TheSolver ) -{ - Send( TheSolution, SolutionReceiver ); - PassiveSolvers.insert( ActiveSolvers.extract( TheSolver ) ); - DispatchToSolvers(); -} - -// -------------------------------------------------------------------------- -// Constructor and destructor -// -------------------------------------------------------------------------- -// -// The constructor takes the name of the Solution Mnager Actor, the name of -// the topic where the solutions should be published, and the topic where the -// application execution contexts will be published. If the latter is empty, -// the manager will not listen to any externally generated requests, only those -// being sent from the Metric Updater supposed to exist on the same Actor -// system node as the manager.The final arguments to the constructor is a -// set of arguments to the solver type in the order expected by the solver -// type and repeated for the number of (local) solvers that should be created. -// -// Currently this manager does not support dispatching configurations to -// remote solvers and collect responses from these. However, this can be -// circumvented by creating a local "solver" transferring the requests to -// a remote solvers and collecting results from the remote solver. - -SolverManager( const std::string & TheActorName, - const Theron::AMQ::TopicName & SolutionTopic, - const Theron::AMQ::TopicName & ContextPublisherTopic, - const auto & ...SolverArguments ) -: Actor( TheActorName ), - StandardFallbackHandler( Actor::GetAddress().AsString() ), - NetworkingActor( Actor::GetAddress().AsString() ), - SolutionReceiver( SolutionTopic ), - SolverPool(), ActiveSolvers(), PassiveSolvers(), - Contexts(), ContextExecutionQueue() -{ - // The solvers are created by expanding the arguments for the solvers - // one by one creating new elements in the solver pool - - ( SolverPool.emplace_back( SolverArguments ), ... ); - - // If the solvers were successfully created, their addresses are recorded as - // passive servers, and a publisher is made for the solution channel, and - // optionally, a subscritpion is made for the alternative context publisher - // topic. If the solvers could not be created, then an invalid argument - // exception will be thrown. - - if( !SolverPool.empty() ) - { - std::ranges::transform( ServerPool, std::inserter( PassiveSolvers ), - [](const SolverType & TheSolver){ return TheSolver.GetAddress(); } ); - - Send( Theron::AMQ::NetworkLayer::TopicSubscription( - Theron::AMQ::NetworkLayer::TopicSubscription::Action::Publisher, - SolutionTopic ), GetSessionLayerAddress() ); - - if( !ContextPublisherTopic.empty() ) - Send( Theron::AMQ::NetworkLayer::TopicSubscription( - Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription, - ContextPublisherTopic ), GetSessionLayerAddress() ); - } - else - { - std::source_location Location = std::source_location::current(); - std::ostringstream ErrorMessage; - - ErrorMessage << "[" << Location.file_name() << " at line " - << Location.line() - << "in function " << Location.function_name() <<"] " - << "It was not possible to construct any solver of type " - << boost::core::demangle( typeid( SolverType ).name() ) - << " from the given constructor argument types: "; - - (( ErrorMessage << boost::core::demangle( typeid( SolverArguments ).name() ) << " " ), ... ); - - throw std::invalid_argument( ErrorMessage.str() ); - } -} };