Files
HowBadIsMyBatch/src/HowBadIsMyBatch.ipynb
2022-03-26 10:50:36 +01:00

1043 lines
41 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "9de5907f-18f5-4cb1-903e-26028ff1fa03",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"pd.set_option('display.max_rows', 100)\n",
"pd.set_option('display.max_columns', None)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1dbf9321",
"metadata": {},
"outputs": [],
"source": [
"from bs4 import BeautifulSoup\n",
"import requests\n",
"import re\n",
"from datetime import datetime\n",
"\n",
"class DateProvider:\n",
" \n",
" DATE_FORMAT = \"%B %d, %Y\"\n",
"\n",
" def __init__(self):\n",
" self.lastUpdated = None\n",
" self.lastUpdatedDataSource = None\n",
"\n",
" def needsUpdate(self):\n",
" return self.getLastUpdated() < self.getLastUpdatedDataSource()\n",
" \n",
" def getLastUpdated(self):\n",
" if self.lastUpdated is None:\n",
" self.lastUpdated = self.__getLastUpdated(\n",
" url = \"https://knollfrank.github.io/HowBadIsMyBatch/batchCodeTable.html\",\n",
" getDateStr = lambda soup: soup.find(id = \"last_updated\").text)\n",
" \n",
" return self.lastUpdated\n",
"\n",
" def getLastUpdatedDataSource(self):\n",
" if self.lastUpdatedDataSource is None:\n",
" def getDateStr(soup):\n",
" lastUpdated = soup.find(string = re.compile(\"Last updated\"))\n",
" return re.search('Last updated: (.+).', lastUpdated).group(1)\n",
"\n",
" self.lastUpdatedDataSource = self.__getLastUpdated(\n",
" url = \"https://vaers.hhs.gov/data/datasets.html\",\n",
" getDateStr = getDateStr)\n",
"\n",
" return self.lastUpdatedDataSource\n",
"\n",
" def __getLastUpdated(self, url, getDateStr):\n",
" htmlContent = requests.get(url).text\n",
" soup = BeautifulSoup(htmlContent, \"lxml\")\n",
" dateStr = getDateStr(soup)\n",
" return datetime.strptime(dateStr, DateProvider.DATE_FORMAT)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ffad1c04",
"metadata": {},
"outputs": [],
"source": [
"dateProvider = DateProvider()\n",
"print(' lastUpdated:', dateProvider.getLastUpdated())\n",
"print('lastUpdatedDataSource:', dateProvider.getLastUpdatedDataSource()) \n",
"needsUpdate = dateProvider.needsUpdate()\n",
"print('needsUpdate:', needsUpdate)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a271254b",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"class VaersDescrReader:\n",
" \n",
" def __init__(self, dataDir):\n",
" self.dataDir = dataDir\n",
"\n",
" def readVaersDescrsForYears(self, years):\n",
" return [self.readVaersDescrForYear(year) for year in years]\n",
"\n",
" def readVaersDescrForYear(self, year):\n",
" return {\n",
" 'VAERSDATA': self._readVAERSDATA('{dataDir}/{year}VAERSDATA.csv'.format(dataDir = self.dataDir, year = year)),\n",
" 'VAERSVAX': self._readVAERSVAX('{dataDir}/{year}VAERSVAX.csv'.format(dataDir = self.dataDir, year = year))\n",
" }\n",
"\n",
" def readNonDomesticVaersDescr(self):\n",
" return {\n",
" 'VAERSDATA': self._readVAERSDATA(self.dataDir + \"/NonDomesticVAERSDATA.csv\"),\n",
" 'VAERSVAX': self._readVAERSVAX(self.dataDir + \"/NonDomesticVAERSVAX.csv\")\n",
" }\n",
"\n",
" def _readVAERSDATA(self, file):\n",
" return self._read_csv(\n",
" file = file,\n",
" usecols = ['VAERS_ID', 'RECVDATE', 'DIED', 'L_THREAT', 'DISABLE', 'HOSPITAL', 'ER_VISIT', 'SPLTTYPE'],\n",
" parse_dates = ['RECVDATE'],\n",
" date_parser = lambda dateStr: pd.to_datetime(dateStr, format = \"%m/%d/%Y\"))\n",
"\n",
" def _readVAERSVAX(self, file):\n",
" return self._read_csv(\n",
" file = file,\n",
" usecols = ['VAERS_ID', 'VAX_DOSE_SERIES', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT'],\n",
" dtype = {\"VAX_DOSE_SERIES\": \"string\"})\n",
"\n",
" def _read_csv(self, file, **kwargs):\n",
" return pd.read_csv(\n",
" file,\n",
" index_col = 'VAERS_ID',\n",
" encoding = 'latin1',\n",
" low_memory = False,\n",
" **kwargs)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7b5d6df0",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"class VaersDescr2DataFrameConverter:\n",
"\n",
" @staticmethod\n",
" def createDataFrameFromDescr(vaersDescr):\n",
" return pd.merge(\n",
" vaersDescr['VAERSDATA'],\n",
" vaersDescr['VAERSVAX'],\n",
" how = 'left',\n",
" left_index = True,\n",
" right_index = True,\n",
" validate = 'one_to_many')\n",
"\n",
" @staticmethod\n",
" def createDataFrameFromDescrs(vaersDescrs):\n",
" dataFrames = [VaersDescr2DataFrameConverter.createDataFrameFromDescr(vaersDescr) for vaersDescr in vaersDescrs]\n",
" return pd.concat(dataFrames)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6b639196",
"metadata": {},
"outputs": [],
"source": [
"class DataFrameNormalizer:\n",
" \n",
" @staticmethod\n",
" def normalize(dataFrame):\n",
" DataFrameNormalizer.removeUnknownBatchCodes(dataFrame)\n",
" DataFrameNormalizer.convertVAX_LOTColumnToUpperCase(dataFrame)\n",
" DataFrameNormalizer._convertColumnsOfDataFrame_Y_to_1_else_0(\n",
" dataFrame,\n",
" ['DIED', 'L_THREAT', 'DISABLE', 'HOSPITAL', 'ER_VISIT'])\n",
"\n",
" @staticmethod\n",
" def convertVAX_LOTColumnToUpperCase(dataFrame):\n",
" dataFrame['VAX_LOT'] = dataFrame['VAX_LOT'].str.upper()\n",
"\n",
" @staticmethod\n",
" def removeUnknownBatchCodes(dataFrame):\n",
" dataFrame.drop(DataFrameNormalizer._isUnknownBatchCode(dataFrame).index, inplace = True)\n",
"\n",
" @staticmethod\n",
" def _isUnknownBatchCode(dataFrame):\n",
" return dataFrame[dataFrame['VAX_LOT'].str.contains(pat = 'UNKNOWN', regex = False, case = False, na = False)]\n",
"\n",
" @staticmethod\n",
" def _convertColumnsOfDataFrame_Y_to_1_else_0(dataFrame, columns):\n",
" for column in columns:\n",
" DataFrameNormalizer._convertColumnOfDataFrame_Y_to_1_else_0(dataFrame, column)\n",
"\n",
" @staticmethod\n",
" def _convertColumnOfDataFrame_Y_to_1_else_0(dataFrame, column):\n",
" dataFrame[column] = DataFrameNormalizer._where(\n",
" condition = dataFrame[column] == 'Y',\n",
" trueValue = 1,\n",
" falseValue = 0)\n",
"\n",
" @staticmethod\n",
" def _where(condition, trueValue, falseValue):\n",
" return np.where(condition, trueValue, falseValue) \n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3ebcba86",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"class DataFrameFilter:\n",
" \n",
" def filterByCovid19(self, dataFrame):\n",
" return dataFrame[self._isCovid19(dataFrame)]\n",
"\n",
" def _isCovid19(self, dataFrame):\n",
" return dataFrame[\"VAX_TYPE\"] == \"COVID19\"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c62cfaff",
"metadata": {},
"outputs": [],
"source": [
"class SummationTableFactory:\n",
"\n",
" @staticmethod\n",
" def createSummationTable(dataFrame):\n",
" summationTable = dataFrame.agg(\n",
" **{\n",
" 'Deaths': pd.NamedAgg(column = 'DIED', aggfunc = 'sum'),\n",
" 'Adverse Reaction Reports': pd.NamedAgg(column = 'DIED', aggfunc = 'size'),\n",
" 'Life Threatening Illnesses': pd.NamedAgg(column = 'L_THREAT', aggfunc = 'sum'), \n",
" 'Disabilities': pd.NamedAgg(column = 'DISABLE', aggfunc = 'sum'),\n",
" 'Severities': pd.NamedAgg(column = 'SEVERE', aggfunc = 'sum'),\n",
" 'Countries': pd.NamedAgg(column = 'COUNTRY', aggfunc = SummationTableFactory.countries2str)\n",
" })\n",
" summationTable['Severe reports'] = summationTable['Severities'] / summationTable['Adverse Reaction Reports'] * 100\n",
" summationTable['Lethality'] = summationTable['Deaths'] / summationTable['Adverse Reaction Reports'] * 100\n",
" return summationTable[\n",
" [\n",
" 'Adverse Reaction Reports',\n",
" 'Deaths',\n",
" 'Disabilities',\n",
" 'Life Threatening Illnesses',\n",
" 'Severe reports',\n",
" 'Lethality',\n",
" 'Countries'\n",
" ]]\n",
"\n",
" @staticmethod\n",
" def countries2str(countries):\n",
" return ', '.join(sorted(set(countries)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c40bd0f0",
"metadata": {},
"outputs": [],
"source": [
"import pycountry\n",
"\n",
"class CountryColumnAdder:\n",
" \n",
" @staticmethod\n",
" def addCountryColumn(dataFrame):\n",
" dataFrame['COUNTRY'] = CountryColumnAdder.getCountryColumn(dataFrame)\n",
" return dataFrame.astype({'COUNTRY': \"string\"})\n",
"\n",
" @staticmethod\n",
" def getCountryColumn(dataFrame):\n",
" return dataFrame.apply(\n",
" lambda row:\n",
" CountryColumnAdder._getCountryNameOfSplttypeOrDefault(\n",
" splttype = row['SPLTTYPE'],\n",
" default = 'Unknown Country'),\n",
" axis = 'columns')\n",
"\n",
" @staticmethod\n",
" def _getCountryNameOfSplttypeOrDefault(splttype, default):\n",
" if not isinstance(splttype, str):\n",
" return default\n",
" \n",
" country = pycountry.countries.get(alpha_2 = splttype[:2])\n",
" return country.name if country is not None else default"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3abe3384",
"metadata": {},
"outputs": [],
"source": [
"class SevereColumnAdder:\n",
" \n",
" @staticmethod\n",
" def addSevereColumn(dataFrame):\n",
" dataFrame['SEVERE'] = (dataFrame['DIED'] + dataFrame['L_THREAT'] + dataFrame['DISABLE']) > 0\n",
" dataFrame['SEVERE'].replace({True: 1, False: 0}, inplace = True)\n",
" return dataFrame\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2dad09e5",
"metadata": {},
"outputs": [],
"source": [
"class CompanyColumnAdder:\n",
" \n",
" def __init__(self, dataFrame_VAX_LOT_VAX_MANU):\n",
" self.dataFrame_VAX_LOT_VAX_MANU = dataFrame_VAX_LOT_VAX_MANU\n",
"\n",
" def addCompanyColumn(self, batchCodeTable):\n",
" return pd.merge(\n",
" batchCodeTable,\n",
" self._createCompanyByBatchCodeTable(),\n",
" how = 'left',\n",
" left_index = True,\n",
" right_index = True,\n",
" validate = 'one_to_one')\n",
"\n",
" def _createCompanyByBatchCodeTable(self):\n",
" manufacturerByBatchCodeTable = self.dataFrame_VAX_LOT_VAX_MANU[['VAX_LOT', 'VAX_MANU']]\n",
" manufacturerByBatchCodeTable = manufacturerByBatchCodeTable.drop_duplicates(subset = ['VAX_LOT'])\n",
" manufacturerByBatchCodeTable = manufacturerByBatchCodeTable.set_index('VAX_LOT')\n",
" return manufacturerByBatchCodeTable.rename(columns = {\"VAX_MANU\": \"Company\"})"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "71456a79",
"metadata": {},
"outputs": [],
"source": [
"class BatchCodeTableFactory:\n",
"\n",
" def __init__(self, dataFrame: pd.DataFrame):\n",
" self.dataFrame = dataFrame\n",
" self.companyColumnAdder = CompanyColumnAdder(dataFrame)\n",
" self.countryBatchCodeTable = SummationTableFactory.createSummationTable(\n",
" dataFrame.groupby(\n",
" [\n",
" dataFrame['COUNTRY'],\n",
" dataFrame['VAX_LOT']\n",
" ]))\n",
"\n",
" def createGlobalBatchCodeTable(self):\n",
" return self._postProcess(SummationTableFactory.createSummationTable(self.dataFrame.groupby('VAX_LOT')))\n",
"\n",
" def createBatchCodeTableByCountry(self, country):\n",
" return self._postProcess(self._getBatchCodeTableByCountry(country))\n",
"\n",
" def _postProcess(self, batchCodeTable):\n",
" batchCodeTable = self.companyColumnAdder.addCompanyColumn(batchCodeTable)\n",
" batchCodeTable = batchCodeTable[\n",
" [\n",
" 'Adverse Reaction Reports',\n",
" 'Deaths',\n",
" 'Disabilities',\n",
" 'Life Threatening Illnesses',\n",
" 'Company',\n",
" 'Countries',\n",
" 'Severe reports',\n",
" 'Lethality'\n",
" ]]\n",
" return batchCodeTable.sort_values(by = 'Severe reports', ascending = False)\n",
"\n",
" def _getBatchCodeTableByCountry(self, country):\n",
" if country in self.countryBatchCodeTable.index:\n",
" return self.countryBatchCodeTable.loc[country]\n",
" else:\n",
" return self._getEmptyBatchCodeTable()\n",
"\n",
" def _getEmptyBatchCodeTable(self):\n",
" return self.countryBatchCodeTable[0:0].droplevel(0)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4db36933",
"metadata": {},
"outputs": [],
"source": [
"from bs4 import BeautifulSoup\n",
"\n",
"class HtmlTransformerUtil:\n",
" \n",
" def applySoupTransformerToFile(self, file, soupTransformer):\n",
" self._writeSoup(soupTransformer(self._readSoup(file)), file)\n",
"\n",
" def _readSoup(self, file):\n",
" with open(file) as fp:\n",
" soup = BeautifulSoup(fp, 'lxml')\n",
" return soup\n",
"\n",
" def _writeSoup(self, soup, file):\n",
" with open(file, \"w\") as fp:\n",
" fp.write(str(soup)) \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "32d4eecf",
"metadata": {},
"outputs": [],
"source": [
"from bs4 import BeautifulSoup\n",
"\n",
"\n",
"class CountryOptionsSetter:\n",
"\n",
" def setCountryOptions(self, html, options):\n",
" soup = self._setCountryOptions(self._parse(html), self._parseOptions(options))\n",
" return str(soup)\n",
"\n",
" def _setCountryOptions(self, soup, options):\n",
" countrySelect = soup.find(id = \"countrySelect\")\n",
" countrySelect.clear()\n",
" for option in options:\n",
" countrySelect.append(option)\n",
" return soup\n",
"\n",
" def _parseOptions(self, options):\n",
" return [self._parse(option).option for option in options]\n",
"\n",
" def _parse(self, html):\n",
" return BeautifulSoup(html, 'lxml')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2f0f9b4b",
"metadata": {},
"outputs": [],
"source": [
"from bs4 import BeautifulSoup\n",
"\n",
"\n",
"def saveCountryOptions(countryOptions):\n",
" HtmlTransformerUtil().applySoupTransformerToFile(\n",
" file = \"../docs/batchCodeTable.html\",\n",
" soupTransformer =\n",
" lambda soup:\n",
" BeautifulSoup(\n",
" CountryOptionsSetter().setCountryOptions(html = str(soup), options = countryOptions),\n",
" 'lxml'))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f02dddfe",
"metadata": {},
"outputs": [],
"source": [
"def saveLastUpdatedBatchCodeTable(lastUpdated):\n",
" def setLastUpdated(soup):\n",
" soup.find(id = \"last_updated\").string.replace_with(lastUpdated.strftime(DateProvider.DATE_FORMAT))\n",
" return soup\n",
"\n",
" HtmlTransformerUtil().applySoupTransformerToFile(\n",
" file = \"../docs/batchCodeTable.html\",\n",
" soupTransformer = setLastUpdated)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6aa28541",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"class IOUtils:\n",
"\n",
" @staticmethod\n",
" def saveDataFrame(dataFrame, file):\n",
" # IOUtils.saveDataFrameAsExcelFile(dataFrame, file)\n",
" # IOUtils.saveDataFrameAsHtml(dataFrame, file)\n",
" IOUtils.saveDataFrameAsJson(dataFrame, file)\n",
"\n",
" @staticmethod\n",
" def saveDataFrameAsExcelFile(dataFrame, file):\n",
" IOUtils.ensurePath(file)\n",
" dataFrame.to_excel(file + '.xlsx')\n",
"\n",
" @staticmethod\n",
" def saveDataFrameAsHtml(dataFrame, file):\n",
" IOUtils.ensurePath(file)\n",
" dataFrame.reset_index().to_html(\n",
" file + '.html',\n",
" index = False,\n",
" table_id = 'batchCodeTable',\n",
" classes = 'display',\n",
" justify = 'unset',\n",
" border = 0)\n",
"\n",
" @staticmethod\n",
" def saveDataFrameAsJson(dataFrame, file):\n",
" IOUtils.ensurePath(file)\n",
" dataFrame.reset_index().to_json(\n",
" file + '.json',\n",
" orient = \"split\",\n",
" index = False)\n",
"\n",
" @staticmethod\n",
" def ensurePath(file):\n",
" directory = os.path.dirname(file)\n",
" if not os.path.exists(directory):\n",
" os.makedirs(directory)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3dacedfd",
"metadata": {},
"outputs": [],
"source": [
"import unittest"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fcc855dd",
"metadata": {},
"outputs": [],
"source": [
"class TestHelper:\n",
"\n",
" @staticmethod\n",
" def createDataFrame(index, columns, data, dtypes = {}):\n",
" return pd.DataFrame(index = index, columns = columns, data = data).astype(dtypes)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ccb9838d",
"metadata": {},
"outputs": [],
"source": [
"from pandas.testing import assert_frame_equal\n",
"\n",
"class DataFrameNormalizerTest(unittest.TestCase):\n",
"\n",
" def test_convertVAX_LOTColumnToUpperCase(self):\n",
" # Given\n",
" dataFrame = TestHelper.createDataFrame(\n",
" columns = ['VAX_LOT'],\n",
" data = [ ['037K20A'],\n",
" ['025l20A'],\n",
" ['025L20A']],\n",
" index = [\n",
" \"0916600\",\n",
" \"0916601\",\n",
" \"1996874\"])\n",
" \n",
" # When\n",
" DataFrameNormalizer.convertVAX_LOTColumnToUpperCase(dataFrame)\n",
" \n",
" # Then\n",
" dataFrameExpected = TestHelper.createDataFrame(\n",
" columns = ['VAX_LOT'],\n",
" data = [ ['037K20A'],\n",
" ['025L20A'],\n",
" ['025L20A']],\n",
" index = [\n",
" \"0916600\",\n",
" \"0916601\",\n",
" \"1996874\"])\n",
" assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)\n",
"\n",
" def test_removeUnknownBatchCodes(self):\n",
" # Given\n",
" dataFrame = TestHelper.createDataFrame(\n",
" columns = ['VAX_LOT'],\n",
" data = [ ['UNKNOWN'],\n",
" ['N/A Unknown'],\n",
" [np.nan],\n",
" ['UNKNOWN TO ME'],\n",
" ['030L20B']],\n",
" index = [\n",
" \"1048786\",\n",
" \"1048786\",\n",
" \"123\",\n",
" \"4711\",\n",
" \"0815\"])\n",
" \n",
" # When\n",
" DataFrameNormalizer.removeUnknownBatchCodes(dataFrame)\n",
" \n",
" # Then\n",
" dataFrameExpected = TestHelper.createDataFrame(\n",
" columns = ['VAX_LOT'],\n",
" data = [ [np.nan],\n",
" ['030L20B']],\n",
" index = [\n",
" \"123\",\n",
" \"0815\"])\n",
" assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e59a1825",
"metadata": {},
"outputs": [],
"source": [
"from pandas.testing import assert_frame_equal\n",
"\n",
"class DataFrameFilterTest(unittest.TestCase):\n",
"\n",
" def test_filterByCovid19(self):\n",
" # Given\n",
" dataFrame = VaersDescr2DataFrameConverter.createDataFrameFromDescrs(\n",
" [\n",
" {\n",
" 'VAERSDATA': TestHelper.createDataFrame(\n",
" columns = ['DIED', 'L_THREAT', 'DISABLE'],\n",
" data = [ [1, 0, 0],\n",
" [0, 0, 1]],\n",
" index = [\n",
" \"0916600\",\n",
" \"0916601\"]),\n",
" 'VAERSVAX': TestHelper.createDataFrame(\n",
" columns = ['VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES'],\n",
" data = [ ['COVID19', 'MODERNA', '037K20A', '1'],\n",
" ['COVID19', 'MODERNA', '025L20A', '1']],\n",
" index = [\n",
" \"0916600\",\n",
" \"0916601\"],\n",
" dtypes = {'VAX_DOSE_SERIES': \"string\"})\n",
" },\n",
" {\n",
" 'VAERSDATA': TestHelper.createDataFrame(\n",
" columns = ['DIED', 'L_THREAT', 'DISABLE'],\n",
" data = [ [0, 0, 0],\n",
" [0, 0, 1]],\n",
" index = [\n",
" \"1996873\",\n",
" \"1996874\"]),\n",
" 'VAERSVAX': TestHelper.createDataFrame(\n",
" columns = ['VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES'],\n",
" data = [ ['HPV9', 'MERCK & CO. INC.', 'R017624', 'UNK'],\n",
" ['COVID19', 'MODERNA', '025L20A', '1']],\n",
" index = [\n",
" \"1996873\",\n",
" \"1996874\"],\n",
" dtypes = {'VAX_DOSE_SERIES': \"string\"})\n",
" }\n",
" ])\n",
" dataFrameFilter = DataFrameFilter()\n",
" \n",
" # When\n",
" dataFrame = dataFrameFilter.filterByCovid19(dataFrame)\n",
" \n",
" # Then\n",
" dataFrameExpected = TestHelper.createDataFrame(\n",
" columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES'],\n",
" data = [ [1, 0, 0, 'COVID19', 'MODERNA', '037K20A', '1'],\n",
" [0, 0, 1, 'COVID19', 'MODERNA', '025L20A', '1'],\n",
" [0, 0, 1, 'COVID19', 'MODERNA', '025L20A', '1']],\n",
" index = [\n",
" \"0916600\",\n",
" \"0916601\",\n",
" \"1996874\"],\n",
" dtypes = {'VAX_DOSE_SERIES': \"string\"})\n",
" assert_frame_equal(dataFrame, dataFrameExpected, check_dtype = False)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c784bfef",
"metadata": {},
"outputs": [],
"source": [
"from pandas.testing import assert_frame_equal\n",
"\n",
"class BatchCodeTableFactoryTest(unittest.TestCase):\n",
"\n",
" def test_createBatchCodeTableByCountry(self):\n",
" # Given\n",
" dataFrame = TestHelper.createDataFrame(\n",
" columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE', 'HOSPITAL', 'ER_VISIT', 'COUNTRY'],\n",
" data = [ [1, 0, 0, 'COVID19', 'PFIZER\\BIONTECH', '016M20A', '2', 'GBPFIZER INC2020486806', 0, 0, 'United Kingdom'],\n",
" [0, 0, 0, 'COVID19', 'MODERNA', '030L20A', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n",
" [1, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n",
" [0, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France']],\n",
" index = [\n",
" \"1048786\",\n",
" \"1048786\",\n",
" \"4711\",\n",
" \"0815\"])\n",
" dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)\n",
" batchCodeTableFactory = BatchCodeTableFactory(dataFrame)\n",
" \n",
" # When\n",
" batchCodeTable = batchCodeTableFactory.createBatchCodeTableByCountry('France')\n",
"\n",
" # Then\n",
" assert_frame_equal(\n",
" batchCodeTable,\n",
" TestHelper.createDataFrame(\n",
" columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company', 'Countries', 'Severe reports', 'Lethality'],\n",
" data = [ [2, 1, 2, 2, 'MODERNA', 'France', 2/2 * 100, 1/2 * 100],\n",
" [1, 0, 0, 0, 'MODERNA', 'France', 0/1 * 100, 0/1 * 100]],\n",
" index = pd.Index(\n",
" [\n",
" '030L20B',\n",
" '030L20A'\n",
" ],\n",
" name = 'VAX_LOT')),\n",
" check_dtype = False)\n",
"\n",
" def test_createGlobalBatchCodeTable(self):\n",
" # Given\n",
" dataFrame = TestHelper.createDataFrame(\n",
" columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE', 'HOSPITAL', 'ER_VISIT', 'COUNTRY'],\n",
" data = [ [1, 0, 0, 'COVID19', 'PFIZER\\BIONTECH', '016M20A', '2', 'GBPFIZER INC2020486806', 0, 0, 'United Kingdom'],\n",
" [0, 0, 0, 'COVID19', 'MODERNA', '030L20A', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n",
" [1, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n",
" [0, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'United Kingdom']],\n",
" index = [\n",
" \"1048786\",\n",
" \"1048786\",\n",
" \"4711\",\n",
" \"0815\"])\n",
" dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)\n",
" batchCodeTableFactory = BatchCodeTableFactory(dataFrame)\n",
" \n",
" # When\n",
" batchCodeTable = batchCodeTableFactory.createGlobalBatchCodeTable()\n",
"\n",
" # Then\n",
" assert_frame_equal(\n",
" batchCodeTable,\n",
" TestHelper.createDataFrame(\n",
" columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company', 'Countries', 'Severe reports', 'Lethality'],\n",
" data = [ [1, 1, 0, 0, 'PFIZER\\BIONTECH', 'United Kingdom', 1/1 * 100, 1/1 * 100],\n",
" [2, 1, 2, 2, 'MODERNA', 'France, United Kingdom', 2/2 * 100, 1/2 * 100],\n",
" [1, 0, 0, 0, 'MODERNA', 'France', 0/1 * 100, 0/1 * 100]],\n",
" index = pd.Index(\n",
" [\n",
" '016M20A',\n",
" '030L20B',\n",
" '030L20A'\n",
" ],\n",
" name = 'VAX_LOT')),\n",
" check_dtype = False)\n",
"\n",
" def test_createBatchCodeTableByNonExistingCountry(self):\n",
" # Given\n",
" dataFrame = TestHelper.createDataFrame(\n",
" columns = ['DIED', 'L_THREAT', 'DISABLE', 'VAX_TYPE', 'VAX_MANU', 'VAX_LOT', 'VAX_DOSE_SERIES', 'SPLTTYPE', 'HOSPITAL', 'ER_VISIT', 'COUNTRY'],\n",
" data = [ [1, 0, 0, 'COVID19', 'PFIZER\\BIONTECH', '016M20A', '2', 'GBPFIZER INC2020486806', 0, 0, 'United Kingdom'],\n",
" [0, 0, 0, 'COVID19', 'MODERNA', '030L20A', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n",
" [1, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France'],\n",
" [0, 1, 1, 'COVID19', 'MODERNA', '030L20B', '1', 'FRMODERNATX, INC.MOD20224', 0, 0, 'France']],\n",
" index = [\n",
" \"1048786\",\n",
" \"1048786\",\n",
" \"4711\",\n",
" \"0815\"])\n",
" dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)\n",
" batchCodeTableFactory = BatchCodeTableFactory(dataFrame)\n",
" \n",
" # When\n",
" batchCodeTable = batchCodeTableFactory.createBatchCodeTableByCountry('non existing country')\n",
"\n",
" # Then\n",
" assert_frame_equal(\n",
" batchCodeTable,\n",
" TestHelper.createDataFrame(\n",
" columns = ['Adverse Reaction Reports', 'Deaths', 'Disabilities', 'Life Threatening Illnesses', 'Company', 'Countries', 'Severe reports', 'Lethality'],\n",
" data = [ ],\n",
" index = pd.Index([], name = 'VAX_LOT')),\n",
" check_dtype = False)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "125351b3",
"metadata": {},
"outputs": [],
"source": [
"class CountryOptionsSetterTest(unittest.TestCase):\n",
"\n",
" def test_setCountryOptions(self):\n",
" # Given\n",
" countryOptionsSetter = CountryOptionsSetter()\n",
"\n",
" # When\n",
" htmlActual = countryOptionsSetter.setCountryOptions(\n",
" html='''\n",
" <html>\n",
" <body>\n",
" <p>Test<p/>\n",
" <select id=\"countrySelect\" name=\"country\">\n",
" <option value=\"Global\" selected>Global</option>\n",
" <option value=\"Afghanistan\">Afghanistan</option>\n",
" <option value=\"Albania\">Albania</option>\n",
" <option value=\"Algeria\">Algeria</option>\n",
" </select>\n",
" </body>\n",
" </html>\n",
" ''',\n",
" options=[\n",
" '<option value=\"Global\" selected>Global</option>',\n",
" '<option value=\"Azerbaijan\">Azerbaijan</option>',\n",
" '<option value=\"Bahrain\">Bahrain</option>'])\n",
"\n",
" # Then\n",
" assertEqualHTML(\n",
" htmlActual,\n",
" '''\n",
" <html>\n",
" <body>\n",
" <p>Test<p/>\n",
" <select id=\"countrySelect\" name=\"country\">\n",
" <option value=\"Global\" selected>Global</option>\n",
" <option value=\"Azerbaijan\">Azerbaijan</option>\n",
" <option value=\"Bahrain\">Bahrain</option>\n",
" </select>\n",
" </body>\n",
" </html>\n",
" ''')\n",
"\n",
"# adapted from https://stackoverflow.com/questions/8006909/pretty-print-assertequal-for-html-strings\n",
"def assertEqualHTML(string1, string2, file1='', file2=''):\n",
" u'''\n",
" Compare two unicode strings containing HTML.\n",
" A human friendly diff goes to logging.error() if they\n",
" are not equal, and an exception gets raised.\n",
" '''\n",
" from bs4 import BeautifulSoup as bs\n",
" import difflib\n",
"\n",
" def short(mystr):\n",
" max = 20\n",
" if len(mystr) > max:\n",
" return mystr[:max]\n",
" return mystr\n",
" p = []\n",
" for mystr, file in [(string1, file1), (string2, file2)]:\n",
" if not isinstance(mystr, str):\n",
" raise Exception(u'string ist not unicode: %r %s' %\n",
" (short(mystr), file))\n",
" soup = bs(mystr)\n",
" pretty = soup.prettify()\n",
" p.append(pretty)\n",
" if p[0] != p[1]:\n",
" for line in difflib.unified_diff(p[0].splitlines(), p[1].splitlines(), fromfile=file1, tofile=file2):\n",
" display(line)\n",
" display(p[0], ' != ', p[1])\n",
" raise Exception('Not equal %s %s' % (file1, file2))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5a8bff1b",
"metadata": {},
"outputs": [],
"source": [
"unittest.main(argv = [''], verbosity = 2, exit = False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "86e0e4f2",
"metadata": {},
"outputs": [],
"source": [
"def getVaersForYears(years):\n",
" def addCountryColumn(dataFrame):\n",
" dataFrame['COUNTRY'] = 'United States'\n",
" return dataFrame\n",
"\n",
" return _getVaers(\n",
" _getVaersDescrReader().readVaersDescrsForYears(years),\n",
" addCountryColumn)\n",
"\n",
"def getNonDomesticVaers():\n",
" return _getVaers(\n",
" [_getVaersDescrReader().readNonDomesticVaersDescr()],\n",
" CountryColumnAdder.addCountryColumn)\n",
"\n",
"def _getVaersDescrReader():\n",
" return VaersDescrReader(dataDir = \"VAERS\")\n",
"\n",
"def _getVaers(vaersDescrs, addCountryColumn):\n",
" dataFrame = VaersDescr2DataFrameConverter.createDataFrameFromDescrs(vaersDescrs)\n",
" dataFrame = addCountryColumn(dataFrame)\n",
" DataFrameNormalizer.normalize(dataFrame)\n",
" dataFrame = SevereColumnAdder.addSevereColumn(dataFrame)\n",
" return dataFrame"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "781ac80e",
"metadata": {},
"outputs": [],
"source": [
"internationalVaers = pd.concat([getVaersForYears([2020, 2021, 2022]), getNonDomesticVaers()])\n",
"internationalVaersCovid19 = DataFrameFilter().filterByCovid19(internationalVaers)\n",
"internationalVaersCovid19"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ff259a35",
"metadata": {},
"outputs": [],
"source": [
"def createAndSaveBatchCodeTableForCountry(createBatchCodeTableForCountry, country, minADRsForLethality = None):\n",
" batchCodeTable = createBatchCodeTableForCountry(country)\n",
" batchCodeTable.index.set_names(\"Batch\", inplace = True)\n",
" if minADRsForLethality is not None:\n",
" batchCodeTable.loc[batchCodeTable['Adverse Reaction Reports'] < minADRsForLethality, ['Severe reports', 'Lethality']] = [np.nan, np.nan]\n",
" IOUtils.saveDataFrame(batchCodeTable, '../docs/data/batchCodeTables/' + country)\n",
" # display(country + \":\", batchCodeTable)\n",
" display(country)\n",
"\n",
"def createAndSaveBatchCodeTablesForCountries(createBatchCodeTableForCountry, countries, minADRsForLethality = None):\n",
" for country in countries:\n",
" createAndSaveBatchCodeTableForCountry(createBatchCodeTableForCountry, country, minADRsForLethality)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cc1ef82a",
"metadata": {},
"outputs": [],
"source": [
"def getCountryOptions(countries):\n",
" return [getCountryOption(country) for country in countries]\n",
"\n",
"def getCountryOption(country):\n",
" return '<option value=\"{country}\">{country}</option>'.format(country = country)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0c4d04fb",
"metadata": {},
"outputs": [],
"source": [
"countries = sorted(internationalVaersCovid19['COUNTRY'].unique())\n",
"countryOptions = ['<option value=\"Global\" selected>Global</option>'] + getCountryOptions(countries)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8d6507ca",
"metadata": {},
"outputs": [],
"source": [
"saveCountryOptions(countryOptions)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9c7485b5",
"metadata": {},
"outputs": [],
"source": [
"saveLastUpdatedBatchCodeTable(dateProvider.getLastUpdatedDataSource())"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7e7e01a5",
"metadata": {},
"outputs": [],
"source": [
"minADRsForLethality = 100\n",
"batchCodeTableFactory = BatchCodeTableFactory(internationalVaersCovid19)\n",
"\n",
"createAndSaveBatchCodeTablesForCountries(\n",
" createBatchCodeTableForCountry = lambda country: batchCodeTableFactory.createBatchCodeTableByCountry(country),\n",
" countries = countries,\n",
" minADRsForLethality = minADRsForLethality)\n",
"\n",
"createAndSaveBatchCodeTableForCountry(\n",
" createBatchCodeTableForCountry = lambda country: batchCodeTableFactory.createGlobalBatchCodeTable(),\n",
" country = 'Global',\n",
" minADRsForLethality = minADRsForLethality)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}