{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "9de5907f-18f5-4cb1-903e-26028ff1fa03", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import pandas as pd\n", "\n", "pd.set_option('display.max_rows', 100)\n", "pd.set_option('display.max_columns', None)" ] }, { "cell_type": "code", "execution_count": null, "id": "d1e4fa9e", "metadata": {}, "outputs": [], "source": [ "from datetime import datetime\n", "\n", "print(datetime.now().strftime(\"%d.%m.%Y, %H:%M:%S Uhr\"))" ] }, { "cell_type": "code", "execution_count": null, "id": "1dbf9321", "metadata": {}, "outputs": [], "source": [ "from bs4 import BeautifulSoup\n", "import requests\n", "import re\n", "from datetime import datetime\n", "\n", "class DateProvider:\n", " \n", " DATE_FORMAT = \"%B %d, %Y\"\n", "\n", " def __init__(self):\n", " self.lastUpdated = None\n", " self.lastUpdatedDataSource = None\n", "\n", " def needsUpdate(self):\n", " return self.getLastUpdated() < self.getLastUpdatedDataSource()\n", " \n", " def getLastUpdated(self):\n", " if self.lastUpdated is None:\n", " self.lastUpdated = self.__getLastUpdated(\n", " url = \"https://knollfrank.github.io/HowBadIsMyBatch/batchCodeTable.html\",\n", " getDateStr = lambda soup: soup.find(id = \"last_updated\").text)\n", " \n", " return self.lastUpdated\n", "\n", " def getLastUpdatedDataSource(self):\n", " if self.lastUpdatedDataSource is None:\n", " def getDateStr(soup):\n", " lastUpdated = soup.find(string = re.compile(\"Last updated\"))\n", " return re.search('Last updated: (.+).', lastUpdated).group(1)\n", "\n", " self.lastUpdatedDataSource = self.__getLastUpdated(\n", " url = \"https://vaers.hhs.gov/data/datasets.html\",\n", " getDateStr = getDateStr)\n", "\n", " return self.lastUpdatedDataSource\n", "\n", " def __getLastUpdated(self, url, getDateStr):\n", " htmlContent = requests.get(url).text\n", " soup = BeautifulSoup(htmlContent, \"lxml\")\n", " dateStr = getDateStr(soup)\n", " return datetime.strptime(dateStr, DateProvider.DATE_FORMAT)" ] }, { "cell_type": "code", "execution_count": null, "id": "ffad1c04", "metadata": {}, "outputs": [], "source": [ "dateProvider = DateProvider()\n", "print(' lastUpdated:', dateProvider.getLastUpdated())\n", "print('lastUpdatedDataSource:', dateProvider.getLastUpdatedDataSource()) \n", "needsUpdate = dateProvider.needsUpdate()\n", "print('needsUpdate:', needsUpdate)" ] }, { "cell_type": "code", "execution_count": null, "id": "e673b947", "metadata": {}, "outputs": [], "source": [ "def getWorkingDirectory():\n", " pwd = ! \"pwd\"\n", " return pwd[0]" ] }, { "cell_type": "code", "execution_count": null, "id": "e313a06c", "metadata": {}, "outputs": [], "source": [ "pwd = getWorkingDirectory()" ] }, { "cell_type": "code", "execution_count": null, "id": "501f6c7b", "metadata": {}, "outputs": [], "source": [ "pwd" ] }, { "cell_type": "markdown", "id": "9514f5be", "metadata": {}, "source": [ "## Download VAERS-Data" ] }, { "cell_type": "code", "execution_count": null, "id": "8f0bfb9c", "metadata": {}, "outputs": [], "source": [ "import os\n", "import time\n", "\n", "class AndroidEmulator:\n", " \n", " @staticmethod\n", " def run(runnable):\n", " AndroidEmulator._start()\n", " result = runnable()\n", " AndroidEmulator._stop()\n", " return result\n", " \n", " @staticmethod\n", " def _start():\n", " os.system(\"/home/frankknoll/Android/Sdk/emulator/emulator -avd Pixel_2_API_30 -no-window &\")\n", " AndroidEmulator._waitUntilStarted()\n", " \n", " @staticmethod\n", " def _waitUntilStarted():\n", " while not AndroidEmulator._isStarted():\n", " time.sleep(1)\n", "\n", " @staticmethod\n", " def _isStarted():\n", " boot_completed = ! adb shell getprop sys.boot_completed\n", " return boot_completed[0] == '1'\n", "\n", " @staticmethod\n", " def _stop():\n", " ! adb emu kill" ] }, { "cell_type": "code", "execution_count": null, "id": "777ff543", "metadata": {}, "outputs": [], "source": [ "from selenium import webdriver\n", "from webdriver_manager.chrome import ChromeDriverManager\n", "from selenium.webdriver.chrome.service import Service as ChromeService\n", "from selenium.webdriver.chrome.options import Options\n", "from selenium.webdriver.common.by import By\n", "\n", "def _getOptions(downloadDir, isHeadless):\n", " options = Options()\n", " options.headless = isHeadless\n", " options.add_experimental_option(\"prefs\", {\"download.default_directory\" : downloadDir})\n", " return options\n", "\n", "def getWebDriver(downloadDir, isHeadless):\n", " return webdriver.Chrome(\n", " service = ChromeService(executable_path = ChromeDriverManager().install()),\n", " options = _getOptions(downloadDir, isHeadless))\n", "\n", "def saveCaptchaImageAs(driver, captchaImageFile):\n", " captchaImage = driver.find_element(By.CSS_SELECTOR, \"img[src='captchaImage']\")\n", " with open(captchaImageFile, 'wb') as file:\n", " file.write(captchaImage.screenshot_as_png)\n", "\n", "def existsElementWithId(driver, id):\n", " return len(driver.find_elements(By.ID, id)) > 0\n", "\n", "def isCaptchaSolved(driver):\n", " return not existsElementWithId(driver, \"wordverify\")\n" ] }, { "cell_type": "code", "execution_count": null, "id": "da7c965a", "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "\n", "def waitUntilDownloadHasFinished(file):\n", " while not os.path.exists(file):\n", " time.sleep(2)" ] }, { "cell_type": "code", "execution_count": null, "id": "d9b72506", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import tensorflow as tf\n", "from tensorflow import keras\n", "from tensorflow.keras import layers\n", "from PIL import Image\n", "import numpy as np\n", "import io\n", "\n", "# copied from value of characters variable in captcha_ocr.ipynb or captcha_ocr_trainAndSaveModel.ipynb\n", "characters = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'a', 'b', 'c', 'd', 'e', 'f']\n", "\n", "img_width = 241\n", "img_height = 62\n", "\n", "downsample_factor = 4\n", "\n", "# copied from value of max_length variable in captcha_ocr.ipynb or captcha_ocr_trainAndSaveModel.ipynb\n", "max_length = 6\n", "\n", "char_to_num = layers.StringLookup(\n", " vocabulary=list(characters),\n", " mask_token=None)\n", "\n", "num_to_char = layers.StringLookup(\n", " vocabulary=char_to_num.get_vocabulary(),\n", " mask_token=None, invert=True)\n", "\n", "def encode_single_sample(img_path):\n", " # 1. Read image\n", " img = tf.io.read_file(img_path)\n", " # 2. Decode and convert to grayscale\n", " img = tf.io.decode_png(img, channels=1)\n", " # 3. Convert to float32 in [0, 1] range\n", " img = tf.image.convert_image_dtype(img, tf.float32)\n", " # 4. Resize to the desired size\n", " img = tf.image.resize(img, [img_height, img_width])\n", " # 5. Transpose the image because we want the time\n", " # dimension to correspond to the width of the image.\n", " img = tf.transpose(img, perm=[1, 0, 2])\n", " # 7. Return a dict as our model is expecting two inputs\n", " return asSingleSampleBatch(img)\n", "\n", "def asSingleSampleBatch(img):\n", " array = keras.utils.img_to_array(img)\n", " array = np.expand_dims(array, axis=0)\n", " return array\n", "\n", "def decode_batch_predictions(pred):\n", " input_len = np.ones(pred.shape[0]) * pred.shape[1]\n", " # Use greedy search. For complex tasks, you can use beam search\n", " results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][:, :max_length]\n", " # Iterate over the results and get back the text\n", " output_text = []\n", " for res in results:\n", " res = tf.strings.reduce_join(num_to_char(res)).numpy().decode(\"utf-8\")\n", " output_text.append(res)\n", " return output_text\n", "\n", "def load_model():\n", " _model = keras.models.load_model('model')\n", " model = keras.models.Model(\n", " _model.get_layer(name=\"image\").input,\n", " _model.get_layer(name=\"dense2\").output)\n", " return model\n", "\n", "def getTextInCaptchaImage(captchaImageFile):\n", " batchImages = encode_single_sample(captchaImageFile)\n", " preds = model.predict(batchImages)\n", " return decode_batch_predictions(preds)[0]\n", "\n", "print(\"loading model...\")\n", "model = load_model()\n", "model.summary()" ] }, { "cell_type": "code", "execution_count": null, "id": "918d088d", "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "#def getTextInCaptchaImage(captchaImageFile):\n", "# baseDir = \"~/AndroidStudioProjects/TextRecognizer\"\n", "# ! cp $captchaImageFile $baseDir/app/src/main/assets/captchas/captcha_image.jpeg\n", "# ! cd $baseDir;./gradlew connectedAndroidTest\n", "# textInCaptchaImage = ! adb shell \"run-as org.textrecognizer cat /data/data/org.textrecognizer/files/captcha_image.txt\"\n", "# return textInCaptchaImage[0]\n", " \n", "def solveCaptchaAndStartFileDownload(driver, captchaImageFile):\n", " saveCaptchaImageAs(driver, captchaImageFile)\n", " textInCaptchaImage = getTextInCaptchaImage(captchaImageFile)\n", " display('textInCaptchaImage: ', textInCaptchaImage)\n", " driver.find_element(By.ID, \"verificationCode\").send_keys(textInCaptchaImage)\n", " driver.find_element(By.CSS_SELECTOR, '[name=\"downloadbut\"]').click()\n", "\n", "def downloadFile(absoluteFile, driver, maxTries):\n", " def _downloadFile():\n", " driver.get('https://vaers.hhs.gov/eSubDownload/index.jsp?fn=' + os.path.basename(absoluteFile))\n", " solveCaptchaAndStartFileDownload(driver, 'captchaImage.jpeg')\n", "\n", " numTries = 1\n", " _downloadFile()\n", " while(not isCaptchaSolved(driver) and (maxTries is None or numTries < maxTries)):\n", " _downloadFile()\n", " numTries = numTries + 1\n", "\n", " if isCaptchaSolved(driver):\n", " waitUntilDownloadHasFinished(absoluteFile)\n", " return absoluteFile\n", " else:\n", " return None\n", "\n", "def downloadVAERSFile(file, downloadDir):\n", " driver = getWebDriver(downloadDir, isHeadless = False)\n", " downloadedFile = downloadFile(\n", " absoluteFile = downloadDir + \"/\" + file,\n", " driver = driver,\n", " maxTries = None)\n", " driver.quit()\n", " return downloadedFile" ] }, { "cell_type": "code", "execution_count": null, "id": "9ccc73b3", "metadata": {}, "outputs": [], "source": [ "import zipfile\n", "import os\n", "\n", "def unzip(zipFile, dstDir):\n", " with zipfile.ZipFile(zipFile, 'r') as zip_ref:\n", " zip_ref.extractall(dstDir)\n", "\n", "def unzipAndRemove(zipFile, dstDir):\n", " unzip(zipFile, dstDir)\n", " os.remove(zipFile)\n", "\n", "def downloadVAERSFileAndUnzip(file):\n", " downloadedFile = downloadVAERSFile(file, getWorkingDirectory() + \"/VAERS/tmp\")\n", " unzipAndRemove(\n", " zipFile = downloadedFile,\n", " dstDir = getWorkingDirectory() + '/VAERS/')\n" ] }, { "cell_type": "code", "execution_count": null, "id": "9a9e4d1e", "metadata": {}, "outputs": [], "source": [ "if needsUpdate:\n", " downloadVAERSFileAndUnzip('2022VAERSData.zip')\n", " downloadVAERSFileAndUnzip('NonDomesticVAERSData.zip')" ] }, { "cell_type": "code", "execution_count": null, "id": "a271254b", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "class VaersDescrReader:\n", " \n", " def __init__(self, dataDir):\n", " self.dataDir = dataDir\n", "\n", " def readVaersDescrsForYears(self, years):\n", " return [self.readVaersDescrForYear(year) for year in years]\n", "\n", " def readVaersDescrForYear(self, year):\n", " return {\n", " 'VAERSDATA': self._readVAERSDATA('{dataDir}/{year}VAERSDATA.csv'.format(dataDir = self.dataDir, year = year)),\n", " 'VAERSVAX': self._readVAERSVAX('{dataDir}/{year}VAERSVAX.csv'.format(dataDir = self.dataDir, year = year))\n", " }\n", "\n", " def readNonDomesticVaersDescr(self):\n", " return {\n", " 'VAERSDATA': self._readVAERSDATA(self.dataDir + \"/NonDomesticVAERSDATA.csv\"),\n", " 'VAERSVAX': self._readVAERSVAX(self.dataDir + \"/NonDomesticVAERSVAX.csv\")\n", " }\n", "\n", " def _readVAERSDATA(self, file):\n", " return self._read_csv(\n", " file = file,\n", " usecols = ['VAERS_ID', 'RECVDATE', 'DIED', 'L_THREAT', 'DISABLE', 'HOSPITAL', 'ER_VISIT', 'SPLTTYPE'],\n", " parse_dates = ['RECVDATE'],\n", " date_parser = lambda dateStr: pd.to_datetime(dateStr, format = \"%m/%d/%Y\"))\n", "\n", " def _readVAERSVAX(self, file):\n", " return self._read_csv(\n", " file = file,\n", " usecols = ['VAERS_ID', 'VAX_DOSE_SERIES', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT'],\n", " dtype = {\"VAX_DOSE_SERIES\": \"string\"})\n", "\n", " def _read_csv(self, file, **kwargs):\n", " return pd.read_csv(\n", " file,\n", " index_col = 'VAERS_ID',\n", " encoding = 'latin1',\n", " low_memory = False,\n", " **kwargs)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "7b5d6df0", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "class VaersDescr2DataFrameConverter:\n", "\n", " @staticmethod\n", " def createDataFrameFromDescr(vaersDescr):\n", " return pd.merge(\n", " vaersDescr['VAERSDATA'],\n", " vaersDescr['VAERSVAX'],\n", " how = 'left',\n", " left_index = True,\n", " right_index = True,\n", " validate = 'one_to_many')\n", "\n", " @staticmethod\n", " def createDataFrameFromDescrs(vaersDescrs):\n", " dataFrames = [VaersDescr2DataFrameConverter.createDataFrameFromDescr(vaersDescr) for vaersDescr in vaersDescrs]\n", " return pd.concat(dataFrames)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "6b639196", "metadata": {}, "outputs": [], "source": [ "class DataFrameNormalizer:\n", " \n", " @staticmethod\n", " def normalize(dataFrame):\n", " DataFrameNormalizer.removeUnknownBatchCodes(dataFrame)\n", " DataFrameNormalizer.convertVAX_LOTColumnToUpperCase(dataFrame)\n", " DataFrameNormalizer._convertColumnsOfDataFrame_Y_to_1_else_0(\n", " dataFrame,\n", " ['DIED', 'L_THREAT', 'DISABLE', 'HOSPITAL', 'ER_VISIT'])\n", "\n", " @staticmethod\n", " def convertVAX_LOTColumnToUpperCase(dataFrame):\n", " dataFrame['VAX_LOT'] = dataFrame['VAX_LOT'].str.upper()\n", "\n", " @staticmethod\n", " def removeUnknownBatchCodes(dataFrame):\n", " dataFrame.drop(DataFrameNormalizer._isUnknownBatchCode(dataFrame).index, inplace = True)\n", "\n", " @staticmethod\n", " def _isUnknownBatchCode(dataFrame):\n", " return dataFrame[dataFrame['VAX_LOT'].str.contains(pat = 'UNKNOWN', regex = False, case = False, na = False)]\n", "\n", " @staticmethod\n", " def _convertColumnsOfDataFrame_Y_to_1_else_0(dataFrame, columns):\n", " for column in columns:\n", " DataFrameNormalizer._convertColumnOfDataFrame_Y_to_1_else_0(dataFrame, column)\n", "\n", " @staticmethod\n", " def _convertColumnOfDataFrame_Y_to_1_else_0(dataFrame, column):\n", " dataFrame[column] = DataFrameNormalizer._where(\n", " condition = dataFrame[column] == 'Y',\n", " trueValue = 1,\n", " falseValue = 0)\n", "\n", " @staticmethod\n", " def _where(condition, trueValue, falseValue):\n", " return np.where(condition, trueValue, falseValue) \n", " " ] }, { "cell_type": "code", "execution_count": null, "id": "3ebcba86", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "class DataFrameFilter:\n", " \n", " def filterByCovid19(self, dataFrame):\n", " return dataFrame[self._isCovid19(dataFrame)]\n", "\n", " def _isCovid19(self, dataFrame):\n", " return dataFrame[\"VAX_TYPE\"] == \"COVID19\"\n" ] }, { "cell_type": "code", "execution_count": null, "id": "c62cfaff", "metadata": {}, "outputs": [], "source": [ "class SummationTableFactory:\n", "\n", " @staticmethod\n", " def createSummationTable(dataFrame):\n", " summationTable = dataFrame.agg(\n", " **{\n", " 'Deaths': pd.NamedAgg(column = 'DIED', aggfunc = 'sum'),\n", " 'Adverse Reaction Reports': pd.NamedAgg(column = 'DIED', aggfunc = 'size'),\n", " 'Life Threatening Illnesses': pd.NamedAgg(column = 'L_THREAT', aggfunc = 'sum'), \n", " 'Disabilities': pd.NamedAgg(column = 'DISABLE', aggfunc = 'sum'),\n", " 'Severities': pd.NamedAgg(column = 'SEVERE', aggfunc = 'sum'),\n", " 'Countries': pd.NamedAgg(column = 'COUNTRY', aggfunc = SummationTableFactory.countries2str)\n", " })\n", " summationTable['Severe reports'] = summationTable['Severities'] / summationTable['Adverse Reaction Reports'] * 100\n", " summationTable['Lethality'] = summationTable['Deaths'] / summationTable['Adverse Reaction Reports'] * 100\n", " return summationTable[\n", " [\n", " 'Adverse Reaction Reports',\n", " 'Deaths',\n", " 'Disabilities',\n", " 'Life Threatening Illnesses',\n", " 'Severe reports',\n", " 'Lethality',\n", " 'Countries'\n", " ]]\n", "\n", " @staticmethod\n", " def countries2str(countries):\n", " return ', '.join(sorted(set(countries)))" ] }, { "cell_type": "code", "execution_count": null, "id": "c40bd0f0", "metadata": {}, "outputs": [], "source": [ "import pycountry\n", "\n", "class CountryColumnAdder:\n", " \n", " @staticmethod\n", " def addCountryColumn(dataFrame):\n", " dataFrame['COUNTRY'] = CountryColumnAdder.getCountryColumn(dataFrame)\n", " return dataFrame.astype({'COUNTRY': \"string\"})\n", "\n", " @staticmethod\n", " def getCountryColumn(dataFrame):\n", " return dataFrame.apply(\n", " lambda row:\n", " CountryColumnAdder._getCountryNameOfSplttypeOrDefault(\n", " splttype = row['SPLTTYPE'],\n", " default = 'Unknown Country'),\n", " axis = 'columns')\n", "\n", " @staticmethod\n", " def _getCountryNameOfSplttypeOrDefault(splttype, default):\n", " if not isinstance(splttype, str):\n", " return default\n", " \n", " country = pycountry.countries.get(alpha_2 = splttype[:2])\n", " return country.name if country is not None else default" ] }, { "cell_type": "code", "execution_count": null, "id": "3abe3384", "metadata": {}, "outputs": [], "source": [ "class SevereColumnAdder:\n", " \n", " @staticmethod\n", " def addSevereColumn(dataFrame):\n", " dataFrame['SEVERE'] = (dataFrame['DIED'] + dataFrame['L_THREAT'] + dataFrame['DISABLE']) > 0\n", " dataFrame['SEVERE'].replace({True: 1, False: 0}, inplace = True)\n", " return dataFrame\n" ] }, { "cell_type": "code", "execution_count": null, "id": "2dad09e5", "metadata": {}, "outputs": [], "source": [ "class CompanyColumnAdder:\n", " \n", " def __init__(self, dataFrame_VAX_LOT_VAX_MANU):\n", " self.dataFrame_VAX_LOT_VAX_MANU = dataFrame_VAX_LOT_VAX_MANU\n", "\n", " def addCompanyColumn(self, batchCodeTable):\n", " return pd.merge(\n", " batchCodeTable,\n", " self._createCompanyByBatchCodeTable(),\n", " how = 'left',\n", " left_index = True,\n", " right_index = True,\n", " validate = 'one_to_one')\n", "\n", " def _createCompanyByBatchCodeTable(self):\n", " manufacturerByBatchCodeTable = self.dataFrame_VAX_LOT_VAX_MANU[['VAX_LOT', 'VAX_MANU']]\n", " manufacturerByBatchCodeTable = manufacturerByBatchCodeTable.drop_duplicates(subset = ['VAX_LOT'])\n", " manufacturerByBatchCodeTable = manufacturerByBatchCodeTable.set_index('VAX_LOT')\n", " return manufacturerByBatchCodeTable.rename(columns = {\"VAX_MANU\": \"Company\"})" ] }, { "cell_type": "code", "execution_count": null, "id": "71456a79", "metadata": {}, "outputs": [], "source": [ "class BatchCodeTableFactory:\n", "\n", " def __init__(self, dataFrame: pd.DataFrame):\n", " self.dataFrame = dataFrame\n", " self.companyColumnAdder = CompanyColumnAdder(dataFrame)\n", " self.countryBatchCodeTable = SummationTableFactory.createSummationTable(\n", " dataFrame.groupby(\n", " [\n", " dataFrame['COUNTRY'],\n", " dataFrame['VAX_LOT']\n", " ]))\n", "\n", " def createGlobalBatchCodeTable(self):\n", " return self._postProcess(SummationTableFactory.createSummationTable(self.dataFrame.groupby('VAX_LOT')))\n", "\n", " def createBatchCodeTableByCountry(self, country):\n", " return self._postProcess(self._getBatchCodeTableByCountry(country))\n", "\n", " def _postProcess(self, batchCodeTable):\n", " batchCodeTable = self.companyColumnAdder.addCompanyColumn(batchCodeTable)\n", " batchCodeTable = batchCodeTable[\n", " [\n", " 'Adverse Reaction Reports',\n", " 'Deaths',\n", " 'Disabilities',\n", " 'Life Threatening Illnesses',\n", " 'Company',\n", " 'Countries',\n", " 'Severe reports',\n", " 'Lethality'\n", " ]]\n", " return batchCodeTable.sort_values(by = 'Severe reports', ascending = False)\n", "\n", " def _getBatchCodeTableByCountry(self, country):\n", " if country in self.countryBatchCodeTable.index:\n", " return self.countryBatchCodeTable.loc[country]\n", " else:\n", " return self._getEmptyBatchCodeTable()\n", "\n", " def _getEmptyBatchCodeTable(self):\n", " return self.countryBatchCodeTable[0:0].droplevel(0)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "4db36933", "metadata": {}, "outputs": [], "source": [ "from bs4 import BeautifulSoup\n", "\n", "class HtmlTransformerUtil:\n", " \n", " def applySoupTransformerToFile(self, file, soupTransformer):\n", " self._writeSoup(soupTransformer(self._readSoup(file)), file)\n", "\n", " def _readSoup(self, file):\n", " with open(file) as fp:\n", " soup = BeautifulSoup(fp, 'lxml')\n", " return soup\n", "\n", " def _writeSoup(self, soup, file):\n", " with open(file, \"w\") as fp:\n", " fp.write(str(soup)) \n" ] }, { "cell_type": "code", "execution_count": null, "id": "32d4eecf", "metadata": {}, "outputs": [], "source": [ "from bs4 import BeautifulSoup\n", "\n", "\n", "class CountryOptionsSetter:\n", "\n", " def setCountryOptions(self, html, options):\n", " soup = self._setCountryOptions(self._parse(html), self._parseOptions(options))\n", " return str(soup)\n", "\n", " def _setCountryOptions(self, soup, options):\n", " countrySelect = soup.find(id = \"countrySelect\")\n", " countrySelect.clear()\n", " for option in options:\n", " countrySelect.append(option)\n", " return soup\n", "\n", " def _parseOptions(self, options):\n", " return [self._parse(option).option for option in options]\n", "\n", " def _parse(self, html):\n", " return BeautifulSoup(html, 'lxml')\n" ] }, { "cell_type": "code", "execution_count": null, "id": "2f0f9b4b", "metadata": {}, "outputs": [], "source": [ "from bs4 import BeautifulSoup\n", "\n", "\n", "def saveCountryOptions(countryOptions):\n", " HtmlTransformerUtil().applySoupTransformerToFile(\n", " file = \"../docs/batchCodeTable.html\",\n", " soupTransformer =\n", " lambda soup:\n", " BeautifulSoup(\n", " CountryOptionsSetter().setCountryOptions(html = str(soup), options = countryOptions),\n", " 'lxml'))\n" ] }, { "cell_type": "code", "execution_count": null, "id": "f02dddfe", "metadata": {}, "outputs": [], "source": [ "def saveLastUpdatedBatchCodeTable(lastUpdated):\n", " def setLastUpdated(soup):\n", " soup.find(id = \"last_updated\").string.replace_with(lastUpdated.strftime(DateProvider.DATE_FORMAT))\n", " return soup\n", "\n", " HtmlTransformerUtil().applySoupTransformerToFile(\n", " file = \"../docs/batchCodeTable.html\",\n", " soupTransformer = setLastUpdated)" ] }, { "cell_type": "code", "execution_count": null, "id": "6aa28541", "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "class IOUtils:\n", "\n", " @staticmethod\n", " def saveDataFrame(dataFrame, file):\n", " # IOUtils.saveDataFrameAsExcelFile(dataFrame, file)\n", " # IOUtils.saveDataFrameAsHtml(dataFrame, file)\n", " IOUtils.saveDataFrameAsJson(dataFrame, file)\n", "\n", " @staticmethod\n", " def saveDataFrameAsExcelFile(dataFrame, file):\n", " IOUtils.ensurePath(file)\n", " dataFrame.to_excel(file + '.xlsx')\n", "\n", " @staticmethod\n", " def saveDataFrameAsHtml(dataFrame, file):\n", " IOUtils.ensurePath(file)\n", " dataFrame.reset_index().to_html(\n", " file + '.html',\n", " index = False,\n", " table_id = 'batchCodeTable',\n", " classes = 'display',\n", " justify = 'unset',\n", " border = 0)\n", "\n", " @staticmethod\n", " def saveDataFrameAsJson(dataFrame, file):\n", " IOUtils.ensurePath(file)\n", " dataFrame.reset_index().to_json(\n", " file + '.json',\n", " orient = \"split\",\n", " index = False)\n", "\n", " @staticmethod\n", " def ensurePath(file):\n", " directory = os.path.dirname(file)\n", " if not os.path.exists(directory):\n", " os.makedirs(directory)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "3dacedfd", "metadata": {}, "outputs": [], "source": [ "import unittest" ] }, { "cell_type": "code", "execution_count": null, "id": "fcc855dd", "metadata": {}, "outputs": [], "source": [ "class TestHelper:\n", "\n", " @staticmethod\n", " def createDataFrame(index, columns, data, dtypes = {}):\n", " return pd.DataFrame(index = index, columns = columns, data = data).astype(dtypes)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "ccb9838d", "metadata": {}, "outputs": [], "source": [ "from pandas.testing import assert_frame_equal\n", "\n", "class DataFrameNormalizerTest(unittest.TestCase):\n", "\n", " def test_convertVAX_LOTColumnToUpperCase(self):\n", " # Given\n", " dataFrame = TestHelper.createDataFrame(\n", " columns = ['VAX_LOT'],\n", " data = [ ['037K20A'],\n", " ['025l20A'],\n", " ['025L20A']],\n", " index = [\n", " \"0916600\",\n", " \"0916601\",\n", " \"1996874\"])\n", " \n", " # When\n", " DataFrameNormalizer.convertVAX_LOTColumnToUpperCase(dataFrame)\n", " \n", " # Then\n", " dataFrameExpected = TestHelper.createDataFrame(\n", " columns = ['VAX_LOT'],\n", " data = [ ['037K20A'],\n", " ['025L20A'],\n", " ['025L20A']],\n", " index = [\n", " \"0916600\",\n", " \"0916601\",\n", " \"1996874\"])\n", " assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)\n", "\n", " def test_removeUnknownBatchCodes(self):\n", " # Given\n", " dataFrame = TestHelper.createDataFrame(\n", " columns = ['VAX_LOT'],\n", " data = [ ['UNKNOWN'],\n", " ['N/A Unknown'],\n", " [np.nan],\n", " ['UNKNOWN TO ME'],\n", " ['030L20B']],\n", " index = [\n", " \"1048786\",\n", " \"1048786\",\n", " \"123\",\n", " \"4711\",\n", " \"0815\"])\n", " \n", " # When\n", " DataFrameNormalizer.removeUnknownBatchCodes(dataFrame)\n", " \n", " # Then\n", " dataFrameExpected = TestHelper.createDataFrame(\n", " columns = ['VAX_LOT'],\n", " data = [ [np.nan],\n", " ['030L20B']],\n", " index = [\n", " \"123\",\n", " \"0815\"])\n", " assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)" ] }, { "cell_type": "code", "execution_count": null, "id": "e59a1825", "metadata": {}, "outputs": [], "source": [ "from pandas.testing import assert_frame_equal\n", "\n", "class DataFrameFilterTest(unittest.TestCase):\n", "\n", " def test_filterByCovid19(self):\n", " # Given\n", " dataFrame = VaersDescr2DataFrameConverter.createDataFrameFromDescrs(\n", " [\n", " {\n", " 'VAERSDATA': TestHelper.createDataFrame(\n", " columns = ['DIED', 'L_THREAT', 'DISABLE'],\n", " data = [ [1, 0, 0],\n", " [0, 0, 1]],\n", " index = [\n", " \"0916600\",\n", " \"0916601\"]),\n", " 'VAERSVAX': TestHelper.createDataFrame(\n", " columns = ['VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES'],\n", " data = [ ['COVID19', 'MODERNA', '037K20A', '1'],\n", " ['COVID19', 'MODERNA', '025L20A', '1']],\n", " index = [\n", " \"0916600\",\n", " \"0916601\"],\n", " dtypes = {'VAX_DOSE_SERIES': \"string\"})\n", " },\n", " {\n", " 'VAERSDATA': TestHelper.createDataFrame(\n", " columns = ['DIED', 'L_THREAT', 'DISABLE'],\n", " data = [ [0, 0, 0],\n", " [0, 0, 1]],\n", " index = [\n", " \"1996873\",\n", " \"1996874\"]),\n", " 'VAERSVAX': TestHelper.createDataFrame(\n", " columns = ['VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES'],\n", " data = [ ['HPV9', 'MERCK & CO. INC.', 'R017624', 'UNK'],\n", " ['COVID19', 'MODERNA', '025L20A', '1']],\n", " index = [\n", " \"1996873\",\n", " \"1996874\"],\n", " dtypes = {'VAX_DOSE_SERIES': \"string\"})\n", " }\n", " ])\n", " dataFrameFilter = DataFrameFilter()\n", " \n", " # When\n", " dataFrame = dataFrameFilter.filterByCovid19(dataFrame)\n", " \n", " # Then\n", " dataFrameExpected = TestHelper.createDataFrame(\n", " columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES'],\n", " data = [ [1, 0, 0, 'COVID19', 'MODERNA', '037K20A', '1'],\n", " [0, 0, 1, 'COVID19', 'MODERNA', '025L20A', '1'],\n", " [0, 0, 1, 'COVID19', 'MODERNA', '025L20A', '1']],\n", " index = [\n", " \"0916600\",\n", " \"0916601\",\n", " \"1996874\"],\n", " dtypes = {'VAX_DOSE_SERIES': \"string\"})\n", " assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "c784bfef", "metadata": {}, "outputs": [], "source": [ "from pandas.testing import assert_frame_equal\n", "\n", "class BatchCodeTableFactoryTest(unittest.TestCase):\n", "\n", " def test_createBatchCodeTableByCountry(self):\n", " # Given\n", " dataFrame = TestHelper.createDataFrame(\n", " columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE', 'HOSPITAL', 'ER_VISIT', 'COUNTRY'],\n", " data = [ [1, 0, 0, 'COVID19', 'PFIZER\\BIONTECH', '016M20A', '2', 'GBPFIZER INC2020486806', 0, 0, 'United Kingdom'],\n", " [0, 0, 0, 'COVID19', 'MODERNA', '030L20A', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n", " [1, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n", " [0, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France']],\n", " index = [\n", " \"1048786\",\n", " \"1048786\",\n", " \"4711\",\n", " \"0815\"])\n", " dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)\n", " batchCodeTableFactory = BatchCodeTableFactory(dataFrame)\n", " \n", " # When\n", " batchCodeTable = batchCodeTableFactory.createBatchCodeTableByCountry('France')\n", "\n", " # Then\n", " assert_frame_equal(\n", " batchCodeTable,\n", " TestHelper.createDataFrame(\n", " columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company', 'Countries', 'Severe reports', 'Lethality'],\n", " data = [ [2, 1, 2, 2, 'MODERNA', 'France', 2/2 * 100, 1/2 * 100],\n", " [1, 0, 0, 0, 'MODERNA', 'France', 0/1 * 100, 0/1 * 100]],\n", " index = pd.Index(\n", " [\n", " '030L20B',\n", " '030L20A'\n", " ],\n", " name = 'VAX_LOT')),\n", " check_dtype = False)\n", "\n", " def test_createGlobalBatchCodeTable(self):\n", " # Given\n", " dataFrame = TestHelper.createDataFrame(\n", " columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE', 'HOSPITAL', 'ER_VISIT', 'COUNTRY'],\n", " data = [ [1, 0, 0, 'COVID19', 'PFIZER\\BIONTECH', '016M20A', '2', 'GBPFIZER INC2020486806', 0, 0, 'United Kingdom'],\n", " [0, 0, 0, 'COVID19', 'MODERNA', '030L20A', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n", " [1, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n", " [0, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'United Kingdom']],\n", " index = [\n", " \"1048786\",\n", " \"1048786\",\n", " \"4711\",\n", " \"0815\"])\n", " dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)\n", " batchCodeTableFactory = BatchCodeTableFactory(dataFrame)\n", " \n", " # When\n", " batchCodeTable = batchCodeTableFactory.createGlobalBatchCodeTable()\n", "\n", " # Then\n", " assert_frame_equal(\n", " batchCodeTable,\n", " TestHelper.createDataFrame(\n", " columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company', 'Countries', 'Severe reports', 'Lethality'],\n", " data = [ [1, 1, 0, 0, 'PFIZER\\BIONTECH', 'United Kingdom', 1/1 * 100, 1/1 * 100],\n", " [2, 1, 2, 2, 'MODERNA', 'France, United Kingdom', 2/2 * 100, 1/2 * 100],\n", " [1, 0, 0, 0, 'MODERNA', 'France', 0/1 * 100, 0/1 * 100]],\n", " index = pd.Index(\n", " [\n", " '016M20A',\n", " '030L20B',\n", " '030L20A'\n", " ],\n", " name = 'VAX_LOT')),\n", " check_dtype = False)\n", "\n", " def test_createBatchCodeTableByNonExistingCountry(self):\n", " # Given\n", " dataFrame = TestHelper.createDataFrame(\n", " columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE', 'HOSPITAL', 'ER_VISIT', 'COUNTRY'],\n", " data = [ [1, 0, 0, 'COVID19', 'PFIZER\\BIONTECH', '016M20A', '2', 'GBPFIZER INC2020486806', 0, 0, 'United Kingdom'],\n", " [0, 0, 0, 'COVID19', 'MODERNA', '030L20A', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n", " [1, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n", " [0, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France']],\n", " index = [\n", " \"1048786\",\n", " \"1048786\",\n", " \"4711\",\n", " \"0815\"])\n", " dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)\n", " batchCodeTableFactory = BatchCodeTableFactory(dataFrame)\n", " \n", " # When\n", " batchCodeTable = batchCodeTableFactory.createBatchCodeTableByCountry('non existing country')\n", "\n", " # Then\n", " assert_frame_equal(\n", " batchCodeTable,\n", " TestHelper.createDataFrame(\n", " columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company', 'Countries', 'Severe reports', 'Lethality'],\n", " data = [ ],\n", " index = pd.Index([], name = 'VAX_LOT')),\n", " check_dtype = False)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "125351b3", "metadata": {}, "outputs": [], "source": [ "class CountryOptionsSetterTest(unittest.TestCase):\n", "\n", " def test_setCountryOptions(self):\n", " # Given\n", " countryOptionsSetter = CountryOptionsSetter()\n", "\n", " # When\n", " htmlActual = countryOptionsSetter.setCountryOptions(\n", " html='''\n", " \n", "
\n", "Test
\n", " \n", " \n", " \n", " ''',\n", " options=[\n", " '',\n", " '',\n", " ''])\n", "\n", " # Then\n", " assertEqualHTML(\n", " htmlActual,\n", " '''\n", " \n", " \n", "Test
\n", " \n", " \n", " \n", " ''')\n", "\n", "# adapted from https://stackoverflow.com/questions/8006909/pretty-print-assertequal-for-html-strings\n", "def assertEqualHTML(string1, string2, file1='', file2=''):\n", " u'''\n", " Compare two unicode strings containing HTML.\n", " A human friendly diff goes to logging.error() if they\n", " are not equal, and an exception gets raised.\n", " '''\n", " from bs4 import BeautifulSoup as bs\n", " import difflib\n", "\n", " def short(mystr):\n", " max = 20\n", " if len(mystr) > max:\n", " return mystr[:max]\n", " return mystr\n", " p = []\n", " for mystr, file in [(string1, file1), (string2, file2)]:\n", " if not isinstance(mystr, str):\n", " raise Exception(u'string ist not unicode: %r %s' %\n", " (short(mystr), file))\n", " soup = bs(mystr)\n", " pretty = soup.prettify()\n", " p.append(pretty)\n", " if p[0] != p[1]:\n", " for line in difflib.unified_diff(p[0].splitlines(), p[1].splitlines(), fromfile=file1, tofile=file2):\n", " display(line)\n", " display(p[0], ' != ', p[1])\n", " raise Exception('Not equal %s %s' % (file1, file2))\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5a8bff1b", "metadata": {}, "outputs": [], "source": [ "unittest.main(argv = [''], verbosity = 2, exit = False)" ] }, { "cell_type": "code", "execution_count": null, "id": "86e0e4f2", "metadata": {}, "outputs": [], "source": [ "def getVaersForYears(years):\n", " def addCountryColumn(dataFrame):\n", " dataFrame['COUNTRY'] = 'United States'\n", " return dataFrame\n", "\n", " return _getVaers(\n", " _getVaersDescrReader().readVaersDescrsForYears(years),\n", " addCountryColumn)\n", "\n", "def getNonDomesticVaers():\n", " return _getVaers(\n", " [_getVaersDescrReader().readNonDomesticVaersDescr()],\n", " CountryColumnAdder.addCountryColumn)\n", "\n", "def _getVaersDescrReader():\n", " return VaersDescrReader(dataDir = \"VAERS\")\n", "\n", "def _getVaers(vaersDescrs, addCountryColumn):\n", " dataFrame = VaersDescr2DataFrameConverter.createDataFrameFromDescrs(vaersDescrs)\n", " dataFrame = addCountryColumn(dataFrame)\n", " DataFrameNormalizer.normalize(dataFrame)\n", " dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)\n", " return dataFrame" ] }, { "cell_type": "code", "execution_count": null, "id": "781ac80e", "metadata": {}, "outputs": [], "source": [ "internationalVaers = pd.concat([getVaersForYears([2020, 2021, 2022]), getNonDomesticVaers()])\n", "internationalVaersCovid19 = DataFrameFilter().filterByCovid19(internationalVaers)\n", "internationalVaersCovid19" ] }, { "cell_type": "code", "execution_count": null, "id": "ff259a35", "metadata": {}, "outputs": [], "source": [ "def createAndSaveBatchCodeTableForCountry(createBatchCodeTableForCountry, country, minADRsForLethality = None):\n", " batchCodeTable = createBatchCodeTableForCountry(country)\n", " batchCodeTable.index.set_names(\"Batch\", inplace = True)\n", " if minADRsForLethality is not None:\n", " batchCodeTable.loc[batchCodeTable['Adverse Reaction Reports'] < minADRsForLethality, ['Severe reports', 'Lethality']] = [np.nan, np.nan]\n", " IOUtils.saveDataFrame(batchCodeTable, '../docs/data/batchCodeTables/' + country)\n", " # display(country + \":\", batchCodeTable)\n", " display(country)\n", "\n", "def createAndSaveBatchCodeTablesForCountries(createBatchCodeTableForCountry, countries, minADRsForLethality = None):\n", " for country in countries:\n", " createAndSaveBatchCodeTableForCountry(createBatchCodeTableForCountry, country, minADRsForLethality)" ] }, { "cell_type": "code", "execution_count": null, "id": "cc1ef82a", "metadata": {}, "outputs": [], "source": [ "def getCountryOptions(countries):\n", " return [getCountryOption(country) for country in countries]\n", "\n", "def getCountryOption(country):\n", " return ''.format(country = country)" ] }, { "cell_type": "code", "execution_count": null, "id": "0c4d04fb", "metadata": {}, "outputs": [], "source": [ "countries = sorted(internationalVaersCovid19['COUNTRY'].unique())\n", "countryOptions = [''] + getCountryOptions(countries)" ] }, { "cell_type": "code", "execution_count": null, "id": "8d6507ca", "metadata": {}, "outputs": [], "source": [ "saveCountryOptions(countryOptions)" ] }, { "cell_type": "code", "execution_count": null, "id": "9c7485b5", "metadata": {}, "outputs": [], "source": [ "saveLastUpdatedBatchCodeTable(dateProvider.getLastUpdatedDataSource())" ] }, { "cell_type": "code", "execution_count": null, "id": "7e7e01a5", "metadata": {}, "outputs": [], "source": [ "minADRsForLethality = 100\n", "batchCodeTableFactory = BatchCodeTableFactory(internationalVaersCovid19)\n", "\n", "createAndSaveBatchCodeTablesForCountries(\n", " createBatchCodeTableForCountry = lambda country: batchCodeTableFactory.createBatchCodeTableByCountry(country),\n", " countries = countries,\n", " minADRsForLethality = minADRsForLethality)\n", "\n", "createAndSaveBatchCodeTableForCountry(\n", " createBatchCodeTableForCountry = lambda country: batchCodeTableFactory.createGlobalBatchCodeTable(),\n", " country = 'Global',\n", " minADRsForLethality = minADRsForLethality)" ] }, { "cell_type": "code", "execution_count": null, "id": "8de69f66", "metadata": {}, "outputs": [], "source": [ "def publishGitHubPages():\n", " %cd /home/frankknoll/Dokumente/Corona/projects/HowBadIsMyBatch-pages\n", " ! git add -A\n", " ! git commit -m \"updating data\"\n", " ! git push" ] }, { "cell_type": "markdown", "id": "3c7319f3", "metadata": {}, "source": [ "### see https://knollfrank.github.io/HowBadIsMyBatch/batchCodeTable.html" ] }, { "cell_type": "code", "execution_count": null, "id": "865df645", "metadata": {}, "outputs": [], "source": [ "publishGitHubPages()" ] } ], "metadata": { "interpreter": { "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1" }, "kernelspec": { "display_name": "Python 3.10.4 64-bit", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.4" }, "vscode": { "interpreter": { "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1" } } }, "nbformat": 4, "nbformat_minor": 5 }