ThreadedSocketConnection.cpp
Go to the documentation of this file.
1/****************************************************************************
2** Copyright (c) 2001-2014
3**
4** This file is part of the QuickFIX FIX Engine
5**
6** This file may be distributed under the terms of the quickfixengine.org
7** license as defined by quickfixengine.org and appearing in the file
8** LICENSE included in the packaging of this file.
9**
10** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12**
13** See http://www.quickfixengine.org/LICENSE for licensing information.
14**
15** Contact ask@quickfixengine.org if any conditions of this licensing are
16** not clear to you.
17**
18****************************************************************************/
19
20#ifdef _MSC_VER
21#include "stdafx.h"
22#else
23#include "config.h"
24#endif
25
29#include "Session.h"
30#include "Utility.h"
31
32namespace FIX
33{
35( int s, Sessions sessions, Log* pLog )
36: m_socket( s ), m_pLog( pLog ),
37 m_sessions( sessions ), m_pSession( 0 ),
38 m_disconnect( false )
39{
40 FD_ZERO( &m_fds );
41 FD_SET( m_socket, &m_fds );
42}
43
45( const SessionID& sessionID, int s,
46 const std::string& address, short port,
47 Log* pLog,
48 const std::string& sourceAddress, short sourcePort )
49 : m_socket( s ), m_address( address ), m_port( port ),
50 m_sourceAddress( sourceAddress ), m_sourcePort( sourcePort ),
51 m_pLog( pLog ),
52 m_pSession( Session::lookupSession( sessionID ) ),
53 m_disconnect( false )
54{
55 FD_ZERO( &m_fds );
56 FD_SET( m_socket, &m_fds );
57 if ( m_pSession ) m_pSession->setResponder( this );
58}
59
68
69bool ThreadedSocketConnection::send( const std::string& msg )
70{
71 int totalSent = 0;
72 while(totalSent < (int)msg.length())
73 {
74 ssize_t sent = socket_send( m_socket, msg.c_str() + totalSent, msg.length() );
75 if(sent < 0) return false;
76 totalSent += sent;
77 }
78
79 return true;
80}
81
83{
84 // do the bind in the thread as name resolution may block
85 if ( !m_sourceAddress.empty() || m_sourcePort )
87
88 return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0;
89}
90
96
98{
99 struct timeval timeout = { 1, 0 };
100 fd_set readset = m_fds;
101
102 try
103 {
104 // Wait for input (1 second timeout)
105 int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
106
107 if( result > 0 ) // Something to read
108 {
109 // We can read without blocking
110 ssize_t size = socket_recv( m_socket, m_buffer, sizeof(m_buffer) );
111 if ( size <= 0 ) { throw SocketRecvFailed( size ); }
113 }
114 else if( result == 0 && m_pSession ) // Timeout
115 {
116 m_pSession->next();
117 }
118 else if( result < 0 ) // Error
119 {
120 throw SocketRecvFailed( result );
121 }
122
124 return true;
125 }
126 catch ( SocketRecvFailed& e )
127 {
128 if( m_disconnect )
129 return false;
130
131 if( m_pSession )
132 {
133 m_pSession->getLog()->onEvent( e.what() );
135 }
136 else
137 {
138 disconnect();
139 }
140
141 return false;
142 }
143}
144
146throw( SocketRecvFailed )
147{
148 try
149 {
150 return m_parser.readFixMessage( msg );
151 }
152 catch ( MessageParseError& ) {}
153 return true;
154}
155
157{
158 std::string msg;
159 while( readMessage(msg) )
160 {
161 if ( !m_pSession )
162 {
163 if ( !setSession( msg ) )
164 { disconnect(); continue; }
165 }
166 try
167 {
168 m_pSession->next( msg, UtcTimeStamp() );
169 }
170 catch( InvalidMessage& )
171 {
172 if( !m_pSession->isLoggedOn() )
173 {
174 disconnect();
175 return;
176 }
177 }
178 }
179}
180
181bool ThreadedSocketConnection::setSession( const std::string& msg )
182{
183 m_pSession = Session::lookupSession( msg, true );
184 if ( !m_pSession )
185 {
186 if( m_pLog )
187 {
188 m_pLog->onEvent( "Session not found for incoming message: " + msg );
189 m_pLog->onIncoming( msg );
190 }
191 return false;
192 }
193
194 SessionID sessionID = m_pSession->getSessionID();
195 m_pSession = 0;
196
197 // see if the session frees up within 5 seconds
198 for( int i = 1; i <= 5; i++ )
199 {
200 if( !Session::isSessionRegistered( sessionID ) )
202 if( m_pSession ) break;
203 process_sleep( 1 );
204 }
205
206 if ( !m_pSession )
207 return false;
208 if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
209 return false;
210
211 m_pSession->setResponder( this );
212 return true;
213}
214
215} // namespace FIX
This interface must be implemented to log messages and events.
Definition Log.h:82
virtual void onIncoming(const std::string &)=0
virtual void onEvent(const std::string &)=0
void addToStream(const char *str, size_t len)
Definition Parser.h:48
Maintains the state and implements the logic of a FIX session.
Definition Session.h:46
static Session * registerSession(const SessionID &)
Definition Session.cpp:1537
static void unregisterSession(const SessionID &)
Definition Session.cpp:1547
void setResponder(Responder *pR)
Definition Session.h:210
static Session * lookupSession(const SessionID &)
Definition Session.cpp:1496
const SessionID & getSessionID() const
Definition Session.h:75
Log * getLog()
Definition Session.h:227
void next()
Definition Session.cpp:125
void disconnect()
Definition Session.cpp:613
static bool isSessionRegistered(const SessionID &)
Definition Session.cpp:1531
bool isLoggedOn()
Definition Session.h:65
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition SessionID.h:31
bool setSession(const std::string &msg)
ThreadedSocketConnection(int s, Sessions sessions, Log *pLog)
Date and Time represented in UTC.
Definition FieldTypes.h:583
ssize_t socket_recv(int s, char *buf, size_t length)
Definition Utility.cpp:170
void socket_close(int s)
Definition Utility.cpp:180
void process_sleep(double s)
Definition Utility.cpp:466
int socket_connect(int socket, const char *address, int port)
Definition Utility.cpp:148
int socket_bind(int socket, const char *hostname, int port)
Definition Utility.cpp:103
ssize_t socket_send(int s, const char *msg, size_t length)
Definition Utility.cpp:175
Not a recognizable message.
Definition Exceptions.h:81
Unable to parse message.
Definition Exceptions.h:74
Socket recv operation failed.
Definition Exceptions.h:279

Generated on Sun Mar 31 2024 07:07:24 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001