Source

python-socket-examples / socket_memory.py

# Using a socket to save data
import socket
import threading
import hashlib
from random import random

# by default can transfer 373248 x 1 bytes
# setsockopt can push this to 520000
num_packets = 250
bytes_per_packet = 2048
host = 'localhost'
random_port = 50000 + int(10000 * random())

# Our "consumer_server" thread will signal when it has successfully
# created a socket, then wait for the producer to finish before starting
# to consume (so we KNOW that the entire data was in the socket at once)
e = threading.Event()

def grab_sock():
    af, socktype, proto, _, sa = socket.getaddrinfo(host,
                                                random_port,
                                                socket.AF_UNSPEC,
                                                socket.SOCK_STREAM)[-1]
    try:
        s = socket.socket(af, socktype, proto)
    except socket.error:
        s.close()
        raise SystemExit('Error: Could not open socket :-/')
    # Tell the OS it can reuse a socket if it wants
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # Tell the OS to use a very large buffer
    s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 600000)
    return s, sa

def stream_data(num_bytes_required):
    e.wait()
    e.clear()
    s, sa = grab_sock()
    s.connect(sa)
    m = hashlib.sha256()
    with open('/dev/urandom','rb') as random_source:
        for _ in range(num_bytes_required):
            outgoing_data = random_source.read(bytes_per_packet)
            m.update(outgoing_data)
            s.send(outgoing_data)

    s.close()
    print('"Saved" {} blocks of {} random bytes'.format(num_packets, bytes_per_packet))
    print("SHA256 of sent: {}".format(m.hexdigest()))
    e.set()

# Server setup
def worker(conn, addr):
    print('waiting for producer to finish filling the socket')
    e.wait()
    print('okay lets have your data')
    received_bytes = []
    for _ in range(num_packets):
        received_bytes.append(conn.recv(bytes_per_packet))
    print("Received {} x {} bytes of data".format(len(received_bytes), len(received_bytes[0])))
    digest = hashlib.sha256()
    [digest.update(received_block) for received_block in received_bytes]

    print("SHA256 of recv: {}".format(digest.hexdigest()))


def consumer_server():
    s, sa = grab_sock()
    try:
        s.bind(sa)
        # Queue up to 1 connect requests
        s.listen(1)
    except OSError:
        s.close()
        raise SystemExit('Socket binding/listening failed...')
    e.set()
    try:
        conn, addr = s.accept()
    except KeyboardInterrupt:
        raise SystemExit()
    print('Connected by', addr)
    threading.Thread(target=worker, args=(conn, addr)).start()

threading.Thread(target=consumer_server).start()
stream_data(num_packets)