Entwerfen einer ETL-Pipeline in Apache Airflow / Sudo Null IT News

Hey Habr! Rustem, IBM Senior DevOps Engineer, ist in Kontakt und heute möchte ich unsere Bekanntschaft mit dem Tool im DataOps-Engineering – Apache Airflow – fortsetzen. Heute werden wir eine ETL-Pipeline entwerfen. Lassen Sie uns nicht verweilen, gleich zur Sache!

Was werden wir brauchen? MySQL DBMS, Apache Airflow und Daten von Datahub. Link zum Datenhub.

Unsere Aufgabe

Unsere Aufgabe heute ist es, eine Pipeline zu erstellen, die täglich läuft und eine Liste generischer Top-Level-Domainnamen in eine MySQL-Tabelle füllt. Insbesondere wollen wir folgende Schritte durchführen:

  1. Extrahieren Sie Daten von der Datahub-Website.

  2. Transformieren Sie die Daten, indem Sie “gemeinsame” Top-Level-Domainnamen herausfiltern.

  3. Hochladen von Daten zu einem Datenspeicherdienst.

Sie werden feststellen, dass wir dem ETL-Paradigma sehr genau folgen werden.

In den folgenden Schritten erstellen wir Knoten zum Extrahieren, Transformieren und Laden mit den entsprechenden Operatoren in Airflow.

Entwerfen eines “Extract Node”

Bevor wir mit anderen Schritten fortfahren, müssen wir sicherstellen, dass wir die Daten erhalten können.

Also führen wir jetzt den Extraktionsvorgang manuell aus, um die Datendatei aus dem Internet auf den lokalen Computer herunterzuladen. Lassen Sie uns die Daten herunterladen, indem Sie den folgenden Befehl ausführen:

wget -c -O /root/manual-extract-data.csv

Stellen wir nun sicher, dass es wie erwartet aussieht:

head /root/manual-extract-data.csv

Erstellen eines “Extract-Knotens”

Jetzt werden wir die vorherige Funktionalität in Form eines Airflow-DAG erstellen. Das folgende Code-Snippet führt den Befehl wget aus und schreibt die Daten in /root/airflow-extract-data.csv.

Diese Aufgabe verwendet den Bash Airflow Operator.

Erstellen wir ein airflow_demo-Verzeichnis mit einem dags-Unterverzeichnis. Lassen Sie uns in den Dags die Datei airflow-extract-node.py erstellen.

Öffnen Sie die Datei und fügen Sie den folgenden Code ein:

from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG(‘extract_dag’, schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False) as dag: extract_task = BashOperator( task_id =’extract_task’, bash_command=’wget -c -O /root/airflow-extract-data.csv’, ) extract_task

Lassen Sie uns unseren DAG registrieren, damit wir nicht warten müssen, bis Airflow ihn abholt:

Luftstrom db init

Jetzt können wir es auch in unserer Benutzeroberfläche sehen.

Entwerfen des Entwerfen des “Transformationsknotens”

Konzentrieren wir uns nun auf den nächsten Schritt unserer Aufgabe, in dem wir die Daten transformieren, indem wir sie in “generische” Top-Level-Domainnamen filtern.

Manuelles Starten eines Konvertierungsvorgangs

Zuerst werden wir den Konvertierungsvorgang manuell starten.

Lassen Sie uns schnell daran arbeiten, mit den Daten zu jonglieren. Wir werden Python3 und Pandas verwenden.

Installieren Sie Pandas mit dem folgenden Befehl:

pip3 pandas installieren

Führen Sie als Nächstes drei Befehle aus:

python3Import pandas as pdFrom datetime import date

Lesen wir nun unsere Daten im Pandas-Datenrahmen:

df = pd.read_csv(“/root/manual-extract-data.csv”) df

Denken Sie jetzt daran, dass wir “generische” Top-Level-Domainnamen herausfiltern möchten. Mal sehen, was die Werte in der Typspalte sind:

df[“Type”].einzigartig()

Lassen Sie uns unseren Filter ausführen:

generischer_typ_df = df[df[“Type”] == “allgemein”]

Schließlich möchten wir unseren Daten ein Datumsfeld hinzufügen, damit wir wissen, an welchem ​​​​Tag es gelöscht wurde:

heute = datum.heute()

Lassen Sie uns diese Spalte hinzufügen. (Notiz. Möglicherweise erhalten Sie eine Warnmeldung wie diese: “Es wird versucht, einen Wert für eine Kopie eines Slice festzulegen ….”. Ignoriere ihn.)

generischer_typ_df[“Date”] = heute.strftime(“%Y-%m-%d”)

Dies sind die Daten, die wir behalten möchten. Schreiben wir es auf:

generic_type_df.to_csv(“/root/manual-transformed-data.csv”, index=False)

Zurück zu unserer Shell

Strg+C

Überprüfung unserer Daten

head /root/manual-transformed-data.csv

Erstellen eines “Transformationsknotens”

Ähnlich wie beim Pull-Schritt erstellen wir eine Aufgabe in Airflow, die die vorherige Funktionalität repliziert.

Diese Aufgabe verwendet eine Python-Anweisung.

Der DAG liest aus /root/airflow-extract-data.csv, wendet denselben Pandas-Filter wie oben an und schreibt das resultierende Dataset in /root/airflow-transformed-data.csv.

Gehen Sie im Katacoda-Fenster zur Registerkarte IDE, wo Sie den Ordner airflow_demo mit einem Unterverzeichnis dags sehen. Innerhalb der Dags erstellen Sie eine airflow-transform-node.py-Datei mit dem folgenden Code:

from airflow import DAG from airflow.operators.python import PythonOperator import pandas as pd from datetime import datetime, date with DAG( dag_id=’transform_dag’, schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) als dag: def transform_data(): “””Datei einlesen und transformierte Datei ausschreiben””” today = date.today() df = pd.read_csv(“/root/airflow-extract-data.csv “) generischer_typ_df = df[df[“Type”] == “generisch”]generischer_typ_df[“Date”] = today.strftime(“%Y-%m-%d”) generic_type_df.to_csv(“/root/airflow-transformed-data.csv”, index=False) print(f’Anzahl der Zeilen: {len(generic_type_df) }’) transform_task = PythonOperator (task_id=’transform_task’, python_callable=transform_data, dag=dag) transform_task

Lassen Sie uns unseren DAG registrieren:

Luftstrom db init

Wir sehen es in der Benutzeroberfläche:

Entwurf laden

Fahren wir mit dem nächsten Schritt fort, in dem die resultierende Datei in der lokalen Datenbank gespeichert wird.

Jetzt werden wir den Download-Vorgang manuell starten. Dazu müssen die neu konvertierten Daten abgerufen und in die Datenbank geladen werden.

Lassen Sie uns eine Datenbank erstellen (sowohl für manuelle Aufgaben als auch für Airflow-Aufgaben)

Öffnen Sie zunächst erneut die MySQL-Shell:

MySQL

Lassen Sie uns eine Datenbank für “manuelle” Daten erstellen:

DATENBANK ERSTELLEN manual_load_database;

Lassen Sie uns nun eine Datenbank für die Airflow-Daten erstellen:

DATENBANK ERSTELLEN airflow_load_database;

Lassen Sie uns sicherstellen, dass unsere mit unseren Datenbanken in Ordnung sind:

DATENBANKEN ANZEIGEN;

Lassen Sie uns nun Tabellen für unsere Datenbank erstellen.

Erste manual_load_database:

USE manual_load_database;

Unser Schema sieht folgendermaßen aus: Domain, Typ, Sponsoring Organization

Wir werden täglich “allgemeine” Daten in unsere Tabelle archivieren, daher müssen wir eine Möglichkeit finden, die täglichen Eingaben zu differenzieren.

Lassen Sie uns unsere Tabelle erstellen, aber eine Spalte für das Datum hinzufügen:

CREATE TABLE top_level_domains (Domain VARCHAR(30), Type VARCHAR(30), SponsoringOrganization VARCHAR(30), Date DATE);

Sehen wir uns unsere Tabelle an:

DESCRIBE top_level_domains;

Jetzt wiederholen wir dasselbe für airflow_load_database:

USE airflow_load_database; CREATE TABLE top_level_domains (Domain VARCHAR(30), Type VARCHAR(30), SponsoringOrganization VARCHAR(30), Date DATE);

Lassen Sie uns nun die Daten manuell laden.

Fügen Sie zuerst diese Einstellung hinzu und beenden Sie:

SET GLOBAL local_infile=1;

Wechseln wir nun wie folgt zu mysql:

mysql –local-infile=1

Lassen Sie uns eine Datenbank auswählen:

USE manual_load_database;

Lassen Sie uns die Daten laden:

LADE DATEN LOKALES INFILE ‘/root/manual-transformed-data.csv’ IN TABELLE top_level_domains FELDER, DIE DURCH ‘,’ EINGESCHLOSSEN DURCH ‘”‘ BEENDET WERDEN ZEILEN MIT ‘\n’ BEENDET 1 ZEILE IGNORE;

Kleiner Check:

wählen Sie * aus top_level_domains aus;

Wenn soweit alles funktioniert, super Arbeit! Wir haben jetzt den ETL-Prozess für unsere Aufgabe manuell ausgeführt. Mal sehen, wie man das automatisieren kann.

Knotendesign laden

Wir werden Airflow verwenden, um die gleiche Arbeit zu erledigen, wie wir es manuell tun würden.

Diese Aufgabe verwendet die MySql-Anweisung. Damit dies funktioniert, müssen wir einige Konfigurationsoptionen festlegen.

Eine der leistungsstärksten Funktionen von Airflow ist seine Flexibilität. Operatoren sind eigentlich Vorlagen, mit denen Sie verschiedene Arten von Operationen ausführen können.

Wir werden die MySQL-Anweisung installieren:

pip3 installiere apache-airflow-providers-mysql

Lassen Sie uns nun eine airflow-load-node.py-Datei mit dem folgenden Code erstellen:

from datetime import datetime from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator with DAG(‘load_dag’, start_date=datetime(2022, 1, 1), schedule_interval=None, default_args={‘mysql_conn_id’: ‘ demo_local_mysql’}, catchup=False) als dag: load_task = MySqlOperator (task_id=’load_task’, sql=r””” USE airflow_load_database; LOAD DATA LOCAL INFILE ‘/root/airflow-transformed-data.csv’ INTO TABLE top_level_domains FIELDS BEENDET VON ‘,’ EINGESCHLOSSEN VON ‘”‘ ZEILEN BEENDET VON ‘\n’ IGNORE 1 ROWS; “””, dag=dag ) load_task

Und registrieren Sie unseren DAG:

Luftstrom db init

Alles zusammenfügen

Bisher haben wir drei verschiedene DAGs erstellt, von denen jeder eine Aufgabe ausführt: extrahieren, transformieren oder laden. Kombinieren wir sie zu einem DAG Airflow.

Erstellen Sie einen ETL-DAG

Erstellen wir dazu eine Datei namens basic-etl-dag.py mit dem folgenden Code:

from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.providers.mysql.operators.mysql import MySqlOperator import pandas as pd from datetime import datetime, date default_args = { ‘mysql_conn_id’: ‘ demo_local_mysql’ } mit DAG(‘basic_etl_dag’, schedule_interval=None, default_args=default_args, start_date=datetime(2022, 1, 1), catchup=False) als dag: extract_task = BashOperator( task_id=’extract_task’, bash_command=’wget -c -O /root/airflow-extract-data.csv’, ) extract_task def transform_data(): “””Datei einlesen und transformierte Datei ausschreiben””” today = date.today() df = pd .read_csv(“/root/airflow-extract-data.csv”) generic_type_df = df[df[“Type”] == “generisch”]generischer_typ_df[“Date”] = today.strftime(“%Y-%m-%d”) generic_type_df.to_csv(“/root/airflow-transformed-data.csv”, index=False) print(f’Anzahl der Zeilen: {len(generic_type_df) }’) transform_task = PythonOperator( task_id=’transform_task’, python_callable=transform_data, dag=dag) transform_task load_task = MySqlOperator( task_id=’load_task’, sql=r””” USE airflow_load_database; LOAD DATA LOCAL INFILE ‘/root/airflow -transformed-data.csv’ IN TABELLE top_level_domains FELDER BEENDET VON ‘,’ EINGESCHLOSSEN VON ‘”‘ ZEILEN BEENDET VON ‘\n’ IGNORE 1 ROWS; “””, dag=dag ) load_task extract_task >> transform_task >> load_task

Lassen Sie uns unseren DAG registrieren: airflow db init.

Und wir werden es in unserer Benutzeroberfläche sehen:

Starten wir nun unseren DAG.

Lassen Sie uns, während der DAG ausgeführt wird, einen kurzen Plausibilitätscheck durchführen, um sicherzustellen, dass er das tut, was wir wollen.

Es ist immer gut, sicherzustellen, dass alles wie erwartet funktioniert, auch wenn Sie drei grüne Kästchen in Airflow erhalten.

Der erste Schritt in der DAG-Extraktionsaufgabe besteht darin, die Daten aus dem Datahub nach /root/airflow-extract-data.csv zu extrahieren. Stellen wir sicher, dass die Daten wirklich vorhanden sind:

Kopf /root/airflow-extract-data.csv

Wenn Sie sehen, dass Daten zurückkommen, bedeutet dies, dass Ihr Task die Daten erfolgreich abrufen und auf der Festplatte speichern konnte.

Der Transformationsschritt liest die Datei /root/airflow-extract-data.csv, führt einige Transformationen daran durch und schreibt sie in /root/airflow-transformed-data.csv.

Stellen wir sicher, dass die Daten wirklich vorhanden sind:

Kopf /root/airflow-transformed-data.csv

Wenn Sie eine Ausgabe sehen, bedeutet dies, dass Ihre Aufgabe diese neuen Daten erfolgreich lesen, konvertieren und auf der Festplatte speichern konnte.

Der Upload-Schritt liest Daten aus /root/airflow-transformed-data.csv und speichert sie in einer MySQL-Tabelle.

Öffnen wir die MySQL-Shell, um zu sehen, ob unsere Ergebnisse da sind:

MySQL

Verwenden Sie die Datenbank, die wir für unseren DAG Airflow erstellt haben:

USE airflow_load_database;

Und zum Schluss führen wir ein SELECT für die Tabelle aus:

SELECT * FROM top_level_domains;

Wenn Sie Ergebnisse sehen, bedeutet dies, dass Ihr DAG erfolgreich war!

Wir laden alle zur offenen Lektion “MapReduce: Big Data Processing Algorithm” ein. Darin analysieren wir detailliert den universellen Algorithmus, mit dem Big Data auf verteilten Systemen ohne Shared Storage (Hadoop, Spark) verarbeitet wird. Lassen Sie uns über Engpässe und potenzielle Betriebsprobleme sprechen. Mal sehen, wie es in der Praxis in Yandex.Cloud aussieht. Anmeldung hier.

Ebenfalls registrieren für eine Unterrichtsstunde zum Thema “Architektur von Datenverarbeitungssystemen”.

Similar Posts

Leave a Reply

Your email address will not be published.