The client-server model with multiple connections

If we are working with Python version 3.4+, there is a module called selectors, which provides an API for quickly building an object-oriented server based on the I/O primitives. The documentation and an example of this is available at https://docs.python.org/3.7/library/selectors.html.

In this example, we are going to implement a server that controls several connections using the selectors package.

You can find the following code in the tcp_server_selectors.py file:

#!/usr/bin/env python3

import selectors
import types
import socket

selector = selectors.DefaultSelector()

def accept_connection(sock):
connection, address = sock.accept()
print('Connection accepted in {}'.format(address))
# We put the socket in non-blocking mode
connection.setblocking(False)
data = types.SimpleNamespace(addr=address, inb=b'', outb=b'')
events = selectors.EVENT_READ | selectors.EVENT_WRITE
selector.register(connection, events, data=data)

In the previous code block, we defined the accept_connection() method for accepting connections from the clients, put the socket in non-blocking mode, and registered a selector for capturing read and write events. In the following code block, we are defining the service_connection() method for differentiating messages marked as event read selector and messages marked as event write selector:

def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(BUFFER_SIZE)
if recv_data:
data.outb += recv_data
else:
print('Closing connection in {}'.format(data.addr))
selector.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
print('Echo from {} to {}'.format(repr(data.outb), data.addr))
sent = sock.send(data.outb)
data.outb = data.outb[sent:]

In the following block of code, we can see our main program for establishing the host, port, and BUFFER_SIZE constants, and configuring our socket in non-blocking mode. We will also register the socket to be monitored by the selector functions:

if __name__ == '__main__':
host = 'localhost'
port = 12345
BUFFER_SIZE = 1024
# We create a TCP socket
socket_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# We configure the socket in non-blocking mode
socket_tcp.setblocking(False)
socket_tcp.bind((host, port))
socket_tcp.listen()
print('Openned socket for listening connections on {} {}'.format(host, port))
socket_tcp.setblocking(False)
# We register the socket to be monitored by the selector functions
selector.register(socket_tcp, selectors.EVENT_READ, data=None)
while socket_tcp:
events = selector.select(timeout=None)
for key, mask in events:
if key.data is None:
accept_connection(key.fileobj)
else:
service_connection(key, mask)
socket_tcp.close()
print('Connection finished.')

Let's explore our implementation a bit more:

Now, let's look at an implementation of a client. It is quite similar to the implementation of the server but instead of waiting for connections, the client starts to initiate connections with the start_connections() function.

You can find the following code in the tcp_client_selectors.py file:

#!/usr/bin/env python3

import socket
import selectors
import types

selector = selectors.DefaultSelector()
messages = ['This is the first message', 'This is the second message']
BUFFER_SIZE = 1024

def start_connections(host, port, num_conns):
server_address = (host, port)
for i in range(0, num_conns):
connid = i + 1
print('Starting connection {} towards {}'.format(connid, server_address))
socket_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# We connect using connect_ex () instead of connect
socket_tcp.connect_ex(server_address)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(connid=connid,
msg_total=sum(len(m) for m in messages), recv_total=0,
messages=list(messages),outb=b'')
selector.register(socket_tcp, events, data=data)
events = selector.select()
for key, mask in events:
service_connection(key, mask)

In the previous code block, we defined the start_connections() method to connect with the server and register a selector for capturing read and write events. In the following code block, we define the service_connection() method for differentiating messages marked as event read selector and event write selector:

def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(BUFFER_SIZE)
if recv_data:
print('Received {} from connection {}'.format(repr(recv_data), data.connid))
data.recv_total += len(recv_data)
if not recv_data or data.recv_total == data.msg_total:
print('Closing connection', data.connid)
selector.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0).encode()
if data.outb:
print('Sending {} to connection {}'.format(repr(data.outb), data.connid))
sent = sock.send(data.outb)
sock.shutdown(socket.SHUT_WR)
data.outb = data.outb[sent:]

if __name__ == '__main__':
host = 'localhost'
port = 12345
BUFFER_SIZE = 1024
start_connections(host, port, 2)

Now, we execute our new server and client implementation for multiple connections:

$ python tcp_server_selectors.py &
Openned socket for listening connections on localhost 12345

$ python tcp_server_selectors.py &
$ python tcp_client_selectors.py
Starting connection 1 towards ('localhost', 12345)
Starting connection 2 towards ('localhost', 12345)
Connection accepted in ('127.0.0.1', 7107)
Connection accepted in ('127.0.0.1', 7109)
Sending 'This is the first message' to connection 1
Sending 'This is the first message' to connection 2
Closing connection in ('127.0.0.1', 7107)
Closing connection in ('127.0.0.1', 7109)

As we can see, our clients communicate with our server and it echoes to verify that the messages were received.

In this section, we looked at non-blocking I/O with the socket and selectors modules to build an object-oriented server based on the I/O primitives.