SocketInitiator.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
26 #include "SocketInitiator.h"
27 #include "Session.h"
28 #include "Settings.h"
29 
30 namespace FIX
31 {
33  MessageStoreFactory& factory,
34  const SessionSettings& settings )
35 throw( ConfigError )
36 : Initiator( application, factory, settings ),
37  m_connector( 1 ), m_lastConnect( 0 ),
38  m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
39  m_rcvBufSize( 0 )
40 {
41 }
42 
44  MessageStoreFactory& factory,
45  const SessionSettings& settings,
46  LogFactory& logFactory )
47 throw( ConfigError )
48 : Initiator( application, factory, settings, logFactory ),
49  m_connector( 1 ), m_lastConnect( 0 ),
50  m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
51  m_rcvBufSize( 0 )
52 {
53 }
54 
56 {
57  SocketConnections::iterator i;
58  for (i = m_connections.begin();
59  i != m_connections.end(); ++i)
60  delete i->second;
61 
62  for (i = m_pendingConnections.begin();
63  i != m_pendingConnections.end(); ++i)
64  delete i->second;
65 }
66 
68 throw ( ConfigError )
69 {
70  const Dictionary& dict = s.get();
71 
72  if( dict.has( RECONNECT_INTERVAL ) )
73  m_reconnectInterval = dict.getInt( RECONNECT_INTERVAL );
74  if( dict.has( SOCKET_NODELAY ) )
75  m_noDelay = dict.getBool( SOCKET_NODELAY );
76  if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
77  m_sendBufSize = dict.getInt( SOCKET_SEND_BUFFER_SIZE );
78  if( dict.has( SOCKET_RECEIVE_BUFFER_SIZE ) )
79  m_rcvBufSize = dict.getInt( SOCKET_RECEIVE_BUFFER_SIZE );
80 }
81 
83 throw ( RuntimeError )
84 {
85 }
86 
88 {
89  connect();
90 
91  while ( !isStopped() ) {
92  m_connector.block( *this, false, 1.0 );
94  }
95 
96  time_t start = 0;
97  time_t now = 0;
98 
99  ::time( &start );
100  while ( isLoggedOn() )
101  {
102  m_connector.block( *this );
103  if( ::time(&now) -5 >= start )
104  break;
105  }
106 }
107 
108 bool SocketInitiator::onPoll( double timeout )
109 {
110  time_t start = 0;
111  time_t now = 0;
112 
113  if( isStopped() )
114  {
115  if( start == 0 )
116  ::time( &start );
117  if( !isLoggedOn() )
118  return false;
119  if( ::time(&now) - 5 >= start )
120  return false;
121  }
122 
123  m_connector.block( *this, true, timeout );
124  return true;
125 }
126 
128 {
129 }
130 
132 {
133  try
134  {
135  std::string address;
136  short port = 0;
137  std::string sourceAddress;
138  short sourcePort = 0;
139 
140  Session* session = Session::lookupSession( s );
141  if( !session->isSessionTime(UtcTimeStamp()) ) return;
142 
143  Log* log = session->getLog();
144 
145  getHost( s, d, address, port, sourceAddress, sourcePort );
146 
147  log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) + " (Source " + sourceAddress + ":" + IntConvertor::convert((unsigned short)sourcePort) + ")");
148  int result = m_connector.connect( address, port, m_noDelay, m_sendBufSize, m_rcvBufSize, sourceAddress, sourcePort );
149  setPending( s );
150 
151  m_pendingConnections[ result ]
152  = new SocketConnection( *this, s, result, &m_connector.getMonitor() );
153  }
154  catch ( std::exception& ) {}
155 }
156 
158 {
159  SocketConnections::iterator i = m_pendingConnections.find( s );
160  if( i == m_pendingConnections.end() ) return;
161  SocketConnection* pSocketConnection = i->second;
162 
163  m_connections[s] = pSocketConnection;
164  m_pendingConnections.erase( i );
165  setConnected( pSocketConnection->getSession()->getSessionID() );
166  pSocketConnection->onTimeout();
167 }
168 
169 void SocketInitiator::onWrite( SocketConnector& connector, int s )
170 {
171  SocketConnections::iterator i = m_connections.find( s );
172  if ( i == m_connections.end() ) return ;
173  SocketConnection* pSocketConnection = i->second;
174  if( pSocketConnection->processQueue() )
175  pSocketConnection->unsignal();
176 }
177 
178 bool SocketInitiator::onData( SocketConnector& connector, int s )
179 {
180  SocketConnections::iterator i = m_connections.find( s );
181  if ( i == m_connections.end() ) return false;
182  SocketConnection* pSocketConnection = i->second;
183  return pSocketConnection->read( connector );
184 }
185 
187 {
188  SocketConnections::iterator i = m_connections.find( s );
189  SocketConnections::iterator j = m_pendingConnections.find( s );
190 
191  SocketConnection* pSocketConnection = 0;
192  if( i != m_connections.end() )
193  pSocketConnection = i->second;
194  if( j != m_pendingConnections.end() )
195  pSocketConnection = j->second;
196  if( !pSocketConnection )
197  return;
198 
199  setDisconnected( pSocketConnection->getSession()->getSessionID() );
200 
201  Session* pSession = pSocketConnection->getSession();
202  if ( pSession )
203  {
204  pSession->disconnect();
205  setDisconnected( pSession->getSessionID() );
206  }
207 
208  delete pSocketConnection;
209  m_connections.erase( s );
210  m_pendingConnections.erase( s );
211 }
212 
214 {
215  onTimeout( connector );
216 }
217 
219 {
220  time_t now;
221  ::time( &now );
222 
223  if ( (now - m_lastConnect) >= m_reconnectInterval )
224  {
225  connect();
226  m_lastConnect = now;
227  }
228 
229  SocketConnections::iterator i;
230  for ( i = m_connections.begin(); i != m_connections.end(); ++i )
231  i->second->onTimeout();
232 }
233 
235  std::string& address, short& port,
236  std::string& sourceAddress, short& sourcePort)
237 {
238  int num = 0;
239  SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
240  if ( i != m_sessionToHostNum.end() ) num = i->second;
241 
242  std::stringstream hostStream;
243  hostStream << SOCKET_CONNECT_HOST << num;
244  std::string hostString = hostStream.str();
245 
246  std::stringstream portStream;
247  portStream << SOCKET_CONNECT_PORT << num;
248  std::string portString = portStream.str();
249 
250  sourcePort = 0;
251  sourceAddress.empty();
252 
253  if( d.has(hostString) && d.has(portString) )
254  {
255  address = d.getString( hostString );
256  port = ( short ) d.getInt( portString );
257 
258  std::stringstream sourceHostStream;
259  sourceHostStream << SOCKET_CONNECT_SOURCE_HOST << num;
260  hostString = sourceHostStream.str();
261  if( d.has(hostString) )
262  sourceAddress = d.getString( hostString );
263 
264  std::stringstream sourcePortStream;
265  sourcePortStream << SOCKET_CONNECT_SOURCE_PORT << num;
266  portString = sourcePortStream.str();
267  if( d.has(portString) )
268  sourcePort = ( short ) d.getInt( portString );
269  }
270  else
271  {
272  num = 0;
273  address = d.getString( SOCKET_CONNECT_HOST );
274  port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
275 
277  sourceAddress = d.getString( SOCKET_CONNECT_SOURCE_HOST );
279  sourcePort = ( short ) d.getInt( SOCKET_CONNECT_SOURCE_PORT );
280  }
281 
282  m_sessionToHostNum[ s ] = ++num;
283 }
284 }
This interface must be implemented to define what your FIX application does.
Definition: Application.h:44
For storage and retrieval of key/value pairs.
Definition: Dictionary.h:37
int getInt(const std::string &) const
Get a value as a int.
Definition: Dictionary.cpp:45
bool getBool(const std::string &) const
Get a value as a bool.
Definition: Dictionary.cpp:71
bool has(const std::string &) const
Check if the dictionary contains a value for key.
Definition: Dictionary.cpp:149
std::string getString(const std::string &, bool capitalize=false) const
Get a value as a string.
Definition: Dictionary.cpp:32
Base for classes which act as an initiator for establishing connections.
Definition: Initiator.h:52
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Initiator.cpp:275
void setConnected(const SessionID &)
Definition: Initiator.cpp:154
void setPending(const SessionID &)
Definition: Initiator.cpp:145
void setDisconnected(const SessionID &)
Definition: Initiator.cpp:163
void start()
Start initiator.
Definition: Initiator.cpp:190
bool isStopped()
Definition: Initiator.h:83
This interface must be implemented to create a Log.
Definition: Log.h:43
This interface must be implemented to log messages and events.
Definition: Log.h:82
virtual void onEvent(const std::string &)=0
This interface must be implemented to create a MessageStore.
Definition: MessageStore.h:42
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:46
const SessionID & getSessionID() const
Definition: Session.h:75
bool isSessionTime(const UtcTimeStamp &time)
Definition: Session.h:108
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1496
void disconnect()
Definition: Session.cpp:613
Log * getLog()
Definition: Session.h:227
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:31
Container for setting dictionaries mapped to sessions.
Encapsulates a socket file descriptor (single-threaded).
Session * getSession() const
bool read(SocketConnector &s)
Connects sockets to remote ports and addresses.
int connect(const std::string &address, int port, bool noDelay, int sendBufSize, int rcvBufSize, const std::string &sourceAddress="", int sourcePort=0)
void block(Strategy &strategy, bool poll=0, double timeout=0.0)
SocketMonitor & getMonitor()
bool onData(SocketConnector &, int)
void onWrite(SocketConnector &, int)
void onConnect(SocketConnector &, int)
SocketConnections m_pendingConnections
bool onPoll(double timeout)
Implemented to connect and poll for events.
void onError(SocketConnector &)
void onDisconnect(SocketConnector &, int)
SocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
void doConnect(const SessionID &, const Dictionary &d)
Implemented to connect a session to its target.
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
SocketConnections m_connections
void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
void onStart()
Implemented to start connecting to targets.
SocketConnector m_connector
SessionToHostNum m_sessionToHostNum
void onTimeout(SocketConnector &)
void getHost(const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
void onStop()
Implemented to stop a running initiator.
Date and Time represented in UTC.
Definition: FieldTypes.h:583
Definition: Acceptor.cpp:35
const char SOCKET_CONNECT_PORT[]
const char SOCKET_SEND_BUFFER_SIZE[]
const char SOCKET_NODELAY[]
const char SOCKET_RECEIVE_BUFFER_SIZE[]
const char RECONNECT_INTERVAL[]
const char SOCKET_CONNECT_HOST[]
const char SOCKET_CONNECT_SOURCE_PORT[]
const char SOCKET_CONNECT_SOURCE_HOST[]
Application is not configured correctly
Definition: Exceptions.h:88
static std::string convert(signed_int value)
Application encountered serious error during runtime
Definition: Exceptions.h:95

Generated on Wed Nov 24 2021 09:55:53 for QuickFIX by doxygen 1.9.1 written by Dimitri van Heesch, © 1997-2001