Wie man Daten in Cold Storage einfügt / Sudo Null IT News

Hallo! Mein Name ist Maxim Chizhov, ich arbeite bereits im dritten Jahr als Backend Engineer bei Avito. Als ich zum ersten Mal in das Unternehmen eintrat, stand ich vor dem Problem, große Mengen an Informationen zu speichern. Wie man es löst, werde ich im Artikel erzählen.

Die Services, die unser Bereich erstellt, arbeiten nach dem klassischen ETL-Schema. Extractor extrahiert analytische Rohdaten aus einer externen Quelle, Transformer glättet sie und speichert sie in MongoDB. Und Loader lädt die transformierten Daten in den Vertica-Speicher.

So sieht ein ETL-Schema ausSo sieht ein ETL-Schema aus

Jetzt speichert Avito Daten in der sechsten Normalform in Vertica.

Lesen Sie auch: Vertica+Anchor Modeling = Starten Sie das Wachstum Ihres Myzels

Was ist das Problem

Rohdaten werden in MongoDB gespeichert. Obwohl wir sie mindestens drei Monate lagern müssen, wächst ihr Volumen durch das Hinzufügen neuer Extraktoren ständig. An einem Punkt betrug das Datenwachstum 5 TB pro Woche. Das Problem ist, dass unsere Datenbank eine Standardkopie von drei Datenbanken ist – 1 Master und 2 Slaves. Im Normalbetrieb wurden sie von der Replikation problemlos verarbeitet. Aber eines Tages begann die Replikationsverzögerung aufgrund von Netzwerkproblemen aus dem Ruder zu laufen.

Oplog überschritt manchmal 20 Stunden – aus diesem Grund konnte die Replikation nicht mit neuen Daten Schritt halten, und das Replikat fiel auseinanderOplog überschritt manchmal 20 Stunden – aus diesem Grund konnte die Replikation nicht mit neuen Daten Schritt halten, und das Replikat fiel auseinander

Die einzige Möglichkeit war, das Replikat neu zu erstellen. Dadurch haben wir etwa 40 TB Rohdaten verloren – es war schmerzhaft. Um eine Wiederholung dieser Situation zu vermeiden, begannen wir, in zwei Richtungen zu handeln:

  • Verteilen Sie die Daten auf verschiedene Replikate.

  • Wir haben uns entschieden, einen Teil der Daten in einem Archiv zu speichern und bei Bedarf in Mongo wiederherzustellen.

Wie wir uns für die Aufbewahrung entschieden haben

Basierend auf Kundenanfragen und der erforderlichen Datenmenge haben wir Speicheranforderungen identifiziert:

  • Rohanalysedaten sollten mindestens ein Jahr aufbewahrt werden.

  • Die Speicherdatenmenge beträgt 800 TB.

  • Darin können Sie Daten schnell im Hot Storage wiederherstellen.

  • Daten können transformiert werden, als ob sie nicht archiviert worden wären.

  • Es sollte möglich sein, Cold Storage abzufragen, zumindest mit primitiven Filtern.

Daher wählten sie zwischen Ceph, Hadoop und regulären Dateien. Um den Speichervergleich zu erleichtern, haben wir eine Tabelle zusammengestellt:

Der Vorteil des Ceph-Speichers war, dass Avito dafür eine S3-Schnittstelle bereitstellt. Viele Bibliotheken wurden damit geschrieben, auch in Python. Die für unsere Zwecke ausgewählte Bibliothek aioboto3.

Das Ergebnis der Arbeit ist AaaS

Der Dienst bietet zwei Handles: Archivieren und Wiederherstellen. Unter der Haube läuft der Standard-Archivierer pigz, mit dem Sie Daten um den Faktor vier komprimieren und die jährliche Datenmenge auf 200 TB reduzieren können. Und er kann in mehreren Threads arbeiten. Dann wird basierend auf den Parametern des Extraktors ein eindeutiger Pfad zum Repository generiert.

Das System arbeitet nach folgendem Algorithmus:

  1. Der Cron-Extraktor beginnt mit dem Entladen von einer externen Quelle und schreibt die Daten in MongoDB.

  2. Nachdem die Arbeit abgeschlossen ist, startet Extractor den Archiving Worker, der die Daten in 1-GB-Batches schneidet.

  3. Jeder Batch wird asynchron über Websocket an AaaS übertragen, wie er ist.

  4. Auf Seiten des Archivdienstes werden die Daten komprimiert und an Ceph gesendet.

  5. Extractor fügt Metadaten zur erfolgreichen Archivierung von extract_id hinzu.

So funktioniert das ganze SystemSo funktioniert das ganze System

Darüber hinaus wird jeden Tag ein Cron ausgeführt, der veraltete Extrakte aus dem Hot Storage entfernt. Zuvor wird geprüft, ob diese Daten archiviert sind. Jeder Extraktor hat seine eigenen TTL-Einstellungen, normalerweise 7 Tage. An dieser Stelle wird die Löschung der extract_id in die Metadaten geschrieben.

Viele werden sich fragen, warum es unmöglich war, jeden Extraktor dazu zu bringen, die Daten selbst zu archivieren und an Ceph zu senden? Denn für jede Änderung in der Archivierungslogik müsste jeder Extraktor aktualisiert werden – das ist teuer. Darüber hinaus müssen möglicherweise auch andere Arten von Diensten, z. B. Transformatoren, archiviert werden.

ETL mit KühlhausETL mit Cold Storage Lassen Sie mich erklären, wie die Datenpartitionierung erfolgt:

  1. Für jede Sammlung wird die durchschnittliche Dokumentgröße in Gigabyte berechnet. Dafür ist der Parameter collstats.avgObjSize zuständig.

  2. Basierend auf der Größe des Batches (standardmäßig ist es 1 GB) erhalten wir es in den Aufzeichnungen.

  3. Wenn wir die Gesamtzahl der Dokumente in der Sammlung für die aktuelle extract_id kennen, erhalten wir die Anzahl der Batches.

  4. Schlüssel wird in Redis geschrieben EXTRACT_ID.batches_info mit empfangenen Nummern. Dies ist ein wichtiger Punkt, da sich die durchschnittliche Dokumentgröße häufig ändert, insbesondere wenn die Sammlung zum ersten Mal erstellt wird. Geschieht dies nicht, werden bei der erneuten Archivierung die Anzahl der Datensätze in Stapeln unterschiedlich sein und es kommt zu Verwechslungen.

Was ist mit der Belastung des Netzwerks passiert?

Die erste funktionierende Variante bedeutete, dass der Archivdienst Daten direkt aus MongoDB-Extraktoren las. Diese Option wurde jedoch aus mehreren Gründen schnell aufgegeben:

  1. Jedes Mal, wenn ein neuer Extraktor hinzugefügt wurde, musste der Archivdienst bearbeitet werden, um ihm den Pfad zu den Sammlungen und anderen Einstellungen zu geben.

  2. Eines der Grundprinzipien der Microservice-Architektur in Avito wurde verletzt – „1 Base – 1 Service“. Dies ist ein Indikator für die qualitative Zerlegung der Funktionalität einzelner Dienste.

Um die gewünschte Reinheit der Architektur zu erreichen, haben wir die Belastung des Netzwerks geopfert. Mal sehen, wie es gewachsen ist.

war → wurdewar → wurdeSo sieht die Komprimierung in der Grafik ausSo sieht die Komprimierung in der Grafik aus

Eine durchschnittlich 4-fache Komprimierung bedeutet 4Tx = Rx. Wir können die Erhöhung der Netzwerklast mit der Formel berechnen:

Ch = (Rx + Rx + Tx) / (Rx + Tx) = 9/5

Einfach ausgedrückt, die Netzwerklast stieg um 80 %. Dies erwies sich als akzeptabel und verursachte keine Probleme mit Netzwerküberlastung.

Datenwiederherstellung

Um Daten wiederherzustellen, wird das Restore-Handle verwendet, das sie über ein Websocket bereitstellt. Zuerst entschieden wir, dass es ausreichen würde, sie in einem Hot Storage wiederherzustellen und dann die erforderlichen Operationen daran durchzuführen. Dies kann beispielsweise die Transformation archivierter Daten sein. Als Übergangslösung funktionierte das gut, verstieß aber gegen das „1 base – 1 service“-Prinzip.

Benutzer missbrauchten die Funktionen und stellten Daten im Produkt MongoDB wieder her. Um dieses Problem zu lösen, haben wir dem Extraktor die Möglichkeit hinzugefügt, nicht nur Daten zurückzugeben, die noch nicht aus dem Hot Storage gelöscht wurden, sondern auch archivierte Daten von Ceph.

Schema der Transformation von Daten aus dem ArchivSchema der Transformation von Daten aus dem Archiv

Lassen Sie mich erklären, was im Diagramm passiert:

  1. Der Transformer folgt den Daten über extract_id zum Extraktor-Handle.

  2. Der Extraktor prüft seine Metadaten, ob die Daten dieser extract_id archiviert und gelöscht wurden.

  3. Wenn sie nicht archiviert oder gelöscht wurden, werden sie aus MongoDB gelesen.

  4. Wenn sie archiviert und gelöscht werden, liest sie sie aus AaaS, das wiederum nach Daten in Ceph sucht.

  5. Wenn die Daten aus dem Archiv gelöscht werden, wird eine Fehlermeldung angezeigt.

Der Transformer ist sich der Existenz des Archivierungsdienstes nicht bewusst und kann Daten für einen beliebigen Zeitraum verwenden. Wenn sie sich nicht im Hot Storage befinden, fungiert der Extraktor als Proxy.

Was ist das Ergebnis

Als Ergebnis haben wir eine stabile Hot-Storage-Größe, die bei etwa 20 TB gehalten wird. Dies ist ein komfortabler Wert für die Replikation. Aber selbst wenn es erneut zu einer Situation mit kritischer Replikationsverzögerung kommt, können wir die Replik einfach schmerzlos neu erstellen.

Jetzt können wir Daten für jeden Aufbewahrungszeitraum im Cold Storage umwandeln. Der Transformer weiß nichts über das Archiv und geht direkt zum Extractor. Dieses Schema ermöglicht es Ihnen, schnell Änderungen vorzunehmen und Daten an Vertica zu liefern, während der Ressourcenverbrauch reduziert wird.

Heiße SpeichergrößeHeiße Speichergröße

Trotz der Fallstricke, die von der Wahl einer Methode zum Speichern von Cold Dumps bis hin zur nahtlosen Transformation archivierter Daten reichten, erwies sich das Schema als recht einfach.

Wir haben eine saubere Architektur bekommen, aber die Belastung des Netzwerks erhöht. Solche Opfer sind unvermeidlich, um die Qualitäten des Systems zu erreichen, die wir erreichen wollen:

  • Horizontale Skalierbarkeit. Wir können AaaS-Ressourcen reduzieren oder erhöhen. In unserem Fall ändern wir einfach die Anzahl der Pods in Kubernetes.

  • Austauschbarkeit von Komponenten. Wenn wir plötzlich einen anderen Speicher anstelle von Ceph verwenden möchten, können wir Änderungen problemlos für alle Extraktoren vornehmen, und sie werden sie nicht einmal bemerken.

Vorheriger Artikel: Trunk Based Development – ​​wer ist das und warum wird es benötigt?

Similar Posts

Leave a Reply

Your email address will not be published.