Parallele Verarbeitung und Transformation von JSON-Dateien in Pandas / Sudo Null IT News

Die Anfangsdaten waren 10.000 JSON-Dateien, von denen jede etwa 3.000 Endknoten hatte. Beim Laden aller Dateien nacheinander und Konvertieren in Pandas mittels json_normalize Die Ausführungszeit betrug 10 Minuten, was ziemlich lange ist. Aufgrund der Tatsache, dass diese Operation mehrmals auf verschiedenen Datensätzen durchgeführt werden musste, wurde entschieden, den Konvertierungsprozess zu beschleunigen, indem der Parallelisierungsmechanismus implementiert und als vollwertiges benutzerdefiniertes Skript implementiert wurde.

Um Parallel Computing zu schaffen, sind uns zwei Bedingungen wichtig: Trennbarkeit und Datenunabhängigkeit. Bei der Arbeit mit JSON-Dateien stellt jede von ihnen eine Zeile der resultierenden Tabelle dar, sodass Sie sie einfach in Partitionen aufteilen können. Außerdem brauchen wir zum Konvertieren einer Datei keine andere, die die Datenunabhängigkeitsbedingung erfüllt.

Die Bedingungen sind erfüllt, Sie können mit dem Code fortfahren.

Bibliotheken importieren:

from tqdm import tqdm from pathlib import path import json from multiprocessing import Pool, RLock import pandas as pd import pickle import argparse

Der gesamte Algorithmus des Programms ist in folgende Blöcke unterteilt:

  1. Argumentanalyse.

  2. Abrufen aller Pfade zu Dateien und Aufteilen in n Gruppen (für n Unterprozesse).

  3. Starten von Prozessen zum Hochladen und Konvertieren in Pandas.

  4. Kombinieren der Ergebnisse der Ausführung von Teilprozessen in einem einzigen Frame und Speichern.

Argumentanalyse

Damit das Skript bequem mit anderen Einstellungen wiederverwendet werden kann, definieren wir die Eingabeparameter des Skripts über die Bibliothek argparse. Trennen wir die Initialisierung unseres Argument-Parsers von der Hauptlogik des Skripts und beschreiben sie in einer Funktion get_arg_parser:

def get_arg_pareser(): parser = argparse.ArgumentParser(description= ‘Aus JSON-Dateien erstellt pd.DataFrame’) parser.add_argument(‘-i’, ‘–input-folder’, type=str, help=’input data folder ‘, erforderlich=True) parser.add_argument(‘-o’, ‘–output-file’, type=str, default=r’output.pickle’, help=’output.pickle’) parser.add_argument(‘- e’, ‘–n-executors’, type=int, default=8, help=’number of subprocesses (default: 8)’) geben den Parser zurück

Nennen wir es im Block __main__:

args = get_arg_pareser().parse_args() N_GROUPS = args.n_executors jsons_folder_path = args.input_folder output_file = args.output_file

Pfade zu Dateien

Nutzung der Bibliothek Pathib Holen Sie sich alle Pfade zu unseren Eingabedateien:

f_paths = list(Path(jsons_folder_path).glob(‘*.json’))

Jetzt werden wir die Daten in n Teile unterteilen und jede Partition nummerieren (für eine schöne Verfolgung des Prozesses).

in_group = len(f_paths) // N_GROUPS + 1 inp_args = [f_paths[i:i + in_group] for i in range(0, len(f_paths), in_group)]inp_args = list(enumerate(inp_args))

Parallelisierung

Bei jeder Aufgabe, bei der eine Parallelisierung erstellt werden muss, müssen wir die Logik zum Ausführen des Teilprozesses in einem separaten Block (Funktion) definieren. Lassen Sie uns eine Funktion erstellen one_process_executiondie die Verarbeitung einer Datenpartition bestimmt.

Die Funktion entlädt alle JSON-Dateien der Partition in einem Prozess und speichert zwei Arrays: Dateinamen (Indizes) und die Daten selbst (res_array). Nach dem Entladen aller Daten mit der Funktion pd.json_normalize Lassen Sie uns die Liste der Wörterbücher in eine Tabelle umwandeln und die Dateinamen als Index verfügbar machen.

Ein wichtiger Hinweis ist, dass die Funktion json_normalize sollte für das Array von Wörterbüchern in Unterprozessen ausgeführt werden und nicht für jede einzelne Datei. Wenn wir jedoch die Transformation jeder Datei separat in Pandas einfügen und pd.DataFrame iterativ Zeile für Zeile hinzufügen, wird dies die Ausführung um das Dreifache verlangsamen. Die Hauptregel, etwas in Pandas zu transformieren, ist, die Transformation so spät wie möglich durchzuführen.

def one_process_execution(pid, f_paths): res_arr = []
Indizes= []

tqdm_text=”#” + f'{pid}’.zfill(3) mit tqdm(total=len(f_paths), position=pid+1, desc=tqdm_text) als pbar: für Pfad in f_paths: mit open(str( path), ‘r’) als f: d = json.load(f) indexes.append(path.stem) res_arr.append(d) pbar.update(1) df = pd.json_normalize(res_arr).assign(index =indexes).set_index(‘index’) print(f’Subproc {pid} done’) return df

Nachdem wir nun partitionierte Daten und eine Beschreibung der Unterprozessfunktion haben, ist es an der Zeit, einen Pool (Container) von Prozessen zu erstellen. Um paralleles Rechnen in Python zu implementieren, wird die Bibliothek verwendet Multiprocessing. Mit Hilfe einer Klasse Schwimmbad Wir initialisieren unseren Task-Container, indem wir die erforderliche Anzahl von Unterprozessen und zusätzlichen Parametern zum Anzeigen des Ausführungsstatus angeben, damit er funktioniert. Als nächstes füllen Sie diesen Pool mit der oben beschriebenen Funktion one_process_execution mit den in Schritt 1 generierten Eingabedaten. Aufgaben werden mit dem Schlüsselwort hinzugefügt apply_asyncdie das Ausführungsverhalten unserer Prozesse definiert.

pool = Pool (processes=N_GROUPS, initializer=tqdm.set_lock, initargs=(RLock(),)) jobs = [pool.apply_async(one_process_execution, args=x) for x in inp_args]

Zu beachten ist, dass die Methode apply_async startet den Prozess nicht selbst, sondern fügt ihn nur der Aufgabenliste hinzu. Also in der Variable Arbeitsplätze Wir haben Definitionen von Unterprozessen, die bereit sind, mit der Ausführung zu beginnen.

Um Berechnungen zu starten, müssen wir die Methode für jedes Element des Arrays aufrufen erhalten, die bei ihrer Ausführung das Ergebnis unserer Funktion zurückgibt, während sie ausgeführt wird. Rufen wir diese Methode für jedes Element des Task-Arrays auf.

# run pool df_lists = list(map(lambda x: x.get(), jobs))Status der ProgrammausführungStatus der Programmausführung

Konsolidierung und Erhaltung

Jetzt liegt es an den Kleinen: Es bleibt, alle Datenrahmen zu einem einzigen zusammenzufassen und zu speichern:

res_df = pd.concat(df_lists) # speichern mit open(output_file, ‘wb’) als f: pickle.dump(res_df, f)

Daher wurde ein Skript erstellt, mit dem Sie eine große Anzahl von json-Dateien mithilfe paralleler Computertechnologien in einen einzigen Datenrahmen konvertieren können. Diese Implementierung konvertierte dieselben 10.000 Dateien in 2 Minuten in einen einzigen Datenrahmen und beschleunigte dadurch den Prozess um das Fünffache.

Das vollständige Skript kann auf meiner persönlichen Seite eingesehen werden github. Es wurde so implementiert, dass es in zwei Modi ausgeführt werden kann:

  1. Wie ein Skript, das das Ergebnis speichert als Essiggurke Datei.

  2. Als Plug-In, dessen Hauptfunktion die generierte zurückgibt pd.DataFrame.

Similar Posts

Leave a Reply

Your email address will not be published.