{"id":52524,"date":"2021-10-14T15:00:28","date_gmt":"2021-10-14T14:00:28","guid":{"rendered":"https:\/\/www.microsoft.com\/en-gb\/industry\/blog\/?p=52524"},"modified":"2022-02-10T20:45:14","modified_gmt":"2022-02-10T19:45:14","slug":"building-scalable-data-science-applications-using-containers-part-6","status":"publish","type":"post","link":"https:\/\/www.microsoft.com\/en-gb\/industry\/blog\/technetuk\/2021\/10\/14\/building-scalable-data-science-applications-using-containers-part-6\/","title":{"rendered":"Building Scalable Data Science Applications using Containers \u2013 Part 6"},"content":{"rendered":"
<\/p>\n
Welcome to the sixth part of this blog series around using containers for Data Science. In parts one<\/a>, two<\/a>, three<\/a>, four<\/a>, and five<\/a>, we provided a number of building blocks that we\u2019ll use here. If this is the first blog you\u2019ve seen, it may be worth skimming the first five parts, or going back and progressing through them. We make a number of assumptions about your familiarity with Docker, storage, and multi-container applications, which were covered previously.<\/p>\n In this article we convert the previous docker-compose application (part five<\/a>) to one that capitalises on a Kubernetes approach \u2013 scalability, resilience, predefined configuration packages with Helm etc.<\/p>\n Reviewing the previous Docker approach\u2019s structure, almost everything sits in a container mounting shared storage.<\/p>\n <\/p>\n Kubernetes brings a different dimension to how you might consider a solution, and our approach builds on this. In this article, we won\u2019t stretch the bounds of what Kubernetes can do, but we will show how to take an existing containers-based application, and slowly migrate that capability to cloud services with Kubernetes used as an orchestration engine.<\/p>\n This is the revised architecture.<\/p>\n <\/p>\n Things to note about this:<\/p>\n We won\u2019t use hard-coded passwords and host names etc within our source as we did in the previous instalment, but we will use configurable variables. This is still less secure than it could be as environment variable values are still visible within Kubernetes configuration files. A more secure approach might use Azure Key Vault and say CSI Secrets<\/a>. However, we want to minimise the length of this blog rather than be distracted by container security. The CSI Secrets link should clarify how to apply this yourself if you need.<\/p>\n For the purposes of this blog, we assume that:<\/p>\n <\/p>\n All the code for this tutorial can be downloaded here<\/a>.<\/p>\n We\u2019ll hold our application under a single directory tree. Create the aks<\/strong> directory, and then beneath that, create sub-directories called containers\/iload<\/strong>, and containers\/worker<\/strong>.<\/p>\n As with the previous instalment, we will use the same classic CIFAR<\/a> images set for our testing. There is a GitHub source that has them in jpg form, which can be downloaded here<\/a>.<\/p>\n Go into your aks directory and clone the repo. You should see something like the following:<\/p>\n <\/p>\n Previous, we used container volumes. In this case, we\u2019ll use blob storage and all containers will reference the same content. Copy the script below into a file called initialprep.sh<\/strong>. Modify the first three lines to refer to the names of an Azure Resource Group, Storage Account, and Blob Container. It will create those resources and upload all the CIFAR images to the Storage Container. If you already have a resource group, storage Account and Storage Container, feel free to remove the lines that do the create.<\/p>\n When we run this, you should see the resource creation followed by the upload to the repository.<\/p>\n <\/p>\n The previous container-only approach used a Postgres container to record results. Azure provides resilient, scalable services, which are easily configurable, so there\u2019s no need to build our own. Let\u2019s provision one of those services and refer to it later.<\/p>\n Below you can see how to list available Postgres SKU types, where the format is (Model_Generation_Cores<\/strong>), so a B<\/strong>asic single core Gen 5<\/strong> server would be \u201cB_Gen5_1<\/strong>\u201d.<\/p>\n Choose the smallest server available. We\u2019ll allocate a basic single core server with 50GB of storage. At the time of writing, this cost around \u00a325\/month but we could also have chosen much less expensive SQL-DB server for around \u00a35\/month with 2GB of storage, but we\u2019d need to change your SQL slightly. We\u2019ve changed as little as necessary from our previous instalment of this blog, but feel free to make your own optimisations.<\/p>\n Here you can see that I\u2019m provisioning a database called cifardb<\/strong> with an administrator name of \u2018jon<\/strong>\u2019 and a password of \u2018P@ssw0rd123<\/strong>\u2019, It also returns the fully qualified domain name of the server (cifardb.postgres.database.azure.com<\/strong>).<\/p>\n By default, Postgres denies access to all services. You can define private networks to ensure very granular access within and from outside Azure. In this case, we\u2019ll provide default access to any Azure service (e.g. Kubernetes). Note, that this does not<\/strong> provide access to any external public endpoint.<\/p>\n <\/p>\n We\u2019re now at the stage where the components to be added are containers. Where we previously used Docker, we\u2019ll now run them on a Kubernetes cluster. The purpose of this article is not to focus on everything Kubernetes. Rather, give a simple example of running Data Science services on Azure Kubernetes.<\/p>\n There are many publicly available guides to understanding the fundamentals of Kubernetes, as well as the Azure approach to implementing it. Microsoft has a set of modules that will introduce you to many of the concepts here<\/a>.<\/p>\n Create a file called aks.sh<\/strong> containing the following, and place this within the aks directory. Replace the Resource Group, AKS Server Name and Azure Container Repository names with your choices.<\/p>\n What this does is create a Kubernetes cluster and a Container Registry and then gives the cluster permission to pull images from the registry. Execute that script.<\/p>\n Now we\u2019ll let our local Kubernetes CLI environment (e.g. laptop \/ desktop) connect to our Azure Kubernetes cluster and confirm that we can see services running.<\/p>\n This shows the cluster running and that we can control it from our local environment.<\/p>\n <\/p>\n Let\u2019s confirm where we are in the overall process.<\/p>\n The final part is to add the application. The key thing to consider with this new approach is that where previously we built all the services, a cloud platform allows us to take advantage of commodity capabilities that are already designed to be scalable and resilient. Looking at the diagram of the cloud version of this application, there are three components outstanding, and each of these uses containers.<\/p>\n <\/p>\n The next component in our solution is the queueing mechanism. Previously, we built a RabbitMQ container to manage our requests. We\u2019ll do the same here, but not with a Dockerfile. We could, but let\u2019s show you an alternative approach using Helm. Helm is a Kubernetes package manager that allows you to install and configure applications very easily. We could achieve the same by building our own container, but Helm makes the process trivial, and there are many ready-made applications available. The documentation for installing RabbitMQ using Helm can be found here<\/a>, but the two lines below are all I needed to get RabbitMQ installed and running in my environment.<\/p>\n There is some interesting information to note here:<\/p>\n We\u2019ll need these credentials in a minute. In the meantime, let\u2019s see what was deployed in our environment:<\/p>\n As there is no external IP address, use the port forward<\/strong> command and let\u2019s interact with RabbitMQ.<\/p>\n <\/p>\n If we now add the credentials extracted earlier, we can see our running RabbitMQ environment.<\/p>\n <\/p>\n <\/p>\n This process performs two functions. First, it connects to our Postgres environment and creates the CATEGORY_RESULTS table if it doesn\u2019t already exist, and then it queues all the images that were uploaded to the storage account earlier so they can be classified. In this example we\u2019re running this as a one-off, but you could also take a more sophisticated approach using a location argument for daily, or ad-hoc batches of images.<\/p>\n Go into the containers\/iload<\/strong> directory and create a file called iload.py<\/strong> containing the following:<\/p>\n As with the previous version of this application, the script extracts all image names in our storage location and adds them to a queue to be classified. The first key difference with this version is that our images aren\u2019t stored in a container\u2019s local disk, but in an Azure storage account so we\u2019ll need our blob storage credentials.<\/p>\n The second thing to note is that we\u2019re using environment variables within the code. This means that the script can refer to customised and changing services without a need to continually modify the code. You can use the same code against different data sources, queues, or storage accounts.<\/p>\n In the containers\/iload<\/strong> directory create a file called Dockerfile<\/strong> containing the following.<\/p>\n This simply defines a container with Python installed, and relevant libraries to access Azure storage, Postgres, and RabbitMQ.<\/p>\n Within that directory, build the container, and then we\u2019ll then move it to our Azure Container Registry.<\/p>\n Now we\u2019ll log in to our Azure Container Registry, tag our local image against a target image in the remote repository, and then push it to Azure. We\u2019ll also confirm that it is there, by doing an Azure equivalent of a docker images<\/strong> (az acr repository list\u2026). Note that we are prefixing the image tag with the name of the Azure Container Registry (jmcifaracr.azurecr.io<\/strong>).<\/p>\n We\u2019re almost there.<\/p>\n Kubernetes has a number of ways of executing workload. The two we\u2019re interested in specifically are deployments and jobs. The key difference is that a job is executed once, whereas a deployment is expected to remain operational, and if anything happens to the process, then Kubernetes will attempt to keep that resource operational. In other words, if a container dies, then it will be restarted.<\/p>\n For the iload process, we only want this to load our 60,000 images and then terminate. We don\u2019t want to load the images, and restart the container, only to load them again, and again etc. To run this job, we\u2019ll provide a configuration file containing the job details and submit it to Kubernetes.<\/p>\n In the containers\/iload<\/strong> directory, create a file called iload-job.yml<\/strong> with the following:<\/p>\n Let\u2019s spend some time looking at this.<\/p>\n The job is going to process the images just uploaded to the container repository. All variables in the script are defined here. We could run this using different values and keep our source code stable. We are using the RabbitMQ and Postgres credentials shown earlier. In addition, we\u2019re referencing our blob storage key and container derived earlier.<\/p>\n Note that the passwords are shown here in clear text, and ideally, we would use something like Azure Key Vault where none of this information is visible. You might consider a more secure approach using CSI Secrets<\/a>, where none of this information is exposed outside of the container.<\/p>\n If we kick off that job using kubectl, you will see it being deployed, and a pod created. Once the job completes, you can also see that the container logs show the job\u2019s progress.<\/p>\n If you return to the RabbitMQ dashboard, you will see the queue contents increase from zero to 60,000 items. At its peak, the job added around 3,500 requests per second.<\/p>\n <\/p>\n The final component in our application is the worker process. Its role is to take an item off the queue, classify it, and then record accuracy of predictions.<\/p>\n Go into the containers\/worker<\/strong> directory and create a file called worker.py<\/strong> containing the following:<\/p>\n The main function of this hasn\u2019t changed since the previous version of this instalment. It takes a request from the queue, containing an image\u2019s physical location, and its expected category returning a predicted category and a confidence value. It also stores these values in a database if desired.<\/p>\n Like the iload process, the key differences here are as follows:<\/p>\n We also added the ability to log results depending on the value of an environment variable, so you might want to play with this to determine the performance impact of logging.<\/p>\n In the containers\/worker<\/strong> directory create a file called Dockerfile<\/strong> containing the following.<\/p>\n Again, this is relatively straight forward. You build a container with the requisite Azure, Python, RabbitMQ, and machine learning libraries installed.<\/p>\n As with the iload process, you need to build a local container, tag it against a target image in the Azure Container Registry and then push it to Azure.<\/p>\n Now we need to provide a deployment file for the worker process. This defines how it is run within Kubernetes.<\/p>\n In the containers\/worker<\/strong> directory, create a file called worker-deployment.yml<\/strong> containing the following:<\/p>\n Let\u2019s spend a bit of time going through this as well.<\/p>\n First, this is a deployment, and it ensures that there is always a defined number of replicas<\/strong> (or pods in this case) running. This configuration uses a single pod, but when we increase this number later, you\u2019ll see how it affects the environment and performance. Second, each pod is allocated an amount of memory and CPU. Some processes are memory intensive, and others compute centric. You can decide how much to dedicate to each pod type.<\/p>\n Let\u2019s deploy that container and evaluate the performance.<\/p>\n You can see that there is an active deployment and a single worker running. This is the view from the RabbitMQ dashboard \u2013 1.8 requests<\/strong> on average per second.<\/p>\n <\/p>\n Increase the number of parallel workers to 5<\/strong> by modifying the replica<\/strong> count in the worker-deployment.yml<\/strong> file and redeploying it. You will then have 5 pods. Each worker takes a request from the queue, performs the image classification, and writes the content to Postgres.<\/p>\n Performance has now increased to an average of 8.8 requests per second.<\/p>\n <\/p>\n Here is a view of performance after increase the replica count even further to 20 (35 requests per second).<\/p>\n <\/p>\n And then 35 workers (55 requests per second).<\/p>\n <\/p>\n This isn\u2019t linear scalability, nor is it an invitation to simply increase the number of workers to 500. Each Kubernetes node has a limited amount of physical resource. During our tests, we achieved 70 requests per second after playing with how much memory and CPU were allocated to each pod. This is an exercise for you to consider with your own workloads. What should be understood though, is that you can scale your service as needed with the underlying Kubernetes architecture to support that. More pods, nodes, clusters etc as needed.<\/p>\n <\/p>\n This article showed how to take an existing multi-container Docker application and migrate it to the Azure Kubernetes Service. Where possible, commodity PaaS capabilities were considered (database, storage etc.). We also showed how to use a publicly available configuration using Helm.<\/p>\n The previous instalment of this blog solely used containers writing the results to Postgres. We did the same here, but there\u2019s nothing to suggest a need to immediately query the results. If this were performance critical, we might consider writing the results to a file, and then batch uploading those to a database at some point for analysis \u2013 much more efficient.<\/p>\n Our application is tiny, and arguably too small to justify an entire Kubernetes environment. However, a Kubernetes environment normally runs many different applications simultaneously within private networks, using well defined security, performance monitoring, and with much more flexibility in terms of scalability and cost optimisation. Since you are only charged for the Kubernetes environment not the number of pods, you can run as many or as few applications as you like in that environment subject to capacity.<\/p>\n You might also want to consider adding a node pool for GPU nodes that will dramatically change your performance where your applications are able to use a service\u2019s underlying GPU. More information can be found here<\/a>.<\/p>\n The articles in this series have focused on the basics of containers on Azure to address some data science patterns with an assumed current interest in on-premises containers to deliver data science solutions.<\/p>\n We haven\u2019t considered the use of MLOps<\/a> where you might approach machine learning and data science with the same rigour, governance, and outcome transparency offered to software development. It hasn\u2019t considered the use of Azure Machine Learning<\/a> where you might want to replace some of your historical code with PaaS machine learning capabilities, and optimised compute.<\/p>\n Future instalments may look at these, incorporating your containers with these prebuilt Azure capabilities.<\/p>\n Note:<\/b>\u00a0If you\u2019ve finished this tutorial and created a specific resource group to try it, then you may want to remove it to ensure you\u2019re no longer being charged for resources that are no longer needed.<\/p>\n <\/p>\n Jon is a Microsoft Cloud Solution Architect specialising in Advanced Analytics & Artificial Intelligence with over 30 years of experience in understanding, translating and delivering leading technology to the market. He currently focuses on a small number of global accounts helping align AI and Machine Learning capabilities with strategic initiatives. He moved to Microsoft from IBM where he was Cloud & Cognitive Technical Leader and an Executive IT Specialist.<\/p>\n Jon has been the Royal Academy of Engineering Visiting Professor for Artificial Intelligence and Cloud Innovation at Surrey University since 2016, where he lectures on various topics from machine learning, and design thinking to architectural thinking.<\/p>\n Mark has worked at Microsoft for five and a half years with a focus on helping customers adopt cloud native technologies. Before Microsoft, he spent around twenty years in the financial services industry, primarily at major UK banks, where he worked in various roles across operations, engineering and architecture. He loves discovering new technologies, learning them in depth and teaching others.<\/p>\n","protected":false},"excerpt":{"rendered":" In this second article in a two-part miniseries, Jon Machtynger and Mark Whitby convert the previous docker-compose application to one that capitalises on a Kubernetes approach.<\/p>\n","protected":false},"author":430,"featured_media":36918,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"ep_exclude_from_search":false,"_classifai_error":"","footnotes":""},"categories":[594],"post_tag":[519],"content-type":[],"coauthors":[531,1776],"class_list":["post-52524","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-technetuk","tag-technet-uk"],"yoast_head":"\n\n
\n
Let\u2019s Begin<\/h3>\n
$ cd aks<\/strong>\r\n\r\n$ git clone https:\/\/github.com\/YoongiKim\/CIFAR-10-images.git<\/strong>\r\nCloning into 'CIFAR-10-images'...\r\nremote: Enumerating objects: 60027, done.\r\nremote: Total 60027 (delta 0), reused 0 (delta 0), pack-reused 60027\r\nReceiving objects: 100% (60027\/60027), 19.94 MiB | 16.28 MiB\/s, done.\r\nResolving deltas: 100% (59990\/59990), done.\r\nUpdating files: 100% (60001\/60001), done.\r\n\r\n$ tree -L 1 .<\/strong>\r\naks\r\n\u251c\u2500\u2500 CIFAR-10-images\r\n\u2514\u2500\u2500 containers\r\n\r\n2 directories<\/pre>\n
Blob Storage and your image<\/h3>\n
RGNAME=\u201drg-cifar<\/strong>\u201d\r\nSTG=\u201dcifarimages<\/strong>\u201d\r\nCON=\u201dcifarstorage<\/strong>\u201d\r\nEXPIRES=$(date --date='1 days' \"+%Y-%m-%d\")\r\nIMAGEDIR=\u201dCIFAR-10-images\u201d\r\n\r\n# Create environment\r\naz group create -l uksouth -n $RGNAME\r\naz storage account create --name $STG --resource-group $RGNAME --location uksouth --sku Standard_ZRS # Create Storage Account\r\naz storage container create --account-name $STG --name $CON --auth-mode login # Create your storage container\r\n\r\nACCOUNTKEY=$(az storage account keys list --resource-group $RGNAME --account-name $STG | grep -i value | head -1 | cut -d':' -f2 | tr -d [\\ \\\"])\r\n\r\n# Generate a temporary SAS key\r\nSAS=$(az storage container generate-sas --account-key $ACCOUNTKEY --account-name $STG --expiry $EXPIRES --name $CON --permissions acldrw | tr -d [\\\"])\r\n\r\n# Determine your URL endpoint\r\nSTGURL=$(az storage account show --name $STG --query primaryEndpoints.blob | tr -d [\\\"])\r\nCONURL=\"$STGURL$CON\"\r\n\r\n# Copy the files to your storage container\r\nazcopy cp \"$IMAGEDIR\" \"$CONURL?$SAS\" \u2013recursive<\/pre>\n
$ initialprep.sh<\/strong>\r\n{\r\n \"id\": \"\/subscriptions\/f14bca45-bd2d-42f2-8a45-1248ab77ba72\/resourceGroups\/rg-cifar2\",\r\n \"location\": \"uksouth\",\r\n \"managedBy\": null,\r\n \"name\": \"rg-cifar2\",\r\n \"properties\": {\r\n \"provis\r\n\r\nJob 8b0ccc36-2050-0a44-496e-c09d979f3169 summary\r\nElapsed Time (Minutes): 0.8001\r\nNumber of File Transfers: 60025\r\nNumber of Folder Property Transfers: 0\r\nTotal Number of Transfers: 60025\r\nNumber of Transfers Completed: 60025\r\nNumber of Transfers Failed: 0\r\nNumber of Transfers Skipped: 0\r\nTotalBytesTransferred: 83127418\r\nFinal Job Status: Completed<\/pre>\n
Database Storage<\/h3>\n
$ az postgres server list-skus -l uksouth | grep -i id<\/strong>\r\n\r\n \"id\": \"Basic\",\r\n \"id\": \"B_Gen5_1\",\r\n \"id\": \"B_Gen5_2\",\r\n \"id\": \"GeneralPurpose\",\r\n \"id\": \"GP_Gen5_2\",\r\n \"id\": \"GP_Gen5_4\",\r\n \"id\": \"GP_Gen5_8\",\r\n \"id\": \"GP_Gen5_16\",\r\n \"id\": \"GP_Gen5_32\",\r\n \"id\": \"GP_Gen5_64\",\r\n \"id\": \"MemoryOptimized\",\r\n \"id\": \"MO_Gen5_2\",\r\n \"id\": \"MO_Gen5_4\",\r\n \"id\": \"MO_Gen5_8\",\r\n \"id\": \"MO_Gen5_16\",\r\n \"id\": \"MO_Gen5_32\",<\/pre>\n
$ az postgres server create --resource-group rg-cifar --name cifardb --location uksouth --admin-user jon --admin-password \"P@ssw0rd123\" --sku-name B_Gen5_1 --storage-size 51200\r\nChecking the existence of the resource group 'rg-cifar'...<\/strong>\r\n{\r\n.\r\n.\r\n \"administratorLogin\": \"jon\",\r\n \"password\": \"P@ssw0rd123\",\r\n.\r\n \"fullyQualifiedDomainName\": \"cifardb.postgres.database.azure.com\",\r\n.\r\n}\r\n$\r\n\r\n# Allow Azure services (e.g. Kubernetes) to access this\r\n$ az postgres server firewall-rule create --resource-group rg-cifar --server-name cifardb --name \"AllowAllLinuxAzureIps\" --start-ip-address \"0.0.0.0\" --end-ip-address \"0.0.0.0\"\r\n<\/strong>\r\n{\r\n \"endIpAddress\": \"0.0.0.0\",\r\n.\r\n \"startIpAddress\": \"0.0.0.0\",\r\n \"type\": \"Microsoft.DBforPostgreSQL\/servers\/firewallRules\"\r\n}<\/pre>\n
The Kubernetes Cluster<\/h3>\n
RGNAME=rg-cifar<\/strong>\r\nAKSNAME=cifarcluster<\/strong>\r\nACRNAME=jmcifaracr<\/strong>\r\n\r\n# Create an AKS cluster with default settings\r\naz aks create -g $RGNAME -n $AKSNAME --kubernetes-version 1.19.11\r\n\r\n# Create an Azure Container Registry\r\naz acr create --resource-group $RGNAME --name $ACRNAME --sku Basic\r\n\r\n# Attach the ACR to the AKS cluster\r\naz aks update -n $AKSNAME -g $RGNAME --attach-acr $ACRNAME<\/pre>\n
$ aks.sh<\/strong>\r\n{\r\n.\r\n \"kubernetesVersion\": \"1.19.11\",\r\n.\r\n \"networkProfile\": {\r\n \"dnsServiceIp\": \"10.0.0.10\",\r\n.\r\n}<\/pre>\n
$ az aks get-credentials --name cifarcluster --resource-group rg-cifar\r\n\r\n$ kubectl get services -A<\/strong>\r\n\r\nNAMESPACE NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE\r\ndefault kubernetes ClusterIP 10.0.0.1 <none> 443\/TCP 27d\r\nkube-system healthmodel-replicaset-service ClusterIP 10.0.243.143 <none> 25227\/TCP 27d\r\nkube-system kube-dns ClusterIP 10.0.0.10 <none> 53\/UDP,53\/TCP 27d\r\nkube-system metrics-server ClusterIP 10.0.133.242 <none> 443\/TCP 27d<\/pre>\n
Sense Check<\/h3>\n
\n
\n
The Queue Process<\/h3>\n
$ helm repo add bitnami https:\/\/charts.bitnami.com\/bitnami\r\n$ helm install rabbitmq bitnami\/rabbitmq<\/strong>\r\n\r\n.\r\n.\r\nCredentials:\r\n echo \"Username : user\"\r\n echo \"Password : $(kubectl get secret --namespace default rabbitmq -o jsonpath=\"{.data.rabbitmq-password}\" | base64 --decode)\"\r\n echo \"ErLang Cookie : $(kubectl get secret --namespace default rabbitmq -o jsonpath=\"{.data.rabbitmq-erlang-cookie}\" | base64 --decode)\"\r\n.\r\n.\r\n.\r\nTo Access the RabbitMQ AMQP port:\r\n echo \"URL : amqp:\/\/127.0.0.1:5672\/\"\r\n kubectl port-forward --namespace default svc\/rabbitmq 5672:5672\r\nTo Access the RabbitMQ Management interface:\r\n echo \"URL : http:\/\/127.0.0.1:15672\/\"\r\n kubectl port-forward --namespace default svc\/rabbitmq 15672:15672<\/pre>\n
\n
$ echo \"Username : user\"\r\nUsername : user<\/strong>\r\n$ echo \"Password : $(kubectl get secret --namespace default rabbitmq -o jsonpath=\"{.data.rabbitmq-password}\" | base64 --decode)\"\r\nPassword : 7TrP8KOVdC<\/strong><\/pre>\n
$ kubectl get services<\/strong>\r\nNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE\r\nkubernetes ClusterIP 10.0.0.1 443\/TCP 27d\r\nrabbitmq ClusterIP 10.0.180.137 5672\/TCP,4369\/TCP,25672\/TCP,15672\/TCP 15m\r\nrabbitmq-headless ClusterIP None 4369\/TCP,5672\/TCP,25672\/TCP,15672\/TCP 15m\r\n\r\n$ kubectl get pods<\/strong>\r\nNAME READY STATUS RESTARTS AGE\r\nrabbitmq-0 1\/1 Running 0 16m<\/pre>\n
$ kubectl port-forward --namespace default svc\/rabbitmq 15672:15672 &<\/strong>\r\n[1] 88032\r\nForwarding from 127.0.0.1:15672 -> 15672\r\nForwarding from [::1]:15672 -> 15672<\/pre>\n
The Initial Load<\/h3>\n
#!\/usr\/bin\/env python\r\nimport sys, os, json, pika\r\nimport psycopg2\r\nfrom azure.storage.blob import ContainerClient\r\n\r\n# Get Environment Vars\r\nRMQ_USER<\/strong>=os.environ[\"RMQ_USER\"] # RabbitMQ Username\r\nRMQ_PASS<\/strong>=os.environ[\"RMQ_PASS\"] # RabbitMQ Password\r\nRMQ_HOST<\/strong>=os.environ[\"RMQ_HOST\"] # RabbitMQ Hostname\r\nSQL_HOST<\/strong>=os.environ[\"SQL_HOST\"] # SQL Hostname\r\nSQL_DB<\/strong>=os.environ[\"SQL_DB\"] # SQL Database\r\nSQL_USER<\/strong>=os.environ[\"SQL_USER\"] # SQL Username\r\nSQL_PASS<\/strong>=os.environ[\"SQL_PASS\"] # SQL Password\r\nSTG_ACNAME<\/strong>=os.environ[\"STG_ACNAME\"] # Storage Account Name\r\nSTG_ACKEY<\/strong>=os.environ[\"STG_ACKEY\"] # Storage Account Key\r\n\r\n# Set up database table if needed\r\ncmd = \"\"\"\r\n CREATE TABLE IF NOT EXISTS CATEGORY_RESULTS (\r\n FNAME VARCHAR(1024) NOT NULL,\r\n CATEGORY NUMERIC(2) NOT NULL,\r\n PREDICTION NUMERIC(2) NOT NULL,\r\n CONFIDENCE REAL);\r\n \"\"\"<\/strong>\r\npgconn = psycopg2.connect(user=SQL_USER, password=SQL_PASS,\r\n host=SQL_HOST, port=\"5432\", database=SQL_DB)\r\ncur = pgconn.cursor()\r\ncur.execute(cmd)\r\ncur.close()\r\npgconn.commit()\r\n\r\n# Load all images in defined storage account\r\nCONNECTION_STRING=\"DefaultEndpointsProtocol=https\" + \\\r\n \";EndpointSuffix=core.windows.net\" + \\\r\n \";AccountName=\"+STG_ACNAME+\";AccountKey=\"+STG_ACKEY\r\nROOT=\"\/CIFAR-10-images\" # This is where the images are held\r\ncontainer = ContainerClient.from_connection_string(CONNECTION_STRING, container_name=\"cifar\")\r\n\r\nrLen = len(ROOT)\r\nclasses = ('airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')\r\n\r\n# Determine the expected category by parsing the directory (after the root path)\r\ndef fnameToCategory(fname):\r\n for c in classes:\r\n if (fname.find(c) > rLen):\r\n return (classes.index(c))\r\n return -1 # This should never happen\r\n\r\nIMGS=[]\r\nblob_list = container.list_blobs()\r\nfor blob in blob_list:\r\n if blob.name.endswith(('.png', '.jpg', '.jpeg')):\r\n cat = fnameToCategory(blob.name)\r\n data = {\"image\" : blob.name, \"category\": cat, \"catName\": classes[cat]}\r\n message = json.dumps(data)\r\n IMGS.append(message)\r\nprint(\"Number of Images to add to queue = \", len(IMGS))\r\n\r\n# Now write them into the queue\r\ncredentials = pika.PlainCredentials(RMQ_USER, RMQ_PASS)\r\nparameters = pika.ConnectionParameters(RMQ_HOST, 5672, '\/', credentials)\r\nconnection = pika.BlockingConnection(parameters)\r\nchannel = connection.channel()\r\nchannel.queue_declare(queue='image_queue', durable=True)\r\n\r\nfor i in IMGS:\r\n channel.basic_publish( exchange='', routing_key='image_queue', body=i,\r\n properties=pika.BasicProperties(delivery_mode=2,)\r\n )\r\n print(\"Queued \", i)\r\n\r\nconnection.close()<\/pre>\n
FROM ubuntu\r\n\r\nRUN apt-get update\r\nRUN apt-get install -y python3 python3-pip\r\n\r\nRUN apt-get update && apt-get install -y poppler-utils net-tools vim\r\nRUN pip install azureml-sdk\r\nRUN pip install azureml-sdk[notebooks]\r\nRUN pip install azure.ai.formrecognizer\r\nRUN pip install azure.storage.blob\r\nRUN pip install jsonify\r\nRUN pip install pika\r\nRUN pip install psycopg2-binary\r\n\r\nADD iload.py \/\r\n\r\nCMD [\"python3\", \".\/iload.py\" ]<\/pre>\n
$ docker build -t iload .<\/strong>\r\n.\r\n.\r\n=> writing image sha256:4ef19e469755572da900ec15514a4a205953a457c4f06f2795b150db3f2b11eb \r\n=> naming to docker.io\/library\/iload<\/pre>\n
# Login to the Azure Container Repository\r\n$ az acr login -n rg-cifar -n jmcifaracr<\/strong>\r\nLogin Succeeded\r\n\r\n$ docker tag iload jmcifaracr.azurecr.io\/iload:1.0<\/strong>\r\n\r\n$ docker images<\/strong>\r\nREPOSITORY TAG IMAGE ID CREATED SIZE\r\niload latest 4ef19e469755 32 minutes ago 1.23GB\r\njmcifaracr.azurecr.io\/iload 1.0 4ef19e469755 32 minutes ago 1.23GB\r\n\r\n$ docker push jmcifaracr.azurecr.io\/iload:1.0<\/strong>\r\nThe push refers to repository [jmcifaracr.azurecr.io\/iload]\r\n6dfdee2e824f: Pushed\r\ne35525d1f4bf: Pushed\r\n.\r\n.\r\n4942a1abcbfa: Pushed\r\n1.0: digest: sha256:e9d606e50f08c682969afe4f59501936ad0706c4a81e43d281d66073a9d4ef28 size: 2847\r\n\r\n$ az acr repository list --name jmcifaracr --output table<\/strong>\r\nResult\r\n--------\r\nIload<\/pre>\n
apiVersion: batch\/v1\r\nkind: Job\r\nmetadata:\r\n name: iload\r\nspec:\r\n template:\r\n spec:\r\n containers:\r\n - name: iload\r\n image: jmcifaracr.azurecr.io\/iload:1.0\r\n imagePullPolicy: Always\r\n env:\r\n - name: RMQ_USER\r\n value: \"user<\/strong>\"\r\n - name: RMQ_PASS\r\n value: \"7TrP8KOVdC<\/strong>\"\r\n - name: RMQ_HOST\r\n value: \"rabbitmq<\/strong>\"\r\n - name: SQL_HOST\r\n value: \"cifardb.postgres.database.azure.com<\/strong>\"\r\n - name: SQL_DB\r\n value: \"postgres<\/strong>\"\r\n - name: SQL_USER\r\n value: \"jon@cifardb.postgres.database.azure.com<\/strong>\"\r\n - name: SQL_PASS\r\n value: \"P@ssw0rd123<\/strong>\"\r\n - name: STG_ACNAME\r\n value: \"cifarimages<\/strong>\"\r\n - name: STG_ACKEY\r\n value: \"xxxxxxxxxxxxxxxx<\/strong>\"\r\n resources:\r\n requests:\r\n cpu: 500m\r\n memory: 512Mi\r\n limits:\r\n cpu: 500m\r\n memory: 512Mi\r\n restartPolicy: Never<\/pre>\n
$ kubectl apply -f iload-job.yml<\/strong>\r\njob.batch\/iload created\r\n\r\n$ kubectl get pods<\/strong>\r\nNAME READY STATUS RESTARTS AGE\r\niload-gpgqg 1\/1 Running 0 41s\r\nrabbitmq-0 1\/1 Running 0 159m\r\n\r\n$ kubectl get jobs<\/strong>\r\nNAME COMPLETIONS DURATION AGE\r\niload 1\/1 62s 17m\r\n\r\n$ kubectl logs iload-gpgqg<\/strong>\r\n.\r\n.\r\n.\r\nQueued {\"image\": \"CIFAR-10-images\/train\/truck\/4992.jpg\", \"category\": 9, \"catName\": \"truck\"}\r\nQueued {\"image\": \"CIFAR-10-images\/train\/truck\/4993.jpg\", \"category\": 9, \"catName\": \"truck\"}\r\nQueued {\"image\": \"CIFAR-10-images\/train\/truck\/4994.jpg\", \"category\": 9, \"catName\": \"truck\"}\r\nQueued {\"image\": \"CIFAR-10-images\/train\/truck\/4995.jpg\", \"category\": 9, \"catName\": \"truck\"}\r\nQueued {\"image\": \"CIFAR-10-images\/train\/truck\/4996.jpg\", \"category\": 9, \"catName\": \"truck\"}\r\nQueued {\"image\": \"CIFAR-10-images\/train\/truck\/4997.jpg\", \"category\": 9, \"catName\": \"truck\"}\r\nQueued {\"image\": \"CIFAR-10-images\/train\/truck\/4998.jpg\", \"category\": 9, \"catName\": \"truck\"}\r\nQueued {\"image\": \"CIFAR-10-images\/train\/truck\/4999.jpg\", \"category\": 9, \"catName\": \"truck\"}<\/pre>\n
#!\/usr\/bin\/env python\r\n\r\nfrom mxnet import gluon, nd, image\r\nimport mxnet as mx\r\nfrom mxnet.gluon.data.vision import transforms\r\nfrom gluoncv import utils\r\nfrom gluoncv.model_zoo import get_model\r\nimport psycopg2\r\nimport pika, time, os, json\r\nfrom azure.storage.blob import ContainerClient\r\n\r\nimport cv2\r\nimport numpy as np\r\n\r\n# Get Environment Vars\r\nRMQ_USER=os.environ[\"RMQ_USER<\/strong>\"] # RabbitMQ Username\r\nRMQ_PASS=os.environ[\"RMQ_PASS<\/strong>\"] # RabbitMQ Password\r\nRMQ_HOST=os.environ[\"RMQ_HOST<\/strong>\"] # RabbitMQ Hostname\r\nSQL_HOST=os.environ[\"SQL_HOST<\/strong>\"] # SQL Hostname\r\nSQL_DB=os.environ[\"SQL_DB<\/strong>\"] # SQL Database\r\nSQL_USER=os.environ[\"SQL_USER<\/strong>\"] # SQL Username\r\nSQL_PASS=os.environ[\"SQL_PASS<\/strong>\"] # SQL Password\r\nSTG_ACNAME=os.environ[\"STG_ACNAME<\/strong>\"] # Storage Account Name\r\nSTG_ACKEY=os.environ[\"STG_ACKEY<\/strong>\"] # Storage Account Key\r\nLOGTODB=os.environ[\"LOGTODB<\/strong>\"] # Log data to Database?\r\n\r\n# Location of Images on blob storage\r\nCONNECTION_STRING=\"DefaultEndpointsProtocol=https\" + \\\r\n \";EndpointSuffix=core.windows.net\" + \\\r\n \";AccountName=\"+STG_ACNAME+\";AccountKey=\"+STG_ACKEY\r\n\r\ncontainer = ContainerClient.from_connection_string(CONNECTION_STRING, container_name=\"cifar\")\r\n\r\nclass_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']\r\nnet = get_model('cifar_resnet110_v1', classes=10, pretrained=True)\r\n\r\ntransform_fn = transforms.Compose([\r\n transforms.Resize(32), transforms.CenterCrop(32), transforms.ToTensor(),\r\n transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])\r\n ])\r\n\r\ndef predictCategory(fname):\r\n blob_client = container.get_blob_client(fname)\r\n imgStream = blob_client.download_blob().readall()\r\n img = mx.ndarray.array(cv2.imdecode(np.frombuffer(imgStream, np.uint8), -1))\r\n img = transform_fn(img)\r\n \r\n pred = net(img.expand_dims(axis=0))\r\n ind = nd.argmax(pred, axis=1).astype('int')\r\n print('%s is classified as [%s], with probability %.3f.'%\r\n (fname, class_names[ind.asscalar()], nd.softmax(pred)[0][ind].asscalar()))\r\n return ind.asscalar(), nd.softmax(pred)[0][ind].asscalar()\r\n\r\ndef InsertResult(connection, fname, category, prediction, prob):\r\n count=0\r\n try:\r\n cursor = connection.cursor()\r\n qry = \"\"\" INSERT INTO CATEGORY_RESULTS (FNAME, CATEGORY, PREDICTION, CONFIDENCE) VALUES (%s,%s,%s,%s)\"\"\"\r\n record = (fname, category, prediction, prob)\r\n cursor.execute(qry, record)\r\n\r\n connection.commit()\r\n count = cursor.rowcount\r\n\r\n except (Exception, psycopg2.Error) as error :\r\n if(connection):\r\n print(\"Failed to insert record into category_results table\", error)\r\n finally:\r\n cursor.close()\r\n return count\r\n\r\n# Routine to pull message from queue, call classifier, and insert result to the DB\r\ndef callback(ch, method, properties, body):\r\n data = json.loads(body)\r\n fname = data['image']\r\n cat = data['category']\r\n pred, prob = predictCategory(fname)\r\n if (LOGTODB == 1):\r\n count = InsertResult(pgconn, fname, int(cat), int(pred), float(prob))\r\n else:\r\n count = 1 # Ensure the message is ack'd and removed from queue\r\n \r\n if (count > 0):\r\n ch.basic_ack(delivery_tag=method.delivery_tag)\r\n else:\r\n ch.basic_nack(delivery_tag=method.delivery_tag)\r\n\r\npgconn = psycopg2.connect(user=SQL_USER, password=SQL_PASS,\r\n host=SQL_HOST, port=\"5432\", database=SQL_DB)\r\ncredentials = pika.PlainCredentials(RMQ_USER, RMQ_PASS)\r\nparameters = pika.ConnectionParameters(RMQ_HOST, 5672, '\/', credentials)\r\nconnection = pika.BlockingConnection(parameters)\r\n\r\nchannel = connection.channel()\r\n\r\nchannel.queue_declare(queue='image_queue', durable=True)\r\nprint(' [*] Waiting for messages. To exit press CTRL+C')\r\n\r\nchannel.basic_qos(prefetch_count=1)\r\nchannel.basic_consume(queue='image_queue', on_message_callback=callback)\r\n\r\nchannel.start_consuming()<\/pre>\n
\n
FROM ubuntu\r\n\r\nRUN apt-get update\r\nRUN apt-get install -y python3 python3-pip\r\n\r\nRUN pip3 install --upgrade mxnet gluoncv pika\r\nRUN pip3 install psycopg2-binary\r\n\r\nRUN pip install azureml-sdk\r\nRUN pip install azureml-sdk[notebooks]\r\nRUN pip install azure.ai.formrecognizer\r\nRUN pip install azure.storage.blob\r\nRUN pip install opencv-python\r\n\r\nARG DEBIAN_FRONTEND=noninteractive\r\nRUN apt-get install ffmpeg libsm6 libxext6 -y\r\n\r\n# Add worker logic necessary to process queue items\r\nADD worker.py \/\r\n\r\n# Start the worker\r\nCMD [\"python3\", \".\/worker.py\" ]<\/pre>\n
$ docker build -t worker .<\/strong>\r\n.\r\n.\r\n=> [12\/12] ADD worker.py \r\n\/\r\n\r\n=> exporting to \r\nimage\r\n\r\n=> => exporting \r\nlayers\r\n\r\n=> => writing image \r\nsha256:9716e1e98687cfc3dd5f66640e441e4aa24131ffb3b3bd4c5d0267a06abcc802\r\n\r\n=> => naming to \r\ndocker.io\/library\/worker\r\n\r\n$ docker tag worker jmcifaracr.azurecr.io\/worker:1.0<\/strong>\r\n$ docker images<\/strong>\r\nREPOSITORY TAG IMAGE ID CREATED SIZE\r\nworker latest 9716e1e98687 About a minute ago 2.24GB\r\njmcifaracr.azurecr.io\/worker 1.0 9716e1e98687 About a minute ago 2.24GB\r\niload latest 4ef19e469755 3 hours ago 1.23GB\r\njmcifaracr.azurecr.io\/iload 1.0 4ef19e469755 3 hours ago 1.23GB\r\n\r\n$ docker push jmcifaracr.azurecr.io\/worker:1.0<\/strong>\r\nThe push refers to repository [jmcifaracr.azurecr.io\/worker]\r\n.\r\n.\r\n\r\n$ az acr repository list --name jmcifaracr --output table<\/strong>\r\nResult\r\n--------\r\niload\r\nworker<\/pre>\n
apiVersion: apps\/v1\r\nkind: Deployment<\/strong>\r\nmetadata:\r\n name: worker\r\nspec:\r\n replicas: 1<\/strong>\r\n selector:\r\n matchLabels:\r\n app: worker\r\n template:\r\n metadata:\r\n labels:\r\n app: worker\r\n spec:\r\n containers:\r\n - name: worker<\/strong>\r\n image: jmcifaracr.azurecr.io\/worker:1.0<\/strong>\r\n imagePullPolicy: Always\r\n env:\r\n - name: RMQ_USER<\/strong>\r\n value: \"user\"\r\n - name: RMQ_PASS<\/strong>\r\n value: \"7TrP8KOVdC\"\r\n - name: RMQ_HOST<\/strong>\r\n value: \"rabbitmq\"\r\n - name: SQL_HOST<\/strong>\r\n value: \"cifardb.postgres.database.azure.com\"\r\n - name: SQL_DB<\/strong>\r\n value: \"postgres\"\r\n - name: SQL_USER<\/strong>\r\n value: \"jon@cifardb.postgres.database.azure.com\"\r\n - name: SQL_PASS<\/strong>\r\n value: \"P@ssw0rd123\"\r\n - name: STG_ACNAME<\/strong>\r\n value: \"cifarimages\"\r\n - name: STG_ACKEY<\/strong>\r\n value: \u201cxxxxxxxx\u201d\r\n - name: LOGTODB<\/strong>\r\n value: \"1\"\r\n resources:\r\n requests:\r\n cpu: 100m\r\n memory: 128Mi\r\n limits:\r\n cpu: 150m\r\n memory: 128Mi<\/pre>\n
$ kubectl apply -f worker-deployment.yml<\/strong>\r\ndeployment.apps\/worker created\r\n\r\n$ kubectl get deployments<\/strong>\r\nNAME READY UP-TO-DATE AVAILABLE AGE\r\nworker 1\/1 1 1 52s\r\n\r\n$ kubectl get pods<\/strong>\r\nNAME READY STATUS RESTARTS AGE\r\niload-gpgqg 0\/1 Completed 0 110m\r\nrabbitmq-0 1\/1 Running 0 4h29m\r\nworker-5df6cb8cb7-qnwtq 1\/1 Running 0 54s<\/pre>\n
$ kubectl apply -f worker-deployment.yml<\/strong>\r\ndeployment.apps\/worker configured\r\n\r\n$ kubectl get deployments<\/strong>\r\nNAME READY UP-TO-DATE AVAILABLE AGE\r\nworker 1\/1 1 1 52s\r\n\r\n$ kubectl get pods<\/strong>\r\nNAME READY STATUS RESTARTS AGE\r\niload-gpgqg 0\/1 Completed 0 112m\r\nrabbitmq-0 1\/1 Running 0 4h32m\r\nworker-5df6cb8cb7-flqp4 1\/1 Running 0 51s\r\nworker-5df6cb8cb7-hsl2p 1\/1 Running 0 51s\r\nworker-5df6cb8cb7-qnwtq 1\/1 Running 0 3m32s\r\nworker-5df6cb8cb7-v9t6p 1\/1 Running 0 51s\r\nworker-5df6cb8cb7-x4dt4 1\/1 Running 0 51s<\/pre>\n
Conclusions and Considerations<\/h3>\n
About the authors<\/h3>\n