Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

Saturday, October 16, 2021

Add a multithreaded http stats service to an existing app

 Quite often when we have a running app, we'd be curious about what's going on in it right now. Often we'd want to see some stats of some sort. Maybe have a way to trigger a test or see if the system is healthy. Whatever it is., we can reference the previous multithreaded http server example.

The only difference is we'll run the request loop in it's own thread. This will let the app carry out whatever it needs to, and in between heavy cpu operations, trigger the http request. We are restricted by the GIL, so if the app has long CPU bound operations, this isn't ideal obviously.


Here's a very basic sample:

'''    Add a threaded non blocking http response to an app    '''
#!/usr/bin/env python3
import socketserver
import http.server
import sys
import time
import urllib
import threading
HTPORT = 8000

class Handler(http.server.BaseHTTPRequestHandler):
    '''   use our own handlers functions '''

    def sendtextinfo(selfcodetext):
        self.send_response(code)
        self.send_header('Content-type''text/html')
        self.end_headers()
        self.wfile.write((str(text)+"\n").encode())

    def do_GET(self):
        '''   handle get   '''
        tnow = time.time()
        gnow = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(tnow)) #Formatted UTC time
        parsed_data = urllib.parse.urlparse(self.path)
        if parsed_data.geturl().lower() == "/time":
            message = gnow
        else : message="Secondary thread for http server does not block main loop<br/>{}".format(gnow)
        self.sendtextinfo(200,message)

class ThreadedHTTPServer(socketserver.ThreadingMixInhttp.server.HTTPServer):
    '''    Basic threaded server class    '''
    http.server.HTTPServer.request_queue_size = 128


def backgroundfunction():
    HTSERVER = ThreadedHTTPServer((''HTPORT), Handler)
    while 1:
        sys.stdout.flush()
        HTSERVER.handle_request()

threading.Thread(target=backgroundfunction).start()

#substitute this loop with main loop of your app
while 1:
    time.sleep(1)
    print("Other stuff here!")


Now I've mucked about with the "get" method to simplify the message return a little for expansion, but the handler can really just be a single function. ThreadedHTTPServer is the heart of this being able to handle multiple requests in separate threads, and this runs in a thread before the main loop starts. 

How would we use this in a more practical example?

Lets have the app read files in one folder and manipulate them, dumping the result in another. We'll have the service respond with stats.



Example app to "encrypt" text files with rot 13. Takes a file from c:\tmp\a, and outputs the result in c:\tmp\b. File moved to c:\tmp\processed. 

'''    Add a threaded non blocking http response to an app    '''
#!/usr/bin/env python3
import socketserver
import http.server
import sys
import time
import urllib
import threading
import glob
import codecs
import os
import shutil
import json
HTPORT = 8000
INPUT_FOLDER="c:\\tmp\\a\\"
PROCESSED_FOLDER="c:\\tmp\\processed\\"
OUTPUT_FOLDER="c:\\tmp\\b\\"
timetoleave=False
stats={}
stats["Errors"]=[]
stats["RecentRequests"]=[]

def gnow():
    tnow = time.time()
    return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(tnow)) #Formatted UTC time


class Handler(http.server.BaseHTTPRequestHandler):
    '''   use our own handlers functions '''

    def sendtextinfo(selfcodetext):
        self.send_response(code)
        self.send_header('Content-type''text/html')
        self.end_headers()
        self.wfile.write((str(text)+"\n").encode())

    def do_GET(self):
        global timetoleave
        global stats
        '''   handle get   '''
        parsed_data = urllib.parse.urlparse(self.path)
        if parsed_data.geturl().lower() == "/quit":
            message = "BYE!"
            timetoleave=True
        else :
            stats["time"]=gnow()
            tnow=time.time()
            if len(stats["RecentRequests"])>0:
                time_for_last_n=tnow-stats["RecentRequests"][0]["Time"]
                if time_for_last_n>0:
                    rate=len(stats["RecentRequests"])/time_for_last_n
                    stats["Rate"]=rate
            msgtext=json.dumps(stats,indent="  ")
            message=msgtext
            for chk in ["Chrome","Mozilla","Explorer","Safari"]:
                if chk in str(self.headers):
                    message=msgtext.replace("\n","<br/>").replace(" ","&nbsp;")
                    break
        self.sendtextinfo(200,message)

class ThreadedHTTPServer(socketserver.ThreadingMixInhttp.server.HTTPServer):
    http.server.HTTPServer.request_queue_size = 128

def backgroundfunction():
    HTSERVER = ThreadedHTTPServer((''HTPORT), Handler)
    HTSERVER.socket.settimeout(10)
    try:
        while not timetoleave:
            sys.stdout.flush()
            HTSERVER.handle_request()
    except KeyboardInterrupt:
        pass

threading.Thread(target=backgroundfunction).start()

def processfiles(filelist):
    for filename in filelist:
        reqnow={}
        reqnow["Time"]=time.time()
        reqnow["Request"]=filename
        stats["RecentRequests"].append(reqnow)
        stats["RecentRequests"]=stats["RecentRequests"][-10:]
        try:
            with open(filename,"r"as fh:
                inputtext=fh.read()
                outputtext=codecs.encode(inputtext,'rot_13')
                outfile=OUTPUT_FOLDER+os.path.basename(filename)
                with open(outfile,"w"as fh:
                    fh.write(outputtext)
            processedfile=PROCESSED_FOLDER+os.path.basename(filename)
            shutil.move(filename,processedfile)
        except Exception as err:
            print("Unable to open {}{}".format(filename,err))
            errnow={}
            errnow["Time"]=time.time()
            errnow["Error"]=err
            stats["Errors"].append(errnow)
            stats["Errors"]=stats["Errors"][-10:]

#substitute this loop with main loop of your app
while not timetoleave:
    allinputfiles=glob.glob(INPUT_FOLDER+"*")
    processfiles(allinputfiles)
    time.sleep(1)

Here the threaded server can give stats while this app is running. Moving a few files to the input folder and pulling up the stats in a browser gives the result:

{
  "Errors": [],
  "RecentRequests": [
    {
      "Time"1634412207.3869205,
      "Request""c:\\tmp\\a\\a.txt"
    },
    {
      "Time"1634412207.3869205,
      "Request""c:\\tmp\\a\\q.txt"
    },
    {
      "Time"1634412207.3869205,
      "Request""c:\\tmp\\a\\test.txt"
    },
    {
      "Time"1634412207.3869205,
      "Request""c:\\tmp\\a\\test1.txt"
    }
  ],
  "time""2021-10-16 19:23:29",
  "Rate"1.8133347398556916
}









Friday, January 29, 2021

A Simple Multithreaded REST Service

I've talked about threading a little already.

https://beomagi.blogspot.com/2019/09/python-threading.html

https://beomagi.blogspot.com/2019/09/threading-continued-timing-thread.html

Next up is a more complete application. Lets look at a simple multithreaded web service.  Now this can be used for basic webpage services, but I think it really shines for making a REST API for quick tools.


Source:

https://github.com/beomagi/BasicRestApp/blob/main/http_server_mt3.py

snapshot below:

'''    Basic threaded http server implementation    '''
#!/usr/bin/env python3
import socketserver
import http.server
import sys
import time
import urllib
import json

#-------------sample functions--------------------
keyval={}
def kput(a,b):
    keyval[a]=b
    return json.dumps({a:b})

def kget(a):
    return keyval.get(a)

def add2numbers(a,b):
    return int(a)+int(b)

def handlejob(func_and_params):
    func=func_and_params[0]
    params=func_and_params[1:]
    if func=="add"return add2numbers(params[0],params[1])
    if func=="put"return kput(params[0],params[1])
    if func=="get"return kget(params[0])
    return "Unknown Command"
#-^-^--^-^-^-^sample functions-^-^-^-^-^-^-^-^-^-^-


class Handler(http.server.BaseHTTPRequestHandler):
    '''   use our own handlers functions '''

    def sendtextinfo(selfcodetext):
        self.send_response(code)
        self.send_header('Content-type''text/html')
        self.end_headers()
        if type(text)==type([]):
            for lines in text:
                self.wfile.write((str(lines)+"\n").encode())
        else:
            self.wfile.write((str(text)+"\n").encode())

    def do_GET(self):
        '''   handle get   '''
        tnow = time.time()
        gnow = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(tnow)) #Formatted UTC time
        parsed_data = urllib.parse.urlparse(self.path)
        if parsed_data.geturl().lower() == "/time":
            message = gnow
        else : message="this is a test of the multithreaded webservice"
        self.sendtextinfo(200,message)

    def do_POST(self):
        '''   handle post like rest API   '''
        try#try getting the bytestream of the request
            content_length = int(self.headers['Content-Length'])
        except Exception as err:
            print("malformed headers")
            self.sendtextinfo(200,str(err))
            return

        if content_length > 0:
            rawrequest = self.rfile.read(content_length).decode('utf-8')
            print("Received POST: {}".format(rawrequest))
            try:
                jrequest = json.loads(rawrequest)
            except BaseException as anError:
                self.sendtextinfo(200,"Error in JSON: {}".format(str(anError)))
                return

        if "cmd" in jrequest:
            commandandparams=jrequest['cmd']
            print("Command received: {}".format(commandandparams))
            result=handlejob(commandandparams)
            if result != None:
                self.sendtextinfo(200,result)
            else:
                self.sendtextinfo(200,"No Output")
        return


class ThreadedHTTPServer(socketserver.ThreadingMixInhttp.server.HTTPServer):
    '''    Basic threaded server class    '''
    http.server.HTTPServer.request_queue_size = 128

if sys.argv[1:]:
    HTPORT = int(sys.argv[1])
else:
    HTPORT = 8000

HTSERVER = ThreadedHTTPServer((''HTPORT), Handler)

try:
    while 1:
        sys.stdout.flush()
        HTSERVER.handle_request()
except KeyboardInterrupt:
    print("Server Stopped")


Discussion:

I've highlighted different sections of the code by altering the background of the div.

The bottom section with the dark purple background is our main area - could stick this in a construct like "if __name__ == '__main__':" but it's not necessary for our purposes.

The main here does a check for passed parameters to override the default port the application will be served on. Then we have an infinite loop that keep calling handle_request(). The "flush" before that forces any buffering in the io to be handled before the next handle. Useful for problems where buffered output isn't yet written at the time of a crash.

The main section makes a call to ThreadedHTTPServer. This manages threaded calls to the handler(3rd section). The handler implements calls for GET and POST. 

I've added some simple functions on POST calls for adding 2 numbers, storing, and getting values in a key-value store sort of setup.

Interfacing with POST can be done from any other language. It's also nice to use CURL to test REST interfaces.



e.g.

beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:09

└─ $ ∙ curl 127.0.0.1:8000

this is a test of the multithreaded webservice

beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:16

└─ $ ∙ curl 127.0.0.1:8000/time

2021-01-29 18:35:21

beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:16

└─ $ ∙ curl 127.0.0.1:8000 -X POST --data '{"cmd":["add",51,15]}'

66


So this example shows how we can make a processing request to the server. Obviously this can be more complex. Maybe I'm sending a url to be parsed (get me ma comix!). Maybe I'm passing data I want to store. Numerous reasons for this. What's performance like?


beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:16

└─ $ ∙ curl 127.0.0.1:8000 -X POST --data '{"cmd":["put","b","If A Technological Feat Is Possible, Man Will Do It. Almost As If It is Wired Into The Core Of Our Being."]}'

{"b": "If A Technological Feat Is Possible, Man Will Do It. Almost As If It is Wired Into The Core Of Our Being."}

beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:16

└─ $ ∙ curl 127.0.0.1:8000 -X POST --data '{"cmd":["get","b"]}'

If A Technological Feat Is Possible, Man Will Do It. Almost As If It is Wired Into The Core Of Our Being.



Ok, so that's the get and put app level functions working. Let's try a bunch of gets.


beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:16

└─ $ ∙ time (for a in {1..1000}; do (curl -s 127.0.0.1:8000 -X POST --data '{"cmd":["get","b"]}' )  & done |sort | uniq ; wait )

If A Technological Feat Is Possible, Man Will Do It. Almost As If It is Wired Into The Core Of Our Being.


real    0m2.403s

user    0m4.234s

sys     0m20.688s

beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:16

└─ $ ∙


The time it took is the real value. It took 2.4 seconds to pull this data a thousand times. If you want to pull a lot of data faster, this basic server can be expanded to use multi-put/multi-get

This isn't great though. We're severely limiting the test by relying on bash to loop though and spawn requests. Bash is slow for this :)

So let's try apache bench.

Start by putting the request in a file.

beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:16

└─ $ ∙ cat postthis.txt

'{"cmd":["get","b"]}'


Now lets run apache bench to stress this.

beomagi@BeoBalthazar ~/gits/beomagi/BasicRestApp  (main) 2021-01-29 13:35:16

└─ $ ∙ ab -n 10000 -c 128  -p ./postthis.txt -T application/json http://127.0.0.1:8000/
This is ApacheBench, Version 2.3 <$Revision: 1807734 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests


Server Software:        BaseHTTP/0.6
Server Hostname:        127.0.0.1
Server Port:            8000

Document Path:          /
Document Length:        57 bytes

Concurrency Level:      128
Time taken for tests:   3.378 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      1730000 bytes
Total body sent:        1570000
HTML transferred:       570000 bytes
Requests per second:    2960.29 [#/sec] (mean)
Time per request:       43.239 [ms] (mean)
Time per request:       0.338 [ms] (mean, across all concurrent requests)
Transfer rate:          500.13 [Kbytes/sec] received
                        453.87 kb/s sent
                        954.00 kb/s total

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    2   5.1      0      47
Processing:     5   40  10.6     41      85
Waiting:        5   38   9.8     40      67
Total:         12   43   9.5     41     100

Percentage of the requests served within a certain time (ms)
  50%     41
  66%     44
  75%     47
  80%     48
  90%     55
  95%     61
  98%     68
  99%     73
 100%    100 (longest request)

_____________________________________________________________


Using apachebench (ab) loading the queue to the max 128 concurrent requests, we handle 10k requests in 3.378 seconds.


I've used this in another system where getting outside software was quite restrictive. It was quicker to write something like the above to act as a cache for a monitoring system. The system would store various database counts and system checks for a monitoring system. Prior to that the monitoring system was making the DB queries directly and wow did our DB hate that!!. Granted it was far more fleshed out - there was get/put single value and multiple value commands. The default page would show all stored data, and the age of the data. There was a timing thread for stats on how heavy the system was being hit. While that system (cachewho) was written and made for that company, I do plan to rewrite in python3 (instead of 2).

Wednesday, September 18, 2019

Threading continued - timing thread

Thread for timing is useful:

Last post discussed parallelism. Threads can be useful for timing stats. In this example, a single thread is spawned for the timing function. Obviously you can spawn purpose specific threads to do whatever you need. Communication with threads is simple because you can just use local variables. Python built in types are thread safe, so it's quite simple, no need to use threading.Lock().


import random
import threading
import json
import time

timingstats={"starttime":0,"processtime":0,"rate":0,"requests":0,"lastreq":0}
exitflag=0

def timingfunction():
    a=0
    while True:
        time.sleep(0.01)
        a+=1
        if a%500==0 or exitflag==1:
            print("## WORD RATE ## : {}".format(timingstats["rate"]))
        if exitflag==1: return

def req(n):
    global exitflag
    entry=""
    while entry != n:
        entry=input("Type: {} :".format(n))
        if entry=="exit":
            exitflag=1
            exit()
    timingstats["requests"]+=1
    now=time.time()
    timingstats["lastreq"]=now
    timingstats["rate"]=float(timingstats["requests"])/(now-timingstats["starttime"])



timingthread=threading.Thread(target=timingfunction)
timingthread.start()
timingstats["starttime"]=time.time()

words="hairy,scary,mary,carbon,chief,hat,blah,lunchbox,cloud,crap"
wordlist=words.split(",")
while True:
    chk=random.randint(0,len(wordlist)-1)
    req(wordlist[chk])

sample output:
python3 tst3.py
Type: chief :chief
Type: blah :blah
Type: chief :chief
Type: blah :## WORD RATE ## : 0.6276005416066457
blah
Type: blah :blah
Type: lunchbox :lunchbox
Type: hat :## WORD RATE ## : 0.5847226421265149
hat
Type: blah :blah
Type: crap :crap
Type: lunchbox :## WORD RATE ## : 0.6115188345845941
exit
## WORD RATE ## : 0.6115188345845941


The timing thread is running on it's own, interrupting me mid-word etc. This is useful if you're making a network app and want statistics, checks, monitoring etc. Maybe the output is to a file, or over the network. For terminal apps, it can be used well if you specify text position. 
e.g.
print("\033[6;53H###Hello")
will output ###Hello at row 5, column 53 in the terminal (not windows).
e.g. you can use several different checks on different threads, and their output status can write data to different areas of the screen. Alternatively (and generrally better idea for bigger things), different threads for different checks write status to status list variable, and a screen updater thread uses that status to draw the screen.