#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Sun Jan 26 17:50:25 2020 @author: suwako """ import sys, multiprocessing, socket, time if sys.platform == 'win32': import multiprocessing.reduction from colorama import Fore, Style import numpy as np import json import gzip def debug(*s): print("[D ] : " + str(', '.join(map(str, s))) + Style.RESET_ALL) def info(*s): print(Fore.GREEN + Fore.LIGHTGREEN_EX + "[I -] : " + Style.RESET_ALL + str(', '.join(map(str, s))) + Style.RESET_ALL) def warn(*s): print(Fore.MAGENTA + "[W ~] : " + Style.RESET_ALL + str(', '.join(map(str, s))) + Style.RESET_ALL) def error(*s): print(Fore.RED + Fore.LIGHTRED_EX + "[E !] : " + Style.RESET_ALL + str(', '.join(map(str, s))) + Style.RESET_ALL) level = 0 BUFFER_SIZE = 4096 CONTINUE = b'\x00' END = b'\xFF' for i in range(level): exec("debug info warn error".split()[i] + "= lambda *x: None") def lookahead(iterable): it = iter(iterable) last = next(it) for val in it: yield last, CONTINUE last = val yield last, END def chunked(size, source): for i in range(0, len(source), size): yield source[i:i+size] def handle(conn, address, queues): try: info("Inbound Connection at " + ':'.join(map(str,address))) data = bytes() while True: in_data = conn.recv(BUFFER_SIZE) if in_data == b"": info("Socket closed remotely at " + ':'.join(map(str,address))) break if in_data[0] == CONTINUE[0]: data += in_data[1:] elif in_data[0] == END[0]: decompressed = gzip.decompress(data+in_data[1:]).decode('utf8') queues['in'].put(json.loads(decompressed)) else : warn("Invalid chunk received !!") send(conn, {'error':True, 'content':"INV_CHUNK"}) break finally: info("Closing socket at " + ':'.join(map(str,address))) conn.close() def send(conn, data_obj): data = gzip.compress(bytes(json.dumps(data_obj), encoding='utf8')) for chunk, info in lookahead(chunked(BUFFER_SIZE - 1, data)): conn.sendall(info + chunk.ljust(BUFFER_SIZE - 1, b'\x00')) class Server(object): def __init__(self, hostname, port): self.hostname = hostname self.port = port self.run = False self.connections = [] def start(self): self.run = True info(f"Listening on {self.hostname}:{self.port}.") self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.socket.bind((self.hostname, self.port)) self.socket.listen(1) while self.run: conn, address = self.socket.accept() queues = {'in':multiprocessing.Queue(), 'out':multiprocessing.Queue()} p = multiprocessing.Process(target=handle, args=(conn, address, queues)) p.start() self.connections.append({'conn':conn, 'address':address, 'queues':queues, 'process':p}) finally: for process in multiprocessing.active_children(): info("Shutting down process {process}") process.terminate() process.join() self.socket.close() def stop(self): info(f"Shutting down the listenner.") self.run = False if __name__ == "__main__": server = Server("0.0.0.0", 9001) server.start() if False : def handle(connection, address): try: info("Connected at "+ ':'.join(map(str,address))) while True: data = connection.recv(1024) if data == b"": info("Socket closed remotely at " + ':'.join(map(str,address))) break debug("Received data %r" % data) connection.sendall(data) debug("Sent data") except: warn("Problem handling request") connection.close() raise finally: debug("Closing socket with " + ':'.join(map(str,address))) connection.close() class Server(object): def __init__(self, hostname, port): self.hostname = hostname self.port = port self.connections = [] def start(self): debug("listening") self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.bind((self.hostname, self.port)) self.socket.listen(1) while True: conn, address = self.socket.accept() process = multiprocessing.Process(target=handle, args=(conn, address)) process.daemon = True process.start() if __name__ == "__main__": server = Server("0.0.0.0", 9000) try: info("Listening") server.start() except: error("Unexpected exception") for process in multiprocessing.active_children(): info("Shutting down process %r", process) process.terminate() process.join() raise finally: info("Shutting down") for process in multiprocessing.active_children(): info("Shutting down process %r", process) process.terminate() process.join() info("All done")