Quite often when we have a running app, we'd be curious about what's going on in it right now. Often we'd want to see some stats of some sort. Maybe have a way to trigger a test or see if the system is healthy. Whatever it is., we can reference the previous multithreaded http server example.
The only difference is we'll run the request loop in it's own thread. This will let the app carry out whatever it needs to, and in between heavy cpu operations, trigger the http request. We are restricted by the GIL, so if the app has long CPU bound operations, this isn't ideal obviously.
Here's a very basic sample:
''' Add a threaded non blocking http response to an app '''
#!/usr/bin/env python3
import socketserver
import http.server
import sys
import time
import urllib
import threading
HTPORT = 8000
class Handler(http.server.BaseHTTPRequestHandler):
''' use our own handlers functions '''
def sendtextinfo(self, code, text):
self.send_response(code)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write((str(text)+"\n").encode())
def do_GET(self):
''' handle get '''
tnow = time.time()
gnow = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(tnow)) #Formatted UTC time
parsed_data = urllib.parse.urlparse(self.path)
if parsed_data.geturl().lower() == "/time":
message = gnow
else : message="Secondary thread for http server does not block main loop<br/>{}".format(gnow)
self.sendtextinfo(200,message)
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
''' Basic threaded server class '''
http.server.HTTPServer.request_queue_size = 128
def backgroundfunction():
HTSERVER = ThreadedHTTPServer(('', HTPORT), Handler)
while 1:
sys.stdout.flush()
HTSERVER.handle_request()
threading.Thread(target=backgroundfunction).start()
#substitute this loop with main loop of your app
while 1:
time.sleep(1)
print("Other stuff here!")
Now I've mucked about with the "get" method to simplify the message return a little for expansion, but the handler can really just be a single function. ThreadedHTTPServer is the heart of this being able to handle multiple requests in separate threads, and this runs in a thread before the main loop starts.
How would we use this in a more practical example?
Lets have the app read files in one folder and manipulate them, dumping the result in another. We'll have the service respond with stats.
Example app to "encrypt" text files with rot 13. Takes a file from c:\tmp\a, and outputs the result in c:\tmp\b. File moved to c:\tmp\processed.
''' Add a threaded non blocking http response to an app '''
#!/usr/bin/env python3
import socketserver
import http.server
import sys
import time
import urllib
import threading
import glob
import codecs
import os
import shutil
import json
HTPORT = 8000
INPUT_FOLDER="c:\\tmp\\a\\"
PROCESSED_FOLDER="c:\\tmp\\processed\\"
OUTPUT_FOLDER="c:\\tmp\\b\\"
timetoleave=False
stats={}
stats["Errors"]=[]
stats["RecentRequests"]=[]
def gnow():
tnow = time.time()
return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(tnow)) #Formatted UTC time
class Handler(http.server.BaseHTTPRequestHandler):
''' use our own handlers functions '''
def sendtextinfo(self, code, text):
self.send_response(code)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write((str(text)+"\n").encode())
def do_GET(self):
global timetoleave
global stats
''' handle get '''
parsed_data = urllib.parse.urlparse(self.path)
if parsed_data.geturl().lower() == "/quit":
message = "BYE!"
timetoleave=True
else :
stats["time"]=gnow()
tnow=time.time()
if len(stats["RecentRequests"])>0:
time_for_last_n=tnow-stats["RecentRequests"][0]["Time"]
if time_for_last_n>0:
rate=len(stats["RecentRequests"])/time_for_last_n
stats["Rate"]=rate
msgtext=json.dumps(stats,indent=" ")
message=msgtext
for chk in ["Chrome","Mozilla","Explorer","Safari"]:
if chk in str(self.headers):
message=msgtext.replace("\n","<br/>").replace(" "," ")
break
self.sendtextinfo(200,message)
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
http.server.HTTPServer.request_queue_size = 128
def backgroundfunction():
HTSERVER = ThreadedHTTPServer(('', HTPORT), Handler)
HTSERVER.socket.settimeout(10)
try:
while not timetoleave:
sys.stdout.flush()
HTSERVER.handle_request()
except KeyboardInterrupt:
pass
threading.Thread(target=backgroundfunction).start()
def processfiles(filelist):
for filename in filelist:
reqnow={}
reqnow["Time"]=time.time()
reqnow["Request"]=filename
stats["RecentRequests"].append(reqnow)
stats["RecentRequests"]=stats["RecentRequests"][-10:]
try:
with open(filename,"r") as fh:
inputtext=fh.read()
outputtext=codecs.encode(inputtext,'rot_13')
outfile=OUTPUT_FOLDER+os.path.basename(filename)
with open(outfile,"w") as fh:
fh.write(outputtext)
processedfile=PROCESSED_FOLDER+os.path.basename(filename)
shutil.move(filename,processedfile)
except Exception as err:
print("Unable to open {}, {}".format(filename,err))
errnow={}
errnow["Time"]=time.time()
errnow["Error"]=err
stats["Errors"].append(errnow)
stats["Errors"]=stats["Errors"][-10:]
#substitute this loop with main loop of your app
while not timetoleave:
allinputfiles=glob.glob(INPUT_FOLDER+"*")
processfiles(allinputfiles)
time.sleep(1)
Here the threaded server can give stats while this app is running. Moving a few files to the input folder and pulling up the stats in a browser gives the result:
{
"Errors": [],
"RecentRequests": [
{
"Time": 1634412207.3869205,
"Request": "c:\\tmp\\a\\a.txt"
},
{
"Time": 1634412207.3869205,
"Request": "c:\\tmp\\a\\q.txt"
},
{
"Time": 1634412207.3869205,
"Request": "c:\\tmp\\a\\test.txt"
},
{
"Time": 1634412207.3869205,
"Request": "c:\\tmp\\a\\test1.txt"
}
],
"time": "2021-10-16 19:23:29",
"Rate": 1.8133347398556916
}
No comments:
Post a Comment