Support for new AMQP option handling to filter messages for the application identification.
Change-Id: Ieb1d657fa4f7dcb89392a259700069f1b9088e5f
This commit is contained in:
parent
d77292357f
commit
e4609b912b
@ -363,7 +363,7 @@ AMPLSolver::AMPLSolver( const std::string & TheActorName,
|
||||
Solver( Actor::GetAddress().AsString() ),
|
||||
ProblemFileDirectory( ProblemPath ),
|
||||
ProblemDefinition( InstallationDirectory ),
|
||||
DefaultObjectiveFunction()
|
||||
DefaultObjectiveFunction(), VariablesToConstants()
|
||||
{
|
||||
RegisterHandler( this, &LSolver::DataFileUpdate );
|
||||
|
||||
|
@ -13,7 +13,7 @@ The command line arguments that can be givne to the Solver Component are
|
||||
|
||||
-A or --AMPLDir <installation directory> for the AMPL model interpreter
|
||||
-B or --broker <URL> for the location of the AMQ broker
|
||||
-E or --endpoint <name> The endpoint name
|
||||
-E or --endpoint <name> The endpoint name = application identifier
|
||||
-M ir --ModelDir <directory> for model and data files
|
||||
-N or --name The AMQ identity of the solver (see below)
|
||||
-P or --port <n> the port to use on the AMQ broker URL
|
||||
@ -93,6 +93,8 @@ License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
|
||||
#include "proton/symbol.hpp" // AMQ symbols
|
||||
#include "proton/connection_options.hpp" // Options for the Broker
|
||||
#include "proton/message.hpp" // AMQ messages definitions
|
||||
#include "proton/source_options.hpp" // App ID filters
|
||||
#include "proton/source.hpp" // The filter map
|
||||
#include "Communication/AMQ/AMQMessage.hpp" // The AMQP messages
|
||||
#include "Communication/AMQ/AMQEndpoint.hpp" // The AMP endpoint
|
||||
#include "Communication/AMQ/AMQjson.hpp" // Transparent JSON-AMQP
|
||||
@ -141,8 +143,6 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings )
|
||||
cxxopts::value<unsigned int>()->default_value("5672") )
|
||||
("S,Solver", "Solver to use, devault Couenne",
|
||||
cxxopts::value<std::string>()->default_value("couenne") )
|
||||
("T,Tenant", "Tenant identifier for messages",
|
||||
cxxopts::value<std::string>()->default_value("TheTenant"))
|
||||
("U,User", "The user name used for the AMQ Broker connection",
|
||||
cxxopts::value<std::string>()->default_value("admin") )
|
||||
("Pw,Password", "The password for the AMQ Broker connection",
|
||||
@ -189,6 +189,89 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings )
|
||||
if( ModelDirectory.empty() || !std::filesystem::exists( ModelDirectory ) )
|
||||
ModelDirectory = std::filesystem::temp_directory_path();
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// AMQ options
|
||||
// --------------------------------------------------------------------------
|
||||
//
|
||||
// In order to be general and flexible, the various AMQ options must be
|
||||
// provided as a user specified class to allow the user full fexibility in
|
||||
// deciding on the connection properties. This class should keep the user
|
||||
// name, the password, and the application identifier, which is identical
|
||||
// to the endpoint.
|
||||
|
||||
class AMQOptions
|
||||
: public Theron::AMQ::NetworkLayer::AMQProperties
|
||||
{
|
||||
private:
|
||||
|
||||
const std::string User, Password, ApplicationID;
|
||||
|
||||
protected:
|
||||
|
||||
// The connection options just sets the user and the password to be used
|
||||
// when the first connection is established with the AMQ broker
|
||||
|
||||
virtual proton::connection_options ConnectionOptions(void) const override
|
||||
{
|
||||
proton::connection_options Options(
|
||||
Theron::AMQ::NetworkLayer::AMQProperties::ConnectionOptions() );
|
||||
|
||||
Options.user( User );
|
||||
Options.password( Password );
|
||||
|
||||
return Options;
|
||||
};
|
||||
|
||||
// Setting the application filter is slightly more complicated as it
|
||||
// involves setting the filter map for the sender.
|
||||
|
||||
virtual proton::receiver_options ReceiverOptions( void ) const override
|
||||
{
|
||||
proton::source::filter_map TheFilter;
|
||||
proton::source_options TheSourceOptions;
|
||||
proton::receiver_options TheOptions(
|
||||
Theron::AMQ::NetworkLayer::AMQProperties::ReceiverOptions() );
|
||||
|
||||
std::ostringstream FilterValue;
|
||||
|
||||
FilterValue << "application = '" << ApplicationID << "'";
|
||||
|
||||
TheFilter.put("apache.org:selector-filter:string", FilterValue.str() );
|
||||
TheSourceOptions.filters( TheFilter );
|
||||
TheOptions.source( TheSourceOptions );
|
||||
|
||||
return TheOptions;
|
||||
}
|
||||
|
||||
// The application identifier must also be provided in every message to
|
||||
// allow other receivers to filter on this.
|
||||
|
||||
virtual proton::message::property_map
|
||||
MessageProperties( void ) const override
|
||||
{
|
||||
proton::message::property_map TheProperties(
|
||||
Theron::AMQ::NetworkLayer::AMQProperties::MessageProperties() );
|
||||
|
||||
TheProperties.put( "application", ApplicationID );
|
||||
|
||||
return TheProperties;
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
AMQOptions( const std::string & TheUser, const std::string & ThePassword,
|
||||
const std::string & TheAppID )
|
||||
: User( TheUser ), Password( ThePassword ), ApplicationID( TheAppID )
|
||||
{}
|
||||
|
||||
AMQOptions( const AMQOptions & Other )
|
||||
: User( Other.User ), Password( Other.Password ),
|
||||
ApplicationID( Other.ApplicationID )
|
||||
{}
|
||||
|
||||
virtual ~AMQOptions() = default;
|
||||
};
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// AMQ communication
|
||||
// --------------------------------------------------------------------------
|
||||
@ -196,51 +279,27 @@ int main( int NumberOfCLIOptions, char ** CLIOptionStrings )
|
||||
// The AMQ communication is managed by the standard communication actors of
|
||||
// the Theron++ Actor framewokr. Thus, it is just a matter of starting the
|
||||
// endpoint actors with the given command line parameters.
|
||||
|
||||
// There are certain properties defined for the NebulOuS communication
|
||||
// protocol where the endpoint name is taken as the application identifier
|
||||
// and the tenant identifier and the protocol used. The same information
|
||||
// has to be given as different maps for the connection options and for
|
||||
// the message properties.
|
||||
|
||||
std::map< proton::symbol, proton::value > NebulOuSProperties{
|
||||
{"x-tenant-id", CLIValues["Tenant"].as< std::string >() },
|
||||
{"x-application-id", CLIValues["Endpoint"].as< std::string >()},
|
||||
{"x-protocol", "AMQP"}
|
||||
};
|
||||
|
||||
proton::message::property_map MessageProperties{
|
||||
{"x-tenant-id", CLIValues["Tenant"].as< std::string >() },
|
||||
{"x-application-id", CLIValues["Endpoint"].as< std::string >()},
|
||||
{"x-protocol", "AMQP"}
|
||||
};
|
||||
|
||||
// The user name and the password are defined in the AMQ Qpid Proton
|
||||
// connection options, and the values are therefore set for the connection
|
||||
// options.
|
||||
|
||||
proton::connection_options AMQOptions;
|
||||
|
||||
AMQOptions.user( CLIValues["User"].as< std::string >() );
|
||||
AMQOptions.password( CLIValues["Password"].as< std::string >() );
|
||||
AMQOptions.properties( NebulOuSProperties );
|
||||
|
||||
//
|
||||
// The network endpoint takes the endpoint name as the first argument, then
|
||||
// the URL for the broker and the port number. Then the network endpoint can
|
||||
// be constructed using the default names for the Session Layer and the
|
||||
// Presentation layer servers, but calling the endpoint for "Solver" to make
|
||||
// it more visible at the AMQ broker listing of subscribers. The endpoint
|
||||
// will be a unique application identifier. The server names are followed
|
||||
// by the defined connection options and the message options.
|
||||
// by the defined AMQ options.
|
||||
|
||||
Theron::AMQ::NetworkEndpoint AMQNetWork(
|
||||
CLIValues["Endpoint"].as< std::string >(),
|
||||
CLIValues["Broker"].as< std::string >(),
|
||||
CLIValues["Port"].as< unsigned int >(),
|
||||
"Solver",
|
||||
CLIValues["Name"].as< std::string >(),
|
||||
Theron::AMQ::Network::SessionLayerLabel,
|
||||
Theron::AMQ::Network::PresentationLayerLabel,
|
||||
AMQOptions, MessageProperties
|
||||
std::make_shared< AMQOptions >(
|
||||
CLIValues["User"].as< std::string >(),
|
||||
CLIValues["Password"].as< std::string >(),
|
||||
CLIValues["Endpoint"].as< std::string >()
|
||||
)
|
||||
);
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
|
Loading…
x
Reference in New Issue
Block a user