எளிய தமிழில் DevOps-11

Schedule using Airflow

இனிவரும் பகுதியில் இதற்கு முன் பகுதியில் கண்ட அதே விஷயத்தை Airflow கொண்டு உருவாக்கித் திட்டமிடுவது பற்றிக் காணலாம். கீழ்கண்ட கட்டளைகள் Airflow இயங்குவதற்குத் தேவையான விஷயத்தை இன்ஸ்டால் செய்யும்.

$ sudo pip3 install apache-airflow
$ sudo pip3 install flask
$ sudo pip3 install flask_bcrypt
$ sudo pip3 install kombu==4.5.0

இவை இயங்கி முடிந்தவுடன் நமது கணினியின் ஹோம் டைரக்டரியில் airflow எனும் டைரக்டரி உருவாக்கப்படும்.
இதற்கு முன் பகுதியில் பகுதியில் நாம் உருவாக்கிய இரண்டு பணிகளுக்கும் ஷெல் ஸ்கிரிப்ட்க்கு பதிலாக, இரண்டு பைத்தான் புரோகிராமை உருவாக்கி அவற்றை task1, task2 எனும் பெயரில் dags டைரக்டரியின் கீழ் (~/airflow/dags) சேமிக்கவும்.


from datetime import datetime
from airflow import DAG
import os
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
def backup():
backup_cmd = 'mysqldump -u root -ppassword sample > "/home/shrini/backups/"sample_"$(date +"%Y%m%d_%H%M%S").sql"'
os.system(backup_cmd)
dag = DAG('mysql_backup', description='Mysql backup every hour', schedule_interval='00 * * * *', start_date=datetime(2020, 11, 2), catchup=False)
backup_operator = PythonOperator(task_id='backup_task', python_callable=backup, dag=dag)

view raw

task1.py

hosted with ❤ by GitHub

நிரலுக்கான விளக்கம்:

Airflow-வில் பல ஆப்பரேட்டர்கள் உள்ளன. அவற்றில் எதன் துணைகொண்டு நாம் இந்தச் செயலை செய்யப் போகிறோமோ அதனை முதலில் import செய்து கொள்ள வேண்டும். இங்கு பைத்தான் ஆப்பரேட்டரை இம்போட் செய்துள்ளோம். பின்பு backup எடுப்பதற்கான கட்டளையை ஒரு function-க்குள் எழுதி இயக்கியுள்ளோம். எப்போதும் ஒரு ஷெல் கட்டளையை பைத்தான் மொழியில் இயக்குவதற்கு os.system() எனும் method பயன்படும். பின்பு இதற்கான dag வரையறுக்கப்படுகிறது. அது தனக்கென ஒரு id, வரையறை மற்றும் எப்போதெல்லாம் இயங்க வேண்டும் ஆகிய மதிப்புகளைப் பெற்றிருப்பதைக் காணலாம். இந்த id தான் Airflow UI திரையில் வெளிப்படும். மேலும் start_date என்பது எந்தத் தேதியிலிருந்து இச்செயலை செய்ய வேண்டும் என்பதைக் குறிக்கிறது. அடுத்து catchup=False என்றிருந்தால், இடையில் சர்வரின் இயக்கம் தடைபடுகின்ற நேரத்தில், அது இயங்க வேண்டிய சுழற்சிகளை நிறுத்திவிடும். அதாவது Airflow server-ன் இயக்கம் இன்று நிறுத்தப்பட்டு நாளை மீண்டும் தொடக்கினால், தொடங்கிய நேரம் முதல் jobs ஓடத் தொடங்கும். அதுவே catchup=True என்றிருந்தால், விட்ட நேரத்திற்கும் சேர்த்து இயங்கி முடிக்கும்.

கடைசியாக இந்த dag மற்றும் function ஆகியவற்றை மதிப்புகளாகக் கொண்ட ஒரு பைத்தான் ஆப்பரேட்டர் இச்செயலுக்காக உருவாக்கப்படுகிறது.

task2:


from datetime import datetime
from airflow import DAG
import os
from airflow.operators.python_operator import PythonOperator
def pushfile():
push_cmd = 'rsync -arv -e "ssh -i /home/shrini/shrini-freepem.pem" /home/shrini/backups ubuntu@35.166.185.40:/home/ubuntu'
os.system(push_cmd)
def clr():
clr_cmd = 'rm -f /home/shrini/backups/*'
os.system(clr_cmd)
dag = DAG('push_remote', description='Pushing files to remote every day', schedule_interval='20 00 * * *', start_date=datetime(2020, 11, 2), catchup=False)
pushfile_operator = PythonOperator(task_id='pushfile_task', python_callable=pushfile, dag=dag)
clear_operator = PythonOperator(task_id='clearing_task', python_callable=clr, dag=dag)
pushfile_operator >> clear_operator

view raw

task2.py

hosted with ❤ by GitHub

நிரலுக்கான விளக்கம்:

இங்கு backup எடுக்கப்பட்ட கோப்புகளை ரிமோட் சர்வருக்கு அனுப்புதல், அனுப்பிய பின் அவற்றை தற்போதைய டைரக்டரியில் இருந்து நீக்குதல் ஆகிய இரண்டு செயல்களுக்கும் இரண்டு தனித்தனி function எழுதப்பட்டுள்ளது. இச்செயல்கள் ஒவ்வொரு நாளும் நள்ளிரவு 12:20 மணிக்கு நடக்குமாறு schedule செய்யப்பட்டுள்ளது. அதன் பிறகு இவ்விரண்டு செயல்களுக்கும் தனித்தனியே இரண்டு ஆப்பரேட்டர்கள் உருவாக்கப்பட்டுள்ளன. முதல் ஆப்பரேட்டரின் செயல்பாடு முடிந்தவுடன் இரண்டாவது ஆப்பரேட்டரின் செயல்பாடு துவங்க வேண்டும் என்பதை >> எனும் குறியீடு குறிக்கிறது.

மேற்கண்ட இரண்டு பணிகளுக்கான நிரல்களையும் dags டைரக்டரியின் கீழ் (~/airflow/dags) சேமித்தவுடன் Airflow-வைத் துவக்குவதற்கான கட்டளையை அளிக்கவும்.

$ airflow initdb

இது ‘Failed to import’ என்பது போன்று ஏதேனும் error-ஐ வெளிப்படுத்தினால், நம்முடைய நிரல்களில் ஏதோ தவறு இருக்கிறது என்று அர்த்தம். ஆகவே இக்கட்டளை எவ்வித இடையூறும் இல்லாமல் ஓடி முடிந்தபின், இதற்கான webserver மற்றும் scheduler ஆகிய இரண்டையும் இரண்டு தனித்தனி டெர்மினல்களில் ஓட விடவும்.

$ airflow webserver
$ airflow scheduler

Airflow UI

webserver மற்றும் scheduler ஆகிய இரண்டும் தனித்தனி டெர்மினல்களில் ஓடிக்கொண்டிருக்கும் போது கீழ்க்கண்ட முகவரியில் சென்று காணவும். இதுதான் நாம் உருவாக்கி ஷெட்யூல் செய்துள்ள பணிகளைத் திரையில் காண உதவும் இடைமுகப்பு ஆகும்.
192.168.1.15:8080/admin/

நம்முடைய dag பைத்தான் ஆப்பரேட்டர் கொண்டு எழுதப்பட்டது. இதேபோல bash ஆப்பரேட்டர், http ஆப்பரேட்டர், docker, hive, kubernetes என பலப்பல ஆப்பரேட்டர்கள் உள்ளன. அவை ஒவ்வொன்றுக்குமான உதாரணங்கள் தான் இங்கே இருப்பவை எல்லாம். இவற்றில் நம்முடைய dag-ஐ ஃபில்டர் செய்து  கொள்ளவும்.

இதில் நம்முடைய dag-ன் பெயர், எப்போது schedule செய்யப்பட்டுள்ளது, சமீபமாக ஓடி முடிந்த dag-ன் நிலைகள், கடைசியாக எப்போது ஓடியது ஆகியவற்றைக் குறிக்கும் வகையில் பல்வேறு உப தலைப்புகள் உள்ளன. இவற்றில் Links எனும் உபதலைப்பின் கீழ் dag-ஐத் தூண்டுதல், அவற்றின் செயல்பாடுகளை வரைபட வடிவில் வெளிப்படுத்துதல், அவற்றுக்கான logs- ஐ வெளிப்படுத்துதல், மூல நிரலை வெளிப்படுத்துதல் போன்ற பல்வேறு செயல்களுக்கான ஐகான்கள் உள்ளன.

இவற்றில் Trigger Dag எனும் ஐகானின் மீது சொடுக்கினால், DAG Runs எனும் உபதலைப்பின் கீழ் இளம்பச்சை நிறத்தில் ஒரு வட்டம் உருவாவதைக் காணலாம். அதாவது dag ஓடத் தொடங்கியிருக்கிறது என்று அர்த்தம். Schedule-ன் படி இல்லாமால், நமக்கு எப்போது தேவையோ அப்போது dag-ஐ ஓட்டிப் பார்க்க இந்த ஐகான் பயன்படும். Recent Tasks, DAG Runs ஆகிய உப தலைப்புகளின் கீழ் உள்ள பல்வேறு நிற வட்டங்கள் dag ஓட்டத்தின் பல்வேறு நிலைகளைக் குறிக்கின்றன. அவை கரும்பச்சையாக இருந்தால் வெற்றிகரமாக ஓடியது என்றும், இளம்பச்சையாக இருந்தால் ஓடிக்கொண்டிருக்கிறது என்றும், சிகப்பாக இருந்தால் ஓடித் தோல்வியுற்றது என்றும் அர்த்தம்.

அடுத்து Tree View ஐகானின் மீது சொடுக்கினால் அது பின்வருமாறு ஒரு வரைபடத்தை வெளிப்படுத்தும். அதேபோல் Graph view-மீதும் சொடுக்கிப் பார்க்கவும்.

Logs ஐகானின் மீது சொடுக்கினால் அது பின்வருமாறு வெளிப்படுத்தும். அதேபோல் மூல நிரலைக் காட்டும் ஐகானின் மீது சொடுக்கினால் அதற்கான நிரல் திரையில் வெளிப்படுவதைக் காணலாம்.

Variables

backup எடுக்கும் பணிக்காக எழுதப்பட்ட பைத்தான் நிரலில் அக்கட்டளையைக் கொஞ்சம் கவனிக்கவும். Mysql-க்குள் உள் நுழைவதற்காக -u வைத் தொடர்ந்து பயனர்பெயர்(root) மற்றும் -p ஐத் தொடர்ந்து கடவுச்சொல் (password) ஆகியவை நேரடியாகவே கொடுக்கப்பட்டுள்ளன. ஒவ்வொரு முறை டேட்டாபேஸின் பாஸ்வேர்டை மாற்றும் போதும், புதிய பயனராக உள் நுழைய முயலும் போதும் இந்த மூல நிரலில் சென்று மாற்றம் செய்ய வேண்டும். இதற்கு பதிலாக இதற்கான variable-களை UI திரையில் உருவாக்கிக் கொண்டு அதனை நாம் நம்முடைய நிரல்களில் பயன்படுத்தலாம். இதன் மூலம் மதிப்பினை மாற்றினாலும் மூல நிரலில் கை வைக்கத் தேவையில்லை. UI திரையில் சென்று அந்த variable-களுக்கான மதிப்பினை மாற்றினால் போதும்.

முதலில் Airflow UI -ல் சென்று Action -> Variable -> Create ஆகிய இணைப்பின் மீது சொடுக்கவும். அது key, val ஆகிய பெட்டிகளைக் கொண்ட திரையை வெளிப்படுத்தும். அதில் நாம் உருவாக்க வேண்டிய variable-ன் பெயர் மற்றும் அதன் மதிப்பினை கீழ்கண்டவாறு அளிக்கவும்.

பின் list எனும் இணைப்பின் மீது சொடுக்கினால் நாம் உருவாக்கிய variable-களும் அவற்றின் மதிப்புகளும் பின்வருமாறு வெளிப்படும்.

திரையில் உருவாக்கப்பட்ட mysql_username, mysql_password ஆகியவை நம்முடைய நிரலுக்குள் import செய்யப்படுகின்றன. பின் இவ்விரு variable-களின் மதிப்பும் usr, pwd ஆகிய பெயரில் நிரலுக்குள் செலுத்தப்படுகின்றன. இது பின்வருமாறு.


from datetime import datetime
from airflow import DAG
import os
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
usr = Variable.get("mysql_username")
pwd = Variable.get("mysql_password")
def backup():
backup_cmd = 'mysqldump -u '+usr+' -p'+pwd+' sample > "/home/shrini/backups/"sample_"$(date +"%Y%m%d_%H%M%S").sql"'
os.system(backup_cmd)
dag = DAG('mysql_backup', description='Mysql backup every hour', schedule_interval='00 * * * *', start_date=datetime(2020, 11, 2), catchup=False)
backup_operator = PythonOperator(task_id='backup_task', python_callable=backup, dag=dag)

Bash Operator

மேற்கண்ட அதே நிரலை பைத்தான் ஆப்பரேட்டர் இல்லாமல் bash  ஆப்பரேட்டர் கொண்டு உருவாக்கப்பட்ட ப்ரோக்ராம் பின்வருமாறு.


from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG('bash_exp', description='Mysql backup every hour', schedule_interval='00 * * * *', start_date=datetime(2020, 11, 2), catchup=False)
operator = BashOperator(task_id='backup_task', bash_command='mysqldump -u root -ppassword sample > "/home/shrini/backups/"sample_"$(date +"%Y%m%d_%H%M%S").sql"',dag=dag)

 

இதில் ஷெல் கட்டளைகள் நேரடியாக இயக்கப்படுவதைக் காணலாம்.

பொதுவாக dag-ன் பெயர்தான் திரையில் வெளிப்படும். ஆகவே dag-ன் பெயரையே புரோகிராமின் பெயராக வைத்துக்கொள்வது நல்லது. புதிதாக bash ஆப்பரேட்டர் கொண்டு உருவாக்கப்பட்ட dag-ஐ திரையில் ரன் செய்து பார்க்கவும். அது தோல்வியுற்று சிகப்பு நிற வளையத்தை வெளிப்படுத்துவதைக் காணலாம். 

என்ன காரணம் என்று தெரிந்து கொள்ள அவ்வளையத்தின் மீது சொடுக்கினால் அதற்கான Run Id பின்வருமாறு வெளிப்படும்.

அந்த Id-ன் மீது சொடுக்கினால், அது பின்வருமாறு திரையை வெளிப்படுத்தும்.

அதில் view log-ன் மீது சொடுக்கவும். என்ன காரணம் என்பது பின்வருமாறு வெளிப்படுத்தப்பட்டுள்ளது.

[2020-11-09 11:59:08,149] {bash_operator.py:157} INFO – mysqldump: Got error: 2002: Can’t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock’ (2) when trying to connect

அதாவது Mysql-ஐத் தொடர்பு கொள்வதில் ஏதோ சிக்கல் என்று வெளிப்படுத்தியுள்ளது. எனவே Mysql இயங்குகின்ற இடத்தில் சென்று அதற்கான சேவையைத் துவக்கி அதன் இயக்கத்தை உறுதிபடுத்திக் கொள்ளவும்.

இதுவரை பல்வேறு சர்வர்களில் ஓடிக்கொண்டிருக்கும் பணிகளை மேலாண்மை செய்ய உதவும் Airflow கருவியை பற்றிப் பார்த்தோம். அடுத்ததாக பல்வேறு சர்வர்களில் ஒரே நேரத்தில் பல மென்பொருட்களை நிறுவுதல், மேம்படுத்துதல்,  நீக்குதல் போன்ற பல பணிகளைச் செய்ய உதவும் Ansible – ஐப் பற்றிப் பார்க்கலாம்.

%d bloggers like this: