Dieser Artikel richtet sich an Personen, die bisher keine Erfahrung mit Hadoop haben und sich einen Einstieg basierend auf praktischen Übungen wünschen. Neben der Erläuterung der Kernkomponenten, werden mithilfe eines Docker-Image grundlegende Konzepte gezeigt.

Einleitung

Dieser Artikel basiert auf dem Vortrag „Hadoop – Taming the Elephant (With a Whale)“. Er behandelt die Grundlagen von Hadoop und bleibt dabei praxisnah und zielgerichtet. Der Aufbau des Artikels gleicht in weiten Teilen dem Aufbau des Vortrags, dessen Folien online verfügbar sind (siehe Abbildung 1).

Abbildung 1: Titelbild des Vortrags 'Hadoop - Taming the Elephant (With a Whale) (Quelle: Lisa Maria Moritz)'

Was ist Hadoop eigentlich?

Hadoop ist ein System zur Datenspeicherung und -analyse, das seit 2006 entwickelt wird und seit 2008 zu Apaches Kernprojekten gehört. Heutzutage wird Hadoop oft in Enterprise-Systemen für die Arbeit mit Big Data eingesetzt. Die Tochter von Doug Cutting, einem der Mitbegründer des Projektes, nannte ihren gelben Plüschelefant „Hadoop“. Daher rühren sowohl der Projektname als auch das gelbe Elefantenlogo [1]. Hadoop besteht aus fünf Kernkomponenten: Hadoop Common, HDFS, MapReduce, YARN und Ozone. Hadoop Common beinhaltet die grundlegenden Bestandteile der Applikation. HDFS, das „Hadoop Distributed File System“, dient zur Speicherung von Daten. MapReduce stellt ein Programmiermodell zur Verarbeitung großer Datenmengen dar. YARN, kurz für „Yet Another Resource Negotiator“, ist das Cluster-Ressource-Management-System von Hadoop. Ozone ist erst seit Ende 2018 in einer Beta-Version Teil des Kerns, es handelt sich hierbei um einen Objektspeicher. Dieser Artikel befasst sich hauptsächlich mit HDFS und MapReduce.

Hadoop in Docker starten

Ein Vorgehen für einen einfachen und schnellen Start in die Thematik, das viele Möglichkeiten für praktische Fingerübungen bietet, ist die Verwendung eines Docker-Image. So werden umfangreiche Installationen vermieden und die Übungen können schnell in Angriff genommen werden. Auch für die ersten Schritte in Hadoop kann ein Docker-Image eingesetzt werden. In Produktion bildet Hadoop in Docker die Ausnahme. Die Basis für die Beispiele in diesem Artikel ist das Docker-Image sequenceiq/hadoop-docker:2.7.0, in dem alle für Hadoop notwendigen Bestandteile als Single-Node-Installation in einem einzigen Docker-Container gestartet werden. Mit dem Befehl in Listing 1 wird das Docker-Image gestartet.

docker run -it \
-p 50070:50070 \
-p 8088:8088 \
-p 50075:50075 \
sequenceiq/hadoop-docker:2.7.0 \
/etc/bootstrap.sh -bash
Listing 1: Befehl zum Starten des Hadoop-Docker-Containers

Beim Starten des Image werden die Ports für die Web-Benutzeroberfläche (50070), das Tracken von Jobs (8088) sowie der Port zum Herunterladen von Ergebnissen (50075) weitergeleitet. Dies ermöglicht später einen Zugriff auf diese Dienste via „localhost“.

Nach dem Ausführen des run-Befehls wird man direkt zur Bash des gestarteten Containers weitergeleitet. Dies ist der Indikator für den erfolgreichen Start des Docker-Containers. Um zu kontrollieren, ob alle Hadoop-Services erfolgreich gestartet wurden, kann ein Beispiel-MapReduce-Job verwendet werden, der von Apache bereits in Hadoop inkludiert wurde. Zum Starten des Jobs, der in Java geschrieben ist, wird der Befehl (siehe Listing 2) in der Bash des Docker-Containers ausgeführt.

$HADOOP_PREFIX/bin/hadoop jar \
share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.0.jar \ grep input output ‘dfs[a-z.]+’
Listing 2: Befehl zum Ausführen des bereitgestellten MapReduce-Beispiels

Es gibt zwei Wege zur Überprüfung der Ergebnisse. Zum einen können die Ergebnisse mithilfe der Web-UI kontrolliert werden. Hierfür wird in einem Web-Browser die URL „localhost:50070“ geöffnet. In der Navigationsleiste ist der Punkt „Utilities“ zu finden, der den Punkt „Browse HDFS“ beinhaltet. Hier kann das gesamte HDFS erkundet werden. Unter anderem auch die zuvor entstandenen Ergebnisse. Diese sind unter „/user/root/output“ zu finden. Es handelt sich um einen Ordner mit zwei Dateien: einem SUCCESS-Deskriptor – eine leere Datei, die anzeigt, dass die Durchführung des MapReduce-Jobs erfolgreich war – sowie einer Datei „part-r-00000“, die die entstandenen Ergebnisse enthält (siehe Abbildung 2).

Abbildung 2: Erkunden des HDFS mithilfe der Web-UI (Quelle: Lisa Maria Moritz)

Zum anderen können die Ergebnisse über die Kommandozeile angesehen werden. Gibt man den Befehl (siehe Listing 3) in der Bash des Docker-Containers ein, so werden die Ergebnisse des Jobs ausgegeben.

$HADOOP_PREFIX/bin/hdfs dfs -cat output/*
Listing 3: Befehl zum Abfragen der Ergebnisse über die Kommandozeile

MapReduce

Ein guter Anfang, um erste praktische Erfahrungen mit Hadoop zu machen, ist, einen MapReduce-Job mithilfe von Java und Maven zu entwickeln. Aber was ist MapReduce? MapReduce ist, wie bereits erwähnt, eine der fünf Kernkomponenten von Hadoop. Es handelt sich um ein Programmiermodell zur Verarbeitung großer Datenmengen und gliedert sich in zwei Teilphasen: die Map-Phase und die Reduce-Phase [2]. Beide Phasen akzeptieren Schlüssel-Wert-Paare als Eingabe und liefern diese als Ausgabe. In der Map-Phase wird der Datensatz zerlegt, es werden ein neuer Schlüssel extrahiert und ein passender Wert zugeordnet. Wert und Schlüssel richten sich hierbei nach dem Ziel des Jobs. Soll beispielsweise aus einem Haustier-Datensatz mit Identifikationsnummern, Tierarten und Tiernamen die Anzahl der verschiedenen Tierarten ermittelt werden, so ist es denkbar, die Tierart als Schlüssel und die Identifikationsnummer als Wert zu extrahieren, wie in Abbildung 3 dargestellt.

Abbildung 3: Veranschaulichung des Mapper-Beispiels (Quelle: Lisa Maria Moritz)

In der Reduce-Phase werden die vom Mapper zerlegten Datensätze zur Erstellung des Ergebnisses wieder zusammengefügt. Für das Haustier-Beispiel würde das bedeuten, die Identifikationsnummern für eine bestimmte Tierart zu zählen und so die Anzahl der Haustiere dieser Tierart zu ermitteln (siehe Abbildung 4).

Abbildung 4: Veranschaulichung des Reducer-Beispiels (Quelle: Lisa Maria Moritz)

Optional kann einem MapReduce-Job ein sogenannter „Combiner“ zugefügt werden, der die Netzwerklast zwischen Mapper und Reducer reduziert. In den meisten Fällen kann der bereits vorhandene Reducer ebenfalls als Combiner eingesetzt werden. Für die nachfolgenden Beispiele ist ein Combiner nicht vonnöten, da keine Multi-Node-Installation von Hadoop vorliegt und die Netzwerkauslastung kein Problem darstellt.

YARN

Seit Hadoop 2, also seit 2013, gehört YARN zum Kern von Hadoop. Neben der Zuweisung der Systemressourcen für verschiedene Anwendungen kümmert sich YARN außerdem um die Verteilung von Aufgaben auf den Knoten des Clusters [3]. Durch die Entkopplung von Ressourcenmanagement und Planung der MapReduce-Jobs gelang es mithilfe von YARN, die vorherige Implementierung von MapReduce zu verbessern. YARN ermöglicht außerdem die Integration von Alternativen zu MapReduce.

HDFS

Hadoop speichert die Daten im HDFS, dem „Hadoop Distributed File System“. Das HDFS besteht aus zwei Node-Typen, die in einem Master-Worker-Muster zusammenarbeiten. Zum einen gibt es die DataNodes, die für die Speicherung der Daten zuständig sind. Zum anderen gibt es einen NameNode, der die Metadaten des HDFS beinhaltet und „weiß“, wo welche Daten abgelegt sind. Die Datenspeicherung selbst erfolgt in sogenannten Blöcken. Eine Datei wird beim Speichern in einen oder mehrere Blöcke zerlegt, die einzelnen Blöcke können auf verschiedenen DataNodes abgelegt werden [4]. Das Dateisystem von Hadoop ist darauf ausgelegt, effizient wenige große Datensätze zu verarbeiten.

MapReduce mit Java

Zur Realisierung eines MapReduce-Jobs in Java werden zwei Abhängigkeiten via Maven eingebunden: „hadoop-common“ und „hadoop-mapreduce-client-core“. Die nachfolgenden Implementierungen basieren auf einem CSV-Datensatz, in dem es unter anderem die Information „Category“ gibt. Das Ergebnis des Map-Reduce-Jobs soll das Zählen des Auftretens der einzelnen Kategorien sein.

Im vorigen Abschnitt wurde bereits erwähnt, dass MapReduce-Jobs aus einem Mapper und einem Reducer bestehen. Beide werden mithilfe des von Hadoop bereitgestellten API umgesetzt.

Als Erstes wird der Mapper entwickelt, der von der Hadoop-Klasse Mapper erbt. Die map-Methode muss überschrieben werden. Beim Erben von der Basisklasse Mapper müssen die Variablentypen von Schlüssel und Wert der Eingabe und Ausgabe spezifiziert werden. Die map-Methode hat die Parameter key und value, wobei dieser bei einer CSV-Datei die gesamte Zeile darstellt, sowie einen context. Der Parameter context dient zur Weitergabe der Ergebnisse. Im folgenden Beispiel wird die Kategorie einer Zeile separiert und gemeinsam mit der Zeilen-ID (key) im context abgelegt. Hadoop verwendet eigene Datentypen, so wird Text statt String und LongWritable statt Long verwendet (siehe Listing 4).

public class CategoryCountMapper
    extends Mapper<LongWritable, Text, Text, LongWritable> {
    private static final int CATEGORY = 1;
    private static final String SEPARATOR = ",";
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] lineData = line.split(SEPARATOR);
        context.write(new Text(lineData[CATEGORY]), key);
    }
}
Listing 4: Mapper in Java

Auf dieser Basis wird der Reducer implementiert. Er wird die Ergebnisse des Mappers aufnehmen und die Vorkommen einzelner Kategorien zählen. Ein Reducer muss von der Hadoop-Klasse Reducer erben und die reduce-Methode überschreiben. Die reduce-Methode hat als Parameter einen key, ein Iterable values und den context. Als key erhält der Reducer die Kategorie, die im Mapper extrahiert wurde, die zugehörigen Zeilen-IDs werden ihm als Iterable übergeben. Die Aufgabe des Reducer liegt im Zählen der IDs und somit im Reduzieren der Ergebnisse des Mappers. Bei Iteration über die IDs wird das Auftreten der einzelnen Kategorien gezählt. Das Ergebnis wird als Wert vom Typ IntWritable zu der jeweiligen Kategorie gespeichert (siehe Listing 5).

public class CategoryCountReducer
    extends Reducer<Text, LongWritable, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (LongWritable value : values) {
            count++;
        }
        context.write(key, new IntWritable(count));
    }
}
Listing 5: Reducer in Java

Neben einem Mapper und einem Reducer benötigt die Java-Implementierung des MapReduce-Jobs einen Einstiegspunkt. Dieser beinhaltet einige zusätzliche Informationen zum MapReduce-Job, wie einen Namen, die Angabe der Klasse des Mappers und des Reducer sowie die Informationen darüber, wo die Eingabedaten liegen und wohin die Ausgabedaten geschrieben werden sollen (siehe Listing 6).

public static void main(String[] args) throws Exception {
    if (args.length != 2) {
        System.err.println("Usage: <input> <output>");
        System.exit(-1);
    }
    String inputPath = args[1]; String outputPath = args[2];
    Job job = Job.getInstance(); job.setJobName("Category Count");
    FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath));
    job.setMapperClass(CategoryCountMapper.class); job.setReducerClass(CategoryCountReducer.class);
    job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
    job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
    job.setJarByClass(CategoryCount.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Listing 6: MapReduce-Einstiegspunkt in Java

Für die Ausführung des MapReduce-Jobs wird ein jar benötigt, dieses kann mithilfe von Maven erstellt werden. Nach Ausführen des Maven-Befehls liegt das jar im target-Ordner und kann von dort in den Docker-Container verschoben werden.

Zum Starten des MapReduce-Jobs dient der gleiche Befehl wie zum Starten des MapReduce-Jobs von Apache, lediglich die Argumente des Befehls müssen angepasst werden (siehe Listing 7).

$HADOOP_PREFIX/bin/hadoop jar \
/tmp/category-count-mapreduce.jar \
example/input \
example/category-count
Listing 7: Befehl zum Starten des Java-MapReduce-Jobs

Wie im Beispiel zuvor können die Ergebnisse via Web-UI oder Kommandozeile betrachtet werden.

MapReduce ohne Java

MapReduce-Jobs können auch in anderen Sprachen als Java implementiert werden. Eine Möglichkeit dafür bietet das Hadoop-Streaming-API. Es arbeitet mit der Unix-Standardeingabe und -ausgabe, dadurch ist die einzige Anforderung an die Programmiersprache, damit umgehen zu können. Das in Java vorhandene, nützliche Feature, dass die Werte für einen Schlüssel dem Reducer als Iterable übergeben werden, geht hierbei leider verloren, sodass es die Aufgabe des Programmierers ist, die Werte den richtigen Schlüsseln zuzuordnen. Glücklicherweise übergibt das Hadoop-Streaming-API dem Reducer die Schlüssel in geordneter Reihenfolge, es genügt also, den vorangegangenen Schlüssel zwischenzuspeichern. Durch die Verwendung von Standardeingabe und -ausgabe ergibt sich der Vorteil, dass Mapper und Reducer mithilfe von Pipes in der Kommandozeile getestet werden können. Für MapReduce-Jobs, die mit dem Streaming-API implementiert werden, ist es nicht nötig, einen dedizierten Startpunkt zu implementieren. Es ist ausreichend, einen Mapper und einen Reducer zu schreiben, die mit folgendem Befehl (Listing 8) über das in Java implementierte Hadoop-Streaming-API gestartet werden können.

$HADOOP_PREFIX/bin/hadoop jar \
$HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
- files /tmp/mapper.py,/tmp/reducer.py \
- input example/input \
- output example/category-not-java-count \
- mapper /tmp/mapper.py \
- reducer /tmp/reducer.py
Listing 8: Befehl zum Starten eines MapReduce-Jobs über das Hadoop Streaming API

In diesem Beispiel werden ein in Python implementierter Mapper und Reducer aufgerufen, die mit den gleichen Eingabedaten wie die Java-Realisierung arbeiten. Ein dediziertes Verzeichnis speichert die Ausgabedaten. Bei der Verwendung des Hadoop-Streaming-API sollte daran gedacht werden, die Dateien mithilfe von chmod ausführbar zu machen. Der Mapper geht Zeile für Zeile die Standardeingabe durch, teilt die CSV-Zeile am Separator und schreibt die erhaltene Kategorie, die in der zweiten Spalte der CSV-Zeile steht, sowie eine selbst zugewiesene ID in die Standardausgabe (Listing 9).

#!/usr/bin/env python
import sys
id = 0
for line in sys.stdin:
    val = line.strip()
    data = val.split(',')
    category = data[1]
    print(category + '\t' + str(id))
    id = id + 1

Der Reducer ist ähnlich aufgebaut. Er verarbeitet Zeile für Zeile die Standardeingabe, die der Ausgabe des Mappers entspricht. Die Besonderheit des Reducer besteht darin, dass der vorherige Zeilenschlüssel gespeichert werden muss, um nur die zu einem Schlüssel gehörenden Werte zu zählen. Das Ergebnis des Reducer wird in die Standardausgabe geschrieben (siehe Listing 10).

#!/usr/bin/env python
import sys
previous_key = None
count = 0
for line in sys.stdin:
    (key, val) = line.strip().split('\t')
    if previous_key is None:
        previous_key = key
    if key == previous_key:
        count = count + 1
    else:
        print(key + '\t' + str(count))
        count = 1
    previous_key = key

Ergebnisse über HTTP erhalten

Die in den beiden vorangegangenen Abschnitten entstandenen Ergebnisse sollen möglicherweise in einer UI dargestellt werden. Wie kann man die Ergebnisse von außen abfragen? Auf diese Frage liefert HttpFS eine Antwort. Diese Abkürzung steht für „HDFS over HTTP“. Es ermöglicht, alle Operationen, die man auf dem HDFS ausführen kann, über HTTP auszuführen. Für die Erstellung einer UI, die die Ergebnisse veranschaulicht, genügen zwei Operationen des HDFS: das Browsen durch Ordnerstrukturen und das Öffnen von Dateien. Hier werden diese als Beispiele mithilfe von „curl“ gezeigt. Das Browsen von Ordnerstrukturen erfolgt durch die Verwendung des Öffnungsmodus LISTSTATUS (siehe Listing 11).

curl -i -L \
"http://localhost:50070/webhdfs/v1/user/root/output/?op=LISTSTATUS”
Listing 11: Ordner durchsuchen über HttpFS

Die Antwort wird in JSON übertragen und enthält ein Array mit Dateien innerhalb des Ordners. Es werden unter anderem der Dateiname (pathSuffix) und der Typ der Datei (type) übertragen. Eine Datei kann vom Typ FILE oder DIRECTORY sein. Ein DIRECTORY kann erneut durchsucht werden, während ein FILE wie folgt geöffnet werden kann (siehe Listing 12).

curl -i -L \
"http://localhost:50070/webhdfs/v1/user/root/output/example/category-count/part-r-00000?op=OPEN"
Listing 12: Dateien öffnen über HttpFS

Resümee

Trotz des großen Umfangs von Hadoop ist es möglich, erste Schritte mit „dem Elefanten“ am heimischen Rechner zu wagen. Dabei hilft der „Wal“ und komprimiert den grauen beziehungsweise gelben Riesen in einen handlichen Container. Die eigentlichen Stärken von Hadoop, wie die Verarbeitung großer, unstrukturierter Datenmengen, können auf diese Weise zwar nicht demonstriert werden. Die Grundlagen von MapReduce und HDFS können allerdings leicht zugänglich weitergegeben werden.

Quellen

  1. Tom White (2015): Hadoop: The Definitive Guide (4th Edition). O’REILLY.  ↩

  2. Jeffrey Dean, Sanjay Ghemawat (2004): MapReduce: Simplified Data Processing on Large Clusters. https:/static. googleusercontent.com/media/research.google.com/en/ archive/mapreduce–osdi04.pdf.  ↩

  3. Eine Definition von YARN  ↩

  4. Alex Holmes (2015): Hadoop In Practice (2nd Edition). Manning.  ↩