From e4609b912b64cbb3686730e3018d54c966b799b6 Mon Sep 17 00:00:00 2001 From: Geir Horn Date: Thu, 8 Feb 2024 08:39:57 +0100 Subject: [PATCH] Support for new AMQP option handling to filter messages for the application identification. Change-Id: Ieb1d657fa4f7dcb89392a259700069f1b9088e5f --- AMPLSolver.cpp | 2 +- SolverComponent.cpp | 129 ++++++++++++++++++++++++++++++++------------ 2 files changed, 95 insertions(+), 36 deletions(-) diff --git a/AMPLSolver.cpp b/AMPLSolver.cpp index d3710d0..94681eb 100644 --- a/AMPLSolver.cpp +++ b/AMPLSolver.cpp @@ -363,7 +363,7 @@ AMPLSolver::AMPLSolver( const std::string & TheActorName, Solver( Actor::GetAddress().AsString() ), ProblemFileDirectory( ProblemPath ), ProblemDefinition( InstallationDirectory ), - DefaultObjectiveFunction() + DefaultObjectiveFunction(), VariablesToConstants() { RegisterHandler( this, &LSolver::DataFileUpdate ); diff --git a/SolverComponent.cpp b/SolverComponent.cpp index 46f98c9..50c0272 100644 --- a/SolverComponent.cpp +++ b/SolverComponent.cpp @@ -13,7 +13,7 @@ The command line arguments that can be givne to the Solver Component are -A or --AMPLDir for the AMPL model interpreter -B or --broker for the location of the AMQ broker --E or --endpoint The endpoint name +-E or --endpoint The endpoint name = application identifier -M ir --ModelDir for model and data files -N or --name The AMQ identity of the solver (see below) -P or --port the port to use on the AMQ broker URL @@ -93,6 +93,8 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/) #include "proton/symbol.hpp" // AMQ symbols #include "proton/connection_options.hpp" // Options for the Broker #include "proton/message.hpp" // AMQ messages definitions +#include "proton/source_options.hpp" // App ID filters +#include "proton/source.hpp" // The filter map #include "Communication/AMQ/AMQMessage.hpp" // The AMQP messages #include "Communication/AMQ/AMQEndpoint.hpp" // The AMP endpoint #include "Communication/AMQ/AMQjson.hpp" // Transparent JSON-AMQP @@ -141,8 +143,6 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) cxxopts::value()->default_value("5672") ) ("S,Solver", "Solver to use, devault Couenne", cxxopts::value()->default_value("couenne") ) - ("T,Tenant", "Tenant identifier for messages", - cxxopts::value()->default_value("TheTenant")) ("U,User", "The user name used for the AMQ Broker connection", cxxopts::value()->default_value("admin") ) ("Pw,Password", "The password for the AMQ Broker connection", @@ -189,6 +189,89 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) if( ModelDirectory.empty() || !std::filesystem::exists( ModelDirectory ) ) ModelDirectory = std::filesystem::temp_directory_path(); +// -------------------------------------------------------------------------- + // AMQ options + // -------------------------------------------------------------------------- + // + // In order to be general and flexible, the various AMQ options must be + // provided as a user specified class to allow the user full fexibility in + // deciding on the connection properties. This class should keep the user + // name, the password, and the application identifier, which is identical + // to the endpoint. + + class AMQOptions + : public Theron::AMQ::NetworkLayer::AMQProperties + { + private: + + const std::string User, Password, ApplicationID; + + protected: + + // The connection options just sets the user and the password to be used + // when the first connection is established with the AMQ broker + + virtual proton::connection_options ConnectionOptions(void) const override + { + proton::connection_options Options( + Theron::AMQ::NetworkLayer::AMQProperties::ConnectionOptions() ); + + Options.user( User ); + Options.password( Password ); + + return Options; + }; + + // Setting the application filter is slightly more complicated as it + // involves setting the filter map for the sender. + + virtual proton::receiver_options ReceiverOptions( void ) const override + { + proton::source::filter_map TheFilter; + proton::source_options TheSourceOptions; + proton::receiver_options TheOptions( + Theron::AMQ::NetworkLayer::AMQProperties::ReceiverOptions() ); + + std::ostringstream FilterValue; + + FilterValue << "application = '" << ApplicationID << "'"; + + TheFilter.put("apache.org:selector-filter:string", FilterValue.str() ); + TheSourceOptions.filters( TheFilter ); + TheOptions.source( TheSourceOptions ); + + return TheOptions; + } + + // The application identifier must also be provided in every message to + // allow other receivers to filter on this. + + virtual proton::message::property_map + MessageProperties( void ) const override + { + proton::message::property_map TheProperties( + Theron::AMQ::NetworkLayer::AMQProperties::MessageProperties() ); + + TheProperties.put( "application", ApplicationID ); + + return TheProperties; + } + + public: + + AMQOptions( const std::string & TheUser, const std::string & ThePassword, + const std::string & TheAppID ) + : User( TheUser ), Password( ThePassword ), ApplicationID( TheAppID ) + {} + + AMQOptions( const AMQOptions & Other ) + : User( Other.User ), Password( Other.Password ), + ApplicationID( Other.ApplicationID ) + {} + + virtual ~AMQOptions() = default; + }; + // -------------------------------------------------------------------------- // AMQ communication // -------------------------------------------------------------------------- @@ -196,51 +279,27 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings ) // The AMQ communication is managed by the standard communication actors of // the Theron++ Actor framewokr. Thus, it is just a matter of starting the // endpoint actors with the given command line parameters. - - // There are certain properties defined for the NebulOuS communication - // protocol where the endpoint name is taken as the application identifier - // and the tenant identifier and the protocol used. The same information - // has to be given as different maps for the connection options and for - // the message properties. - - std::map< proton::symbol, proton::value > NebulOuSProperties{ - {"x-tenant-id", CLIValues["Tenant"].as< std::string >() }, - {"x-application-id", CLIValues["Endpoint"].as< std::string >()}, - {"x-protocol", "AMQP"} - }; - - proton::message::property_map MessageProperties{ - {"x-tenant-id", CLIValues["Tenant"].as< std::string >() }, - {"x-application-id", CLIValues["Endpoint"].as< std::string >()}, - {"x-protocol", "AMQP"} - }; - - // The user name and the password are defined in the AMQ Qpid Proton - // connection options, and the values are therefore set for the connection - // options. - - proton::connection_options AMQOptions; - - AMQOptions.user( CLIValues["User"].as< std::string >() ); - AMQOptions.password( CLIValues["Password"].as< std::string >() ); - AMQOptions.properties( NebulOuSProperties ); - + // // The network endpoint takes the endpoint name as the first argument, then // the URL for the broker and the port number. Then the network endpoint can // be constructed using the default names for the Session Layer and the // Presentation layer servers, but calling the endpoint for "Solver" to make // it more visible at the AMQ broker listing of subscribers. The endpoint // will be a unique application identifier. The server names are followed - // by the defined connection options and the message options. + // by the defined AMQ options. Theron::AMQ::NetworkEndpoint AMQNetWork( CLIValues["Endpoint"].as< std::string >(), CLIValues["Broker"].as< std::string >(), CLIValues["Port"].as< unsigned int >(), - "Solver", + CLIValues["Name"].as< std::string >(), Theron::AMQ::Network::SessionLayerLabel, Theron::AMQ::Network::PresentationLayerLabel, - AMQOptions, MessageProperties + std::make_shared< AMQOptions >( + CLIValues["User"].as< std::string >(), + CLIValues["Password"].as< std::string >(), + CLIValues["Endpoint"].as< std::string >() + ) ); // --------------------------------------------------------------------------