Archives du blog

MongoDB Aggregation Framework

Vous avez probablement entendu parlé de MongoDb, une solution NoSQL orientée document développée par 10Gen. Les documents sont stockés en JSON, et bien que vous ayez un driver disponible pour chaque language, on se retrouve souvent à coder les requêtes en javascript dans le shell mongo fourni. Je vais vous parler de la version 2.2 qui est la dernière version stable et contient le framework d’aggregation, grande nouveauté attendue par les développeurs. Pour votre information, les numéros de version de Mongo suivent le vieux modèle du kernel Linux : les numéros pairs sont stables (2.2) alors que les versions de développement sont instables (2.1). Node.js suit le même modèle par exemple.

L’aggrégation donc, qu’est ce que c’est? Pour vous faire comprendre l’intérêt nous allons prendre un petit exemple (version simplifée d’un vrai projet). Admettons que vous stockiez les connexions à votre application toute les minutes, par exemple avec un document qui ressemblerait à

{"timestamp": 1358608980 , "connections": 150}

C’est à dire un timestamp qui correspond à la minute concernée et un nombre de connexions total.

Disons que vous vouliez récupérer les statistiques sur une plage de temps, par exemple sur une heure : il faudrait alors aggréger ces données pour obtenir le nombre total de connexion et le nombre moyen par minute. Seulement voilà, MongoDb ne propose pas de “group by”, de “sum” ou de “avg” comme l’on pourrait avoir en SQL. Ce genre d’opération est même déconseillé, car fait en javascript cela prend un plus de temps que dans une base classique. C’est en tout cas à éviter pour répondre à des requêtes en temps réel. Mais bon des fois, on est obligé…

 The old way : Map/Reduce
Jusqu’à la version 2.2 donc, on utilisait un algo map/reduce pour arriver à nos fins. Si vous ne connaissez pas, je vous invite à lire cet article de votre serviteur expliquant le fonctionnement. Dans un algo map/reduce, Il faut écrire une fonction map et une fonction reduce, qui vont s’appliquer sur les données selectionnées par une requête (un sous ensemble de votre collection MongoDb).

La requête qui permet de selectionner ce sous ensemble serait par exemple :

// stats comprises entre 15:00 et 16:00

var query = { timestamp : { $gte: 1358607600, $lte: 1358611200 }}

La fonction map va renvoyer les informations qui vous intéressent pour une clé. Ici nous voulons les connexions pour l’heure qui nous intéresse, donc nous aurons une fonction comme suit :

// on renvoie les infos pour la clé 15:00

var map = function(){ emit(1358607600, { connections : this.connections}) }

La fonction reduce va ensuite aggréger les informations, en ajoutant les connexions totales pour la clé 15:00 et calculer la moyenne associée.

// calculer la somme de toutes les connexions et la moyenne

var reduce = function(key, values){
 var connections = Array.sum(values.connections);
 var avg = connections/values.length;
 return { connections: connections, avg: avg}
 }

Maintenant que nous avons nos fonctions map et reduce, ainsi que la requête pour remonter les données qui nous intéressent, on peut lancer le map reduce.

// dans le shell mongo

db.statistics.mapReduce(map, reduce, { query: query, out: { inline: 1 }})

Le out inline permet d’écrire la réponse dans le shell directement (sinon il faut préciser une collection qui acceuillera le résultat). On obtient une réponse du style :

{connections: 180000, avg: 3000}

en 4,5 secondes environ sur ma collection de plusieurs millions de document légèrement plus complexes que l’exemple.

The new way : Aggregation Framework
Maintenant voyons la nouvelle façon de faire avec le framework d’aggrégation. Une nouvelle opération apparaît : aggregate. Celle-ci remplace mapReduce et fonctionne comme le pipe sous Linux : de nouveaux opérateurs sont disponibles et on peut les enchaîner. Par exemple, le “group by” est simplifié avec un nouvel attribut $group. La requête qui permet de filtrer un sous ensemble de la collection est écrite avec un opérateur $match. Enfin de nouveaux opérateurs viennent nous simplifier la vie : $sum, $avg, $min, $max… J’imagine que vous avez saisi l’idée.

Ici on veut un élément match qui limite l’opération aux données de l’heure concernée, on peut réutiliser la même query que tout à l’heure. On groupe ensuite les documents avec une seule clé : celle de l’heure qui nous intéresse, puis l’on demande le calcul de deux valeurs, le nombre total de connexions (une somme) et la moyenne des connections (une moyenne donc).

db.statistics.aggregate(
 { $match: query},
 { $group: { _id: 1358607600, totalCompleted: {$sum: "$connections"}, totalAvg: {$avg: "$connections"}
 }})

Le résultat est le suivant (en 4,2 secondes, soit un temps légérement inférieur au précédent) :

{ result: [{
 "_id": 1358607600,
 "totalCompleted": 180000,
 "totalAvg": 3000
 }], ok: 1}

L’avantage principal du framework d’aggrégation réside dans sa plus grande simplicité d’écriture et de lecture : plus besoin d’écrire des fonctions js soi-même pour des opérations somme toute assez courantes. Spring Data Mongo par exemple, le très bon projet de SpringSource pour vous simplifier la vie, demande d’écrire des fonctions en js pour faire du map/reduce. Vous avez donc un projet Java, qui contient quand même quelques fichiers js au milieu pour faire certaines opérations. Beaucoup attendent donc avec impatience l’arrivée du support du framework d’aggrégation dans Spring Data. Espérons qu’il ne tarde pas trop! En attendant d’autres frameworks comme Jongo l’ont déjà intégré. Il y a toutefois quelques limites comme le résultat de l’aggregate qui doit faire moins de 16Mo. Bref tout n’est pas idéal, mais ce très bon produit s’améliore à chaque version!

Publicités

Getting Started with Hadoop : Part 2

Disclaimer : Ce billet est la deuxième partie d’un article écrit pour la revue Programmez  (n°144) et traite de la technologie Hadoop. Si vous avez manqué la première partie, la voici. J’en profite également pour citer un article de businessinsider qui vous donnera peut être envie découvrir cette techno :  les startups qui emploient les ex-Yahoo, ex-Facebook et ex-Google. Et l’article est parfaitement dans l’actualité avec la release de la version 1.0.0 !

Exemple
Prenons un exemple pratique : vous voulez trier une pile de pizzas, chaque pizza ayant un parfum (4 fromages, margarita…) et une forme (ronde, carrée, triangle..). Pour cela vous avez à votre disposition 3 serveurs (3 noeuds dans votre cluster). Quelles vont être les phases map et reduce ?
La phase map va consister à répartir les pizzas selon leur forme. Chaque serveur va recevoir une pile de pizzas à traiter et pour chaque pizza va écrire un couple (clé, valeur) de sortie composé de la forme de la pizza et de la pizza.
(ronde, pizza1), (ronde, pizza2), (carrée, pizza3) …

Une fois toutes les pizzas traitées par les serveurs, on se retrouve avec une liste de couple (clé, valeur) avec les clés représentant les formes de pizza. Hadoop va alors redistribuer cet ensemble de données sur nos 3 serveurs en envoyant les pizzas carrées sur le premier, les rondes sur le second etc… Éventuellement, si les pizzas carrées sont trop nombreuses pour être traitées sur un seul serveur, elles seront envoyées en partie sur le serveur un et en partie sur le serveur deux.

Chaque reducer va alors donner un couple (clé, valeur) en sortie indiquant le nombre de chaque saveur pour les clés qu’il a reçues. On crée ici une nouvelle clé composée de la forme et de la saveur.
(carrée_4fromages, 234), (ronde_margarita, 128), (triangle_margarita,6) …

Cet exemple est un cas d’école, et même si vous allez rarement trier des pizzas, vous pouvez chaîner plusieurs mappers et/ou reducers pour arriver à vos fins sur des opérations plus compliquées.

Parmi les exemples notables d’utilisation, nous avons bien sûr Twitter, Google ou Facebook qui s’en servent massivement pour toutes les opérations d’indexation, de traitement de leurs logs serveur (avec plusieurs teraoctets traités chaque jour) ou encore les prédictions de tendances. Mais on retrouve aussi Hadoop chez des opérateurs de téléphonie mobile (test de nouveaux forfaits sur de vastes volumes de données), ou dans des banques pour des calculs de prime, ou pour des calculs de facturation complexes mais ponctuels. Ou encore dans l’industrie pour les suivis RFID qui peuvent générer des quantités incroyables de données.

Un dernier exemple intéressant est celui du New York Times. Afin de générer plusieurs teraoctets de PDFs à partir de leurs archives et de les ouvrir au public, leur équipe a implémenté un programme map/reduce pour réaliser efficacement cette tâche. Et plutôt que d’acheter de nouvelles machines qui ne leur serviraient plus par la suite, elle s’est appuyée sur des instances Amazon : 100 instances, pendant 24h pour un coût en infrastructure dérisoire de 240$. Résultat : 1.5 To de PDF générés dans la journée, avec le résultat hébergé dans le stockage S3 (Simple Stockage Service, service de stockage dans le cloud de la société Amazon) et prêt à être utilisé.

HDFS
Hadoop n’est pas seulement le framework qui vous permet d’écrire des programmes distribués en vous simplifiant la tâche, il met aussi a disposition un système de fichier pensé pour ce type de traitement. Dans un cluster Hadoop, les données sont réparties entre les différents noeuds du cluster. Chaque bloc de données est même répliqué sur plusieurs noeuds pour éviter toute perte (la valeur par défaut de cette redondance est de 3, la donnée se trouvera donc sur 3 noeuds distincts de votre cluster).

Pour répartir ces données de façon performante, un système de fichier distribué a été créé : Hadoop Distributed File System (HDFS) inspiré là aussi de Google et de son Google File System. Ce système de fichier est écrit en Java et vous assure un système de fichier portable. Même si les données sont réparties, elles sont accessibles par n’importe quel noeud lors du traitement (et sont alors transférées si nécessaires depuis leur noeud de stockage vers le noeud affecté au traitement). Ce système de fichier est également capable de monitorer les noeuds, de répliquer et de répartir les données en cas de perte de serveur. Chaque noeud de votre cluster est, en régle générale, également un noeud de stockage.

Lorsque l’on veut traiter un ensemble de données (par exemple un fichier XML de plusieurs centaines de Mo), on va d’abord monter ces données sur le système de fichier HDFS. Pour cela Hadoop fournit un ensemble de commandes très proches de celles que l’on trouve dans l’univers Unix : ls pour lister, put pour ajouter, rm pour supprimer, cat pour lire… Une fois les données montées, on peut démarrer le traitement ; HDFS va alors diviser chaque fichier d’entrée en bloc de données (par défaut de 64Mb) et l’envoyer à un noeud pour calcul, en privilégiant bien sûr les blocs présents physiquement sur le serveur. L’idée est plutôt de déplacer les traitements vers les données que l’inverse, ce qui est beaucoup moins coûteux en performance et en bande passante.

Ecosystème
Hadoop a aussi un très riche écosystème d’outils qui répondent à des besoins différents.

  • Apache Pig est un langage de requête pour analyser un vaste ensemble de données, né chez Yahoo. C’est une abstraction comme peut l’être SQL pour écrire des requêtes qui sont alors traduites en job Map/Reduce et exécutées de façon distribuée sur le cluster. Cela peut simplifier considérablement l’écriture d’une requête !
  • Apache Hive : un data warehouse libre implémentant un langage de requête orienté SQL (HiveSQL) dont la mise en œuvre se traduit par l’exécution de jobs Map/Reduce orchestrés par Hadoop.
  • Sqoop va, lui, vous permettre d’importer vos données stockées dans une base SQL dans HDFS. Le nom vient d’ailleurs de la contraction SQL-to-Hadoop. Il peut également importer les données dans Hive.
  • Flume permet également de collecter des données, mais plutôt des logs.
  • Apache HBase est une base de données orientée colonne s’inscrivant dans la mouvance NoSQL dont le design s’inspire de BigTable (Google). Elle supporte jusqu’à plusieurs millions de lignes et fonctionne au dessus de HDFS.
  • Oozie est un moteur de workflow pour orchestrer vos programme Map/Reduce ou vos scripts Pig.
  • Hue (Hadoop User Experience) est une interface graphique web qui permet d’administrer Hadoop, de parcourir votre répertoire HDFS, de voir les programmes en cours d’exécution etc…


Bref, beaucoup d’utilitaires sont là pour vous aider !

Comment démarrer simplement ?
Une société s’est spécialisée dans le support d’Hadoop auprès des entreprises : il s’agit de la société Cloudera, qui compte dans ses rangs nombre de commiters du projet, dont Doug Cutting. Cloudera fournit un ensemble de ressources très utiles pour aller plus loin, des vidéos mais aussi une machine virtuelle Ubuntu avec tous les outils précités d’ores et déjà installés et configurés. C’est donc un moyen simple de tester ce que peut vous offrir Hadoop. Le framework est d’ailleurs fourni avec quelques exemples de programmes sous forme de jar, que vous pouvez lancer dans la VM. Le navigateur installé vous permettra de monitorer l’avancement de votre programme lors de son exécution.

Hadoop est donc un outil à connaître et qui vous sera peut être utile dès demain. Il ouvre des perspectives intéressantes sur la façon de traiter vos données et n’est pas seulement réservé aux ‘big players’ du web. Si votre application produit trop de données pour que les outils classiques puissent les traiter, ou que ceux-ci les traitent trop lentement, ou de façon trop coûteuse, pensez Hadoop ! Son implémentation du pattern Map/Reduce vous permettra de découvrir en douceur les joies de la scalabilité, et son formidable éco-système, à peine abordé ici, vous ouvrira un univers sans limite. Et quand on voit le nombre de startups qui se lancent sur cette techno, on se dit que Hadoop, c’est définitivement hype!

Getting Started with Hadoop : Part 1

Disclaimer : Ce billet est la première partie d’un article écrit pour la revue Programmez qui est en kiosque ce mois de Septembre  (n°144) et traite de la technologie Hadoop (une petite pause au milieu de la série consacrée à Node.js).

Avez vous déjà essayé d’indexer le web ? Non ? Et bien Google l’a fait, et le fait tous les jours. En quasi temps réel. Grande nouvelle me direz vous, on le sait depuis longtemps ! Mais ne vous êtes vous jamais demandé comment ? Avoir les plus grandes fermes de serveurs et embaucher les cerveaux les plus brillants ne suffit pas : il s’agit aussi de faire ça intelligemment ! Comment effectuer des traitements sur un ensemble de données de cette taille? La firme de Mountain View a partagé quelques uns de ses secrets en Décembre 2004 dans un whitepaper nommé “MapReduce: Simplified Data Processing on Large Clusters”. Un nom un peu barbare pour dévoiler les principes d’un algorithme simple, MapReduce, pour traiter des données sur des clusters de machines, mais sans en révéler les détails d’implémentation.

Qu’à cela ne tienne, quelques passionnés, Doug Cutting (créateur du projet Apache Lucene, un moteur d’indexation de documents) en tête, se mettent à réaliser une implémentation open source de MapReduce : Apache Hadoop était né ! L’objectif est alors pour Doug d’accélérer un autre de ses projets open source, le projet Apache Nutch, un moteur d’indexation web. Après quelques temps, une version de Nutch basée sur ce qui deviendra Hadoop, est rendue disponible. Et elle se révèle bien plus rapide et simple que la précédente. Yahoo pressent l’intérêt de la chose et va alors employer Doug. Ce dernier et une équipe travaille à plein temps sur le projet Hadoop (nommé d’ailleurs avec le surnom du jouet du fils de Doug, un éléphant qui est aussi devenu le logo). Aujourd’hui la détection de spam et l’indexation des sites web chez Yahoo se basent sur Apache Hadoop, dont la première release date de 2008.

Intérêt

Hadoop ne se destine pas à être utilisé en temps réel, c’est un outil qui vous permet d’effectuer vos traitements batchs. Il se base sur l’algorithme MapReduce afin d’effectuer le traitement de vastes volumes de données sur un serveur ou, et c’est toute sa puissance, sur un cluster de serveurs. Ce n’est pas une solution de stockage de données, et Hadoop ne s’inscrit donc pas dans la mouvance NoSQL, même si on les retrouve souvent associés en pratique (car pour traiter de grandes quantités de données, il faut déjà les stocker et c’est là que NoSQL a tout son sens). Ceci étant dit, quel intérêt pour vous et moi qui ne sommes ni Google, ni Twitter et qui ne possédons pas une ferme de serveurs ? Tout d’abord avec le prix de l’espace de stockage qui diminue, on serait bien tenté de stocker de plus en plus d’informations dans nos applications (de la donnée texte à la photo ou vidéo), et, pour les traiter, d’utiliser quelques serveurs au lieu d’un. Voire même d’utiliser la puissance du Cloud pour le faire grâce à son prix dérisoire, Amazon par exemple permettant de louer un serveur pendant une heure pour 0.085$. Ou peut être pour gagner en vitesse de traitement : au lieu d’acheter un magnifique serveur octocore, peut être que vos deux vieux serveurs qui prennent la poussière viennent de retrouver un intérêt…

Principe

Mais sur quoi repose ce modèle de traitement des données ? Tout s’articule sur le découpage de vos programmes en deux parties distinctes dont les exécutions vont être successives : la phase ‘map’ et la phase ‘reduce’. Le mapper est là pour filtrer et transformer les entrées en une sortie que le reducer pourra agréger une fois la première phase terminée, aboutissant alors au résultat souhaité, que ce soit un simple calcul statistique ou un traitement métier plus complexe. Ces deux phases ne sont pas issues de l’imaginaire des développeurs, mais bien des retours terrains constatés par les Googlers qui travaillaient sur ces problématiques.

Chaque phase est en fait une simple méthode, écrite en Java ou éventuellement dans votre langage préféré (Python, Ruby…), de traitement de données à implémenter. Lors de la première phase, MapReduce reçoit les données et donne chaque élément à traiter à chaque mapper (sur chaque noeud de votre cluster, soit de 1 à n machines). A l’issue de cette phase les données traitées sont redistribuées à chaque reducer (idem, chaque noeud de votre cluster, de 1 à n machines) pour arriver au résultat final.

Même si parfois découper votre traitement en deux phases vous semblera non trivial, une fois cette étape réalisée, passer l’exécution de votre programme sur un cluster de n machines ne sera pas plus compliqué qu’un changement de fichier de configuration. Quand vous écrivez votre programme en suivant le modèle MapReduce, Hadoop se charge ensuite pour vous de passer à l’échelle supérieure.

Si nous n’utilisions pas un framework comme Hadoop, nous devrions aussi écrire une fonction de partitionnement pour répartir les données entre les différents noeuds, une autre méthode pour les redistribuer entre les deux phases, gérer le load-balancing, les possibles échecs de traitement, les inévitables pannes matérielles serveur, etc… Beaucoup de travail pas forcément passionnant !  Mais Hadoop se charge là aussi des tâches ingrates et les optimise probablement mieux que ce que nous aurions pu faire. En résumé, écrire un programme Hadoop est donc aussi simple qu’écrire une méthode Map et une méthode Reduce, et à vous la scalabilté !

Dans un programme MapReduce, aucune valeur n’est traitée sans une clé associée. Les fonctions map et reduce ne reçoivent pas de simples données mais des paires (clé, valeur). Même chose pour les données sortantes des fonctions. Le mapper peut produire 0, 1 ou n couples (clé, valeur) pour un couple (clé, valeur) entrant. Le reducer, lui, va réduire une liste de couples (clé, valeur) produite par les mappers en une seule valeur par clé.

To be continued : si vous êtes impatient, foncez au bureau de presse le plus proche. Sinon je mettrai en ligne la suite dans quelques temps. Ce numéro de Programmez contient également un article de @AgnesCrepet, mon amie JDuchess, sur le BDD (si vous ne savez pas ce que c’est vous avez deux bonnes raisons d’acheter ce numéro). Et c’est un très bon article, écrit avec Mauro Talevi, commiter JBehave, je ne le dis pas juste parce que c’est une amie!