In [None]:
import numpy as np
import pandas as pd

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)

In [None]:
from datetime import datetime

print(datetime.now().strftime("%d.%m.%Y, %H:%M:%S Uhr"))

In [None]:
from bs4 import BeautifulSoup
import requests
import re
from datetime import datetime

class DateProvider:
    
    DATE_FORMAT = "%B %d, %Y"

    def __init__(self):
        self.lastUpdated = None
        self.lastUpdatedDataSource = None

    def needsUpdate(self):
        return self.getLastUpdated() < self.getLastUpdatedDataSource()
        
    def getLastUpdated(self):
        if self.lastUpdated is None:
            self.lastUpdated = self.__getLastUpdated(
                url = "https://knollfrank.github.io/HowBadIsMyBatch/batchCodeTable.html",
                getDateStr = lambda soup: soup.find(id = "last_updated").text)
        
        return self.lastUpdated

    def getLastUpdatedDataSource(self):
        if self.lastUpdatedDataSource is None:
            def getDateStr(soup):
                lastUpdated = soup.find(string = re.compile("Last updated"))
                return re.search('Last updated: (.+).', lastUpdated).group(1)

            self.lastUpdatedDataSource = self.__getLastUpdated(
                url = "https://vaers.hhs.gov/data/datasets.html",
                getDateStr = getDateStr)

        return self.lastUpdatedDataSource

    def __getLastUpdated(self, url, getDateStr):
        htmlContent = requests.get(url).text
        soup = BeautifulSoup(htmlContent, "lxml")
        dateStr = getDateStr(soup)
        return datetime.strptime(dateStr, DateProvider.DATE_FORMAT)

In [None]:
dateProvider = DateProvider()
print('          lastUpdated:', dateProvider.getLastUpdated())
print('lastUpdatedDataSource:', dateProvider.getLastUpdatedDataSource())        
needsUpdate = dateProvider.needsUpdate()
print('needsUpdate:', needsUpdate)

In [None]:
def getWorkingDirectory():
    pwd = ! "pwd"
    return pwd[0]

In [None]:
pwd = getWorkingDirectory()

In [None]:
pwd

## Download VAERS-Data

In [None]:
import os
import time

class AndroidEmulator:
    
    @staticmethod
    def run(runnable):
        AndroidEmulator._start()
        result = runnable()
        AndroidEmulator._stop()
        return result
    
    @staticmethod
    def _start():
        os.system("/home/frankknoll/Android/Sdk/emulator/emulator -avd Pixel_2_API_30 -no-window &")
        AndroidEmulator._waitUntilStarted()
        
    @staticmethod
    def _waitUntilStarted():
        while not AndroidEmulator._isStarted():
            time.sleep(1)

    @staticmethod
    def _isStarted():
        boot_completed = ! adb shell getprop sys.boot_completed
        return boot_completed[0] == '1'

    @staticmethod
    def _stop():
        ! adb emu kill

In [None]:
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By

def _getOptions(downloadDir, isHeadless):
    options = Options()
    options.headless = isHeadless
    options.add_experimental_option("prefs", {"download.default_directory" : downloadDir})
    return options

def getWebDriver(downloadDir, isHeadless):
    return webdriver.Chrome(
        service = ChromeService(executable_path = ChromeDriverManager().install()),
        options = _getOptions(downloadDir, isHeadless))

def saveCaptchaImageAs(driver, captchaImageFile):
    captchaImage = driver.find_element(By.CSS_SELECTOR, "img[src='captchaImage']")
    with open(captchaImageFile, 'wb') as file:
        file.write(captchaImage.screenshot_as_png)

def existsElementWithId(driver, id):
    return len(driver.find_elements(By.ID, id)) > 0

def isCaptchaSolved(driver):
    return not existsElementWithId(driver, "wordverify")


In [None]:
import time
import os

def waitUntilDownloadHasFinished(file):
    while not os.path.exists(file):
        time.sleep(2)

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from PIL import Image
import numpy as np
import io

# copied from value of characters variable in captcha_ocr.ipynb or captcha_ocr_trainAndSaveModel.ipynb
characters = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'a', 'b', 'c', 'd', 'e', 'f']

batch_size = 1

img_width = 241
img_height = 62

downsample_factor = 4

# copied from value of max_length variable in captcha_ocr.ipynb or captcha_ocr_trainAndSaveModel.ipynb
max_length = 6

char_to_num = layers.StringLookup(
    vocabulary=list(characters),
    mask_token=None)

num_to_char = layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(),
    mask_token=None, invert=True)


def encode_single_sample(img_path):
    # 1. Read image
    img = tf.io.read_file(img_path)
    # 2. Decode and convert to grayscale
    img = tf.io.decode_png(img, channels=1)
    # 3. Convert to float32 in [0, 1] range
    img = tf.image.convert_image_dtype(img, tf.float32)
    # 4. Resize to the desired size
    img = tf.image.resize(img, [img_height, img_width])
    # 5. Transpose the image because we want the time
    # dimension to correspond to the width of the image.
    img = tf.transpose(img, perm=[1, 0, 2])
    # 7. Return a dict as our model is expecting two inputs
    return img


def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][:, :max_length]
    # Iterate over the results and get back the text
    output_text = []
    for res in results:
        res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
        output_text.append(res)
    return output_text


def load_model():
    _model = keras.models.load_model('model')
    model = keras.models.Model(
        _model.get_layer(name="image").input,
        _model.get_layer(name="dense2").output)
    return model


def getTextInCaptchaImage(captchaImageFile):
    batchImages = getBatchImagesFromFile(captchaImageFile)
    preds = model.predict(batchImages)
    return decode_batch_predictions(preds)[0]


def getBatchImagesFromFile(imageFile):
    return list(asDataset(imageFile).as_numpy_iterator())[0]


def asDataset(imageFile):
    dataset = tf.data.Dataset.from_tensor_slices([imageFile])
    dataset = (
        dataset
        .map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
        .batch(batch_size)
        .prefetch(buffer_size=tf.data.AUTOTUNE)
    )
    return dataset


print("loading model...")
model = load_model()
model.summary()

In [None]:
import os

#def getTextInCaptchaImage(captchaImageFile):
#    baseDir = "~/AndroidStudioProjects/TextRecognizer"
#    ! cp $captchaImageFile $baseDir/app/src/main/assets/captchas/captcha_image.jpeg
#    ! cd $baseDir;./gradlew connectedAndroidTest
#    textInCaptchaImage = ! adb shell "run-as org.textrecognizer cat /data/data/org.textrecognizer/files/captcha_image.txt"
#    return textInCaptchaImage[0]
    
def solveCaptchaAndStartFileDownload(driver, captchaImageFile):
    saveCaptchaImageAs(driver, captchaImageFile)
    textInCaptchaImage = getTextInCaptchaImage(captchaImageFile)
    display('textInCaptchaImage: ', textInCaptchaImage)
    driver.find_element(By.ID, "verificationCode").send_keys(textInCaptchaImage)
    driver.find_element(By.CSS_SELECTOR, '[name="downloadbut"]').click()

def downloadFile(absoluteFile, driver, maxTries):
    def _downloadFile():
        driver.get('https://vaers.hhs.gov/eSubDownload/index.jsp?fn=' + os.path.basename(absoluteFile))
        solveCaptchaAndStartFileDownload(driver, 'captchaImage.jpeg')

    numTries = 1
    _downloadFile()
    while(not isCaptchaSolved(driver) and (maxTries is None or numTries < maxTries)):
        _downloadFile()
        numTries = numTries + 1

    if isCaptchaSolved(driver):
        waitUntilDownloadHasFinished(absoluteFile)
        return absoluteFile
    else:
        return None

def downloadVAERSFile(file, downloadDir):
    driver = getWebDriver(downloadDir, isHeadless = False)
    downloadedFile = downloadFile(
        absoluteFile = downloadDir + "/" + file,
        driver = driver,
        maxTries = None)
    driver.quit()
    return downloadedFile

In [None]:
import zipfile
import os

def unzip(zipFile, dstDir):
    with zipfile.ZipFile(zipFile, 'r') as zip_ref:
        zip_ref.extractall(dstDir)

def unzipAndRemove(zipFile, dstDir):
    unzip(zipFile, dstDir)
    os.remove(zipFile)

def downloadVAERSFileAndUnzip(file):
    downloadedFile = downloadVAERSFile(file, getWorkingDirectory() + "/VAERS/tmp")
    unzipAndRemove(
        zipFile = downloadedFile,
        dstDir = getWorkingDirectory() + '/VAERS/')


In [None]:
if needsUpdate:
    downloadVAERSFileAndUnzip('2022VAERSData.zip')
    downloadVAERSFileAndUnzip('NonDomesticVAERSData.zip')

In [None]:
import pandas as pd

class VaersDescrReader:
    
    def __init__(self, dataDir):
        self.dataDir = dataDir

    def readVaersDescrsForYears(self, years):
        return [self.readVaersDescrForYear(year) for year in years]

    def readVaersDescrForYear(self, year):
        return {
                    'VAERSDATA': self._readVAERSDATA('{dataDir}/{year}VAERSDATA.csv'.format(dataDir = self.dataDir, year = year)),
                    'VAERSVAX': self._readVAERSVAX('{dataDir}/{year}VAERSVAX.csv'.format(dataDir = self.dataDir, year = year))
               }

    def readNonDomesticVaersDescr(self):
        return {
                    'VAERSDATA': self._readVAERSDATA(self.dataDir + "/NonDomesticVAERSDATA.csv"),
                    'VAERSVAX': self._readVAERSVAX(self.dataDir + "/NonDomesticVAERSVAX.csv")
               }

    def _readVAERSDATA(self, file):
        return self._read_csv(
            file = file,
            usecols = ['VAERS_ID', 'RECVDATE', 'DIED', 'L_THREAT', 'DISABLE', 'HOSPITAL', 'ER_VISIT', 'SPLTTYPE'],
            parse_dates = ['RECVDATE'],
            date_parser = lambda dateStr: pd.to_datetime(dateStr, format = "%m/%d/%Y"))

    def _readVAERSVAX(self, file):
        return self._read_csv(
            file = file,
            usecols = ['VAERS_ID', 'VAX_DOSE_SERIES', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT'],
            dtype = {"VAX_DOSE_SERIES": "string"})

    def _read_csv(self, file, **kwargs):
        return pd.read_csv(
            file,
            index_col = 'VAERS_ID',
            encoding = 'latin1',
            low_memory = False,
            **kwargs)


In [None]:
import pandas as pd

class VaersDescr2DataFrameConverter:

    @staticmethod
    def createDataFrameFromDescr(vaersDescr):
        return pd.merge(
                vaersDescr['VAERSDATA'],
                vaersDescr['VAERSVAX'],
                how = 'left',
                left_index = True,
                right_index = True,
                validate = 'one_to_many')

    @staticmethod
    def createDataFrameFromDescrs(vaersDescrs):
        dataFrames = [VaersDescr2DataFrameConverter.createDataFrameFromDescr(vaersDescr) for vaersDescr in vaersDescrs]
        return pd.concat(dataFrames)


In [None]:
class DataFrameNormalizer:
    
    @staticmethod
    def normalize(dataFrame):
        DataFrameNormalizer.removeUnknownBatchCodes(dataFrame)
        DataFrameNormalizer.convertVAX_LOTColumnToUpperCase(dataFrame)
        DataFrameNormalizer._convertColumnsOfDataFrame_Y_to_1_else_0(
            dataFrame,
            ['DIED', 'L_THREAT', 'DISABLE', 'HOSPITAL', 'ER_VISIT'])

    @staticmethod
    def convertVAX_LOTColumnToUpperCase(dataFrame):
        dataFrame['VAX_LOT'] = dataFrame['VAX_LOT'].str.upper()

    @staticmethod
    def removeUnknownBatchCodes(dataFrame):
        dataFrame.drop(DataFrameNormalizer._isUnknownBatchCode(dataFrame).index, inplace = True)

    @staticmethod
    def _isUnknownBatchCode(dataFrame):
        return dataFrame[dataFrame['VAX_LOT'].str.contains(pat = 'UNKNOWN', regex = False, case = False, na = False)]

    @staticmethod
    def _convertColumnsOfDataFrame_Y_to_1_else_0(dataFrame, columns):
        for column in columns:
            DataFrameNormalizer._convertColumnOfDataFrame_Y_to_1_else_0(dataFrame, column)

    @staticmethod
    def _convertColumnOfDataFrame_Y_to_1_else_0(dataFrame, column):
        dataFrame[column] = DataFrameNormalizer._where(
            condition = dataFrame[column] == 'Y',
            trueValue = 1,
            falseValue = 0)

    @staticmethod
    def _where(condition, trueValue, falseValue):
        return np.where(condition, trueValue, falseValue)    
    

In [None]:
import pandas as pd

class DataFrameFilter:
    
    def filterByCovid19(self, dataFrame):
        return dataFrame[self._isCovid19(dataFrame)]

    def _isCovid19(self, dataFrame):
        return dataFrame["VAX_TYPE"] == "COVID19"


In [None]:
class SummationTableFactory:

    @staticmethod
    def createSummationTable(dataFrame):
        summationTable = dataFrame.agg(
            **{
                'Deaths':                     pd.NamedAgg(column = 'DIED',     aggfunc = 'sum'),
                'Adverse Reaction Reports':   pd.NamedAgg(column = 'DIED',     aggfunc = 'size'),
                'Life Threatening Illnesses': pd.NamedAgg(column = 'L_THREAT', aggfunc = 'sum'), 
                'Disabilities':               pd.NamedAgg(column = 'DISABLE',  aggfunc = 'sum'),
                'Severities':                 pd.NamedAgg(column = 'SEVERE',   aggfunc = 'sum'),
                'Countries':                  pd.NamedAgg(column = 'COUNTRY',  aggfunc = SummationTableFactory.countries2str)
            })
        summationTable['Severe reports'] = summationTable['Severities'] / summationTable['Adverse Reaction Reports'] * 100
        summationTable['Lethality'] = summationTable['Deaths'] / summationTable['Adverse Reaction Reports'] * 100
        return summationTable[
            [
                'Adverse Reaction Reports',
                'Deaths',
                'Disabilities',
                'Life Threatening Illnesses',
                'Severe reports',
                'Lethality',
                'Countries'
            ]]

    @staticmethod
    def countries2str(countries):
        return  ', '.join(sorted(set(countries)))

In [None]:
import pycountry

class CountryColumnAdder:
    
    @staticmethod
    def addCountryColumn(dataFrame):
        dataFrame['COUNTRY'] = CountryColumnAdder.getCountryColumn(dataFrame)
        return dataFrame.astype({'COUNTRY': "string"})

    @staticmethod
    def getCountryColumn(dataFrame):
        return dataFrame.apply(
            lambda row:
                CountryColumnAdder._getCountryNameOfSplttypeOrDefault(
                 splttype = row['SPLTTYPE'],
                 default = 'Unknown Country'),
            axis = 'columns')

    @staticmethod
    def _getCountryNameOfSplttypeOrDefault(splttype, default):
        if not isinstance(splttype, str):
            return default
        
        country = pycountry.countries.get(alpha_2 = splttype[:2])
        return country.name if country is not None else default

In [None]:
class SevereColumnAdder:
    
    @staticmethod
    def addSevereColumn(dataFrame):
        dataFrame['SEVERE'] = (dataFrame['DIED'] + dataFrame['L_THREAT'] + dataFrame['DISABLE']) > 0
        dataFrame['SEVERE'].replace({True: 1, False: 0}, inplace = True)
        return dataFrame


In [None]:
class CompanyColumnAdder:
    
    def __init__(self, dataFrame_VAX_LOT_VAX_MANU):
        self.dataFrame_VAX_LOT_VAX_MANU = dataFrame_VAX_LOT_VAX_MANU

    def addCompanyColumn(self, batchCodeTable):
        return pd.merge(
            batchCodeTable,
            self._createCompanyByBatchCodeTable(),
            how = 'left',
            left_index = True,
            right_index = True,
            validate = 'one_to_one')

    def _createCompanyByBatchCodeTable(self):
        manufacturerByBatchCodeTable = self.dataFrame_VAX_LOT_VAX_MANU[['VAX_LOT', 'VAX_MANU']]
        manufacturerByBatchCodeTable = manufacturerByBatchCodeTable.drop_duplicates(subset = ['VAX_LOT'])
        manufacturerByBatchCodeTable = manufacturerByBatchCodeTable.set_index('VAX_LOT')
        return manufacturerByBatchCodeTable.rename(columns = {"VAX_MANU": "Company"})

In [None]:
class BatchCodeTableFactory:

    def __init__(self, dataFrame: pd.DataFrame):
        self.dataFrame = dataFrame
        self.companyColumnAdder = CompanyColumnAdder(dataFrame)
        self.countryBatchCodeTable = SummationTableFactory.createSummationTable(
            dataFrame.groupby(
                [
                    dataFrame['COUNTRY'],
                    dataFrame['VAX_LOT']
                ]))

    def createGlobalBatchCodeTable(self):
        return self._postProcess(SummationTableFactory.createSummationTable(self.dataFrame.groupby('VAX_LOT')))

    def createBatchCodeTableByCountry(self, country):
        return self._postProcess(self._getBatchCodeTableByCountry(country))

    def _postProcess(self, batchCodeTable):
        batchCodeTable = self.companyColumnAdder.addCompanyColumn(batchCodeTable)
        batchCodeTable = batchCodeTable[
            [
                'Adverse Reaction Reports',
                'Deaths',
                'Disabilities',
                'Life Threatening Illnesses',
                'Company',
                'Countries',
                'Severe reports',
                'Lethality'
            ]]
        return batchCodeTable.sort_values(by = 'Severe reports', ascending = False)

    def _getBatchCodeTableByCountry(self, country):
        if country in self.countryBatchCodeTable.index:
            return self.countryBatchCodeTable.loc[country]
        else:
            return self._getEmptyBatchCodeTable()

    def _getEmptyBatchCodeTable(self):
        return self.countryBatchCodeTable[0:0].droplevel(0)


In [None]:
from bs4 import BeautifulSoup

class HtmlTransformerUtil:
    
    def applySoupTransformerToFile(self, file, soupTransformer):
        self._writeSoup(soupTransformer(self._readSoup(file)), file)

    def _readSoup(self, file):
        with open(file) as fp:
            soup = BeautifulSoup(fp, 'lxml')
        return soup

    def _writeSoup(self, soup, file):
        with open(file, "w") as fp:
            fp.write(str(soup))    


In [None]:
from bs4 import BeautifulSoup


class CountryOptionsSetter:

    def setCountryOptions(self, html, options):
        soup = self._setCountryOptions(self._parse(html), self._parseOptions(options))
        return str(soup)

    def _setCountryOptions(self, soup, options):
        countrySelect = soup.find(id = "countrySelect")
        countrySelect.clear()
        for option in options:
            countrySelect.append(option)
        return soup

    def _parseOptions(self, options):
        return [self._parse(option).option for option in options]

    def _parse(self, html):
        return BeautifulSoup(html, 'lxml')


In [None]:
from bs4 import BeautifulSoup


def saveCountryOptions(countryOptions):
    HtmlTransformerUtil().applySoupTransformerToFile(
        file = "../docs/batchCodeTable.html",
        soupTransformer =
            lambda soup:
                BeautifulSoup(
                    CountryOptionsSetter().setCountryOptions(html = str(soup), options = countryOptions),
                    'lxml'))


In [None]:
def saveLastUpdatedBatchCodeTable(lastUpdated):
    def setLastUpdated(soup):
        soup.find(id = "last_updated").string.replace_with(lastUpdated.strftime(DateProvider.DATE_FORMAT))
        return soup

    HtmlTransformerUtil().applySoupTransformerToFile(
        file = "../docs/batchCodeTable.html",
        soupTransformer = setLastUpdated)

In [None]:
import os

class IOUtils:

    @staticmethod
    def saveDataFrame(dataFrame, file):
        # IOUtils.saveDataFrameAsExcelFile(dataFrame, file)
        # IOUtils.saveDataFrameAsHtml(dataFrame, file)
        IOUtils.saveDataFrameAsJson(dataFrame, file)

    @staticmethod
    def saveDataFrameAsExcelFile(dataFrame, file):
        IOUtils.ensurePath(file)
        dataFrame.to_excel(file + '.xlsx')

    @staticmethod
    def saveDataFrameAsHtml(dataFrame, file):
        IOUtils.ensurePath(file)
        dataFrame.reset_index().to_html(
            file + '.html',
            index = False,
            table_id = 'batchCodeTable',
            classes = 'display',
            justify = 'unset',
            border = 0)

    @staticmethod
    def saveDataFrameAsJson(dataFrame, file):
        IOUtils.ensurePath(file)
        dataFrame.reset_index().to_json(
            file + '.json',
            orient = "split",
            index = False)

    @staticmethod
    def ensurePath(file):
        directory = os.path.dirname(file)
        if not os.path.exists(directory):
            os.makedirs(directory)


In [None]:
import unittest

In [None]:
class TestHelper:

    @staticmethod
    def createDataFrame(index, columns, data, dtypes = {}):
        return pd.DataFrame(index = index, columns = columns, data = data).astype(dtypes)


In [None]:
from pandas.testing import assert_frame_equal

class DataFrameNormalizerTest(unittest.TestCase):

    def test_convertVAX_LOTColumnToUpperCase(self):
        # Given
        dataFrame = TestHelper.createDataFrame(
            columns = ['VAX_LOT'],
            data = [  ['037K20A'],
                      ['025l20A'],
                      ['025L20A']],
            index = [
                "0916600",
                "0916601",
                "1996874"])
            
        # When
        DataFrameNormalizer.convertVAX_LOTColumnToUpperCase(dataFrame)
        
        # Then
        dataFrameExpected = TestHelper.createDataFrame(
            columns = ['VAX_LOT'],
            data = [  ['037K20A'],
                      ['025L20A'],
                      ['025L20A']],
            index = [
                "0916600",
                "0916601",
                "1996874"])
        assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)

    def test_removeUnknownBatchCodes(self):
        # Given
        dataFrame = TestHelper.createDataFrame(
            columns = ['VAX_LOT'],
            data = [  ['UNKNOWN'],
                      ['N/A Unknown'],
                      [np.nan],
                      ['UNKNOWN TO ME'],
                      ['030L20B']],
            index = [
                "1048786",
                "1048786",
                "123",
                "4711",
                "0815"])
            
        # When
        DataFrameNormalizer.removeUnknownBatchCodes(dataFrame)
        
        # Then
        dataFrameExpected = TestHelper.createDataFrame(
            columns = ['VAX_LOT'],
            data = [  [np.nan],
                      ['030L20B']],
            index = [
                "123",
                "0815"])
        assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)

In [None]:
from pandas.testing import assert_frame_equal

class DataFrameFilterTest(unittest.TestCase):

    def test_filterByCovid19(self):
        # Given
        dataFrame = VaersDescr2DataFrameConverter.createDataFrameFromDescrs(
            [
                {
                    'VAERSDATA': TestHelper.createDataFrame(
                        columns = ['DIED', 'L_THREAT', 'DISABLE'],
                        data = [  [1,      0,          0],
                                  [0,      0,          1]],
                        index = [
                            "0916600",
                            "0916601"]),
                    'VAERSVAX': TestHelper.createDataFrame(
                        columns = ['VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES'],
                        data = [  ['COVID19',  'MODERNA',  '037K20A', '1'],
                                  ['COVID19',  'MODERNA',  '025L20A', '1']],
                        index = [
                            "0916600",
                            "0916601"],
                        dtypes = {'VAX_DOSE_SERIES': "string"})
                },
                {
                    'VAERSDATA': TestHelper.createDataFrame(
                        columns = ['DIED', 'L_THREAT', 'DISABLE'],
                        data = [  [0,       0,         0],
                                  [0,       0,         1]],
                            index = [
                            "1996873",
                            "1996874"]),
                    'VAERSVAX': TestHelper.createDataFrame(
                            columns = ['VAX_TYPE', 'VAX_MANU',         'VAX_LOT', 'VAX_DOSE_SERIES'],
                            data = [  ['HPV9',     'MERCK & CO. INC.', 'R017624', 'UNK'],
                                      ['COVID19',  'MODERNA',          '025L20A', '1']],
                            index = [
                                "1996873",
                                "1996874"],
                            dtypes = {'VAX_DOSE_SERIES': "string"})
                    }
            ])
        dataFrameFilter = DataFrameFilter()
            
        # When
        dataFrame = dataFrameFilter.filterByCovid19(dataFrame)
        
        # Then
        dataFrameExpected = TestHelper.createDataFrame(
            columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES'],
            data = [  [1,       0,         0,         'COVID19',  'MODERNA',  '037K20A', '1'],
                      [0,       0,         1,         'COVID19',  'MODERNA',  '025L20A', '1'],
                      [0,       0,         1,         'COVID19',  'MODERNA',  '025L20A', '1']],
            index = [
                "0916600",
                "0916601",
                "1996874"],
            dtypes = {'VAX_DOSE_SERIES': "string"})
        assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)


In [None]:
from pandas.testing import assert_frame_equal

class BatchCodeTableFactoryTest(unittest.TestCase):

    def test_createBatchCodeTableByCountry(self):
        # Given
        dataFrame = TestHelper.createDataFrame(
            columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU',         'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE',                  'HOSPITAL', 'ER_VISIT', 'COUNTRY'],
            data = [  [1,      0,          0,         'COVID19',  'PFIZER\BIONTECH',  '016M20A', '2',               'GBPFIZER INC2020486806',    0,          0,          'United Kingdom'],
                      [0,      0,          0,         'COVID19',  'MODERNA',          '030L20A', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'France'],
                      [1,      1,          1,         'COVID19',  'MODERNA',          '030L20B', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'France'],
                      [0,      1,          1,         'COVID19',  'MODERNA',          '030L20B', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'France']],
            index = [
                "1048786",
                "1048786",
                "4711",
                "0815"])
        dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)
        batchCodeTableFactory = BatchCodeTableFactory(dataFrame)
        
        # When
        batchCodeTable = batchCodeTableFactory.createBatchCodeTableByCountry('France')

        # Then
        assert_frame_equal(
            batchCodeTable,
            TestHelper.createDataFrame(
                columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company', 'Countries', 'Severe reports', 'Lethality'],
                data = [  [2,                          1,        2,              2,                            'MODERNA', 'France',    2/2 * 100,        1/2 * 100],
                          [1,                          0,        0,              0,                            'MODERNA', 'France',    0/1 * 100,        0/1 * 100]],
                index = pd.Index(
                    [
                        '030L20B',
                        '030L20A'
                    ],
                    name = 'VAX_LOT')),
            check_dtype = False)

    def test_createGlobalBatchCodeTable(self):
        # Given
        dataFrame = TestHelper.createDataFrame(
            columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU',         'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE',                  'HOSPITAL', 'ER_VISIT', 'COUNTRY'],
            data = [  [1,      0,          0,         'COVID19',  'PFIZER\BIONTECH',  '016M20A', '2',               'GBPFIZER INC2020486806',    0,          0,          'United Kingdom'],
                      [0,      0,          0,         'COVID19',  'MODERNA',          '030L20A', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'France'],
                      [1,      1,          1,         'COVID19',  'MODERNA',          '030L20B', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'France'],
                      [0,      1,          1,         'COVID19',  'MODERNA',          '030L20B', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'United Kingdom']],
            index = [
                "1048786",
                "1048786",
                "4711",
                "0815"])
        dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)
        batchCodeTableFactory = BatchCodeTableFactory(dataFrame)
        
        # When
        batchCodeTable = batchCodeTableFactory.createGlobalBatchCodeTable()

        # Then
        assert_frame_equal(
            batchCodeTable,
            TestHelper.createDataFrame(
                columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company',         'Countries',              'Severe reports', 'Lethality'],
                data = [  [1,                          1,        0,              0,                            'PFIZER\BIONTECH', 'United Kingdom',         1/1 * 100,        1/1 * 100],
                          [2,                          1,        2,              2,                            'MODERNA',         'France, United Kingdom', 2/2 * 100,        1/2 * 100],
                          [1,                          0,        0,              0,                            'MODERNA',         'France',                 0/1 * 100,        0/1 * 100]],
                index = pd.Index(
                    [
                        '016M20A',
                        '030L20B',
                        '030L20A'
                    ],
                    name = 'VAX_LOT')),
            check_dtype = False)

    def test_createBatchCodeTableByNonExistingCountry(self):
        # Given
        dataFrame = TestHelper.createDataFrame(
            columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU',         'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE',                  'HOSPITAL', 'ER_VISIT', 'COUNTRY'],
            data = [  [1,      0,          0,         'COVID19',  'PFIZER\BIONTECH',  '016M20A', '2',               'GBPFIZER INC2020486806',    0,          0,          'United Kingdom'],
                      [0,      0,          0,         'COVID19',  'MODERNA',          '030L20A', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'France'],
                      [1,      1,          1,         'COVID19',  'MODERNA',          '030L20B', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'France'],
                      [0,      1,          1,         'COVID19',  'MODERNA',          '030L20B', '1',               'FRMODERNATX, INC.MOD20224', 0,          0,          'France']],
            index = [
                "1048786",
                "1048786",
                "4711",
                "0815"])
        dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)
        batchCodeTableFactory = BatchCodeTableFactory(dataFrame)
        
        # When
        batchCodeTable = batchCodeTableFactory.createBatchCodeTableByCountry('non existing country')

        # Then
        assert_frame_equal(
            batchCodeTable,
            TestHelper.createDataFrame(
                columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company', 'Countries', 'Severe reports', 'Lethality'],
                data = [  ],
                index = pd.Index([], name = 'VAX_LOT')),
            check_dtype = False)


In [None]:
class CountryOptionsSetterTest(unittest.TestCase):

    def test_setCountryOptions(self):
        # Given
        countryOptionsSetter = CountryOptionsSetter()

        # When
        htmlActual = countryOptionsSetter.setCountryOptions(
            html='''
            <html>
              <body>
                <p>Test<p/>
                <select id="countrySelect" name="country">
                  <option value="Global" selected>Global</option>
                  <option value="Afghanistan">Afghanistan</option>
                  <option value="Albania">Albania</option>
                  <option value="Algeria">Algeria</option>
                </select>
              </body>
            </html>
            ''',
            options=[
                '<option value="Global" selected>Global</option>',
                '<option value="Azerbaijan">Azerbaijan</option>',
                '<option value="Bahrain">Bahrain</option>'])

        # Then
        assertEqualHTML(
            htmlActual,
            '''
            <html>
              <body>
                <p>Test<p/>
                <select id="countrySelect" name="country">
                  <option value="Global" selected>Global</option>
                  <option value="Azerbaijan">Azerbaijan</option>
                  <option value="Bahrain">Bahrain</option>
                </select>
              </body>
            </html>
            ''')

# adapted from https://stackoverflow.com/questions/8006909/pretty-print-assertequal-for-html-strings
def assertEqualHTML(string1, string2, file1='', file2=''):
    u'''
    Compare two unicode strings containing HTML.
    A human friendly diff goes to logging.error() if they
    are not equal, and an exception gets raised.
    '''
    from bs4 import BeautifulSoup as bs
    import difflib

    def short(mystr):
        max = 20
        if len(mystr) > max:
            return mystr[:max]
        return mystr
    p = []
    for mystr, file in [(string1, file1), (string2, file2)]:
        if not isinstance(mystr, str):
            raise Exception(u'string ist not unicode: %r %s' %
                            (short(mystr), file))
        soup = bs(mystr)
        pretty = soup.prettify()
        p.append(pretty)
    if p[0] != p[1]:
        for line in difflib.unified_diff(p[0].splitlines(), p[1].splitlines(), fromfile=file1, tofile=file2):
            display(line)
        display(p[0], ' != ', p[1])
        raise Exception('Not equal %s %s' % (file1, file2))


In [None]:
unittest.main(argv = [''], verbosity = 2, exit = False)

In [None]:
def getVaersForYears(years):
    def addCountryColumn(dataFrame):
        dataFrame['COUNTRY'] = 'United States'
        return dataFrame

    return _getVaers(
        _getVaersDescrReader().readVaersDescrsForYears(years),
        addCountryColumn)

def getNonDomesticVaers():
    return _getVaers(
        [_getVaersDescrReader().readNonDomesticVaersDescr()],
        CountryColumnAdder.addCountryColumn)

def _getVaersDescrReader():
    return VaersDescrReader(dataDir = "VAERS")

def _getVaers(vaersDescrs, addCountryColumn):
    dataFrame = VaersDescr2DataFrameConverter.createDataFrameFromDescrs(vaersDescrs)
    dataFrame = addCountryColumn(dataFrame)
    DataFrameNormalizer.normalize(dataFrame)
    dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)
    return dataFrame

In [None]:
internationalVaers = pd.concat([getVaersForYears([2020, 2021, 2022]), getNonDomesticVaers()])
internationalVaersCovid19 = DataFrameFilter().filterByCovid19(internationalVaers)
internationalVaersCovid19

In [None]:
def createAndSaveBatchCodeTableForCountry(createBatchCodeTableForCountry, country, minADRsForLethality = None):
    batchCodeTable = createBatchCodeTableForCountry(country)
    batchCodeTable.index.set_names("Batch", inplace = True)
    if minADRsForLethality is not None:
        batchCodeTable.loc[batchCodeTable['Adverse Reaction Reports'] < minADRsForLethality, ['Severe reports', 'Lethality']] = [np.nan, np.nan]
    IOUtils.saveDataFrame(batchCodeTable, '../docs/data/batchCodeTables/' + country)
    # display(country + ":", batchCodeTable)
    display(country)

def createAndSaveBatchCodeTablesForCountries(createBatchCodeTableForCountry, countries, minADRsForLethality = None):
    for country in countries:
        createAndSaveBatchCodeTableForCountry(createBatchCodeTableForCountry, country, minADRsForLethality)

In [None]:
def getCountryOptions(countries):
    return [getCountryOption(country) for country in countries]

def getCountryOption(country):
    return '<option value="{country}">{country}</option>'.format(country = country)

In [None]:
countries = sorted(internationalVaersCovid19['COUNTRY'].unique())
countryOptions = ['<option value="Global" selected>Global</option>']  + getCountryOptions(countries)

In [None]:
saveCountryOptions(countryOptions)

In [None]:
saveLastUpdatedBatchCodeTable(dateProvider.getLastUpdatedDataSource())

In [None]:
minADRsForLethality = 100
batchCodeTableFactory = BatchCodeTableFactory(internationalVaersCovid19)

createAndSaveBatchCodeTablesForCountries(
    createBatchCodeTableForCountry = lambda country: batchCodeTableFactory.createBatchCodeTableByCountry(country),
    countries = countries,
    minADRsForLethality = minADRsForLethality)

createAndSaveBatchCodeTableForCountry(
    createBatchCodeTableForCountry = lambda country: batchCodeTableFactory.createGlobalBatchCodeTable(),
    country = 'Global',
    minADRsForLethality = minADRsForLethality)

In [None]:
def publishGitHubPages():
    %cd /home/frankknoll/Dokumente/Corona/projects/HowBadIsMyBatch-pages
    ! git add -A
    ! git commit -m "updating data"
    ! git push

### see https://knollfrank.github.io/HowBadIsMyBatch/batchCodeTable.html

In [None]:
publishGitHubPages()