Entwerfen von Datenpipelines in Apache Airflow / Sudo Null IT News

Hey Habr! Mein Name ist Rustem, ich bin Senior DevOps Engineer bei IBM.

Heute möchte ich Ihnen ein wichtiges Werkzeug in der DataOps-Methodik vorstellen, nämlich Apache Airflow und das Design von Data Pipelines (Data Pipelines).

Dieser Artikel enthält eine kurze Einführung in Airflow und die Schritte zum Erstellen und Konfigurieren von Datenpipelines. Zuerst werden wir Airflow installieren und konfigurieren. Sehen wir uns dann ein praktisches Beispiel für das Erstellen und Ausführen eines DAG in Airflow an. Unser heutiges Ziel ist ein praktisches Verständnis der Airflow-Bereitstellung und der grundlegenden DAG-Entwicklung.

Ein wenig über DataOps

Die DataOps-Methodik soll es einer Organisation ermöglichen, einen iterativen Prozess zum Erstellen und Bereitstellen von Analyse- und Datenpipelines zu verwenden. Indem sie Daten- und Modellmanagementpraktiken befolgen, können sie qualitativ hochwertige Unternehmensdaten für ihre KI-Anwendungen bereitstellen.

Mit anderen Worten, die Praxis von DataOps ermöglicht es Ihnen, die Erfahrung von DevOps auf Datenmanagement und -analyse zu übertragen. Die Erfahrung zeigt, dass der Einsatz von DataOps die Markteinführungszeit für Analyselösungen effektiv verkürzt, die Datenqualität und -konformität verbessert und die Datenverwaltungskosten senkt.

Ein wenig über AirFlow

In diesem Schritt installieren wir das Apache Airflow Python-Paket in unserer Umgebung und initialisieren die Konfiguration.

Installieren Sie das Airflow-Paket, indem Sie den folgenden Befehl im Terminal ausführen:

pip install “apache-airflow==2.3.0” –ignore-installed

Beachten Sie, dass wir eine bestimmte Version von Airflow anheften und hier das Flag –ignore-installed verwenden, um einige Versionskonflikte mit Paketabhängigkeiten zu vermeiden.

Initialisierung der Airflow-Datenbank

Airflow verwendet eine relationale Datenbank als Backend zum Speichern von Konfigurationsdaten. Standardmäßig ist dies eine SQLite-Datenbank, die in ~/airflow/airflow.db gespeichert wird. Wir initialisieren die Datenbank in unserer Umgebung, indem wir den folgenden Befehl im Terminal ausführen:

Luftstrom db init

Erstellen Sie einen Administratorbenutzer.

Als Nächstes müssen wir einen Benutzer erstellen, der sich bei der Airflow-Benutzeroberfläche anmelden kann. Geben Sie im Terminal Folgendes ein, um einen Benutzer namens admin mit Administratorrechten zu erstellen:

Airflow-Benutzer erstellen \ –username admin \ –firstname Firstname \ –lastname Lastname \ –role Admin \ –email admin@example.org \ –password password

Bei Erfolg sehen wir die folgende Ausgabe:

Webserver und Scheduler starten

Um sicherzustellen, dass die Konfiguration korrekt funktioniert, können wir den Airflow-Webserver und -Scheduler starten und uns bei der Benutzeroberfläche anmelden. Führen Sie die folgenden Befehle in einem Terminal aus, um den Webserver und den Scheduler zu starten:

Airflow-Webserver –Port 8080 -DLuftstromplanerLuftstromplaner

Dadurch wird die Airflow-Weboberfläche auf Port 8080 gestartet

Melden Sie sich bei Airflow anMelden Sie sich bei Airflow an

Nach dem Laden der Benutzeroberfläche sehen wir die Anmeldeseite. Geben Sie die im vorherigen Schritt erstellten Benutzeranmeldeinformationen ein:

Benutzername: Administrator

Passwort: Passwort

Wenn alles erfolgreich konfiguriert ist, sehen wir die Airflow-Weboberfläche mit einer Liste von DAG-Beispielen:

Jetzt können wir zu unserem Terminal zurückkehren und den Scheduler-Prozess beenden (optional, aber ich empfehle dies). Eingabe: Strg+C

Öffnen Sie ein neues Terminal und führen Sie den Befehl aus: Airflow Scheduler

AirFlow konfigurierenAirFlow konfigurieren

Lassen Sie uns AirFlow anhand einiger Best Practices einrichten.

Lassen Sie uns zuerst die DAGs auflisten, gehen Sie zurück zu unserem ersten Terminal und führen Sie den Befehl aus: airflow dags list

Bearbeiten wir nun die Konfigurationsdatei, die sich in /root/airflow/airflow.cfg befinden sollte. Diese Datei enthält Informationen über alle Einstellungen unseres Airflow.

Diese Einstellungen werden als Schlüssel/Wert-Paare dargestellt, die wie folgt aussehen
Einstellungsname = Einstellungswert

Ändern Sie den Speicherort des DAG-Ordners

Wir werden einen neuen Ordner für unseren Python-DAG-Code verwenden. Zuerst müssen wir die DAG-Ordnereinstellung auf den neuen Pfad aktualisieren. Für unsere Zwecke verwenden wir den Pfad /root/airflow_demo/dags.

Ändern Sie die Einstellung dags_folder in Zeile 4 so, dass sie wie folgt aussieht:

Dadurch wird Airflow auf das Projektverzeichnis verwiesen, in dem neue DAG-Entitäten gespeichert und erstellt werden.

DAG-Beispiele deaktivieren

Es gibt viele Beispiele für DAGs, die uns automatisch zur Verfügung stehen, wie Sie in der Benutzeroberfläche sehen können. Dadurch ist es etwas schwieriger, die von uns erstellten DAGs zu sehen, also blenden wir diese Beispiel-DAGs aus.

Ändern Sie außerdem den Parameter load_examples in Zeile 51 so, dass er folgendermaßen aussieht:

Dadurch wird verhindert, dass Airflow DAG-Beispiele lädt.

Ändern Sie die Farbe der Navigationsleiste

Unser Airflow läuft derzeit als unsere Produktionsumgebung. Jeder, der in DevOps arbeitet, wird Ihnen sagen, dass die Trennung von Produktions- und Staging-Umgebungen unglaublich wichtig ist.

Im Allgemeinen ist die visuelle Differenzierung am besten, und glücklicherweise bietet Airflow eine Anpassung für die offensichtliche visuelle Veränderung. Mit Airflow können wir die Farbe des Headers steuern.

Lassen Sie uns die Header-Einstellung in airflow.cfg aktualisieren, damit jeder in dieser Umgebung weiß, dass er besonders vorsichtig sein muss:

navbar_color = #ffc0cb

Erzwingen Sie das erneute Laden von Airflow

Wir werden Airflow zum Neustart zwingen, damit wir nicht warten müssen, bis es automatisch neu gestartet wird.

Führen Sie im Terminal den folgenden Befehl erneut aus:

Liste der Luftstrom-Dags

Jetzt sollte unsere Liste der DAGs leer sein (da noch nichts in /root/airflow_demo/dags ist).

Wir müssen die Airflow-Datenbank neu initialisieren, damit einige dieser Einstellungen ausgewählt werden können. Führen wir den folgenden Befehl aus:

Luftstrom db initStarten Sie den Webserver neu

Schließlich müssen wir den aktuellen Webserverprozess stoppen/neu starten (Sie können die PID einfach beenden). Führen Sie den folgenden Befehl aus, um die PID zu finden:

cat /root/airflow/airflow-webserver.pid

Und wir können den Prozess mit diesem Befehl beenden:

kill $(cat /root/airflow/airflow-webserver.pid)

Sobald der Airflow-Webserver gestoppt wurde, können wir den folgenden Befehl ausführen, um ihn wieder zu starten:

Airflow-Webserver –Port 8080 -D

Wir werden auch den Planer neu starten.

Zuerst stoppen wir den Scheduler. Gehen wir zum Terminal, auf dem der Scheduler läuft, und führen Sie Folgendes aus:

Strg+C und Airflow-Scheduler

Testen wir unseren AirFlow

Lassen Sie uns eine DAG-Datei im Pfad /root/airflow_demo/dags erstellen Mit Namen beweisen-dinge-funktionieren.py und folgenden Code:

from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta default_args = { ‘owner’ : ‘Vinoo’, ‘depends_on_past’ :False, ’email’ :[’email@example.com’]’email_on_failure’: False, ’email_on_retry’: False, ‘catchup’: False, ‘retrys’: 1, ‘retry_delay’: timedelta(minutes=5) } dag = DAG( ‘CreateFile’, default_args=default_args, start_date= datetime(2022,1,1,0,0), schedule_interval=timedelta(minutes=500)) task1 = BashOperator( task_id=’prove_things_work’, bash_command=’echo “hello, world!” > /root/create-this- file.txt’,dag=dag)

Lassen Sie uns nun die Syntax unseres DAG überprüfen, gehen wir zurück zu unserem ersten Terminal und führen den Befehl aus:

python3 airflow_demo/dags/prove-things-work.py

Dadurch wird die Datei auf Python-Syntaxfehler überprüft. Bei Erfolg erfolgt keine Ausgabe.

Als letzten Schritt aktualisieren wir die Airflow-Datenbank, anstatt darauf zu warten, dass der Planer sie abholt, damit unser DAG von Airflow initialisiert wird:

Luftstrom db init

Wie wir sehen können, hat AirFlow unseren DAG initialisiert

Lassen Sie es uns ausführen, dazu drücken wir die Taste Play -> Trigger DAG

In der Baumansicht sehen Sie jetzt neben jeder Aufgabe im DAG quadratische Symbole, die ihre Farbe ändern, wenn Aufgaben in die Warteschlange gestellt und ausgeführt werden. Wenn die Aufgaben erfolgreich ausgeführt werden, werden diese Symbole in der Benutzeroberfläche dunkelgrün markiert. Wenn die Aufgaben nicht erledigt sind, werden die Quadrate rot markiert.

Hat es funktioniert? Stellen wir sicher, dass der DAG erfolgreich abgeschlossen wurde. Führen Sie den folgenden Befehl aus: cat /root/create-this-file.txt

Wenn Sie den Satz „Hallo Welt“ sehen, hat Ihr DAG funktioniert!

Lassen Sie uns nun eine DAG mit zwei Knoten erstellen

Lassen Sie uns eine Datei mit dem Namen erstellen two-node-dag.py mit folgendem Code:

from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime # Auf alle Aufgaben angewendete Standardeinstellungen default_args = { ‘owner’: ‘airflow’, ‘depends_on_past’: False, ’email_on_failure’: False, ’email_on_retry’: False, ‘Wiederholungen’: 0, ‘Catchup’: False, ‘start_date’: datetime(2022, 1, 1) } with DAG( dag_id=’two-node-dag’, description=’An example Airflow DAG’, schedule_interval =Keine, default_args=default_args ) as dag: t0 = BashOperator( task_id=’bash_task_0′, bash_command=’echo “Hallo, das ist die erste Airflow-Aufgabe!”‘ ) t1 = BashOperator( task_id=’bash_task_1’, bash_command= ‘echo “Sleeping…” && sleep 5s && date’ ) t0 >> t1

Testen wir die Syntax:

python3 airflow_demo/dags/two-node-dag.py

Zurück zu unserer Benutzeroberfläche

Wenn der Planer ausgeführt wird, holt AirFlow nach einiger Zeit einen neuen DAG ab. Wenn Sie jedoch nicht warten möchten, können Sie den folgenden Befehl ausführen, um die Synchronisierung zu erzwingen:

Luftstrom db init

Wir sehen, dass unser DAG für unseren Airflow sichtbar ist.

Diesmal führen wir es jedoch vom Terminal aus aus:

Airflow-Dags lösen Zwei-Knoten-Dags aus

Und am Ende schauen wir uns die Protokolle unseres DAG an, dafür kehren wir zu unserer Benutzeroberfläche zurück.

Klicken wir auf unseren DAG:

Kommen wir zum Diagramm

Wählen Sie Aufgabe

Wählen Sie im Popup-Fenster Protokoll aus

Wir haben mehrere DAGs in Airflow erfolgreich konfiguriert und gestartet. Fassen wir zusammen:

  • Die Bereitstellung und Konfiguration von Airflow ist einfach.

  • Das Erstellen eines DAG in Airflow ist ein einfacher und unkomplizierter Vorgang.

  • DAGs werden durch Code definiert.

  • DAG-Entitäten sind recht flexibel in der Verwendung.

In Kürze wird OTUS eine offene Sitzung zum Thema “MapReduce: Big Data Processing Algorithm” veranstalten. Darin analysieren wir detailliert den universellen Algorithmus, mit dem Big Data auf verteilten Systemen ohne Shared Storage (Hadoop, Spark) verarbeitet wird. Lassen Sie uns über Engpässe und potenzielle Betriebsprobleme sprechen. Mal sehen, wie es in der Praxis in Yandex.Cloud aussieht. Die Anmeldung steht allen offen Verknüpfung.

Similar Posts

Leave a Reply

Your email address will not be published.