fintp_transport
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
AmqHelper.h
Go to the documentation of this file.
1 /*
2 * FinTP - Financial Transactions Processing Application
3 * Copyright (C) 2013 Business Information Systems (Allevo) S.R.L.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>
17 * or contact Allevo at : 031281 Bucuresti, 23C Calea Vitan, Romania,
18 * phone +40212554577, office@allevo.ro <mailto:office@allevo.ro>, www.allevo.ro.
19 */
20 
21 #ifndef AMQHELPER_H
22 #define AMQHELPER_H
23 
24 #include <activemq/core/ActiveMQConnection.h>
25 
26 #include "../TransportHelper.h"
27 
28 namespace FinTP
29 {
31  {
32  private :
33 
34  activemq::core::ActiveMQConnection* m_Connection;
35  cms::Session* m_Session, *m_AutoAcknowledgeSession;
36  cms::MessageConsumer* m_Consumer, *m_AutoAcknowledgeConsumer;
37  cms::QueueBrowser* m_QueueBrowser;
38  cms::MessageProducer* m_Producer, *m_AutoAcknowledgeProducer;
39 
42  string m_ConnectionString, m_Selector;
43 
45 
47 
48  int m_Timeout;
51 
52  int noMessage();
53  void resetUsePassedIds();
54  void updateSelector( string& selector );
55 
56  const string* m_MessageFormatPointer;
57 
58  void writeToBuffer( ManagedBuffer& buffer, const unsigned char* rawBuffer );
59  void doConnect( const string& brokerURIName, bool force );
60  bool send( const string& queueName, cms::Message& msg, bool syncpoint = true );
61  inline void setConnectionBrokerURI();
62  void setLastMessageIds( const cms::Message& msg );
63  void setLastMessageReplyData( const cms::Message& msg );
64  int setLastMessageInfo( cms::Message* msg, ManagedBuffer* buffer, bool browse=false, bool get=true, bool syncpoint=true );
65 
66  TRANSPORT_MESSAGE_TYPE ToTransportMessageType( const string& messageType );
67  std::string ToString( TRANSPORT_MESSAGE_TYPE messageType );
68 
69  void closeQueue();
70 
71  static const string FINTPGROUPID;
72  static const string FINTPGROUPSEQ;
73  static const string FINTPLASTINGROUP;
74 
75  map<string, int> m_GroupLogicOrder;
76 
77  public :
78 
79  explicit AmqHelper( const std::string& connectionstring = "" );
80 
81  ~AmqHelper();
82 
83  void connect( const string& queueManagerName = "", const string& transportUri = "", bool force = false );
84  void connect( const string& queueManagerName, const string& transportUri, const string& keyRepository , const string& sslCypherSpec , const string& sslPeerName , bool force = false );
85 
86  void openQueue( bool syncpoint, const std::string& queueName = "" );
87  void openQueue( const std::string& queueName = "");
88  void openBackoutQueue( const std::string& queueName );
89  void setBackupQueue( const std::string& queueName );
90 
91  long getQueueDepth( const std::string& queueName = "" );
92 
93  long peek( const std::string& queueName = "", bool first = true );
94 
95  void setTimeout( int millisecs );
96 
97  long getOne( bool getForClean = false );
98  long doGetOne( ManagedBuffer* buffer, bool getForClean, bool syncpoint = true );
99  long getOne( unsigned char* buffer, size_t maxSize, bool syncpoint = true );
100  long getGroupMessage( ManagedBuffer* groupMessageBuffer, const string& groupId, bool& isCleaningUp);
101  void putGroupMessage( ManagedBuffer* buffer, const string& batchId, long messageSequence, bool isLast );
102 
103 
104  void putOne( const ManagedBuffer& buffer, bool syncpoint = true );
105  void putOne( unsigned char* buffer, size_t bufferSize, bool syncpoint = true );
106  void putOne( unsigned char* buffer, size_t bufferSize, cms::Message& msg );
107  void putOne( cms::Message& msg, bool syncpoint = true );
108 
109  void putOneRequest( unsigned char* buffer, size_t bufferSize, const std::string& rtqName, const std::string& rtbName, TransportReplyOptions& replyOptions );
110  void putOneReply( unsigned char* buffer, size_t bufferSize, long feedback, TRANSPORT_MESSAGE_TYPE messageType = TMT_REPLY );
111  void putOneReply( const ManagedBuffer& buffer, long feedback );
112 
113  void putToDeadLetterQueue( cms::Message& msg, bool syncpoint = true );
114 
115  cms::Message* createMsg();
116 
117  void clearMessages();
118 
119  void setApplicationName( const std::string& applicationName );
120  void setMessageId( const std::string& messageId );
121  void setCorrelationId( const std::string& correlId );
122  void setGroupId( const std::string& groupId );
123 
124  void setMessageFormat( const string& format );
125  string getLastMessageFormat() const;
126 
127  time_t getMessagePutTime();
128  std::string getLastReplyBrokerURI() const { return m_ReplyBrokerURI; }
129 
130  TRANSPORT_MESSAGE_TYPE getLastMessageType() { return m_MessageType; };
131 
132  int getLastSequenceId() const { return m_GroupSequence; }
133  void setSequenceId( int sequenceId );
134 
135  void clearSSLOptions();
136 
137  void reconnect();
138  void disconnect();
139  bool commit();
140  bool rollback();
141 
142 // void setAutoAbandon( const int retries );
143 
144  /* FIXME */
145  TransportReplyOptions getLastReplyOptions() const;
146  string getApplicationName() const { return m_ApplicationName; }
147  std::string getLastReplyQueueManager() const { return m_ConnectionString; }
148 
149  long getOne( ManagedBuffer* buffer, bool syncpoint = true, bool keepJMSHeader = true ) { return doGetOne(buffer, false, syncpoint); }
150  /* FIXME */
151 
152  };
153 }
154 
155 #endif //AMQHELPER_H