Wie spalte ich eine RDD in zwei oder mehr RDDs?

Ich suche nach einer Möglichkeit, eine RDD in zwei oder mehr RDDs aufzuteilen. Die nächste, die ich gesehen habe, ist Scala Spark: Split-Sammlung in mehrere RDD? Das ist immer noch eine einzige RDD.

Wenn Sie mit SAS vertraut sind:

data work.split1, work.split2; set work.preSplit; if (condition1) output work.split1 else if (condition2) output work.split2 run; 

was zu zwei unterschiedlichen Datensätzen führte. Es müsste sofort fortgesetzt werden, um die Ergebnisse zu erhalten, die ich beabsichtige …

Es ist nicht möglich, mehrere RDDs aus einer einzigen Transformation * zu erhalten. Wenn Sie eine RDD teilen möchten, müssen Sie einen filter für jede Split-Bedingung anwenden. Beispielsweise:

 def even(x): return x % 2 == 0 def odd(x): return not even(x) rdd = sc.parallelize(range(20)) rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even)) 

Wenn Sie nur eine binäre Bedingung haben und die Berechnung teuer ist, können Sie etwas ähnliches bevorzugen:

 kv_rdd = rdd.map(lambda x: (x, odd(x))) kv_rdd.cache() rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys() rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys() 

Es bedeutet nur eine einzige Prädikatberechnung, erfordert aber einen zusätzlichen Durchlauf über alle Daten.

Es ist wichtig anzumerken, dass, solange eine Eingabe-RDD ordnungsgemäß zwischengespeichert wird und keine zusätzlichen Annahmen bezüglich der Datenverteilung bestehen, es keinen signifikanten Unterschied gibt, wenn es um die Zeitkomplexität zwischen wiederholtem Filter und for-Schleife mit verschachteltem if-else geht.

Mit N Elementen und M Bedingungen Anzahl der Operationen, die Sie durchführen müssen, ist eindeutig proportional zu N mal M. Im Falle von for-Schleife sollte es näher sein (N + MN) / 2 und wiederholte Filter ist genau NM aber am Ende von der Tag ist nichts anderes als O (NM). Sie können meine Diskussion ** mit Jason Lenderman sehen , um etwas über Pros-and-Cons zu lesen.

Auf sehr hohem Niveau sollten Sie zwei Dinge beachten:

  1. Spark-Transformationen sind faul, bis Sie eine Aktion ausführen, bei der Ihre RDD nicht materialisiert ist

    Warum spielt es eine Rolle? Zurück zu meinem Beispiel:

     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even)) 

    Wenn ich später entscheide, dass ich nur rdd_odd brauche, gibt es keinen Grund, rdd_even zu materialisieren.

    Wenn Sie sich Ihr SAS-Beispiel work.split2 , um work.split2 zu berechnen, work.split2 Sie sowohl die Eingabedaten als auch work.split1 .

  2. RDDs stellen eine deklarative API bereit. Wenn Sie einen filter oder eine map , ist es völlig abhängig von der Engine, wie diese Operation ausgeführt wird. Solange die functionen, die an Transformationen übergeben werden, frei von Nebenwirkungen sind, werden mehrere Möglichkeiten zur Optimierung einer gesamten Pipeline geschaffen.

Am Ende des Tages ist dieser Fall nicht speziell genug, um seine eigene Transformation zu rechtfertigen.

Diese Karte mit Filtermuster wird tatsächlich in einem core-Spark verwendet. Siehe meine Antwort zu Wie teilt Sparks RDD.randomSplit tatsächlich die RDD und einen relevanten Teil der randomSplit Methode auf.

Wenn das einzige Ziel darin besteht, eine Aufteilung der Eingabe zu erreichen, ist es möglich, die Klausel partitionBy für DataFrameWriter deren Textausgabeformat:

 def makePairs(row: T): (String, String) = ??? data .map(makePairs).toDF("key", "value") .write.partitionBy($"key").format("text").save(...) 

* Es gibt nur 3 grundlegende Arten von Transformationen in Spark:

  • RDD [T] => RDD [T]
  • RDD [T] => RDD [U]
  • (RDD [T], RDD [U]) => RDD [W]

wobei T, U, W entweder atomare Typen oder Produkte / Tupel (K, V) sein können. Jede andere Operation muss unter Verwendung einer Kombination der obigen ausgedrückt werden. Sie können das Original RDD-Papier für weitere Details überprüfen.

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** Siehe auch Scala Spark: Split-Sammlung in mehrere RDD?

Wie andere oben erwähnte Poster gibt es keine einzelne, native RDD-Transformation, die RDDs spaltet, aber hier sind einige “Multiplex” -Operationen, die effizient eine Vielzahl von “Splitting” auf RDDs emulieren können, ohne mehrfach zu lesen:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

Einige Methoden zur zufälligen Aufteilung:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

Methoden stehen im Open Source-Silex-Projekt zur Verfügung:

https://github.com/willb/silex

Ein Blogbeitrag erklärt, wie sie funktionieren:

http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

 def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U], persist: StorageLevel): Seq[RDD[U]] = { val mux = self.mapPartitionsWithIndex { case (id, itr) => Iterator.single(f(id, itr)) }.persist(persist) Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } } } def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]], persist: StorageLevel): Seq[RDD[U]] = { val mux = self.mapPartitionsWithIndex { case (id, itr) => Iterator.single(f(id, itr)) }.persist(persist) Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } } } 

Wie an anderer Stelle erwähnt, beinhalten diese Verfahren einen Kompromiss zwischen Speicher und Geschwindigkeit, da sie die gesamten Partitionsergebnisse “eifrig” statt “träge” berechnen. Daher ist es möglich, dass diese Methoden bei großen Partitionen zu Speicherproblemen führen, wo dies bei traditionellen faulen Transformationen nicht möglich ist.

Wenn Sie eine RDD mithilfe des RandomSplit-API-Aufrufs aufteilen , erhalten Sie ein Array von RDDs zurück.

Wenn Sie 5 RDDs zurückgeben möchten, geben Sie 5 Gewichtswerte ein.

z.B

 val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4) val seedValue = 5 val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue) splitRDD(1).collect() res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100) 

Eine Möglichkeit besteht darin, einen benutzerdefinierten Partitionierer zu verwenden, um die Daten abhängig von Ihren Filterbedingungen zu partitionieren. Dies kann erreicht werden, indem Partitioner und etwas ähnlich dem RangePartitioner .

Eine Kartenpartition kann dann verwendet werden, um mehrere RDDs von der partitionierten RDD zu erstellen, ohne alle Daten zu lesen.

 val filtered = partitioned.mapPartitions { iter => { new Iterator[Int](){ override def hasNext: Boolean = { if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) { false } else { iter.hasNext } } override def next():Int = iter.next() } 

Beachten Sie jedoch, dass die Anzahl der Partitionen in den gefilterten RDDs mit der Anzahl in der partitionierten RDD übereinstimmt. Daher sollte eine Koaleszenz verwendet werden, um dies zu reduzieren und die leeren Partitionen zu entfernen.