I have a pair of RDD with millions of key-value pairs, where each value is a list that can contain an element Or billions of elements it leads to a poor performance because large groups will block the nodes of the cluster for hours, while the groups that take a few seconds can not be processed in parallel because the entire cluster is already busy.
Is there anyway to repair it anyway?
Edit:
The problem I have is a flatmap
where the entire list given key is analyzed. is. The key is not touched, and the rest of the list in the operation list, which takes a large amount of time, but unfortunately it will have to be done. This means that there should be complete list in the same node at the same time. In the resultant RDD, there will be a sublist on the basis of the value calculated in the flatmap
.
I can not use broadcast variables in this case scenario, because any common data will not be used between different key-value pairs according to the O'Reilly Longing Spark Book, according to a partitioner, this Type of operation will not benefit from a participant because no shuffle is included (although I'm not sure this is true). Can a Partitioner help in this situation?
Edit SECOND:
This is an example of my code:
Apply to the public category MyFunction FlatMapFunction & Lt; Tuple2 & lt; String, iterable & lt; Bean & gt; & Gt ;, comparison & gt; {Public Eaterless & lt; Processed Bean & gt; Call (Tueplay 2 & Lt; K, Itretable & lt; Bean & gt; & gt; Input) Exception {list & lt; Processed Bean & gt; Output = new arreelist & lt; Processed Bean & gt; (); & Lt; Bean & gt; Listprocrosis = collectionsultual.malelist (input._2 ()); // In some cases size == 2, in the second size & gt; For 100.000 (Int i = 0; ITTE; listProcessresses () - 1; I ++) {For (Int J = I + 1; J & LT; ListProcess (); J ++) {Processben Processed = Process Data ( Get list recipes (i), listproprotection.Tet (j)); If (processed! = Null) {output.add (processed); }}} Return output; }
What's the loop for n (n-1) / 2
bar, but it can not be avoided.
If 'process data' is expensive, then it is possible that you can parallel that step and some Can avail the benefits.
In the pseudo-code, something like this will happen:
DEF process data (Bean 1: bean, bean 2: bean): option [modified data] = {... } Val RDD: RDD [(Key, List [Bean]]] = ... Val added: RDD [(Bean, Bean)] = RD.FlatMap ((key, beans) => {wall output = ineligible. List [Processed Bean] () Val LAN = Beans Length (var i = 0; i & lt; len -1; i ++) {for (var j = i + 1; j & lt; len; j ++ ) {Output.add ((beans (i), beam (j)))}}} output}). Repetition (some numbers) Well results: RDD [processed bean] = added .map (beans => Process data (beans ._1, beans ._2)) .filter (_.modified) .map (_get) < / Code>
The flatmap phase will still be surrounded by your largest list, and you should apply a shuffle when you repeat, but moving the process forward, the data phase out of that N ^ 2 phase You can get some similarities.
No comments:
Post a Comment