Case study

Zkroť si své saranče aneb zátěžové testy s Locust.io

Je s podivem, že platforma pro zátěžové testy Locust není v IT komunitě více rozšířena. Přitom open-source projekt Locust.io existuje již 9 let a na Internetu lze nalézt řadu návodů, jak s ním rychle a efektivně začít. Oproti tomu množina případových studií o komplexnějších realizacích “zátěžáků” pomocí Locustu je poměrně skromná. Tento článek má ambice ji rozšířit a přinést inspiraci v oblastech:

  • Locust jako základ robustního frameworku pro zátěžové testy,
  • správa datových setů během exekuce testů,
  • integrace na BI řešení pro reporting,
  • dynamické řízení testovacích scénářů.

Charakter projektu

Zadání

Úkol zněl jasně: je potřeba zátěžově otestovat webové stránky generované CMS systémem. Pro ověření optimálního nastavení sdílené infrastruktury byly vytipovány 3 testovací scénáře:

  1. základní: klasická zátěž generovaná běžnými návštěvníky webových stránek,
  2. rozšířený základní: zátěž od běžných návštěvníků kombinovaná se situací, kdy CMS systém na základě zeditovaného obsahu pro stránky sestavuje finální web,
  3. editační: zátěž generovaná editory při měnění obsahu, kdy CMS systém připravuje náhled, jak bude stránka s danými úpravami vypadat.

Použité technologie a nástroje

Frontend aplikace je vytvořen v javascriptovém frameworku React, aplikační vrstva nad .NET platformou v Azure cloud. Vývoj je řízen přes TFS-GIT, což platilo i pro kód zátěžových testů.

Jako základní kámen řešení pro zátěžové testy byl odzkoušen a vybrán nástroj Locust díky těmto klíčovým vlastnostem:

  • vysoký výkon umožňující simulovat 1k–10k virtuálních uživatelů z běžného PC,
  • flexibilita a komunitní podpora jazyka Python pro utilizaci existujících, resp. psaní nových, doplňkových knihoven a řešení méně standardních až specifických úloh v rámci implementace testovacích scénářů,
  • připravenost na integraci do CI/CD pipeline,
  • připravenost frameworku pro integraci na vybrané řešení pro zpracování, analýzu a vizualizaci dat zachycených při běhu testu (v tomto případě Elastic + Kibana).

Pro další výhody Locust platformy lze nahlédnout přímo do dokumentace na Locust.io, kde jsou k dispozici i příklady základních principů a použití. Nadto na Internetu najdete např. tento a řadu dalších tutoriálů. Pokud se s Locustem setkáváte poprvé, doporučuji před pokračováním v článku zabrousit alespoň na uvedené odkazy.

Locust tak utvořil jádro frameworku, jehož celkovou architekturu zachycuje následující obrázek:

Zkroť si své saranče aneb zátěžové testy s Locust.io

Jednotlivé moduly si blíže popíšeme.

Moduly frameworku

Konfigurace testů a testovacích dat

Protože jedním z cílů bylo umožnit vývojářům spouštění testů kdykoli v případě potřeby, tj. obejít se při denním používání testů bez specializovaného performance inženýra, nastavení parametru běhu testů a testovacích dat byla vytažena do konfiguračního souboru:

## ----------------------------------------- ##
## Load test scenario and data configuration ##
## ----------------------------------------- ##

LOAD_SCENARIO = 2
## 1 .. web users visiting sites (test case: WebTest)
## 2 .. web users visiting sites + content server processing workers output (test cases: WebTest + ContentServerTest)
## 3 .. editors requesting pages previews (test case: PreviewTest). Note: amount of virtuals users should fit amount of data records. 

## Load shape (i.e. amount and timing of virtuals users during test execution) can be predefined by stages with following parameters:
##  - duration .. When the duration period elapses (measured from the start of the whole test), the test enters in the next stage.
##  - users .. Total user count
##  - spawn_rate .. Number of users to start/stop per second
LOAD_STAGES = [
        {"duration": 100, "users": 10, "spawn_rate": 1},
        {"duration": 300, "users": 20, "spawn_rate": 1},
        {"duration": 600, "users": 30, "spawn_rate": 1},
    ]
## Uncomment line below to disable load stages configuration. Load shape is then controlled through web UI.
#LOAD_STAGES = False

## WebTest data source
DATA_SOURCE_PATH = "data/contentserver-de-ch_named.csv"

## PreviewTest data source
DATA_PREVIEWS_SOURCE_PATH = "data/previews.csv"

## ContentServerTest data source
SITES = {
    "dech-v2":"empty"
    ,"itch-v2":"empty"
    ,"frch-v2":"empty"
}

Jedná se o běžný Python soubor, což umožňuje svobodu při definici datových struktur parametrů testu. Např. LOAD_STAGESSITES jsou proměnné nativních datových Python typů, přesto však pro uživatele testu velmi čitelné a srozumitelné (samozřejmě i díky patřičným komentářům).

Run-time data, tj. data utilizovaná při běhu testu, je vhodné zapouzdřit do dedikovaných tříd:

import csv
from random import randint

from shared import log

class SourceDataReader:
    def __init__(self, file_path):
        self.file_path = file_path
    def read(self):
        reader = csv.DictReader(open(self.file_path, "r"))
        data = []
        for element in reader:
            data.append(element)
        return data

class Endpoints:
    def __init__(self, path_to_csv):
        log("Reading input data...")
        sdr = SourceDataReader(path_to_csv)
        self.INPUT_DATA = sdr.read()
        self.INPUT_DATA_LENGTH = len(self.INPUT_DATA)
        log(f"{self.INPUT_DATA_LENGTH} endpoints records read")
    def getRecord(self):
        row = self.INPUT_DATA [ randint(0, self.INPUT_DATA_LENGTH-1) ]
        return row

class Previews:
    def __init__(self, path_to_csv):
        log("Reading input data...")
        sdr = SourceDataReader(path_to_csv)
        self.INPUT_DATA = sdr.read()
        self.INPUT_DATA_LENGTH = len(self.INPUT_DATA)
        log(f"{self.INPUT_DATA_LENGTH} previews records read")
    def __operateRecord(self, void):
        rowId = 0
        retVal = ''
        found = False
        for row in self.INPUT_DATA:
            #print( row )
            if (row['status'] == 'ready'):
                if (void != True):
                    self.setLocked ( rowId )
                retVal = {
                    'rowId': rowId
                    ,'row': row
                }
                found = True
                break
            rowId = rowId + 1
        if (found == True):
            return ( retVal )
        else:
            return ( False )
    def getRecord(self):
        return self.__operateRecord(False)
    def testRecord(self):
        return self.__operateRecord(True)
    def __setStatus (self, rowId, status):
        self.INPUT_DATA[rowId]['status'] = status
    def setReady (self, rowId):
        self.__setStatus(rowId, 'ready')
    def setLocked (self, rowId):
        self.__setStatus(rowId, 'locked')

Ukázka kódu výše demonstruje, jak snadno lze nad datovými objekty vystavět jednoduchou či složitější logiku pro operaci s daty. To je užitečné např. v případě, že se datové věty používají opakovaně a zároveň jednu datovou větu nelze použít pod více paralelně běžícími virtuálními uživateli. Příklady použití si ukážeme dále v kapitole o implementaci testovacích scénářů.

Elastic — ukládání dat o běhu testu

Byť webové UI Locustu umožňuje sledovat několik statistik, které postačí na základní přehled o běhu zátěžového testu, Locust nedisponuje vlastní robustní datově analytickou ani reportovací vrstvou. V dnešní době, kdy je k tomuto účelu dostupná řada specializovaných velmi efektivních nástrojů, je tento na první pohled nedostatek několikanásobně vynahrazen přímočarostí, se kterou se Locust dokáže na jakékoli BI řešení integrovat. Ať už jde v našem případě o integraci na Elastic nebo na nějaké jiné datové řešení, vždy stojíme před úkolem ukládání dvou kategorií dat:

  1. Sledované výkonnostní veličiny zatěžované aplikace, typicky doba odezvy, doba doběhnutí jobu, úroveň čerpání infrastrukturních zdrojů (vytížení CPU, zaplnění paměti, počet aktivních nodů, atd.), apod.
  2. Parametry prostředí aktuálního běhu testu. Např. aktuální počet virtuálních uživatelů, počet paralelně zpracovávaných požadavků.

Pokud jste se již seznámili se základy Locustu, víte, že o každém requestu a navazujícím response se zachycuje základní sada statistik vizualizovaná na webovém UI. Jak ale údaje, na základě kterých se statistiky počítají, odesílat do nějaké externí databáze? Také jste si možná všimli, že Locust vizualizuje v grafu počet virtuálních uživatelů, což je údaj, který by se nám určitě též hodil. Návodem budiž následující kód přidávající listenery nad vybrané Locust události:

from datetime import datetime
import pytz
import json
import gevent

from locust import events
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner

from shared import forwarder, log
from common import loggers

OK_TEMPLATE = '{"request_type":"%s", "name":"%s", "result":"%s", "response_time":%s, "response_length":%s, "other":%s}'

ERR_TEMPLATE = '{"request_type":"%s", "name":"%s", "result":"%s", "response_time":%s, "exception":"%s", "other":%s}'

log_VUs = loggers.VUs(forwarder)
log_memory = loggers.Memory(forwarder)

@events.request_success.add_listener
def additional_success_handler(request_type, name, response_time, response_length, **kwargs):
    """ additional request success handler to log statistics """
    json_string = OK_TEMPLATE % (request_type, name, "OK", response_time, response_length, json.dumps(kwargs))
    #log (json_string)
    message = {"type": "success", "@timestamp": datetime.now( pytz.timezone('Europe/Prague') ) ,"payload": json.loads(json_string)}
    forwarder.add(message)

@events.request_failure.add_listener
def additional_failure_handler(request_type, name, response_time, exception, **kwargs):
    """ additional request failure handler to log statistics """
    json_string = ERR_TEMPLATE % (request_type, name, "ERR", response_time, exception, json.dumps(kwargs))
    log (json_string)
    message = {"type": "failure", "@timestamp": datetime.now( pytz.timezone('Europe/Prague') ) ,"payload": json.loads(json_string)}
    forwarder.add(message)

@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    # only run this on master & standalone
    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(log_VUs.run, environment)
        log_VUs.quit = False
        gevent.spawn(log_memory.run, environment)
        log_memory.quit = False

@events.test_stop.add_listener
def on_test_stop(environment, **_kwargs):
    # only run this on master & standalone
    if not isinstance(environment.runner, WorkerRunner):
        log_VUs.quit = True
        log_memory.quit = True

První dva additional_success_handleradditional_failure_handler v podstatě jen dodatečně přeposílají nativně zachycované statistiky v případě ne/úspěšného requestu “někam” dál pomocí forwarder.add(message).

Implementaci objektu forwarder, který zapouzdřuje integraci na Elastic, zde nebudeme rozebírat. Vše potřebné naleznete v článku Karola Brejny Locust.io experiments — Emitting results to external DB.

Druhé dva on_test_starton_test_stop, které se spustí při zahájení a ukončení testu, obsahují příklady custom loggerů implementovaných nad rámec těch Locust nativních: log_VUslog_memory. Všimněte si, že loggery se startují příkazem gevent.spawn(), který způsobí, že logger běží v tzv. Greenletu. To umožňuje, že loggery běží paralelně s běžícím testem, avšak neblokují pro sebe celý CPU proces.

Greenlet architektura je zajímavá sama o sobě. Na její implementaci pro Python v podobě knihovny gevent je vystavěn i Locust. A není tajemstvím, že právě díky ní poráží Locust ve výkonnosti rozdílem třídy některé tradiční nástroje pro zátěžové testy, jako např. jMeter. Klíčová myšlenka Greenletů tkví v předpokladu, že větší úlohy lze vždy rozdělit na menší sub-úlohy, které lze vykonávat “na přeskáčku” (tzv. context switching). Úlohy pak mohou být vypořádávány současně v rámci jednoho CPU procesu na rozdíl od patternu zpracování v paralelních vláknech, kdy každá úloha spotřebovává právě jeden proces, tj. jedno CPU vlákno. Více se můžete dozvědět na domovských stránkách gevent projektu nebo třeba v hezkém tutoriálu gevent For the Working Python Developer.

Loggery log_VUslog_memory jsou instancemi tříd VUsMemory z modulu loggers.py:

import time
from datetime import datetime
import pytz
import gevent
import json
import requests

from shared import log

class VUs:
    def __init__(self, forwarder):
        self.forwarder = forwarder
        self.quit = False
    def run(self, environment):
        while not self.quit:
            self.forwarder.add({
                "type": "environment"
                ,"@timestamp": datetime.now( pytz.timezone('Europe/Prague') )
                ,"payload": { "VUs": environment.runner.user_count }
            })
            log("VUs:" + str (environment.runner.user_count) )
            gevent.sleep(1)

class Memory:
    def __init__(self, forwarder):
        self.forwarder = forwarder
        self.quit = False
    def run(self, environment):
        while not self.quit:
            response = requests.get('https://customazureinfrastrucurebox.com/health/memory')
            rjson = response.json()
            self.forwarder.add({
                "type": "environment"
                ,"@timestamp": datetime.now( pytz.timezone('Europe/Prague') )
                ,"payload": {
                    "MemoryLoadBytes": rjson['MemoryLoadBytes']
                    ,"HeapSizeBytes": rjson['HeapSizeBytes']
                    ,"FragmentedBytes": rjson['FragmentedBytes']
                 }
            })
            log("Memory monitoring: MemoryLoadBytes = " + str (rjson['MemoryLoadBytes']) )
            gevent.sleep(1)

V kódu opět vidíme objekt forwarder, který obstarává směrování logovacích záznamů do externí databáze. Z pohledu Greenlet architektury je klíčový příkaz gevent.sleep(1), který říká: aktuální greenlet (úlohu) uspi na 1s a uvolni zdroje (tzv. yielding) ke zpracování jiného greenletu (blíže viz odkazy o gevent knihovně výše). Z toho vyplývá, že logovací údaje jsou zasílány do databáze každou vteřinu.

Ve třídě Memory si povšimněme volání služby, která vrací aktuální stav zaplnění paměti na daném Azure boxu (serveru). Takto vystavený monitorovací endpoint umožňuje orchestrovat logování přímo Locustem. V případě, že takový endpoint nemáme k dispozici, nezbývá než zařídit, aby se tyto údaje posílali do databáze, kam ukládáme data o běhu testu, přímo z interního infrastrukturního monitoringu. Má to samozřejmě tu nevýhodu, že při exekuci testu musíme často spolupracovat s pracovníky správy infrastruktury.

Možná jste zvědaví, jak konkrétně jsou data zasílaná do Elasticu vizualizována. Práce s Kibanou nad správně strukturovanými zdrojovými daty z běhu zátěžového testu si však zaslouží samostatný článek, který bude v brzké následovat.

Externí moduly

Ač je Python téměř všeho schopný, najdeme určitě řadu případů, kdy je optimální použít jiné prostředky. V našem případě šlo o PowerShell joby, které v Azure prostředí simulovaly změny obsahu stránek. Integrace na externí modul tak byla triviální, stačilo spustit vybraný PS job a zparsovat výstup (zkontrolovat, že nedošlo k chybě, a případně zjistit požadovaný údaj):


def process_file(self, *args, **kwargs):
        params = ""
        isPreview = False
        for param in args:
            params = params + " " + param
        if not kwargs:
            cmd = 'scripts/PublishWorkerContent.ps1' + params
        elif kwargs['type'] == 'Preview':
            cmd = 'scripts/GetPreview.ps1' + params
            isPreview = True
        print (cmd)
        proc = subprocess.run(["powershell.exe",cmd], stdin=None, stdout=subprocess.PIPE)
        URL = ""
        if (proc.stdout.decode('utf-8').find('Exception') > -1):
            returncode = 1
        else:
            returncode = 0
            if isPreview == True:
                inStr = proc.stdout.decode('utf-8')
                tmpStr = inStr.split("https://", 1)
                URL = "https://" + tmpStr[1].rstrip()
        return {
            "returncode": returncode
            ,"stdout": proc.stdout.decode('utf-8')
            ,"previewURL": URL
        }

Implementace testovacích scénářů

Základní scénář

Při troše programátorské zručnosti je Locust schopen zvládnout prakticky kterýkoli komunikační protokol. Nicméně pro klasickou komunikaci přes http ho lze použít takříkajíc “out-of-the-box”. Test case, který volá vybranou URL a provádí základní kontrolu správnosti response (návratový kód 200), vypadá následovně jednoduše:

class WebTest(TaskSet):

    @task(1)
    def web_load(self):
        row = endpoints.getRecord()
        URL = row['protocol'] + row['domain'] + row['path'] + '?_host=' + row['parameter']
        request_name = row['name'] if ( row['name']>"" ) else URL
        with self.client.get(
            URL
            ,name=request_name
            ) as r:
                assert r.status_code is 200, "Unexpected response code: " + str(r.status_code)

Objekt endpoints drží datový set načtený do paměti, konkrétně sadu URL z csv souboru. Metoda getRecord() vrací náhodně vybrané URL z datového setu. Dále stojí za povšimnutí, že každé volání lze pojmenovat. Volání se stejným jménem totiž ve statistikách vystupují jako různé instance jednoho volání i v případě, že ve skutečnosti je pokaždé cíleno odlišné URL. To umožňuje agregovat data již při běhu testu.

Rozšířený základní scénář a editační scénář

Jednoduchý základní scénář bylo třeba zkombinovat s běžícími procesy v CMS systému simulovanými v Azure aplikační vrstvě pomocí PowerShell skriptů. Spuštění skriptu simulovalo událost aktivující proces (job), jehož výsledek se projevil na prezentační vrstvě. Délka běhu procesu byla jednou ze sledovaných veličin. Za tímto účelem byl naimplementován test case, který měl za úkol spustit Azure job, na daném URL zachytit, že došlo ke kýžené změně, a změřit časovou periodu. Kód tohoto test casu je velmi podobný tomu z editačního scénáře, který si ukážeme a popíšeme:

class PreviewTest(TaskSet):

    @task(1)
    def preview_upload(self):
        preview = previews.getRecord()
        if ( preview == False ):
            log( 'PreviewTest: No data available. User Taskset killed.' )
            time.sleep(1)
            self.interrupt()
        site_name = preview['row']['site']
        preview_name = preview['row']['preview']

        log( 'Trying site preview: ' + site_name + ', '+ preview_name)
        now = datetime.now()
        job = utils.process_file( site_name, preview_name, type="Preview")
        log("Preview job returns:" + str(job['returncode']) + job['stdout'])
        retVal = "OK" if job['returncode']==0 else "ERR"
        batchId = uuid.uuid1()
        later = datetime.now()
        forwarder.add({
            "type": "batch"
            ,"@timestamp": datetime.now( pytz.timezone('Europe/Prague') )
            ,"payload": {
                "id": f"{batchId}"
                ,"step": "preview_load: " + site_name + ' ' + preview_name
                ,"event": "running"
                ,"return": retVal
                ,"response_time": utils.getTimePeriod(now, later)
            }
        })
        if ( retVal == "ERR" ):
            log( 'Site preview '+ site_name +' job failed.')
            assert retVal != "ERR", 'Site preview ' + site_name + ' ' + preview_name + ' job failed.'
            previews.setReady( preview['rowId'] )
            self.interrupt()
        now = datetime.now()
        proceeding = True
        while proceeding:
            with self.client.get(
                job['previewURL']
                ,catch_response=True
                ,name = "preview: " + site_name + ' ' + preview_name
            ) as r:
                if ( r.status_code == 200 ):
                    proceeding = False
                    r.success()
                else:
                    r.failure( 'Site preview ' + site_name + ' ' + preview_name + ' not returning 200, but ' + str(r.status_code) )
                    previews.setReady( preview['rowId'] )
                    self.interrupt()
        later = datetime.now()
        forwarder.add({
            "type": "batch"
            ,"@timestamp": datetime.now( pytz.timezone('Europe/Prague') )
            ,"payload": {
                "id": f"{batchId}"
                ,"step": "preview_load: " + site_name + ' ' + preview_name
                ,"event": "finished"
                ,"return": "OK"
                ,"response_time": utils.getTimePeriod(now, later)
            }
        })
        previews.setReady( preview['rowId'] )

Test case používá data uložená v objektu previews. Datové věty lze utilizovat opakovaně, ale jedna datová věta nesmí být zpracovávaná víc než jedním jobem. K tomu slouží metody

previews.getRecord()
previews.setReady( preview[‘rowId’] )

První metoda vrací volnou datovou větu a zamyká ji proti použití jinde, druhá pak větu uvolňuje k další utilizaci (implementace viz modul dataReader.py výše).

Na řádcích 13–35 exekujeme script spuštějící job job = utils.process_file( site_name, preview_name, type=”Preview”), zalogujeme výsledek s dobou běhu scriptu:

forwarder.add({
    "type": "batch"
    ,...

a po kontrole, zda exekuce scriptu neskončila s chybou if ( retVal == “ERR” ):…, zahájíme cyklus while proceeding:, který kontroluje zda nedošlo ke změně obsahu na stránkách. Když je změna zaznamenána, opět zalogujeme vč. doby běhu jobu.

V kódu se opakovaně vyskytuje příkaz self.interrupt(), který v případě chyb neslučitelných s dalším pokračováním test casu běh přeruší, a daný virtuální uživatel, pod kterým instance test casu běžela, je vrácen do poolu.

Řízení mixu testovacích případů v rámci scénáře

V úvodu článku jsme si ukázali konfigurační soubor, kterým lze mimo jiné volit scénář pro daný běh zátěžového testu:

LOAD_SCENARIO = 2
## 1 .. web users visiting sites (test case: WebTest)                       ## 2 .. web users visiting sites + content server processing workers output (test cases: WebTest + ContentServerTest)
## 3 .. editors requesting pages previews (test case: PreviewTest). Note: amount of virtuals users should fit amount of data records.

Při zvolení scénáře 2 se v rámci testu spouštějí dva testovací případy, které se zásadně liší ve způsobu pracování s daty:

  1. WebTest vybírá z datového setu náhodně jednu větu (URL), přičemž nevadí, je-li vybráno pro více současně běžících virtuálních uživatelů stejné URL,
  2. ContentServerTest vybírá větu, která není ve stavu procesování jiným virtuálním uživatelem.

Před zapojením dalšího virtuálního uživatele do běhu testu je tedy potřeba logika, která na základě stavu datového setu pro ContentServerTest určí, zda si daný virtuální uživatel může vybírat z obou test casů nebo na něj zbyde jen ten první. Ukažme si následující kód:

class LoadScenario(HttpUser):
    def on_start(self):
        if LOAD_SCENARIO == 1:
            self.tasks = [WebTest]
        elif LOAD_SCENARIO == 2:
            endpoint = endpoints.testRecord()
            if ( endpoint == False ):
                log( 'No site empty. BatchTest taskset skipped.' )
                self.tasks = [WebTest]
            else:
                self.tasks = [WebTest, ContentServerTest]
        elif LOAD_SCENARIO == 3:
            preview = previews.testRecord()
            if ( preview == False ):
                log( 'No site empty. Empty Taskset fired.' )
                self.tasks = [EmptyTest]
            else:
                self.tasks = [PreviewTest]

Metoda testRecord() má za účel zjistit, zda v daném datovém objektu je volná věta ke zpracování (k jejímu zamknutí dojde až při skutečné aktivaci virtuálního uživatele pomocí getRecord(), viz výše).

Proměnná self.tasks se pak plní seznamem test casů, které připadají pro spouštěného virtuálního uživatele v úvahu. Vzhledem k tomu, že u test casů v seznamu neuvádíme váhy (viz tasks attribute), Locust vybere z dvojice [WebTest, ContentServerTest] zcela náhodně.

Závěrem

V repositáři kódu pro Locust si můžete ověřit, jak projekt žije, i za poslední rok přibyla řada klíčových features. To svědčí o tom, že platforma je stále více používána na seriozních projektech zátěžových testů.

Hlavním cílem tohoto článku bylo přivést vás k zamyšlení použít Locust na dalším projektu. Pokud se tak stalo, směle do toho, nebudete litovat.

Do you also want to be part of Tesena?

Do you also want to be part of Tesena?

Check out our open positions and join us!
I'm in!