Saturday, October 16, 2021

Add a multithreaded http stats service to an existing app

 Quite often when we have a running app, we'd be curious about what's going on in it right now. Often we'd want to see some stats of some sort. Maybe have a way to trigger a test or see if the system is healthy. Whatever it is., we can reference the previous multithreaded http server example.

The only difference is we'll run the request loop in it's own thread. This will let the app carry out whatever it needs to, and in between heavy cpu operations, trigger the http request. We are restricted by the GIL, so if the app has long CPU bound operations, this isn't ideal obviously.


Here's a very basic sample:

'''    Add a threaded non blocking http response to an app    '''
#!/usr/bin/env python3
import socketserver
import http.server
import sys
import time
import urllib
import threading
HTPORT = 8000

class Handler(http.server.BaseHTTPRequestHandler):
    '''   use our own handlers functions '''

    def sendtextinfo(selfcodetext):
        self.send_response(code)
        self.send_header('Content-type''text/html')
        self.end_headers()
        self.wfile.write((str(text)+"\n").encode())

    def do_GET(self):
        '''   handle get   '''
        tnow = time.time()
        gnow = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(tnow)) #Formatted UTC time
        parsed_data = urllib.parse.urlparse(self.path)
        if parsed_data.geturl().lower() == "/time":
            message = gnow
        else : message="Secondary thread for http server does not block main loop<br/>{}".format(gnow)
        self.sendtextinfo(200,message)

class ThreadedHTTPServer(socketserver.ThreadingMixInhttp.server.HTTPServer):
    '''    Basic threaded server class    '''
    http.server.HTTPServer.request_queue_size = 128


def backgroundfunction():
    HTSERVER = ThreadedHTTPServer((''HTPORT), Handler)
    while 1:
        sys.stdout.flush()
        HTSERVER.handle_request()

threading.Thread(target=backgroundfunction).start()

#substitute this loop with main loop of your app
while 1:
    time.sleep(1)
    print("Other stuff here!")


Now I've mucked about with the "get" method to simplify the message return a little for expansion, but the handler can really just be a single function. ThreadedHTTPServer is the heart of this being able to handle multiple requests in separate threads, and this runs in a thread before the main loop starts. 

How would we use this in a more practical example?

Lets have the app read files in one folder and manipulate them, dumping the result in another. We'll have the service respond with stats.



Example app to "encrypt" text files with rot 13. Takes a file from c:\tmp\a, and outputs the result in c:\tmp\b. File moved to c:\tmp\processed. 

'''    Add a threaded non blocking http response to an app    '''
#!/usr/bin/env python3
import socketserver
import http.server
import sys
import time
import urllib
import threading
import glob
import codecs
import os
import shutil
import json
HTPORT = 8000
INPUT_FOLDER="c:\\tmp\\a\\"
PROCESSED_FOLDER="c:\\tmp\\processed\\"
OUTPUT_FOLDER="c:\\tmp\\b\\"
timetoleave=False
stats={}
stats["Errors"]=[]
stats["RecentRequests"]=[]

def gnow():
    tnow = time.time()
    return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(tnow)) #Formatted UTC time


class Handler(http.server.BaseHTTPRequestHandler):
    '''   use our own handlers functions '''

    def sendtextinfo(selfcodetext):
        self.send_response(code)
        self.send_header('Content-type''text/html')
        self.end_headers()
        self.wfile.write((str(text)+"\n").encode())

    def do_GET(self):
        global timetoleave
        global stats
        '''   handle get   '''
        parsed_data = urllib.parse.urlparse(self.path)
        if parsed_data.geturl().lower() == "/quit":
            message = "BYE!"
            timetoleave=True
        else :
            stats["time"]=gnow()
            tnow=time.time()
            if len(stats["RecentRequests"])>0:
                time_for_last_n=tnow-stats["RecentRequests"][0]["Time"]
                if time_for_last_n>0:
                    rate=len(stats["RecentRequests"])/time_for_last_n
                    stats["Rate"]=rate
            msgtext=json.dumps(stats,indent="  ")
            message=msgtext
            for chk in ["Chrome","Mozilla","Explorer","Safari"]:
                if chk in str(self.headers):
                    message=msgtext.replace("\n","<br/>").replace(" ","&nbsp;")
                    break
        self.sendtextinfo(200,message)

class ThreadedHTTPServer(socketserver.ThreadingMixInhttp.server.HTTPServer):
    http.server.HTTPServer.request_queue_size = 128

def backgroundfunction():
    HTSERVER = ThreadedHTTPServer((''HTPORT), Handler)
    HTSERVER.socket.settimeout(10)
    try:
        while not timetoleave:
            sys.stdout.flush()
            HTSERVER.handle_request()
    except KeyboardInterrupt:
        pass

threading.Thread(target=backgroundfunction).start()

def processfiles(filelist):
    for filename in filelist:
        reqnow={}
        reqnow["Time"]=time.time()
        reqnow["Request"]=filename
        stats["RecentRequests"].append(reqnow)
        stats["RecentRequests"]=stats["RecentRequests"][-10:]
        try:
            with open(filename,"r"as fh:
                inputtext=fh.read()
                outputtext=codecs.encode(inputtext,'rot_13')
                outfile=OUTPUT_FOLDER+os.path.basename(filename)
                with open(outfile,"w"as fh:
                    fh.write(outputtext)
            processedfile=PROCESSED_FOLDER+os.path.basename(filename)
            shutil.move(filename,processedfile)
        except Exception as err:
            print("Unable to open {}{}".format(filename,err))
            errnow={}
            errnow["Time"]=time.time()
            errnow["Error"]=err
            stats["Errors"].append(errnow)
            stats["Errors"]=stats["Errors"][-10:]

#substitute this loop with main loop of your app
while not timetoleave:
    allinputfiles=glob.glob(INPUT_FOLDER+"*")
    processfiles(allinputfiles)
    time.sleep(1)

Here the threaded server can give stats while this app is running. Moving a few files to the input folder and pulling up the stats in a browser gives the result:

{
  "Errors": [],
  "RecentRequests": [
    {
      "Time"1634412207.3869205,
      "Request""c:\\tmp\\a\\a.txt"
    },
    {
      "Time"1634412207.3869205,
      "Request""c:\\tmp\\a\\q.txt"
    },
    {
      "Time"1634412207.3869205,
      "Request""c:\\tmp\\a\\test.txt"
    },
    {
      "Time"1634412207.3869205,
      "Request""c:\\tmp\\a\\test1.txt"
    }
  ],
  "time""2021-10-16 19:23:29",
  "Rate"1.8133347398556916
}