ThreadedSocketAcceptor.cpp
Go to the documentation of this file.
1/****************************************************************************
2** Copyright (c) 2001-2014
3**
4** This file is part of the QuickFIX FIX Engine
5**
6** This file may be distributed under the terms of the quickfixengine.org
7** license as defined by quickfixengine.org and appearing in the file
8** LICENSE included in the packaging of this file.
9**
10** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12**
13** See http://www.quickfixengine.org/LICENSE for licensing information.
14**
15** Contact ask@quickfixengine.org if any conditions of this licensing are
16** not clear to you.
17**
18****************************************************************************/
19
20#ifdef _MSC_VER
21#include "stdafx.h"
22#else
23#include "config.h"
24#endif
25
27#include "Settings.h"
28#include "Utility.h"
29
30namespace FIX
31{
33 Application& application,
34 MessageStoreFactory& factory,
35 const SessionSettings& settings ) throw( ConfigError )
36: Acceptor( application, factory, settings )
37{ socket_init(); }
38
40 Application& application,
41 MessageStoreFactory& factory,
42 const SessionSettings& settings,
43 LogFactory& logFactory ) throw( ConfigError )
44: Acceptor( application, factory, settings, logFactory )
45{
46 socket_init();
47}
48
53
55throw ( ConfigError )
56{
57 std::set<SessionID> sessions = s.getSessions();
58 std::set<SessionID>::iterator i;
59 for( i = sessions.begin(); i != sessions.end(); ++i )
60 {
61 const Dictionary& settings = s.get( *i );
62 settings.getInt( SOCKET_ACCEPT_PORT );
63 if( settings.has(SOCKET_REUSE_ADDRESS) )
64 settings.getBool( SOCKET_REUSE_ADDRESS );
65 if( settings.has(SOCKET_NODELAY) )
66 settings.getBool( SOCKET_NODELAY );
67 }
68}
69
71throw ( RuntimeError )
72{
73 short port = 0;
74 std::set<int> ports;
75
76 std::set<SessionID> sessions = s.getSessions();
77 std::set<SessionID>::iterator i = sessions.begin();
78 for( ; i != sessions.end(); ++i )
79 {
80 const Dictionary& settings = s.get( *i );
81 port = (short)settings.getInt( SOCKET_ACCEPT_PORT );
82
83 m_portToSessions[port].insert( *i );
84
85 if( ports.find(port) != ports.end() )
86 continue;
87 ports.insert( port );
88
89 const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ?
90 settings.getBool( SOCKET_REUSE_ADDRESS ) : true;
91
92 const bool noDelay = settings.has( SOCKET_NODELAY ) ?
93 settings.getBool( SOCKET_NODELAY ) : false;
94
95 const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
96 settings.getInt( SOCKET_SEND_BUFFER_SIZE ) : 0;
97
98 const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
99 settings.getInt( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
100
101 int socket = socket_createAcceptor( port, reuseAddress );
102 if( socket < 0 )
103 {
105 socket_close( socket );
106 throw RuntimeError( "Unable to create, bind, or listen to port "
107 + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
108 }
109 if( noDelay )
110 socket_setsockopt( socket, TCP_NODELAY );
111 if( sendBufSize )
112 socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
113 if( rcvBufSize )
114 socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
115
116 m_socketToPort[socket] = port;
117 m_sockets.insert( socket );
118 }
119}
120
122{
123 Sockets::iterator i;
124 for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
125 {
126 Locker l( m_mutex );
127 int port = m_socketToPort[*i];
128 AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
129 thread_id thread;
130 thread_spawn( &socketAcceptorThread, info, thread );
131 addThread( *i, thread );
132 }
133}
134
135bool ThreadedSocketAcceptor::onPoll( double timeout )
136{
137 return false;
138}
139
141{
142 SocketToThread threads;
143 SocketToThread::iterator i;
144
145 {
146 Locker l(m_mutex);
147
148 time_t start = 0;
149 time_t now = 0;
150
151 ::time( &start );
152 while ( isLoggedOn() )
153 {
154 if( ::time(&now) -5 >= start )
155 break;
156 }
157
158 threads = m_threads;
159 m_threads.clear();
160 }
161
162 for ( i = threads.begin(); i != threads.end(); ++i )
163 socket_close( i->first );
164 for ( i = threads.begin(); i != threads.end(); ++i )
165 thread_join( i->second );
166}
167
169{
170 Locker l(m_mutex);
171
172 m_threads[ s ] = t;
173}
174
176{
177 Locker l(m_mutex);
178 SocketToThread::iterator i = m_threads.find( s );
179 if ( i != m_threads.end() )
180 {
181 thread_detach( i->second );
182 m_threads.erase( i );
183 }
184}
185
187{
188 AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
189
190 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
191 int s = info->m_socket;
192 int port = info->m_port;
193 delete info;
194
195 int noDelay = 0;
196 int sendBufSize = 0;
197 int rcvBufSize = 0;
198 socket_getsockopt( s, TCP_NODELAY, noDelay );
199 socket_getsockopt( s, SO_SNDBUF, sendBufSize );
200 socket_getsockopt( s, SO_RCVBUF, rcvBufSize );
201
202 int socket = 0;
203 while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
204 {
205 if( noDelay )
206 socket_setsockopt( socket, TCP_NODELAY );
207 if( sendBufSize )
208 socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
209 if( rcvBufSize )
210 socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
211
212 Sessions sessions = pAcceptor->m_portToSessions[port];
213
214 ThreadedSocketConnection * pConnection =
216 ( socket, sessions, pAcceptor->getLog() );
217
218 ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
219
220 {
221 Locker l( pAcceptor->m_mutex );
222
223 std::stringstream stream;
224 stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
225
226 if( pAcceptor->getLog() )
227 pAcceptor->getLog()->onEvent( stream.str() );
228
229 thread_id thread;
230 if ( !thread_spawn( &socketConnectionThread, info, thread ) )
231 {
232 delete info;
233 delete pConnection;
234 }
235 else
236 pAcceptor->addThread( socket, thread );
237 }
238 }
239
240 if( !pAcceptor->isStopped() )
241 pAcceptor->removeThread( s );
242
243 return 0;
244}
245
247{
248 ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
249
250 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
251 ThreadedSocketConnection* pConnection = info->m_pConnection;
252 delete info;
253
254 int socket = pConnection->getSocket();
255
256 while ( pConnection->read() ) {}
257 delete pConnection;
258 if( !pAcceptor->isStopped() )
259 pAcceptor->removeThread( socket );
260 return 0;
261}
262}
#define THREAD_PROC
Definition Utility.h:184
Base for classes which act as an acceptor for incoming connections.
Definition Acceptor.h:50
void start()
Start acceptor.
Definition Acceptor.cpp:158
bool isStopped()
Definition Acceptor.h:87
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition Acceptor.cpp:230
Log * getLog()
Definition Acceptor.h:59
This interface must be implemented to define what your FIX application does.
Definition Application.h:44
For storage and retrieval of key/value pairs.
Definition Dictionary.h:37
int getInt(const std::string &) const
Get a value as a int.
bool getBool(const std::string &) const
Get a value as a bool.
bool has(const std::string &) const
Check if the dictionary contains a value for key.
Locks/Unlocks a mutex using RAII.
Definition Mutex.h:96
This interface must be implemented to create a Log.
Definition Log.h:43
virtual void onEvent(const std::string &)=0
This interface must be implemented to create a MessageStore.
Container for setting dictionaries mapped to sessions.
Threaded Socket implementation of Acceptor.
bool onPoll(double timeout)
Implemented to connect and poll for events.
static THREAD_PROC socketConnectionThread(void *p)
static THREAD_PROC socketAcceptorThread(void *p)
ThreadedSocketAcceptor(Application &, MessageStoreFactory &, const SessionSettings &)
void onStop()
Implemented to stop a running acceptor.
void onStart()
Implemented to start listening for connections.
std::map< int, thread_id > SocketToThread
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
void onInitialize(const SessionSettings &)
Implemented to initialize acceptor.
Encapsulates a socket file descriptor (multi-threaded).
const char SOCKET_SEND_BUFFER_SIZE[]
int socket_accept(int s)
Definition Utility.cpp:164
const char SOCKET_ACCEPT_PORT[]
int socket_setsockopt(int s, int opt)
Definition Utility.cpp:208
void thread_detach(thread_id thread)
Definition Utility.cpp:447
const char SOCKET_REUSE_ADDRESS[]
pthread_t thread_id
Definition Utility.h:190
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition Utility.cpp:416
void socket_close(int s)
Definition Utility.cpp:180
void thread_join(thread_id thread)
Definition Utility.cpp:437
int socket_getsockopt(int s, int opt, int &optval)
Definition Utility.cpp:233
const char SOCKET_NODELAY[]
void socket_init()
Definition Utility.cpp:81
const char SOCKET_RECEIVE_BUFFER_SIZE[]
void socket_term()
Definition Utility.cpp:96
int socket_createAcceptor(int port, bool reuse)
Definition Utility.cpp:120
const char * socket_peername(int socket)
Definition Utility.cpp:353
Application is not configured correctly
Definition Exceptions.h:88
static std::string convert(signed_int value)
Application encountered serious error during runtime
Definition Exceptions.h:95
Socket Error.
Definition Exceptions.h:246

Generated on Sun Mar 31 2024 07:07:24 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001