Jan-09-2020, 07:22 PM
Hello everyone,
This script below is sort of a peer to peer local network communication system. I tried studying asyncio which i believed to be suitable for my purposes, however, it goes over my head right now. I used threads instead. The problem is my CPU usage really spikes up whenever the script runs like constantly 40% to 50%. I tried adding time.sleeps() where ever i felt the CPU might get clogged but to no avail. The script is as below,
Thanks
This script below is sort of a peer to peer local network communication system. I tried studying asyncio which i believed to be suitable for my purposes, however, it goes over my head right now. I used threads instead. The problem is my CPU usage really spikes up whenever the script runs like constantly 40% to 50%. I tried adding time.sleeps() where ever i felt the CPU might get clogged but to no avail. The script is as below,
import queue import struct import threading import sys import socket import time from PyQt5 import QtCore, QtWidgets import mysql.connector class NonBlockingInputTemplate: def __init__(self): self.lock = threading.Lock() self.__alive = False self.input_queue = queue.Queue() self.user_input = '' self.__input_cache = [] def __str__(self): if self.__alive: return "%s %r" % (self.__class__, 'Status : Running') else: return "%s %r" % (self.__class__, 'Status : Stopped') def start(self): """Start Input Thread""" print('Starting Non Blocking I/O') self.__alive = True self.start_threads() def stop(self): """Stop Input Thread""" if self.__alive: print('Stopping Non Blocking I/O') self.__alive = False def start_threads(self): non_blocking_input_thread = threading.Thread(target=self.non_blocking_input, args=(), name='InputThread') read_input_thread = threading.Thread(target=self.read_input, args=(), name='ReadThread') self_defined_thread = threading.Thread(target=self.self_defined, args=(), name='SelfThread') non_blocking_input_thread.start() read_input_thread.start() self_defined_thread.start() def non_blocking_input(self): print('>', end='') while True: if not self.__alive: break self.input_queue.put(sys.stdin.read(1)) def read_input(self): while True: if not self.__alive: break if not self.input_queue.empty(): self.lock.acquire() while not self.input_queue.empty(): self.user_input = self.user_input + self.input_queue.get() self.user_input = self.user_input[:-1] if self.user_input: self.__input_cache.append(self.user_input.replace('\n', '')) self.lock.release() self.user_input = '' print('>', end='') def self_defined(self): pass def get(self): """Retrieve the first item from the input queue""" if self.__input_cache: return self.__input_cache.pop(0) def cached(self): """Return the input queue""" return self.__input_cache def flush(self): """Flush the input queue""" self.__input_cache = [] class NonBlockingInput(NonBlockingInputTemplate): def __init__(self): super().__init__() # Additional functionality implemented here as per requirement, left out for now class CollabTemplate: def __init__(self): self.receiver = socket.socket() self.connector = socket.socket() self.broadcast_s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.broadcast_r = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.multi_cast = '224.7.7.7' self.port = 25000 self.m_port = 26000 self.header_size = 6 self.peers = [] self.peers_details = {} self.peers_outgoing = {} self.lock = threading.Lock() self.data_cache = [] def initialize(self): print('Collab Initialized...') accept_connections_thread = threading.Thread(target=self.accept_connections, args=(), name='AcceptConnectionsThread') broadcast_self_thread = threading.Thread(target=self.broadcast_self, args=(), name='BroadcastSelfThread') search_peers_thread = threading.Thread(target=self.search_peers, args=(), name='SearchPeersThread') ping_routine_thread = threading.Thread(target=self.ping_routine, args=(), name='PingRoutineThread') pull_data_thread = threading.Thread(target=self.pull_data, args=(), name='PullDataThread') accept_connections_thread.start() broadcast_self_thread.start() search_peers_thread.start() ping_routine_thread.start() pull_data_thread.start() def accept_connections(self): self.receiver.bind((socket.gethostbyname(socket.gethostname()), self.port)) self.lock.acquire() self.peers.append('self') self.peers_details['self'] = tuple((socket.gethostbyname(socket.gethostname()), '')) self.lock.release() self.receiver.listen() while True: conn, address = self.receiver.accept() if address[0] not in [value[0] for key, value in self.peers_details.items()]: self.lock.acquire() self.peers.append(conn) self.peers_details[conn] = address self.lock.release() try: connector = socket.socket() connector.connect((address[0], self.port)) # Experimental code starts here threading.Thread(target=self.peer_thread, args=(connector,), name='PeerThread-' + str(address[0])).start() # Experimental code ends here except ConnectionError as e: print('Error while completing handshake') print('Connection received from {}'.format(self.peers_details[conn])) def ping_routine(self): while True: time.sleep(1) if self.peers: for conn in self.peers: try: if conn != 'self': conn.sendall(bytes([0])) except ConnectionResetError as e: conn.shutdown(socket.SHUT_RDWR) conn.close() print(str(self.peers_details[conn][0]) + ' has disconnected') self.lock.acquire() self.peers.remove(conn) self.peers_details.pop(conn) self.lock.release() def peer_thread(self, conn): """Communication Protocol Header : 1-byte message identification appended with 5 digit message size /x0200005 : 5 char message /x00 --> Ping ; No response required as if unable to send byte connection will be closed /x01 --> Peer Discovery ; /x01 --> Response to peer discovery /x02 --> Communication ; Appended with the size of the message : Use int(base36encode, 36) Content : recv() will be called until the data received does not match the size of the data""" while True: try: peer_data = conn.recv(1) if peer_data: if peer_data[0] == 0: pass elif peer_data[0] == 1: pass elif peer_data[0] == 2: while len(peer_data) < self.header_size: peer_data = peer_data + conn.recv(self.header_size - len(peer_data)) f_size = int(peer_data[1:self.header_size].decode(), 36) stream_data = bytearray() while len(peer_data) + len(stream_data) - self.header_size < f_size: d_size = f_size - len(peer_data) + self.header_size + len(stream_data) if d_size > 16777216: stream_data.extend(conn.recv(16777216)) elif d_size > 4194304: stream_data.extend(conn.recv(4194304)) elif d_size > 1048576: stream_data.extend(conn.recv(1048576)) elif d_size > 262144: stream_data.extend(conn.recv(262144)) elif d_size > 65536: stream_data.extend(conn.recv(65536)) elif d_size > 16384: stream_data.extend(conn.recv(16384)) elif d_size > 4096: stream_data.extend(conn.recv(4096)) else: stream_data.extend(conn.recv(d_size)) peer_data = peer_data + stream_data if len(peer_data) - self.header_size > int(peer_data[1:self.header_size].decode(), 36): peer_data = peer_data[0:f_size + self.header_size] self.parser(conn.getpeername()[0], peer_data.decode()[self.header_size:]) except ConnectionResetError as e: break def base36encode(self, number): alphabet, base36 = ['0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ', ''] number = abs(number) while number: number, i = divmod(number, 36) base36 = alphabet[i] + base36 if not base36: base36 = 0 return base36 def broadcast_self(self): self.broadcast_s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, bytes([1])) self.broadcast_s.bind((socket.gethostbyname(socket.gethostname()), self.m_port)) while True: time.sleep(5) self.broadcast_s.sendto(bytes([1]), (self.multi_cast, self.m_port)) time.sleep(60) def search_peers(self): self.broadcast_r.bind(('', self.m_port)) group = socket.inet_aton(self.multi_cast) mreq = struct.pack('4sL', group, socket.INADDR_ANY) self.broadcast_r.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) while True: data, address = self.broadcast_r.recvfrom(1) if data == bytes([1]) and address[0] not in [value[0] for key, value in self.peers_details.items()]: print('Sending connection to', end='') print((address[0], str(self.port))) connector = socket.socket() connector.connect((address[0], self.port)) threading.Thread(target=self.peer_thread, args=(connector,), name='PeerThread-' + str(address[0])).start() def push(self, data, data_type=None): '''data_type (Not Implemented Yet/ Will Implement If Required) : b --> Broadcast Message p --> Private Message''' if len(str(data)) > 0: data_size = str(self.base36encode(len(str(data)))) if len(data_size) <= self.header_size: data_size = data_size.rjust(self.header_size - 1, '0') self.data_cache.append(bytes([2]) + data_size.encode() + str(data).encode()) else: raise Exception(BufferError) def pull_data(self): while True: time.sleep(0.1) if self.data_cache: self.msg_broadcast(self.data_cache.pop(0)) def msg_broadcast(self, data): for conn in self.peers: if conn != 'self': conn.sendall(data) def parser(self, peer_details, peer_data): # Only prints the string received in the buffer # Implement additional functionality in child class print(str(peer_details) + ' : ' + peer_data) class Collab(CollabTemplate): def __init__(self): super().__init__() # Additional functionality implemented here but left our for now def input_thread(): while True: time.sleep(0.1) data = user_input.get() if data: collab.push(data) if __name__ == '__main__': user_input = NonBlockingInput() user_input.start() collab = Collab() collab.initialize() threading.Thread(target=input_thread, args=()).start()The code is not really all that good but I'm learning. Apart from the original issue any additional advice or issues in the current script are also appreciated.
Thanks