MongoDB: Aggregationsframework: Holen Sie sich das zuletzt datierte Dokument pro Gruppierungs-ID

Ich möchte das letzte Dokument für jede Station mit allen anderen Feldern erhalten:

{ "_id" : ObjectId("535f5d074f075c37fff4cc74"), "station" : "OR", "t" : 86, "dt" : ISODate("2014-04-29T08:02:57.165Z") } { "_id" : ObjectId("535f5d114f075c37fff4cc75"), "station" : "OR", "t" : 82, "dt" : ISODate("2014-04-29T08:02:57.165Z") } { "_id" : ObjectId("535f5d364f075c37fff4cc76"), "station" : "WA", "t" : 79, "dt" : ISODate("2014-04-29T08:02:57.165Z") } 

Ich muss t und Station für das späteste dt pro Station haben. Mit dem Aggregations-Framework:

 db.temperature.aggregate([{$sort:{"dt":1}},{$group:{"_id":"$station", result:{$last:"$dt"}, t:{$last:"$t"}}}]) 

kehrt zurück

 { "result" : [ { "_id" : "WA", "result" : ISODate("2014-04-29T08:02:57.165Z"), "t" : 79 }, { "_id" : "OR", "result" : ISODate("2014-04-29T08:02:57.165Z"), "t" : 82 } ], "ok" : 1 } 

Ist das der effizienteste Weg, das zu tun?

Vielen Dank

Um Ihre Frage direkt zu beantworten, ist es der effizienteste Weg. Aber ich denke, wir müssen klären, warum das so ist.

Wie in Alternativen vorgeschlagen, ist die eine Sache, die Leute betrachten, “sortieren” Ihre Ergebnisse vor dem Übergang zu einer $group Bühne und was sie betrachten, ist der “Zeitstempel” Wert, also möchten Sie sicherstellen, dass alles drin ist “timestamp” Reihenfolge, also also die Form:

 db.temperature.aggregate([ { "$sort": { "station": 1, "dt": -1 } }, { "$group": { "_id": "$station", "result": { "$first":"$dt"}, "t": {"$first":"$t"} }} ]) 

Und wie gesagt, Sie möchten natürlich, dass ein Index das widerspiegelt, um die Sortierung effizient zu machen:

Jedoch, und das ist der wahre Punkt. Was von anderen übersehen wurde (wenn nicht für Sie selbst) ist, dass alle diese Daten wahrscheinlich bereits in der zeitlichen Reihenfolge eingefügt werden, indem jeder Messwert als hinzugefügt aufgezeichnet wird.

Also das Schöne daran ist, dass das _id Feld (mit einer Default- ObjectId ) bereits in “timestamp” ObjectId ist, da es tatsächlich einen Zeitwert enthält und dies die Aussage möglich macht:

 db.temperature.aggregate([ { "$group": { "_id": "$station", "result": { "$last":"$dt"}, "t": {"$last":"$t"} }} ]) 

Und es ist schneller. Warum? Nun, Sie müssen keinen Index (zusätzlichen Code zum Aufrufen) auswählen, Sie müssen den Index auch nicht zusätzlich zum Dokument “laden”.

Wir wissen bereits, dass die Dokumente in Ordnung sind (nach _id ), also sind die $last _id gültig. Sie scannen sowieso alles, und Sie können auch die Abfrage für die _id Werte als gleichwertig für zwei Daten “Bereich” verwenden.

Das einzig Wahre, was ich hier sagen kann, ist, dass es in der “realen Welt” vielleicht praktischer ist, zwischen Datenbereichen zu wählen, wenn man diese Art von Akkumulation durchführt, anstatt die “erste” und “letzte” zu bekommen. _id Werte, um einen “Bereich” oder etwas ähnliches in Ihrer tatsächlichen Verwendung zu definieren.

Wo ist der Beweis dafür? Nun, es ist ziemlich einfach zu reproduzieren, also habe ich einfach einige Beispieldaten erzeugt:

 var stations = [ "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY" ]; for ( i=0; i<200000; i++ ) { var station = stations[Math.floor(Math.random()*stations.length)]; var t = Math.floor(Math.random() * ( 96 - 50 + 1 )) +50; dt = new Date(); db.temperatures.insert({ station: station, t: t, dt: dt }); } 

Auf meiner Hardware (8GB Laptop mit Spinny Disk, die nicht stellar, aber sicherlich ausreichend ist) zeigt jede Ausführung der statement deutlich eine Pause mit der Version mit einem Index und einer Sortierung (gleiche Schlüssel auf Index wie die Sortieranweisung). Es ist nur eine kleine Pause, aber der Unterschied ist signifikant genug, um es zu bemerken.

Selbst wenn man sich die EXPLAIN-Ausgabe anschaut (Version 2.6 und höher, oder tatsächlich ist es in 2.4.9, obwohl nicht dokumentiert), kann man den Unterschied darin sehen, obwohl die $sort aufgrund des Vorhandenseins eines Indexes die benötigte Zeit optimiert ist scheint mit der Indexauswahl zu sein und dann die indizierten Einträge zu laden. Das Einbeziehen aller Felder für eine "abgedeckte" Indexabfrage macht keinen Unterschied.

Auch für die Aufzeichnung liefert das reine Indizieren des Datums und nur das Sortieren nach den Datumswerten das gleiche Ergebnis. Möglicherweise etwas schneller, aber immer noch langsamer als die natürliche Indexform ohne die Sorte.

Solange Sie also den ersten und letzten _id Wert auf _id " _id können, ist es tatsächlich so, dass die Verwendung des natürlichen Index für die _id der effizienteste Weg ist. Ihre tatsächliche Reichweite kann variieren, je nachdem, ob dies für Sie praktisch ist oder nicht und es könnte einfach einfacher sein, den Index zu implementieren und das Datum zu sortieren.

Aber wenn Sie mit der Verwendung von _id Bereichen oder größer als die "letzte" _id in Ihrer Abfrage _id , dann vielleicht eine Optimierung, um die Werte zusammen mit Ihren Ergebnissen zu erhalten, so dass Sie diese Informationen tatsächlich in aufeinanderfolgenden Abfragen speichern und verwenden können:

 db.temperature.aggregate([ // Get documents "greater than" the "highest" _id value found last time { "$match": { "_id": { "$gt": ObjectId("536076603e70a99790b7845d") } }}, // Do the grouping with addition of the returned field { "$group": { "_id": "$station", "result": { "$last":"$dt"}, "t": {"$last":"$t"}, "lastDoc": { "$last": "$_id" } }} ]) 

Und wenn Sie die Ergebnisse tatsächlich " ObjectId ", können Sie den maximalen Wert von ObjectId aus Ihren Ergebnissen ermitteln und in der nächsten Abfrage verwenden.

Wie auch immer, viel Spaß beim Spielen, aber wieder ja, in diesem Fall ist diese Abfrage der schnellste Weg.

Ein Index ist alles was du wirklich brauchst:

 db.temperature.ensureIndex({ 'station': 1, 'dt': 1 }) for s in db.temperature.distinct('station'): db.temperature.find({ station: s }).sort({ dt : -1 }).limit(1) 

natürlich mit der für Ihre Sprache gültigen Syntax.

Edit: Sie haben Recht, dass eine Schleife wie diese eine Hin- und Rückfahrt pro Station verursacht, und sie ist großartig für ein paar Stationen und nicht so gut für 1000. Sie wollen immer noch den zusammengesetzten Index auf Station + dt, und nehmen Vorteil einer absteigenden Sortierung:

 db.temperature.aggregate([ { $sort: { station: 1, dt: -1 } }, { $group: { _id: "$station", result: {$first:"$dt"}, t: {$first:"$t"} } } ]) 

Soweit die Aggregationsabfrage, die Sie gepostet haben, würde ich sicherstellen, dass Sie einen Index für dt haben:

 db.temperature.ensureIndex({'dt': 1 }) 

Dadurch wird sichergestellt, dass das $ sort zu Beginn der Aggregationspipeline so effizient wie möglich ist.

Ob dies der effizienteste Weg ist, um diese Daten zu erhalten, im Gegensatz zu einer Abfrage in einer Schleife, hängt wahrscheinlich davon ab, wie viele Datenpunkte Sie haben. Am Anfang, mit “Tausenden von Stationen” und vielleicht Hunderttausenden von Datenpunkten würde ich denken, dass der Aggregationsansatz schneller sein wird.

Wenn Sie jedoch mehr und mehr Daten hinzufügen, besteht das Problem, dass die Aggregationsabfrage weiterhin alle Dokumente berührt. Dies wird immer teurer, wenn Sie auf Millionen oder mehr Dokumente skalieren. Ein Ansatz für diesen Fall wäre das Hinzufügen eines $ -Limits direkt nach dem $ sort, um die Gesamtzahl der betrachteten Dokumente zu begrenzen. Das ist ein bisschen hacky und ungenau, aber es würde helfen, die Gesamtzahl der Dokumente zu beschränken, auf die zugegriffen werden muss.