24 #include <activemq/core/ActiveMQConnection.h>
26 #include "../TransportHelper.h"
36 cms::MessageConsumer*
m_Consumer, *m_AutoAcknowledgeConsumer;
38 cms::MessageProducer*
m_Producer, *m_AutoAcknowledgeProducer;
53 void resetUsePassedIds();
54 void updateSelector(
string& selector );
58 void writeToBuffer( ManagedBuffer& buffer,
const unsigned char* rawBuffer );
59 void doConnect(
const string& brokerURIName,
bool force );
60 bool send(
const string& queueName, cms::Message& msg,
bool syncpoint =
true );
61 inline void setConnectionBrokerURI();
62 void setLastMessageIds(
const cms::Message& msg );
63 void setLastMessageReplyData(
const cms::Message& msg );
64 int setLastMessageInfo( cms::Message* msg, ManagedBuffer* buffer,
bool browse=
false,
bool get=
true,
bool syncpoint=
true );
79 explicit AmqHelper(
const std::string& connectionstring =
"" );
83 void connect(
const string& queueManagerName =
"",
const string& transportUri =
"",
bool force =
false );
84 void connect(
const string& queueManagerName,
const string& transportUri,
const string& keyRepository ,
const string& sslCypherSpec ,
const string& sslPeerName ,
bool force =
false );
86 void openQueue(
bool syncpoint,
const std::string& queueName =
"" );
87 void openQueue(
const std::string& queueName =
"");
88 void openBackoutQueue(
const std::string& queueName );
89 void setBackupQueue(
const std::string& queueName );
91 long getQueueDepth(
const std::string& queueName =
"" );
93 long peek(
const std::string& queueName =
"",
bool first =
true );
95 void setTimeout(
int millisecs );
97 long getOne(
bool getForClean =
false );
98 long doGetOne( ManagedBuffer* buffer,
bool getForClean,
bool syncpoint =
true );
99 long getOne(
unsigned char* buffer,
size_t maxSize,
bool syncpoint =
true );
100 long getGroupMessage( ManagedBuffer* groupMessageBuffer,
const string& groupId,
bool& isCleaningUp);
101 void putGroupMessage( ManagedBuffer* buffer,
const string& batchId,
long messageSequence,
bool isLast );
104 void putOne(
const ManagedBuffer& buffer,
bool syncpoint =
true );
105 void putOne(
unsigned char* buffer,
size_t bufferSize,
bool syncpoint =
true );
106 void putOne(
unsigned char* buffer,
size_t bufferSize, cms::Message& msg );
107 void putOne( cms::Message& msg,
bool syncpoint =
true );
109 void putOneRequest(
unsigned char* buffer,
size_t bufferSize,
const std::string& rtqName,
const std::string& rtbName,
TransportReplyOptions& replyOptions );
110 void putOneReply(
unsigned char* buffer,
size_t bufferSize,
long feedback,
TRANSPORT_MESSAGE_TYPE messageType = TMT_REPLY );
111 void putOneReply(
const ManagedBuffer& buffer,
long feedback );
113 void putToDeadLetterQueue( cms::Message& msg,
bool syncpoint =
true );
115 cms::Message* createMsg();
117 void clearMessages();
119 void setApplicationName(
const std::string& applicationName );
120 void setMessageId(
const std::string& messageId );
121 void setCorrelationId(
const std::string& correlId );
122 void setGroupId(
const std::string& groupId );
124 void setMessageFormat(
const string& format );
125 string getLastMessageFormat()
const;
127 time_t getMessagePutTime();
133 void setSequenceId(
int sequenceId );
135 void clearSSLOptions();
149 long getOne( ManagedBuffer* buffer,
bool syncpoint =
true,
bool keepJMSHeader =
true ) {
return doGetOne(buffer,
false, syncpoint); }