def simple_server_demo():
# Create and start a TCP server
= TCPServer(port=8000)
server
server.start()
try:
# Keep the server running for 60 seconds
print("Server running. Press Ctrl+C to stop...")
60)
time.sleep(except KeyboardInterrupt:
print("Keyboard interrupt received, stopping server...")
finally:
# Stop the server
server.stop()
TCP Server Implementation
Building a TCP server with detailed explanations of each step
Introduction
In this notebook, we’ll implement a TCP server that: 1. Creates a socket 2. Binds to an address 3. Listens for incoming connections 4. Accepts connections and handles them 5. Sends and receives data 6. Closes connections gracefully
Let’s import the necessary modules:
Simple TCP Server
We’ll start with a basic TCP server implementation that will handle connections one at a time:
TCPServer
TCPServer (host:str='127.0.0.1', port:int=0, backlog:int=5, buffer_size:int=1024)
A simple TCP server that can handle multiple clients.
Enhanced TCP Server with Custom Message Handling
Now let’s create a more flexible server that allows custom message handling:
EnhancedTCPServer
EnhancedTCPServer (host:str='127.0.0.1', port:int=0, backlog:int=5, buffer_size:int=1024)
An enhanced TCP server that supports custom message handlers.
TCP Server with Connection Events
Finally, let’s create a version that provides event callbacks for connection events:
EventDrivenTCPServer
EventDrivenTCPServer (host:str='127.0.0.1', port:int=0, backlog:int=5, buffer_size:int=1024)
A TCP server that triggers events for connection lifecycle.
Example Usage
Here’s a simple example of how to use our TCP server:
# Uncomment to run the demo
# simple_server_demo()
def event_driven_server_demo():
# Create and start an event-driven TCP server
= EventDrivenTCPServer(port=8000)
server
# Set up event handlers
= lambda conn_id, addr: print(f"EVENT: Client connected: {addr[0]}:{addr[1]}")
server.on_connect = lambda conn_id: print(f"EVENT: Client disconnected: {conn_id}")
server.on_disconnect = lambda conn_id, data: print(f"EVENT: Received data from {conn_id}: {data.decode('utf-8')}")
server.on_data
# Set up a custom message handler
def message_handler(conn_id, data):
= data.decode('utf-8')
message # Echo the message back with a prefix
return f"Server received: {message}".encode('utf-8')
server.set_message_handler(message_handler)
# Start the server
server.start()
try:
# Keep the server running for 60 seconds
print("Event-driven server running. Press Ctrl+C to stop...")
60)
time.sleep(except KeyboardInterrupt:
print("Keyboard interrupt received, stopping server...")
finally:
# Stop the server
server.stop()
# Uncomment to run the demo
# event_driven_server_demo()