From d94869181b0f5697b66375fdc81c87b01194e8e7 Mon Sep 17 00:00:00 2001 From: frankknoll Date: Tue, 22 Nov 2022 13:22:00 +0100 Subject: [PATCH] refactoring --- src/CaptchaReader.py | 73 ++++++++++++++ src/HowBadIsMyBatch.ipynb | 197 +------------------------------------ src/VAERSFileDownloader.py | 49 +++++++++ src/WebDriver.py | 27 +++++ src/zipUtils.py | 12 +++ 5 files changed, 164 insertions(+), 194 deletions(-) create mode 100644 src/CaptchaReader.py create mode 100644 src/VAERSFileDownloader.py create mode 100644 src/WebDriver.py create mode 100644 src/zipUtils.py diff --git a/src/CaptchaReader.py b/src/CaptchaReader.py new file mode 100644 index 00000000000..e537e478aff --- /dev/null +++ b/src/CaptchaReader.py @@ -0,0 +1,73 @@ +import numpy as np +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras import layers +from PIL import Image +import numpy as np +import io + +# copied from value of characters variable in captcha_ocr.ipynb or captcha_ocr_trainAndSaveModel.ipynb +characters = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'a', 'b', 'c', 'd', 'e', 'f'] + +img_width = 241 +img_height = 62 + +downsample_factor = 4 + +# copied from value of max_length variable in captcha_ocr.ipynb or captcha_ocr_trainAndSaveModel.ipynb +max_length = 6 + +char_to_num = layers.StringLookup( + vocabulary=list(characters), + mask_token=None) + +num_to_char = layers.StringLookup( + vocabulary=char_to_num.get_vocabulary(), + mask_token=None, invert=True) + +def encode_single_sample(img_path): + # 1. Read image + img = tf.io.read_file(img_path) + # 2. Decode and convert to grayscale + img = tf.io.decode_png(img, channels=1) + # 3. Convert to float32 in [0, 1] range + img = tf.image.convert_image_dtype(img, tf.float32) + # 4. Resize to the desired size + img = tf.image.resize(img, [img_height, img_width]) + # 5. Transpose the image because we want the time + # dimension to correspond to the width of the image. + img = tf.transpose(img, perm=[1, 0, 2]) + # 7. Return a dict as our model is expecting two inputs + return asSingleSampleBatch(img) + +def asSingleSampleBatch(img): + array = keras.utils.img_to_array(img) + array = np.expand_dims(array, axis=0) + return array + +def decode_batch_predictions(pred): + input_len = np.ones(pred.shape[0]) * pred.shape[1] + # Use greedy search. For complex tasks, you can use beam search + results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][:, :max_length] + # Iterate over the results and get back the text + output_text = [] + for res in results: + res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8") + output_text.append(res) + return output_text + +def load_model(): + _model = keras.models.load_model('model') + model = keras.models.Model( + _model.get_layer(name="image").input, + _model.get_layer(name="dense2").output) + return model + +def getTextInCaptchaImage(captchaImageFile): + batchImages = encode_single_sample(captchaImageFile) + preds = model.predict(batchImages) + return decode_batch_predictions(preds)[0] + +print("loading model...") +model = load_model() +model.summary() \ No newline at end of file diff --git a/src/HowBadIsMyBatch.ipynb b/src/HowBadIsMyBatch.ipynb index 4b85dcc98e6..6c39eada496 100644 --- a/src/HowBadIsMyBatch.ipynb +++ b/src/HowBadIsMyBatch.ipynb @@ -120,189 +120,6 @@ " ! adb emu kill" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "777ff543", - "metadata": {}, - "outputs": [], - "source": [ - "from selenium import webdriver\n", - "from webdriver_manager.chrome import ChromeDriverManager\n", - "from selenium.webdriver.chrome.service import Service as ChromeService\n", - "from selenium.webdriver.chrome.options import Options\n", - "from selenium.webdriver.common.by import By\n", - "\n", - "def _getOptions(downloadDir, isHeadless):\n", - " options = Options()\n", - " options.headless = isHeadless\n", - " options.add_experimental_option(\"prefs\", {\"download.default_directory\" : downloadDir})\n", - " return options\n", - "\n", - "def getWebDriver(downloadDir, isHeadless):\n", - " return webdriver.Chrome(\n", - " service = ChromeService(executable_path = ChromeDriverManager().install()),\n", - " options = _getOptions(downloadDir, isHeadless))\n", - "\n", - "def saveCaptchaImageAs(driver, captchaImageFile):\n", - " captchaImage = driver.find_element(By.CSS_SELECTOR, \"img[src='captchaImage']\")\n", - " with open(captchaImageFile, 'wb') as file:\n", - " file.write(captchaImage.screenshot_as_png)\n", - "\n", - "def existsElementWithId(driver, id):\n", - " return len(driver.find_elements(By.ID, id)) > 0\n", - "\n", - "def isCaptchaSolved(driver):\n", - " return not existsElementWithId(driver, \"wordverify\")\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "da7c965a", - "metadata": {}, - "outputs": [], - "source": [ - "import time\n", - "import os\n", - "\n", - "def waitUntilDownloadHasFinished(file):\n", - " while not os.path.exists(file):\n", - " time.sleep(2)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "d9b72506", - "metadata": {}, - "outputs": [], - "source": [ - "import numpy as np\n", - "import tensorflow as tf\n", - "from tensorflow import keras\n", - "from tensorflow.keras import layers\n", - "from PIL import Image\n", - "import numpy as np\n", - "import io\n", - "\n", - "# copied from value of characters variable in captcha_ocr.ipynb or captcha_ocr_trainAndSaveModel.ipynb\n", - "characters = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'a', 'b', 'c', 'd', 'e', 'f']\n", - "\n", - "img_width = 241\n", - "img_height = 62\n", - "\n", - "downsample_factor = 4\n", - "\n", - "# copied from value of max_length variable in captcha_ocr.ipynb or captcha_ocr_trainAndSaveModel.ipynb\n", - "max_length = 6\n", - "\n", - "char_to_num = layers.StringLookup(\n", - " vocabulary=list(characters),\n", - " mask_token=None)\n", - "\n", - "num_to_char = layers.StringLookup(\n", - " vocabulary=char_to_num.get_vocabulary(),\n", - " mask_token=None, invert=True)\n", - "\n", - "def encode_single_sample(img_path):\n", - " # 1. Read image\n", - " img = tf.io.read_file(img_path)\n", - " # 2. Decode and convert to grayscale\n", - " img = tf.io.decode_png(img, channels=1)\n", - " # 3. Convert to float32 in [0, 1] range\n", - " img = tf.image.convert_image_dtype(img, tf.float32)\n", - " # 4. Resize to the desired size\n", - " img = tf.image.resize(img, [img_height, img_width])\n", - " # 5. Transpose the image because we want the time\n", - " # dimension to correspond to the width of the image.\n", - " img = tf.transpose(img, perm=[1, 0, 2])\n", - " # 7. Return a dict as our model is expecting two inputs\n", - " return asSingleSampleBatch(img)\n", - "\n", - "def asSingleSampleBatch(img):\n", - " array = keras.utils.img_to_array(img)\n", - " array = np.expand_dims(array, axis=0)\n", - " return array\n", - "\n", - "def decode_batch_predictions(pred):\n", - " input_len = np.ones(pred.shape[0]) * pred.shape[1]\n", - " # Use greedy search. For complex tasks, you can use beam search\n", - " results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][:, :max_length]\n", - " # Iterate over the results and get back the text\n", - " output_text = []\n", - " for res in results:\n", - " res = tf.strings.reduce_join(num_to_char(res)).numpy().decode(\"utf-8\")\n", - " output_text.append(res)\n", - " return output_text\n", - "\n", - "def load_model():\n", - " _model = keras.models.load_model('model')\n", - " model = keras.models.Model(\n", - " _model.get_layer(name=\"image\").input,\n", - " _model.get_layer(name=\"dense2\").output)\n", - " return model\n", - "\n", - "def getTextInCaptchaImage(captchaImageFile):\n", - " batchImages = encode_single_sample(captchaImageFile)\n", - " preds = model.predict(batchImages)\n", - " return decode_batch_predictions(preds)[0]\n", - "\n", - "print(\"loading model...\")\n", - "model = load_model()\n", - "model.summary()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "918d088d", - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "\n", - "#def getTextInCaptchaImage(captchaImageFile):\n", - "# baseDir = \"~/AndroidStudioProjects/TextRecognizer\"\n", - "# ! cp $captchaImageFile $baseDir/app/src/main/assets/captchas/captcha_image.jpeg\n", - "# ! cd $baseDir;./gradlew connectedAndroidTest\n", - "# textInCaptchaImage = ! adb shell \"run-as org.textrecognizer cat /data/data/org.textrecognizer/files/captcha_image.txt\"\n", - "# return textInCaptchaImage[0]\n", - " \n", - "def solveCaptchaAndStartFileDownload(driver, captchaImageFile):\n", - " saveCaptchaImageAs(driver, captchaImageFile)\n", - " textInCaptchaImage = getTextInCaptchaImage(captchaImageFile)\n", - " display('textInCaptchaImage: ', textInCaptchaImage)\n", - " driver.find_element(By.ID, \"verificationCode\").send_keys(textInCaptchaImage)\n", - " driver.find_element(By.CSS_SELECTOR, '[name=\"downloadbut\"]').click()\n", - "\n", - "def downloadFile(absoluteFile, driver, maxTries):\n", - " def _downloadFile():\n", - " driver.get('https://vaers.hhs.gov/eSubDownload/index.jsp?fn=' + os.path.basename(absoluteFile))\n", - " solveCaptchaAndStartFileDownload(driver, 'captchaImage.jpeg')\n", - "\n", - " numTries = 1\n", - " _downloadFile()\n", - " while(not isCaptchaSolved(driver) and (maxTries is None or numTries < maxTries)):\n", - " _downloadFile()\n", - " numTries = numTries + 1\n", - "\n", - " if isCaptchaSolved(driver):\n", - " waitUntilDownloadHasFinished(absoluteFile)\n", - " return absoluteFile\n", - " else:\n", - " return None\n", - "\n", - "def downloadVAERSFile(file, downloadDir):\n", - " driver = getWebDriver(downloadDir, isHeadless = True)\n", - " downloadedFile = downloadFile(\n", - " absoluteFile = downloadDir + \"/\" + file,\n", - " driver = driver,\n", - " maxTries = None)\n", - " driver.quit()\n", - " return downloadedFile" - ] - }, { "cell_type": "code", "execution_count": null, @@ -310,16 +127,8 @@ "metadata": {}, "outputs": [], "source": [ - "import zipfile\n", - "import os\n", - "\n", - "def unzip(zipFile, dstDir):\n", - " with zipfile.ZipFile(zipFile, 'r') as zip_ref:\n", - " zip_ref.extractall(dstDir)\n", - "\n", - "def unzipAndRemove(zipFile, dstDir):\n", - " unzip(zipFile, dstDir)\n", - " os.remove(zipFile)\n", + "from VAERSFileDownloader import downloadVAERSFile\n", + "from zipUtils import unzipAndRemove\n", "\n", "def downloadVAERSFileAndUnzip(file):\n", " downloadedFile = downloadVAERSFile(file, getWorkingDirectory() + \"/VAERS/tmp\")\n", @@ -661,7 +470,7 @@ }, "vscode": { "interpreter": { - "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" + "hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1" } } }, diff --git a/src/VAERSFileDownloader.py b/src/VAERSFileDownloader.py new file mode 100644 index 00000000000..b9b7b20fd10 --- /dev/null +++ b/src/VAERSFileDownloader.py @@ -0,0 +1,49 @@ +import os +import time +from WebDriver import getWebDriver, isCaptchaSolved, saveCaptchaImageAs +from selenium.webdriver.common.by import By +from CaptchaReader import getTextInCaptchaImage + +#def getTextInCaptchaImage(captchaImageFile): +# baseDir = "~/AndroidStudioProjects/TextRecognizer" +# ! cp $captchaImageFile $baseDir/app/src/main/assets/captchas/captcha_image.jpeg +# ! cd $baseDir;./gradlew connectedAndroidTest +# textInCaptchaImage = ! adb shell "run-as org.textrecognizer cat /data/data/org.textrecognizer/files/captcha_image.txt" +# return textInCaptchaImage[0] + +def solveCaptchaAndStartFileDownload(driver, captchaImageFile): + saveCaptchaImageAs(driver, captchaImageFile) + textInCaptchaImage = getTextInCaptchaImage(captchaImageFile) + display('textInCaptchaImage: ', textInCaptchaImage) + driver.find_element(By.ID, "verificationCode").send_keys(textInCaptchaImage) + driver.find_element(By.CSS_SELECTOR, '[name="downloadbut"]').click() + +def downloadFile(absoluteFile, driver, maxTries): + def _downloadFile(): + driver.get('https://vaers.hhs.gov/eSubDownload/index.jsp?fn=' + os.path.basename(absoluteFile)) + solveCaptchaAndStartFileDownload(driver, 'captchaImage.jpeg') + + numTries = 1 + _downloadFile() + while(not isCaptchaSolved(driver) and (maxTries is None or numTries < maxTries)): + _downloadFile() + numTries = numTries + 1 + + if isCaptchaSolved(driver): + _waitUntilDownloadHasFinished(absoluteFile) + return absoluteFile + else: + return None + +def _waitUntilDownloadHasFinished(file): + while not os.path.exists(file): + time.sleep(2) + +def downloadVAERSFile(file, downloadDir): + driver = getWebDriver(downloadDir, isHeadless = True) + downloadedFile = downloadFile( + absoluteFile = downloadDir + "/" + file, + driver = driver, + maxTries = None) + driver.quit() + return downloadedFile diff --git a/src/WebDriver.py b/src/WebDriver.py new file mode 100644 index 00000000000..90107b43bd1 --- /dev/null +++ b/src/WebDriver.py @@ -0,0 +1,27 @@ +from selenium import webdriver +from webdriver_manager.chrome import ChromeDriverManager +from selenium.webdriver.chrome.service import Service as ChromeService +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.common.by import By + +def _getOptions(downloadDir, isHeadless): + options = Options() + options.headless = isHeadless + options.add_experimental_option("prefs", {"download.default_directory" : downloadDir}) + return options + +def getWebDriver(downloadDir, isHeadless): + return webdriver.Chrome( + service = ChromeService(executable_path = ChromeDriverManager().install()), + options = _getOptions(downloadDir, isHeadless)) + +def saveCaptchaImageAs(driver, captchaImageFile): + captchaImage = driver.find_element(By.CSS_SELECTOR, "img[src='captchaImage']") + with open(captchaImageFile, 'wb') as file: + file.write(captchaImage.screenshot_as_png) + +def existsElementWithId(driver, id): + return len(driver.find_elements(By.ID, id)) > 0 + +def isCaptchaSolved(driver): + return not existsElementWithId(driver, "wordverify") diff --git a/src/zipUtils.py b/src/zipUtils.py new file mode 100644 index 00000000000..06ae283e0c0 --- /dev/null +++ b/src/zipUtils.py @@ -0,0 +1,12 @@ +import zipfile +import os + + +def unzip(zipFile, dstDir): + with zipfile.ZipFile(zipFile, 'r') as zip_ref: + zip_ref.extractall(dstDir) + + +def unzipAndRemove(zipFile, dstDir): + unzip(zipFile, dstDir) + os.remove(zipFile)