# Captchas

**see:** https://keras.io/examples/vision/captcha_ocr/<br>
**original:** https://colab.research.google.com/drive/1Olw2KMHfPlnGaYuzffl2zb6D1etlBGZf?usp=sharing<br>
**View Github version in Colab:** <a href="https://colab.research.google.com/github/KnollFrank/2captcha-worker-assistant-server/blob/master/captcha_ocr_trainAndSaveModel_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a><br>
**paper:** Simple and Easy: Transfer Learning-Based Attacks to Text CAPTCHA<br>

## Setup

In [None]:
import sys
sys.argv = sys.argv[:1]

In [None]:
def isInColab():
    try:
        import colab
        return True
    except:
        return False

In [None]:
inColab = isInColab()

In [None]:
if inColab:
    branch = 'read-captcha'
    !git clone https://github.com/KnollFrank/HowBadIsMyBatch.git
    !cd HowBadIsMyBatch; git checkout $branch

In [None]:
import os
srcPath = '/content/HowBadIsMyBatch/src' if inColab else os.getcwd()

In [None]:
if inColab:
    sys.path.insert(0, srcPath)

In [None]:
import numpy as np
from pathlib import Path
import tensorflow as tf
from captcha.CaptchaGenerator import CaptchaGenerator
from captcha.CharNumConverter import CharNumConverter
from captcha.DataSplitter import DataSplitter
from captcha.DatasetFactory import DatasetFactory
from captcha.ModelFactory import ModelFactory
from captcha.PredictionsDecoder import PredictionsDecoder
from captcha.ModelDAO import ModelDAO
from captcha.CaptchaShape import CaptchaShape

In [None]:
from pathlib import Path

class GoogleDriveManager:
  
  _googleDriveFolder = Path('/content/gdrive')
  _baseFolder = _googleDriveFolder / 'MyDrive/CAPTCHA/models/'

  @staticmethod
  def mount():
    from google.colab import drive
    drive.mount(str(GoogleDriveManager._googleDriveFolder))

  @staticmethod
  def uploadFolderToGoogleDrive(folder):
    basename = !basename {folder}
    basename = basename[0]
    !cd {folder}/..; zip -r {basename}.zip {basename}/
    !cd {folder}/..; cp {basename}.zip {GoogleDriveManager._baseFolder}
    
  @staticmethod
  def downloadFolderFromGoogleDrive(folder):
    !cp {GoogleDriveManager._baseFolder}/{folder}.zip .
    !rm -rf {folder}
    !unzip {folder}.zip


In [None]:
def getImagesAndLabels(dataDir):
    fileSuffix = ".jpeg"
    images = sorted(list(map(str, list(dataDir.glob("*" + fileSuffix)))))
    labels = [image.split(os.path.sep)[-1].split(fileSuffix)[0] for image in images]
    return images, labels


In [None]:
def getTrainValidationTestDatasets(dataDir, datasetFactory):
    images, labels = getImagesAndLabels(dataDir)
    print("Number of images found:", len(images))
    print("Characters:", CaptchaGenerator.characters)

    dataSplitter = DataSplitter(images, labels)
    
    return (
        datasetFactory.createDataset(*dataSplitter.getTrain()),
        datasetFactory.createDataset(*dataSplitter.getValid()),
        datasetFactory.createDataset(*dataSplitter.getTest())
        )

In [None]:
import matplotlib.pyplot as plt
import math

def displayImagesInGrid(numGridCols, images, titles, titleColors):
    assert len(images) == len(titles) == len(titleColors)
    images = [image.numpy().astype(np.uint8) for image in images]
    numGridRows = math.ceil(len(images) / numGridCols)
    _, axs = plt.subplots(numGridRows, numGridCols, figsize=(15, 5))
    for row in range(numGridRows):
        for col in range(numGridCols):
            ax = axs[row, col]
            ax.axis("off")
            i = row * numGridCols + col
            if(i < len(images)):
                ax.imshow(images[i])
                ax.set_title(titles[i], color=titleColors[i])
    plt.show()


In [None]:
def display16Predictions(model, dataset, predictionsDecoder):
    for batch in dataset.take(1):
        numPredictions2Display = 16
        batch_images = batch["image"][:numPredictions2Display]
        batch_labels = batch["label"][:numPredictions2Display]

        preds = model.predict(batch_images)
        pred_texts = predictionsDecoder.decode_batch_predictions(preds)
        orig_texts = predictionsDecoder.asStrings(batch_labels)

        displayImagesInGrid(
            4,
            batch_images,
            [f"Prediction/Truth: {pred_text}/{orig_text}" for (pred_text, orig_text) in zip(pred_texts, orig_texts)],
            ['green' if pred_text == orig_text else 'red' for (pred_text, orig_text) in zip(pred_texts, orig_texts)])

In [None]:
def printLayers(model):
    for i, layer in enumerate(model.layers):
        print(i, layer.name)


In [None]:
# FK-TODO: entferne die getAccuracy()-Methode. Implementiere stattdessen https://stackoverflow.com/questions/37657260/how-to-implement-custom-metric-in-keras oder https://keras.io/api/metrics/#custom-metrics
def getAccuracy(dataset, prediction_model, ctc_decode):
    accuracy = tf.keras.metrics.Accuracy()

    for batch in dataset:
        accuracy.update_state(batch["label"], ctc_decode(prediction_model.predict(batch["image"], verbose=0)))

    return accuracy.result().numpy()

## Preparation

In [None]:
if inColab:
    GoogleDriveManager.mount()

In [None]:
modelDAO = ModelDAO()
charNumConverter = CharNumConverter(CaptchaGenerator.characters)
predictionsDecoder = PredictionsDecoder(CaptchaGenerator.captchaLength, charNumConverter.num_to_char)
captchaShape = CaptchaShape()
datasetFactory = DatasetFactory(captchaShape, charNumConverter.char_to_num, batch_size = 64)

In [None]:
def saveModel(model):
    modelFilepath = f'{srcPath}/captcha/{model.name}'
    modelDAO.saveModel(model, modelFilepath)
    if inColab:
        GoogleDriveManager.uploadFolderToGoogleDrive(modelFilepath)

## Create And Train Base Model

In [None]:
if inColab:
    !apt-get update
    !sudo apt install ttf-mscorefonts-installer
    !sudo fc-cache -f
    !fc-match Arial

In [None]:
# "We generate 200,000 images for base model pre-training"
captchaGenerator = CaptchaGenerator(
    numCaptchas = 200000, # 50, # 200000,
    dataDir = Path(srcPath + '/captchas/generated/VAERS/'))

In [None]:
captchaGenerator.createAndSaveCaptchas()

In [None]:
train_dataset, validation_dataset, test_dataset = getTrainValidationTestDatasets(captchaGenerator.dataDir, datasetFactory)

In [None]:
for batch in train_dataset.take(1):
    numImages2Display = 16
    images = batch["image"][:numImages2Display]
    labels = batch["label"][:numImages2Display]
    displayImagesInGrid(4, images, predictionsDecoder.asStrings(labels), ['black'] * len(labels))

In [None]:
modelFactory = ModelFactory(captchaShape, charNumConverter.char_to_num)
model = modelFactory.createMobileNetV3Small()
model.summary()

In [None]:
# "the success rates became stable after the base-model training epochs exceeded 20"
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=20)


In [None]:
saveModel(model)

In [None]:
prediction_model = ModelFactory.createPredictionModel(model)
prediction_model.summary()


In [None]:
display16Predictions(prediction_model, test_dataset, predictionsDecoder)

In [None]:
getAccuracy(test_dataset, prediction_model, predictionsDecoder.ctc_decode)

## Transfer learning

In [None]:
# "we collected 1,500 real CAPTCHAs from the websites. Note that only 500 of them are used for fine-tuning, and another 1,000 are applied to calculate the test accuracy"
# FK-TODO: lade das pre-trainierte model und trainiere es mit 500 real-world-Daten aus dem Ordner captchas/VAERS/, die restlichen 540 (es sollten nach obigem Zitat aber 1,000 sein) sind dann die Test-Daten.
# see https://keras.io/guides/transfer_learning/
# see https://www.tensorflow.org/tutorials/images/transfer_learning


In [None]:
modelName, numTrainableLayers = 'MobileNetV3Small', 104
# modelName, numTrainableLayers = 'ResNet101', 348

In [None]:
# FK-TODO: DRY with VAERSFileDownloader
modelFilepath = f'{srcPath}/captcha/{modelName}'
model = modelDAO.loadModel(modelFilepath)
model.summary(show_trainable=True)

In [None]:
# printLayers(model)

In [None]:
model.trainable = True
for layer in model.layers[:numTrainableLayers]:
    layer.trainable = False

In [None]:
model.summary(show_trainable=True)

In [None]:
train_dataset, validation_dataset, test_dataset = getTrainValidationTestDatasets(Path(f"{srcPath}/captcha/captchas/VAERS/"), datasetFactory)

In [None]:
# "The model is optimized by a stochastic gradient descent (SGD) strategy with an initial learning rate of 0.004, weight decay of 0.00004 and momentum of 0.9."
from tensorflow.keras.optimizers import SGD
# model.compile(optimizer=SGD(learning_rate=0.0001, momentum=0.9))
model.compile(optimizer='adam')

# "Therefore, in our experiments, we chose 1 epoch for the fine-tuning stage."
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=20)


In [None]:
prediction_model = ModelFactory.createPredictionModel(model)
prediction_model.summary()

In [None]:
getAccuracy(test_dataset, prediction_model, predictionsDecoder.ctc_decode)

In [None]:
display16Predictions(prediction_model, test_dataset, predictionsDecoder)

In [None]:
saveModel(model)