Von überall aus auf deinen Spark-Cluster zugreifen – mit Apache Livy


Livy ist ein REST-Webservice zum Einreichen von Spark-Jobs oder zum Zugriff auf – und somit zum Teilen von – lang laufenden Spark-Sitzungen aus der Ferne. Anstatt deinen Spark-Client mühsam zu konfigurieren und zu installieren, übernimmt Livy diese Aufgabe und stellt dir eine einfache und komfortable Schnittstelle zur Verfügung.
Wir bei statworx nutzen Livy, um Spark-Jobs aus dem Workflow-Tool Apache Airflow auf flüchtigen Amazon-EMR-Clustern einzureichen. Außerdem teilen sich mehrere Kolleg:innen mit unterschiedlichen Scriptsprachenkenntnissen einen laufenden Spark-Cluster.
Ein weiterer großer Vorteil von Livy ist, dass du aus einer Reihe von Scriptsprachen wählen kannst: Java, Scala, Python, R. Wie auch bei Spark hängt es vom jeweiligen Anwendungsfall (und deinen Fähigkeiten) ab, welche Sprache du verwenden solltest bzw. kannst.

Architektur – https://livy.incubator.apache.org/
Apache Livy befindet sich noch im Incubator-Status, und der Code ist im Git-Projekt zu finden.
Wann man es verwenden sollte
Da sich REST-APIs leicht in deine Anwendung integrieren lassen, solltest du sie verwenden, wenn:
- mehrere Clients eine Spark-Session teilen sollen.
- die Clients schlank sind und nicht mit Installation und Konfiguration überlastet werden sollen.
- du ein schnelles Setup brauchst, um auf deinen Spark-Cluster zuzugreifen.
- du Spark in eine App auf deinem Mobilgerät integrieren möchtest.
- du volatile Cluster hast und nicht jedes Mal die Konfiguration anpassen willst.
- ein externes Workflow-Tool Spark-Jobs einreicht.
Voraussetzungen
Livy ist im Allgemeinen benutzerfreundlich, und du brauchst eigentlich nicht viel Vorbereitung. Alles, was du grundsätzlich benötigst, ist ein HTTP-Client, um mit der REST-API von Livy zu kommunizieren. REST-APIs gelten als leicht zugänglich (Zustände und Listen sind sogar über Browser abrufbar), HTTP(s) ist ein bekanntes Protokoll (Statuscodes zur Fehlerbehandlung, Aktionen wie GET und POST usw.) und bietet gleichzeitig alle nötigen Sicherheitsmechanismen.
Da Livy als Vermittler für deine Spark-Anfragen fungiert und deinen Code (entweder als Script-Snippets oder als übermittelbare Pakete) an den Cluster weiterleitet, musst du tatsächlich Code schreiben (oder jemanden haben, der den Code für dich schreibt, oder ein übertragbares Paket zur Hand haben).
Ich habe mich in diesem Blogpost entschieden, hauptsächlich Python als Spark-Skriptsprache zu verwenden und auch direkt mit dem Livy-Interface zu interagieren. Einige Beispiele wurden zusätzlich mit curl ausgeführt.
Wie man Livy verwendet
Es gibt zwei Modi, um mit dem Livy-Interface zu interagieren:
- Interactive Sessions haben eine laufende Session, in die du Anweisungen senden kannst. Sofern Ressourcen verfügbar sind, werden diese ausgeführt und du erhältst eine Ausgabe. Das kann verwendet werden, um mit Daten zu experimentieren oder schnelle Berechnungen durchzuführen.
- Jobs/Batch übermitteln Code-Pakete wie Programme. Ein typischer Anwendungsfall ist eine regelmäßige Aufgabe mit bestimmten Parametern, die im Hintergrund ausgeführt wird. Das könnte beispielsweise eine Datenvorbereitungsaufgabe sein, die Eingabe- und Ausgabeverzeichnisse als Parameter entgegennimmt.
Im Folgenden schauen wir uns beide Fälle und den typischen Ablauf der Einreichung näher an. Jeder Fall wird durch Beispiele veranschaulicht.
Interactive Sessions
Beginnen wir mit einem Beispiel für eine interaktive Spark-Session. Im gesamten Beispiel verwende ich Python und das requests-Paket, um Anfragen an die REST-API zu senden und Antworten zu empfangen. Wie bereits erwähnt, musst du diesen Weg nicht unbedingt gehen – du kannst auch deinen bevorzugten HTTP-Client verwenden (sofern dieser POST- und DELETE-Anfragen unterstützt).
Start einer Spark-Session. Es gibt eine Reihe von Parametern, die konfiguriert werden können (die genauen Details findest du in der Livy-Dokumentation), aber für diesen Blogpost halten wir uns an die Grundlagen und geben lediglich den Namen und den Typ des Codes an. Wenn du bereits Spark-Code ohne Livy übermittelt hast, kommen dir Parameter wie executorMemory oder (YARN) queue vielleicht bekannt vor. Und falls du aufwendigere Tasks mit zusätzlichen Bibliotheken ausführst, weißt du sicher, dass auch der jars-Parameter entsprechend konfiguriert werden muss.
Um die Session zu starten, müssen wir eine POST-Anfrage an die Adresse /sessions mit den Parametern senden.
import requests
LIVY_HOST = 'http://livy-server'
directive = '/sessions'
headers = {'Content-Type': 'application/json'}
data = {'kind':'pyspark','name':'first-livy'}
resp = requests.post(LIVY_HOST+directive, headers=headers, data=json.dumps(data))
if resp.status_code == requests.codes.created:
session_id = resp.json()['id']
else:
raise CustomError()
Livy antwortet im Gegenzug mit einer Kennung für die Session, die wir aus der Antwort extrahieren.
Beachte, dass die Session möglicherweise eine gewisse Startzeit benötigt, bis YARN (ein Ressourcenmanager in der Hadoop-Welt) alle Ressourcen zugewiesen hat. In der Zwischenzeit überprüfen wir den Status der Session, indem wir die Adresse /sessions/{session_id}/state
abfragen. Sobald der Status idle ist, können wir Befehle gegen die Session ausführen.
Um Spark-Code auszuführen, verwendet man sogenannte Statements. Der Code wird in den Body einer POST-Anfrage gepackt und an die entsprechende Adresse gesendet: sessions/{session_id}/statements
.
directive = f'/sessions/{session_id}/statements'
data = {'code':'...'}
resp = request.post(LIVY_HOST+directive, headers=headers, data=json.dumps(data))
Als Antwort erhalten wir folgende Attribute:
Das Statement durchläuft mehrere Zustände (siehe unten) und je nach deinem Code, deiner Interaktion (Statements können auch abgebrochen werden) und den verfügbaren Ressourcen landet es mit mehr oder weniger Wahrscheinlichkeit im Status success. Der entscheidende Punkt hierbei ist, dass wir die Kontrolle über den Status haben und entsprechend handeln können.

Übrigens: Das Abbrechen eines Statements erfolgt über eine GET-Anfrage an /sessions/{session_id}/statements/{statement_id}/cancel
Nun ist es an der Zeit, ein Statement einzureichen: Stellen wir uns vor, wir wären einer der Klassenkameraden von Gauß und bekämen die Aufgabe, die Zahlen von 1 bis 1000 zu summieren. Zum Glück hast du Zugang zu einem Spark-Cluster und – noch besser – dort läuft die Livy-REST-API, mit der wir über unsere mobile App verbunden sind: Alles, was du tun musst, ist folgenden Spark-Code zu schreiben:
import textwrap
code = textwrap.dedent("""df = spark.createDataFrame(list(range(1,1000)),'int')
df.groupBy().sum().collect()[0]['sum(value)']""")
code_packed = {'code':code}
Das ist die gesamte Logik, die wir definieren müssen. Der Rest ist die Ausführung gegen die REST-API:
import time
directive = f'/sessions/{session_id}/statements'
resp = requests.post(LIVY_HOST+directive, headers=headers, data=json.dumps(code_packed))
if resp.status_code == requests.codes.created:
stmt_id = resp.json()['id']
while True:
info_resp = requests.get(LIVY_HOST+f'/sessions/{session_id}/statements/{stmt_id}')
if info_resp.status_code == requests.codes.ok:
state = info_resp.json()['state']
if state in ('waiting','running'):
time.sleep(2)
elif state in ('cancelling','cancelled','error'):
raise CustomException()
else:
break
else:
raise CustomException()
print(info_resp.json()['output'])
else:
#something went wrong with creation
raise CustomException()
Alle 2 Sekunden prüfen wir den Status des Statements und reagieren entsprechend auf das Ergebnis: Wir beenden das Monitoring, sobald der Status gleich available ist. Natürlich müsste hier noch etwas ergänzt werden: Beispielsweise sollte ein Fehlerstatus anders behandelt werden als ein Abbruch, und es wäre klug, einen Timeout einzubauen, um die Schleife nach einer bestimmten Zeit zu verlassen.
Angenommen, der Code wurde erfolgreich ausgeführt, schauen wir uns das Attribut output in der Antwort an:
{'status': 'ok', 'execution_count': 2, 'data': {'text/plain': '499500'}}
Da haben wir es: Die Antwort ist 499500.
Zum Schluss beenden wir die Session wieder, um Ressourcen für andere freizugeben:
directive = f'/sessions/{session_id}/statements'
requests.delete(LIVY_HOST+directive)
Job Submission
Jetzt wollen wir zu einer kompakteren Lösung übergehen. Nehmen wir an, wir haben ein Paket zur Lösung einer bestimmten Aufgabe, das als Jar-Datei oder als Python-Skript vorliegt. Es müssen lediglich einige Parameter ergänzt werden – wie Eingabedateien, Ausgabeverzeichnis und einige Flags.
Der Einfachheit halber verwenden wir das bekannte Wordcount-Beispiel, für das Spark freundlicherweise eine Implementierung bereitstellt: Eine große Datei einlesen und bestimmen, wie oft jedes Wort vorkommt. Wir verwenden wieder Python als Spark-Sprache. Diesmal wird curl als HTTP-Client verwendet.
Als Beispiel-Datei habe ich den Wikipedia-Eintrag kopiert, der erscheint, wenn man Livy eingibt. Der Text handelt tatsächlich vom römischen Historiker Titus Livius.
Ich bin für dieses Beispiel in die AWS-Cloud gewechselt, weil sie eine komfortable Möglichkeit bietet, einen mit Livy ausgestatteten Cluster einzurichten, und Dateien können mit einem Upload-Handler einfach in S3 gespeichert werden. Schauen wir uns nun an, wie wir vorgehen sollten:
curl -X POST --data '{"file":"s3://livy-example/wordcount.py","args":[s3://livy-example/livy_life.txt"]}'
-H "Content-Type: application/json" http://livy-server:8998/batches
Die Struktur ist derjenigen, die wir zuvor gesehen haben, sehr ähnlich. Wenn wir den Batch an Livy übergeben, erhalten wir im Gegenzug eine Kennung sowie einige weitere Informationen wie den aktuellen Status.
{"id":1,"name":null,"state":"running","appId":"application_1567416002081_0005",...}
Um den Fortschritt des Jobs zu überwachen, gibt es eine entsprechende Adresse: /batches/{batch_id}/state
Höchstwahrscheinlich möchten wir zunächst sicherstellen, dass der Job erfolgreich ausgeführt wurde. In allen anderen Fällen müssen wir herausfinden, was mit unserem Job passiert ist. Die Adresse /batches/{batchId}/log kann hier helfen, um den Lauf zu inspizieren.
Zum Schluss wird die Session entfernt durch:
curl -X DELETE http://livy-server:8998/batches/1
woraufhin folgende Rückmeldung erfolgt: {"msg":"deleted"} – und damit sind wir fertig.
Trivia
- Der Hadoop-Cluster-Dienst EMR von AWS unterstützt Livy nativ als Option in der Softwarekonfiguration.

- Apaches Notebook-Tool Zeppelin unterstützt Livy als Interpreter, d.h. man schreibt Code direkt im Notebook und führt ihn unmittelbar über die Livy-REST-API aus, ohne sich selbst um HTTP kümmern zu müssen.
- Sei vorsichtig und verwende Livy nicht in jedem Fall, wenn du einen Spark-Cluster abfragen willst: Wenn du Spark als Query-Backend nutzen und Daten über Spark SQL abfragen willst, solltest du dir eher den Thriftserver ansehen, anstatt eine Lösung um Livy herum zu bauen.
- Kerberos kann in Livy für Authentifizierungszwecke integriert werden.