How to pivot DataFrame?Does Spark supports melt and dcastHow to transpose/pivot the rows data to column in Spark Scala?Convert row values into columns with its value from another column in spark scalaHow to transpose data in Hive, Impala or Spark?Create Spark Dataframe from existing Dataframe such that new Dataframe's columns based on existing Dataframe rowspyspark dataframe using group to get multiple fields countSpark - Transpose DataFrame ColumnsHow to “dense” a data frame in SparkConverting Values to Columns in Spark Dataset (Convert Key & Value pair of columns to regular columns)PySpark - How to transpose a DataframeHow to sort a dataframe by multiple column(s)?Is the Scala 2.8 collections library a case of “the longest suicide note in history”?How to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasReshaping/Pivoting data in Spark RDD and/or Spark DataFramesHow to define partitioning of DataFrame?How to apply a function to a column of a Spark DataFrame?How to pivot a dataframeHow to pivot on arbitrary column?Trouble porting Scala spark code to PySpark

Lowest total scrabble score

Is there a way to get `mathscr' with lower case letters in pdfLaTeX?

Has any country ever had 2 former presidents in jail simultaneously?

What should you do if you miss a job interview (deliberately)?

Does the Linux kernel need a file system to run?

Multiplicative persistence

Unexpected behavior of the procedure `Area` on the object 'Polygon'

Invalid date error by date command

Is there an injective, monotonically increasing, strictly concave function from the reals, to the reals?

Mimic lecturing on blackboard, facing audience

Review your own paper in Mathematics

PTIJ: Haman's bad computer

Add big quotation marks inside my colorbox

Why is the "ls" command showing permissions of files in a FAT32 partition?

Terse Method to Swap Lowest for Highest?

Does an advisor owe his/her student anything? Will an advisor keep a PhD student only out of pity?

How do you respond to a colleague from another team when they're wrongly expecting that you'll help them?

Did arcade monitors have same pixel aspect ratio as TV sets?

Why can Carol Danvers change her suit colours in the first place?

Creepy dinosaur pc game identification

Electoral considerations aside, what are potential benefits, for the US, of policy changes proposed by the tweet recognizing Golan annexation?

Does Doodling or Improvising on the Piano Have Any Benefits?

Calculating total slots

How should I respond when I lied about my education and the company finds out through background check?



How to pivot DataFrame?


Does Spark supports melt and dcastHow to transpose/pivot the rows data to column in Spark Scala?Convert row values into columns with its value from another column in spark scalaHow to transpose data in Hive, Impala or Spark?Create Spark Dataframe from existing Dataframe such that new Dataframe's columns based on existing Dataframe rowspyspark dataframe using group to get multiple fields countSpark - Transpose DataFrame ColumnsHow to “dense” a data frame in SparkConverting Values to Columns in Spark Dataset (Convert Key & Value pair of columns to regular columns)PySpark - How to transpose a DataframeHow to sort a dataframe by multiple column(s)?Is the Scala 2.8 collections library a case of “the longest suicide note in history”?How to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasReshaping/Pivoting data in Spark RDD and/or Spark DataFramesHow to define partitioning of DataFrame?How to apply a function to a column of a Spark DataFrame?How to pivot a dataframeHow to pivot on arbitrary column?Trouble porting Scala spark code to PySpark













39















I am starting to use Spark DataFrames and I need to be able to pivot the data to create multiple columns out of 1 column with multiple rows. There is built in functionality for that in Scalding and I believe in Pandas in Python, but I can't find anything for the new Spark Dataframe.



I assume I can write custom function of some sort that will do this but I'm not even sure how to start, especially since I am a novice with Spark. I anyone knows how to do this with built in functionality or suggestions for how to write something in Scala, it is greatly appreciated.










share|improve this question
























  • See this similar question where I posted a native Spark approach that doesn't need to know the column/category names ahead of time.

    – patricksurry
    Jun 23 '15 at 15:56















39















I am starting to use Spark DataFrames and I need to be able to pivot the data to create multiple columns out of 1 column with multiple rows. There is built in functionality for that in Scalding and I believe in Pandas in Python, but I can't find anything for the new Spark Dataframe.



I assume I can write custom function of some sort that will do this but I'm not even sure how to start, especially since I am a novice with Spark. I anyone knows how to do this with built in functionality or suggestions for how to write something in Scala, it is greatly appreciated.










share|improve this question
























  • See this similar question where I posted a native Spark approach that doesn't need to know the column/category names ahead of time.

    – patricksurry
    Jun 23 '15 at 15:56













39












39








39


24






I am starting to use Spark DataFrames and I need to be able to pivot the data to create multiple columns out of 1 column with multiple rows. There is built in functionality for that in Scalding and I believe in Pandas in Python, but I can't find anything for the new Spark Dataframe.



I assume I can write custom function of some sort that will do this but I'm not even sure how to start, especially since I am a novice with Spark. I anyone knows how to do this with built in functionality or suggestions for how to write something in Scala, it is greatly appreciated.










share|improve this question
















I am starting to use Spark DataFrames and I need to be able to pivot the data to create multiple columns out of 1 column with multiple rows. There is built in functionality for that in Scalding and I believe in Pandas in Python, but I can't find anything for the new Spark Dataframe.



I assume I can write custom function of some sort that will do this but I'm not even sure how to start, especially since I am a novice with Spark. I anyone knows how to do this with built in functionality or suggestions for how to write something in Scala, it is greatly appreciated.







scala apache-spark dataframe apache-spark-sql pivot






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 7 at 15:55









user6910411

35.4k1089110




35.4k1089110










asked May 14 '15 at 18:42









J CalbreathJ Calbreath

87731423




87731423












  • See this similar question where I posted a native Spark approach that doesn't need to know the column/category names ahead of time.

    – patricksurry
    Jun 23 '15 at 15:56

















  • See this similar question where I posted a native Spark approach that doesn't need to know the column/category names ahead of time.

    – patricksurry
    Jun 23 '15 at 15:56
















See this similar question where I posted a native Spark approach that doesn't need to know the column/category names ahead of time.

– patricksurry
Jun 23 '15 at 15:56





See this similar question where I posted a native Spark approach that doesn't need to know the column/category names ahead of time.

– patricksurry
Jun 23 '15 at 15:56












6 Answers
6






active

oldest

votes


















53





+50









As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows:



df
.groupBy(grouping_columns)
.pivot(pivot_column, [values])
.agg(aggregate_expressions)


Usage examples using nycflights13 and csv format:



Python:



from pyspark.sql.functions import avg

flights = (sqlContext
.read
.format("csv")
.options(inferSchema="true", header="true")
.load("flights.csv")
.na.drop())

flights.registerTempTable("flights")
sqlContext.cacheTable("flights")

gexprs = ("origin", "dest", "carrier")
aggexpr = avg("arr_delay")

flights.count()
## 336776

%timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
## 10 loops, best of 3: 1.03 s per loop


Scala:



val flights = sqlContext
.read
.format("csv")
.options(Map("inferSchema" -> "true", "header" -> "true"))
.load("flights.csv")

flights
.groupBy($"origin", $"dest", $"carrier")
.pivot("hour")
.agg(avg($"arr_delay"))


Java:



import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;

Dataset<Row> df = spark.read().format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("flights.csv");

df.groupBy(col("origin"), col("dest"), col("carrier"))
.pivot("hour")
.agg(avg(col("arr_delay")));


R / SparkR:



library(magrittr)

flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

flights %>%
groupBy("origin", "dest", "carrier") %>%
pivot("hour") %>%
agg(avg(column("arr_delay")))


R / sparklyr



library(dplyr)

flights <- spark_read_csv(sc, "flights", "flights.csv")

avg.arr.delay <- function(gdf)
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"avg",
"arr_delay"
)
gdf %>% invoke("agg", expr, list())


flights %>%
sdf_pivot(origin + dest + carrier ~ hour, fun.aggregate=avg.arr.delay)


SQL:



CREATE TEMPORARY VIEW flights 
USING csv
OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

SELECT * FROM (
SELECT origin, dest, carrier, arr_delay, hour FROM flights
) PIVOT (
avg(arr_delay)
FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
);


Example data:



"year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00


Performance considerations:



Generally speaking pivoting is an expensive operation.




  • if you can try to provide values list:



    vs = list(range(25))
    %timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
    ## 10 loops, best of 3: 392 ms per loop


  • in some cases it proved to be beneficial (likely no longer worth the effort in 2.0 or later) to repartition and / or pre-aggregate the data


  • for reshaping only, you can use first: How to use pivot and calculate average on a non-numeric column (facing AnalysisException "is not a numeric column")?


Related questions:



  • How to melt Spark DataFrame?

  • Unpivot in spark-sql/pyspark

  • Transpose column to row with Spark





share|improve this answer
































    13














    I overcame this by writing a for loop to dynamically create a SQL query. Say I have:



    id tag value
    1 US 50
    1 UK 100
    1 Can 125
    2 US 75
    2 UK 150
    2 Can 175


    and I want:



    id US UK Can
    1 50 100 125
    2 75 150 175


    I can create a list with the value I want to pivot and then create a string containing the SQL query I need.



    val countries = List("US", "UK", "Can")
    val numCountries = countries.length - 1

    var query = "select *, "
    for (i <- 0 to numCountries-1)
    query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "

    query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"

    myDataFrame.registerTempTable("myTable")
    val myDF1 = sqlContext.sql(query)


    I can create similar query to then do the aggregation. Not a very elegant solution but it works and is flexible for any list of values, which can also be passed in as an argument when your code is called.






    share|improve this answer

























    • I am trying to reproduce your example, but I get an "org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns id, tag, value"

      – user299791
      Feb 29 '16 at 8:59











    • That has to do with the quotes. If you look at the resulting text string what you would get is 'case when tag = US', so Spark thinks thats a column name rather than a text value. What you really want to see is 'case when tag = "US" '. I have edited the above answer to have the correct set up for quotes.

      – J Calbreath
      Feb 29 '16 at 14:03






    • 2





      But as also mentioned, this is fuctionality is now native to Spark using the pivot command.

      – J Calbreath
      Feb 29 '16 at 14:03


















    9














    A pivot operator has been added to the Spark dataframe API, and is part of Spark 1.6.



    See https://github.com/apache/spark/pull/7841 for details.






    share|improve this answer






























      5














      I have solved a similar problem using dataframes with the following steps:



      Create columns for all your countries, with 'value' as the value:



      import org.apache.spark.sql.functions._
      val countries = List("US", "UK", "Can")
      val countryValue = udf(countryToCheck: String, countryInRow: String, value: Long) =>
      if(countryToCheck == countryInRow) value else 0

      val countryFuncs = countries.mapcountry => (dataFrame: DataFrame) => dataFrame.withColumn(country, countryValue(lit(country), df("tag"), df("value")))
      val dfWithCountries = Function.chain(countryFuncs)(df).drop("tag").drop("value")


      Your dataframe 'dfWithCountries' will look like this:



      +--+--+---+---+
      |id|US| UK|Can|
      +--+--+---+---+
      | 1|50| 0| 0|
      | 1| 0|100| 0|
      | 1| 0| 0|125|
      | 2|75| 0| 0|
      | 2| 0|150| 0|
      | 2| 0| 0|175|
      +--+--+---+---+


      Now you can sum together all the values for your desired result:



      dfWithCountries.groupBy("id").sum(countries: _*).show


      Result:



      +--+-------+-------+--------+
      |id|SUM(US)|SUM(UK)|SUM(Can)|
      +--+-------+-------+--------+
      | 1| 50| 100| 125|
      | 2| 75| 150| 175|
      +--+-------+-------+--------+


      It's not a very elegant solution though. I had to create a chain of functions to add in all the columns. Also if I have lots of countries, I will expand my temporary data set to a very wide set with lots of zeroes.






      share|improve this answer






























        0














        Initially i adopted Al M's solution. Later took the same thought and rewrote this function as a transpose function.



        This method transposes any df rows to columns of any data-format with using key and value column



        for input csv



        id,tag,value
        1,US,50a
        1,UK,100
        1,Can,125
        2,US,75
        2,UK,150
        2,Can,175


        ouput



        +--+---+---+---+
        |id| UK| US|Can|
        +--+---+---+---+
        | 2|150| 75|175|
        | 1|100|50a|125|
        +--+---+---+---+


        transpose method :



        def transpose(hc : HiveContext , df: DataFrame,compositeId: List[String], key: String, value: String) = 

        val distinctCols = df.select(key).distinct.map r => r(0) .collect().toList

        val rdd = df.map row =>
        (compositeId.collect case id => row.getAs(id).asInstanceOf[Any] ,
        scala.collection.mutable.Map(row.getAs(key).asInstanceOf[Any] -> row.getAs(value).asInstanceOf[Any]))

        val pairRdd = rdd.reduceByKey(_ ++ _)
        val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
        hc.createDataFrame(rowRdd, getSchema(df.schema, compositeId, (key, distinctCols)))



        private def dynamicRow(r: (List[Any], scala.collection.mutable.Map[Any, Any]), colNames: List[Any]) =
        val cols = colNames.collect case col => r._2.getOrElse(col.toString(), null)
        val array = r._1 ++ cols
        Row(array: _*)


        private def getSchema(srcSchema: StructType, idCols: List[String], distinctCols: (String, List[Any])): StructType =
        val idSchema = idCols.map idCol => srcSchema.apply(idCol)
        val colSchema = srcSchema.apply(distinctCols._1)
        val colsSchema = distinctCols._2.map col => StructField(col.asInstanceOf[String], colSchema.dataType, colSchema.nullable)
        StructType(idSchema ++ colsSchema)



        main snippet



        import java.util.Date
        import org.apache.spark.SparkConf
        import org.apache.spark.SparkContext
        import org.apache.spark.sql.Row
        import org.apache.spark.sql.DataFrame
        import org.apache.spark.sql.types.StructType
        import org.apache.spark.sql.hive.HiveContext
        import org.apache.spark.sql.types.StructField


        ...
        ...
        def main(args: Array[String]): Unit =

        val sc = new SparkContext(conf)
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        val dfdata1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")
        .load("data.csv")
        dfdata1.show()
        val dfOutput = transpose(new HiveContext(sc), dfdata1, List("id"), "tag", "value")
        dfOutput.show







        share|improve this answer




















        • 2





          This methods transposes rows to columns...

          – Jaigates
          Sep 6 '16 at 18:38


















        -1














        There is simple and elegant solution.



        scala> spark.sql("select * from k_tags limit 10").show()
        +---------------+-------------+------+
        | imsi| name| value|
        +---------------+-------------+------+
        |246021000000000| age| 37|
        |246021000000000| gender|Female|
        |246021000000000| arpu| 22|
        |246021000000000| DeviceType| Phone|
        |246021000000000|DataAllowance| 6GB|
        +---------------+-------------+------+

        scala> spark.sql("select * from k_tags limit 10").groupBy($"imsi").pivot("name").agg(min($"value")).show()
        +---------------+-------------+----------+---+----+------+
        | imsi|DataAllowance|DeviceType|age|arpu|gender|
        +---------------+-------------+----------+---+----+------+
        |246021000000000| 6GB| Phone| 37| 22|Female|
        |246021000000001| 1GB| Phone| 72| 10| Male|
        +---------------+-------------+----------+---+----+------+





        share|improve this answer























          protected by user8371915 Jul 15 '18 at 19:20



          Thank you for your interest in this question.
          Because it has attracted low-quality or spam answers that had to be removed, posting an answer now requires 10 reputation on this site (the association bonus does not count).



          Would you like to answer one of these unanswered questions instead?














          6 Answers
          6






          active

          oldest

          votes








          6 Answers
          6






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          53





          +50









          As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows:



          df
          .groupBy(grouping_columns)
          .pivot(pivot_column, [values])
          .agg(aggregate_expressions)


          Usage examples using nycflights13 and csv format:



          Python:



          from pyspark.sql.functions import avg

          flights = (sqlContext
          .read
          .format("csv")
          .options(inferSchema="true", header="true")
          .load("flights.csv")
          .na.drop())

          flights.registerTempTable("flights")
          sqlContext.cacheTable("flights")

          gexprs = ("origin", "dest", "carrier")
          aggexpr = avg("arr_delay")

          flights.count()
          ## 336776

          %timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
          ## 10 loops, best of 3: 1.03 s per loop


          Scala:



          val flights = sqlContext
          .read
          .format("csv")
          .options(Map("inferSchema" -> "true", "header" -> "true"))
          .load("flights.csv")

          flights
          .groupBy($"origin", $"dest", $"carrier")
          .pivot("hour")
          .agg(avg($"arr_delay"))


          Java:



          import static org.apache.spark.sql.functions.*;
          import org.apache.spark.sql.*;

          Dataset<Row> df = spark.read().format("csv")
          .option("inferSchema", "true")
          .option("header", "true")
          .load("flights.csv");

          df.groupBy(col("origin"), col("dest"), col("carrier"))
          .pivot("hour")
          .agg(avg(col("arr_delay")));


          R / SparkR:



          library(magrittr)

          flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

          flights %>%
          groupBy("origin", "dest", "carrier") %>%
          pivot("hour") %>%
          agg(avg(column("arr_delay")))


          R / sparklyr



          library(dplyr)

          flights <- spark_read_csv(sc, "flights", "flights.csv")

          avg.arr.delay <- function(gdf)
          expr <- invoke_static(
          sc,
          "org.apache.spark.sql.functions",
          "avg",
          "arr_delay"
          )
          gdf %>% invoke("agg", expr, list())


          flights %>%
          sdf_pivot(origin + dest + carrier ~ hour, fun.aggregate=avg.arr.delay)


          SQL:



          CREATE TEMPORARY VIEW flights 
          USING csv
          OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

          SELECT * FROM (
          SELECT origin, dest, carrier, arr_delay, hour FROM flights
          ) PIVOT (
          avg(arr_delay)
          FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
          13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
          );


          Example data:



          "year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
          2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
          2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
          2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
          2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
          2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
          2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
          2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
          2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
          2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
          2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00


          Performance considerations:



          Generally speaking pivoting is an expensive operation.




          • if you can try to provide values list:



            vs = list(range(25))
            %timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
            ## 10 loops, best of 3: 392 ms per loop


          • in some cases it proved to be beneficial (likely no longer worth the effort in 2.0 or later) to repartition and / or pre-aggregate the data


          • for reshaping only, you can use first: How to use pivot and calculate average on a non-numeric column (facing AnalysisException "is not a numeric column")?


          Related questions:



          • How to melt Spark DataFrame?

          • Unpivot in spark-sql/pyspark

          • Transpose column to row with Spark





          share|improve this answer





























            53





            +50









            As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows:



            df
            .groupBy(grouping_columns)
            .pivot(pivot_column, [values])
            .agg(aggregate_expressions)


            Usage examples using nycflights13 and csv format:



            Python:



            from pyspark.sql.functions import avg

            flights = (sqlContext
            .read
            .format("csv")
            .options(inferSchema="true", header="true")
            .load("flights.csv")
            .na.drop())

            flights.registerTempTable("flights")
            sqlContext.cacheTable("flights")

            gexprs = ("origin", "dest", "carrier")
            aggexpr = avg("arr_delay")

            flights.count()
            ## 336776

            %timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
            ## 10 loops, best of 3: 1.03 s per loop


            Scala:



            val flights = sqlContext
            .read
            .format("csv")
            .options(Map("inferSchema" -> "true", "header" -> "true"))
            .load("flights.csv")

            flights
            .groupBy($"origin", $"dest", $"carrier")
            .pivot("hour")
            .agg(avg($"arr_delay"))


            Java:



            import static org.apache.spark.sql.functions.*;
            import org.apache.spark.sql.*;

            Dataset<Row> df = spark.read().format("csv")
            .option("inferSchema", "true")
            .option("header", "true")
            .load("flights.csv");

            df.groupBy(col("origin"), col("dest"), col("carrier"))
            .pivot("hour")
            .agg(avg(col("arr_delay")));


            R / SparkR:



            library(magrittr)

            flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

            flights %>%
            groupBy("origin", "dest", "carrier") %>%
            pivot("hour") %>%
            agg(avg(column("arr_delay")))


            R / sparklyr



            library(dplyr)

            flights <- spark_read_csv(sc, "flights", "flights.csv")

            avg.arr.delay <- function(gdf)
            expr <- invoke_static(
            sc,
            "org.apache.spark.sql.functions",
            "avg",
            "arr_delay"
            )
            gdf %>% invoke("agg", expr, list())


            flights %>%
            sdf_pivot(origin + dest + carrier ~ hour, fun.aggregate=avg.arr.delay)


            SQL:



            CREATE TEMPORARY VIEW flights 
            USING csv
            OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

            SELECT * FROM (
            SELECT origin, dest, carrier, arr_delay, hour FROM flights
            ) PIVOT (
            avg(arr_delay)
            FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
            13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
            );


            Example data:



            "year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
            2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
            2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
            2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
            2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
            2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
            2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
            2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
            2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
            2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
            2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00


            Performance considerations:



            Generally speaking pivoting is an expensive operation.




            • if you can try to provide values list:



              vs = list(range(25))
              %timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
              ## 10 loops, best of 3: 392 ms per loop


            • in some cases it proved to be beneficial (likely no longer worth the effort in 2.0 or later) to repartition and / or pre-aggregate the data


            • for reshaping only, you can use first: How to use pivot and calculate average on a non-numeric column (facing AnalysisException "is not a numeric column")?


            Related questions:



            • How to melt Spark DataFrame?

            • Unpivot in spark-sql/pyspark

            • Transpose column to row with Spark





            share|improve this answer



























              53





              +50







              53





              +50



              53




              +50





              As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows:



              df
              .groupBy(grouping_columns)
              .pivot(pivot_column, [values])
              .agg(aggregate_expressions)


              Usage examples using nycflights13 and csv format:



              Python:



              from pyspark.sql.functions import avg

              flights = (sqlContext
              .read
              .format("csv")
              .options(inferSchema="true", header="true")
              .load("flights.csv")
              .na.drop())

              flights.registerTempTable("flights")
              sqlContext.cacheTable("flights")

              gexprs = ("origin", "dest", "carrier")
              aggexpr = avg("arr_delay")

              flights.count()
              ## 336776

              %timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
              ## 10 loops, best of 3: 1.03 s per loop


              Scala:



              val flights = sqlContext
              .read
              .format("csv")
              .options(Map("inferSchema" -> "true", "header" -> "true"))
              .load("flights.csv")

              flights
              .groupBy($"origin", $"dest", $"carrier")
              .pivot("hour")
              .agg(avg($"arr_delay"))


              Java:



              import static org.apache.spark.sql.functions.*;
              import org.apache.spark.sql.*;

              Dataset<Row> df = spark.read().format("csv")
              .option("inferSchema", "true")
              .option("header", "true")
              .load("flights.csv");

              df.groupBy(col("origin"), col("dest"), col("carrier"))
              .pivot("hour")
              .agg(avg(col("arr_delay")));


              R / SparkR:



              library(magrittr)

              flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

              flights %>%
              groupBy("origin", "dest", "carrier") %>%
              pivot("hour") %>%
              agg(avg(column("arr_delay")))


              R / sparklyr



              library(dplyr)

              flights <- spark_read_csv(sc, "flights", "flights.csv")

              avg.arr.delay <- function(gdf)
              expr <- invoke_static(
              sc,
              "org.apache.spark.sql.functions",
              "avg",
              "arr_delay"
              )
              gdf %>% invoke("agg", expr, list())


              flights %>%
              sdf_pivot(origin + dest + carrier ~ hour, fun.aggregate=avg.arr.delay)


              SQL:



              CREATE TEMPORARY VIEW flights 
              USING csv
              OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

              SELECT * FROM (
              SELECT origin, dest, carrier, arr_delay, hour FROM flights
              ) PIVOT (
              avg(arr_delay)
              FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
              13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
              );


              Example data:



              "year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
              2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
              2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
              2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
              2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
              2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
              2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
              2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
              2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
              2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
              2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00


              Performance considerations:



              Generally speaking pivoting is an expensive operation.




              • if you can try to provide values list:



                vs = list(range(25))
                %timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
                ## 10 loops, best of 3: 392 ms per loop


              • in some cases it proved to be beneficial (likely no longer worth the effort in 2.0 or later) to repartition and / or pre-aggregate the data


              • for reshaping only, you can use first: How to use pivot and calculate average on a non-numeric column (facing AnalysisException "is not a numeric column")?


              Related questions:



              • How to melt Spark DataFrame?

              • Unpivot in spark-sql/pyspark

              • Transpose column to row with Spark





              share|improve this answer















              As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows:



              df
              .groupBy(grouping_columns)
              .pivot(pivot_column, [values])
              .agg(aggregate_expressions)


              Usage examples using nycflights13 and csv format:



              Python:



              from pyspark.sql.functions import avg

              flights = (sqlContext
              .read
              .format("csv")
              .options(inferSchema="true", header="true")
              .load("flights.csv")
              .na.drop())

              flights.registerTempTable("flights")
              sqlContext.cacheTable("flights")

              gexprs = ("origin", "dest", "carrier")
              aggexpr = avg("arr_delay")

              flights.count()
              ## 336776

              %timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
              ## 10 loops, best of 3: 1.03 s per loop


              Scala:



              val flights = sqlContext
              .read
              .format("csv")
              .options(Map("inferSchema" -> "true", "header" -> "true"))
              .load("flights.csv")

              flights
              .groupBy($"origin", $"dest", $"carrier")
              .pivot("hour")
              .agg(avg($"arr_delay"))


              Java:



              import static org.apache.spark.sql.functions.*;
              import org.apache.spark.sql.*;

              Dataset<Row> df = spark.read().format("csv")
              .option("inferSchema", "true")
              .option("header", "true")
              .load("flights.csv");

              df.groupBy(col("origin"), col("dest"), col("carrier"))
              .pivot("hour")
              .agg(avg(col("arr_delay")));


              R / SparkR:



              library(magrittr)

              flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

              flights %>%
              groupBy("origin", "dest", "carrier") %>%
              pivot("hour") %>%
              agg(avg(column("arr_delay")))


              R / sparklyr



              library(dplyr)

              flights <- spark_read_csv(sc, "flights", "flights.csv")

              avg.arr.delay <- function(gdf)
              expr <- invoke_static(
              sc,
              "org.apache.spark.sql.functions",
              "avg",
              "arr_delay"
              )
              gdf %>% invoke("agg", expr, list())


              flights %>%
              sdf_pivot(origin + dest + carrier ~ hour, fun.aggregate=avg.arr.delay)


              SQL:



              CREATE TEMPORARY VIEW flights 
              USING csv
              OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

              SELECT * FROM (
              SELECT origin, dest, carrier, arr_delay, hour FROM flights
              ) PIVOT (
              avg(arr_delay)
              FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
              13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
              );


              Example data:



              "year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
              2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
              2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
              2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
              2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
              2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
              2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
              2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
              2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
              2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
              2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00


              Performance considerations:



              Generally speaking pivoting is an expensive operation.




              • if you can try to provide values list:



                vs = list(range(25))
                %timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
                ## 10 loops, best of 3: 392 ms per loop


              • in some cases it proved to be beneficial (likely no longer worth the effort in 2.0 or later) to repartition and / or pre-aggregate the data


              • for reshaping only, you can use first: How to use pivot and calculate average on a non-numeric column (facing AnalysisException "is not a numeric column")?


              Related questions:



              • How to melt Spark DataFrame?

              • Unpivot in spark-sql/pyspark

              • Transpose column to row with Spark






              share|improve this answer














              share|improve this answer



              share|improve this answer








              edited Nov 22 '18 at 11:11


























              community wiki





              13 revs, 3 users 60%
              zero323
























                  13














                  I overcame this by writing a for loop to dynamically create a SQL query. Say I have:



                  id tag value
                  1 US 50
                  1 UK 100
                  1 Can 125
                  2 US 75
                  2 UK 150
                  2 Can 175


                  and I want:



                  id US UK Can
                  1 50 100 125
                  2 75 150 175


                  I can create a list with the value I want to pivot and then create a string containing the SQL query I need.



                  val countries = List("US", "UK", "Can")
                  val numCountries = countries.length - 1

                  var query = "select *, "
                  for (i <- 0 to numCountries-1)
                  query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "

                  query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"

                  myDataFrame.registerTempTable("myTable")
                  val myDF1 = sqlContext.sql(query)


                  I can create similar query to then do the aggregation. Not a very elegant solution but it works and is flexible for any list of values, which can also be passed in as an argument when your code is called.






                  share|improve this answer

























                  • I am trying to reproduce your example, but I get an "org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns id, tag, value"

                    – user299791
                    Feb 29 '16 at 8:59











                  • That has to do with the quotes. If you look at the resulting text string what you would get is 'case when tag = US', so Spark thinks thats a column name rather than a text value. What you really want to see is 'case when tag = "US" '. I have edited the above answer to have the correct set up for quotes.

                    – J Calbreath
                    Feb 29 '16 at 14:03






                  • 2





                    But as also mentioned, this is fuctionality is now native to Spark using the pivot command.

                    – J Calbreath
                    Feb 29 '16 at 14:03















                  13














                  I overcame this by writing a for loop to dynamically create a SQL query. Say I have:



                  id tag value
                  1 US 50
                  1 UK 100
                  1 Can 125
                  2 US 75
                  2 UK 150
                  2 Can 175


                  and I want:



                  id US UK Can
                  1 50 100 125
                  2 75 150 175


                  I can create a list with the value I want to pivot and then create a string containing the SQL query I need.



                  val countries = List("US", "UK", "Can")
                  val numCountries = countries.length - 1

                  var query = "select *, "
                  for (i <- 0 to numCountries-1)
                  query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "

                  query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"

                  myDataFrame.registerTempTable("myTable")
                  val myDF1 = sqlContext.sql(query)


                  I can create similar query to then do the aggregation. Not a very elegant solution but it works and is flexible for any list of values, which can also be passed in as an argument when your code is called.






                  share|improve this answer

























                  • I am trying to reproduce your example, but I get an "org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns id, tag, value"

                    – user299791
                    Feb 29 '16 at 8:59











                  • That has to do with the quotes. If you look at the resulting text string what you would get is 'case when tag = US', so Spark thinks thats a column name rather than a text value. What you really want to see is 'case when tag = "US" '. I have edited the above answer to have the correct set up for quotes.

                    – J Calbreath
                    Feb 29 '16 at 14:03






                  • 2





                    But as also mentioned, this is fuctionality is now native to Spark using the pivot command.

                    – J Calbreath
                    Feb 29 '16 at 14:03













                  13












                  13








                  13







                  I overcame this by writing a for loop to dynamically create a SQL query. Say I have:



                  id tag value
                  1 US 50
                  1 UK 100
                  1 Can 125
                  2 US 75
                  2 UK 150
                  2 Can 175


                  and I want:



                  id US UK Can
                  1 50 100 125
                  2 75 150 175


                  I can create a list with the value I want to pivot and then create a string containing the SQL query I need.



                  val countries = List("US", "UK", "Can")
                  val numCountries = countries.length - 1

                  var query = "select *, "
                  for (i <- 0 to numCountries-1)
                  query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "

                  query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"

                  myDataFrame.registerTempTable("myTable")
                  val myDF1 = sqlContext.sql(query)


                  I can create similar query to then do the aggregation. Not a very elegant solution but it works and is flexible for any list of values, which can also be passed in as an argument when your code is called.






                  share|improve this answer















                  I overcame this by writing a for loop to dynamically create a SQL query. Say I have:



                  id tag value
                  1 US 50
                  1 UK 100
                  1 Can 125
                  2 US 75
                  2 UK 150
                  2 Can 175


                  and I want:



                  id US UK Can
                  1 50 100 125
                  2 75 150 175


                  I can create a list with the value I want to pivot and then create a string containing the SQL query I need.



                  val countries = List("US", "UK", "Can")
                  val numCountries = countries.length - 1

                  var query = "select *, "
                  for (i <- 0 to numCountries-1)
                  query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "

                  query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"

                  myDataFrame.registerTempTable("myTable")
                  val myDF1 = sqlContext.sql(query)


                  I can create similar query to then do the aggregation. Not a very elegant solution but it works and is flexible for any list of values, which can also be passed in as an argument when your code is called.







                  share|improve this answer














                  share|improve this answer



                  share|improve this answer








                  edited Sep 3 '17 at 20:31









                  zero323

                  172k42503587




                  172k42503587










                  answered May 22 '15 at 13:21









                  J CalbreathJ Calbreath

                  87731423




                  87731423












                  • I am trying to reproduce your example, but I get an "org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns id, tag, value"

                    – user299791
                    Feb 29 '16 at 8:59











                  • That has to do with the quotes. If you look at the resulting text string what you would get is 'case when tag = US', so Spark thinks thats a column name rather than a text value. What you really want to see is 'case when tag = "US" '. I have edited the above answer to have the correct set up for quotes.

                    – J Calbreath
                    Feb 29 '16 at 14:03






                  • 2





                    But as also mentioned, this is fuctionality is now native to Spark using the pivot command.

                    – J Calbreath
                    Feb 29 '16 at 14:03

















                  • I am trying to reproduce your example, but I get an "org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns id, tag, value"

                    – user299791
                    Feb 29 '16 at 8:59











                  • That has to do with the quotes. If you look at the resulting text string what you would get is 'case when tag = US', so Spark thinks thats a column name rather than a text value. What you really want to see is 'case when tag = "US" '. I have edited the above answer to have the correct set up for quotes.

                    – J Calbreath
                    Feb 29 '16 at 14:03






                  • 2





                    But as also mentioned, this is fuctionality is now native to Spark using the pivot command.

                    – J Calbreath
                    Feb 29 '16 at 14:03
















                  I am trying to reproduce your example, but I get an "org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns id, tag, value"

                  – user299791
                  Feb 29 '16 at 8:59





                  I am trying to reproduce your example, but I get an "org.apache.spark.sql.AnalysisException: cannot resolve 'US' given input columns id, tag, value"

                  – user299791
                  Feb 29 '16 at 8:59













                  That has to do with the quotes. If you look at the resulting text string what you would get is 'case when tag = US', so Spark thinks thats a column name rather than a text value. What you really want to see is 'case when tag = "US" '. I have edited the above answer to have the correct set up for quotes.

                  – J Calbreath
                  Feb 29 '16 at 14:03





                  That has to do with the quotes. If you look at the resulting text string what you would get is 'case when tag = US', so Spark thinks thats a column name rather than a text value. What you really want to see is 'case when tag = "US" '. I have edited the above answer to have the correct set up for quotes.

                  – J Calbreath
                  Feb 29 '16 at 14:03




                  2




                  2





                  But as also mentioned, this is fuctionality is now native to Spark using the pivot command.

                  – J Calbreath
                  Feb 29 '16 at 14:03





                  But as also mentioned, this is fuctionality is now native to Spark using the pivot command.

                  – J Calbreath
                  Feb 29 '16 at 14:03











                  9














                  A pivot operator has been added to the Spark dataframe API, and is part of Spark 1.6.



                  See https://github.com/apache/spark/pull/7841 for details.






                  share|improve this answer



























                    9














                    A pivot operator has been added to the Spark dataframe API, and is part of Spark 1.6.



                    See https://github.com/apache/spark/pull/7841 for details.






                    share|improve this answer

























                      9












                      9








                      9







                      A pivot operator has been added to the Spark dataframe API, and is part of Spark 1.6.



                      See https://github.com/apache/spark/pull/7841 for details.






                      share|improve this answer













                      A pivot operator has been added to the Spark dataframe API, and is part of Spark 1.6.



                      See https://github.com/apache/spark/pull/7841 for details.







                      share|improve this answer












                      share|improve this answer



                      share|improve this answer










                      answered Nov 19 '15 at 8:47









                      David AndersonDavid Anderson

                      6,51421424




                      6,51421424





















                          5














                          I have solved a similar problem using dataframes with the following steps:



                          Create columns for all your countries, with 'value' as the value:



                          import org.apache.spark.sql.functions._
                          val countries = List("US", "UK", "Can")
                          val countryValue = udf(countryToCheck: String, countryInRow: String, value: Long) =>
                          if(countryToCheck == countryInRow) value else 0

                          val countryFuncs = countries.mapcountry => (dataFrame: DataFrame) => dataFrame.withColumn(country, countryValue(lit(country), df("tag"), df("value")))
                          val dfWithCountries = Function.chain(countryFuncs)(df).drop("tag").drop("value")


                          Your dataframe 'dfWithCountries' will look like this:



                          +--+--+---+---+
                          |id|US| UK|Can|
                          +--+--+---+---+
                          | 1|50| 0| 0|
                          | 1| 0|100| 0|
                          | 1| 0| 0|125|
                          | 2|75| 0| 0|
                          | 2| 0|150| 0|
                          | 2| 0| 0|175|
                          +--+--+---+---+


                          Now you can sum together all the values for your desired result:



                          dfWithCountries.groupBy("id").sum(countries: _*).show


                          Result:



                          +--+-------+-------+--------+
                          |id|SUM(US)|SUM(UK)|SUM(Can)|
                          +--+-------+-------+--------+
                          | 1| 50| 100| 125|
                          | 2| 75| 150| 175|
                          +--+-------+-------+--------+


                          It's not a very elegant solution though. I had to create a chain of functions to add in all the columns. Also if I have lots of countries, I will expand my temporary data set to a very wide set with lots of zeroes.






                          share|improve this answer



























                            5














                            I have solved a similar problem using dataframes with the following steps:



                            Create columns for all your countries, with 'value' as the value:



                            import org.apache.spark.sql.functions._
                            val countries = List("US", "UK", "Can")
                            val countryValue = udf(countryToCheck: String, countryInRow: String, value: Long) =>
                            if(countryToCheck == countryInRow) value else 0

                            val countryFuncs = countries.mapcountry => (dataFrame: DataFrame) => dataFrame.withColumn(country, countryValue(lit(country), df("tag"), df("value")))
                            val dfWithCountries = Function.chain(countryFuncs)(df).drop("tag").drop("value")


                            Your dataframe 'dfWithCountries' will look like this:



                            +--+--+---+---+
                            |id|US| UK|Can|
                            +--+--+---+---+
                            | 1|50| 0| 0|
                            | 1| 0|100| 0|
                            | 1| 0| 0|125|
                            | 2|75| 0| 0|
                            | 2| 0|150| 0|
                            | 2| 0| 0|175|
                            +--+--+---+---+


                            Now you can sum together all the values for your desired result:



                            dfWithCountries.groupBy("id").sum(countries: _*).show


                            Result:



                            +--+-------+-------+--------+
                            |id|SUM(US)|SUM(UK)|SUM(Can)|
                            +--+-------+-------+--------+
                            | 1| 50| 100| 125|
                            | 2| 75| 150| 175|
                            +--+-------+-------+--------+


                            It's not a very elegant solution though. I had to create a chain of functions to add in all the columns. Also if I have lots of countries, I will expand my temporary data set to a very wide set with lots of zeroes.






                            share|improve this answer

























                              5












                              5








                              5







                              I have solved a similar problem using dataframes with the following steps:



                              Create columns for all your countries, with 'value' as the value:



                              import org.apache.spark.sql.functions._
                              val countries = List("US", "UK", "Can")
                              val countryValue = udf(countryToCheck: String, countryInRow: String, value: Long) =>
                              if(countryToCheck == countryInRow) value else 0

                              val countryFuncs = countries.mapcountry => (dataFrame: DataFrame) => dataFrame.withColumn(country, countryValue(lit(country), df("tag"), df("value")))
                              val dfWithCountries = Function.chain(countryFuncs)(df).drop("tag").drop("value")


                              Your dataframe 'dfWithCountries' will look like this:



                              +--+--+---+---+
                              |id|US| UK|Can|
                              +--+--+---+---+
                              | 1|50| 0| 0|
                              | 1| 0|100| 0|
                              | 1| 0| 0|125|
                              | 2|75| 0| 0|
                              | 2| 0|150| 0|
                              | 2| 0| 0|175|
                              +--+--+---+---+


                              Now you can sum together all the values for your desired result:



                              dfWithCountries.groupBy("id").sum(countries: _*).show


                              Result:



                              +--+-------+-------+--------+
                              |id|SUM(US)|SUM(UK)|SUM(Can)|
                              +--+-------+-------+--------+
                              | 1| 50| 100| 125|
                              | 2| 75| 150| 175|
                              +--+-------+-------+--------+


                              It's not a very elegant solution though. I had to create a chain of functions to add in all the columns. Also if I have lots of countries, I will expand my temporary data set to a very wide set with lots of zeroes.






                              share|improve this answer













                              I have solved a similar problem using dataframes with the following steps:



                              Create columns for all your countries, with 'value' as the value:



                              import org.apache.spark.sql.functions._
                              val countries = List("US", "UK", "Can")
                              val countryValue = udf(countryToCheck: String, countryInRow: String, value: Long) =>
                              if(countryToCheck == countryInRow) value else 0

                              val countryFuncs = countries.mapcountry => (dataFrame: DataFrame) => dataFrame.withColumn(country, countryValue(lit(country), df("tag"), df("value")))
                              val dfWithCountries = Function.chain(countryFuncs)(df).drop("tag").drop("value")


                              Your dataframe 'dfWithCountries' will look like this:



                              +--+--+---+---+
                              |id|US| UK|Can|
                              +--+--+---+---+
                              | 1|50| 0| 0|
                              | 1| 0|100| 0|
                              | 1| 0| 0|125|
                              | 2|75| 0| 0|
                              | 2| 0|150| 0|
                              | 2| 0| 0|175|
                              +--+--+---+---+


                              Now you can sum together all the values for your desired result:



                              dfWithCountries.groupBy("id").sum(countries: _*).show


                              Result:



                              +--+-------+-------+--------+
                              |id|SUM(US)|SUM(UK)|SUM(Can)|
                              +--+-------+-------+--------+
                              | 1| 50| 100| 125|
                              | 2| 75| 150| 175|
                              +--+-------+-------+--------+


                              It's not a very elegant solution though. I had to create a chain of functions to add in all the columns. Also if I have lots of countries, I will expand my temporary data set to a very wide set with lots of zeroes.







                              share|improve this answer












                              share|improve this answer



                              share|improve this answer










                              answered Aug 4 '15 at 13:27









                              Al MAl M

                              487410




                              487410





















                                  0














                                  Initially i adopted Al M's solution. Later took the same thought and rewrote this function as a transpose function.



                                  This method transposes any df rows to columns of any data-format with using key and value column



                                  for input csv



                                  id,tag,value
                                  1,US,50a
                                  1,UK,100
                                  1,Can,125
                                  2,US,75
                                  2,UK,150
                                  2,Can,175


                                  ouput



                                  +--+---+---+---+
                                  |id| UK| US|Can|
                                  +--+---+---+---+
                                  | 2|150| 75|175|
                                  | 1|100|50a|125|
                                  +--+---+---+---+


                                  transpose method :



                                  def transpose(hc : HiveContext , df: DataFrame,compositeId: List[String], key: String, value: String) = 

                                  val distinctCols = df.select(key).distinct.map r => r(0) .collect().toList

                                  val rdd = df.map row =>
                                  (compositeId.collect case id => row.getAs(id).asInstanceOf[Any] ,
                                  scala.collection.mutable.Map(row.getAs(key).asInstanceOf[Any] -> row.getAs(value).asInstanceOf[Any]))

                                  val pairRdd = rdd.reduceByKey(_ ++ _)
                                  val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
                                  hc.createDataFrame(rowRdd, getSchema(df.schema, compositeId, (key, distinctCols)))



                                  private def dynamicRow(r: (List[Any], scala.collection.mutable.Map[Any, Any]), colNames: List[Any]) =
                                  val cols = colNames.collect case col => r._2.getOrElse(col.toString(), null)
                                  val array = r._1 ++ cols
                                  Row(array: _*)


                                  private def getSchema(srcSchema: StructType, idCols: List[String], distinctCols: (String, List[Any])): StructType =
                                  val idSchema = idCols.map idCol => srcSchema.apply(idCol)
                                  val colSchema = srcSchema.apply(distinctCols._1)
                                  val colsSchema = distinctCols._2.map col => StructField(col.asInstanceOf[String], colSchema.dataType, colSchema.nullable)
                                  StructType(idSchema ++ colsSchema)



                                  main snippet



                                  import java.util.Date
                                  import org.apache.spark.SparkConf
                                  import org.apache.spark.SparkContext
                                  import org.apache.spark.sql.Row
                                  import org.apache.spark.sql.DataFrame
                                  import org.apache.spark.sql.types.StructType
                                  import org.apache.spark.sql.hive.HiveContext
                                  import org.apache.spark.sql.types.StructField


                                  ...
                                  ...
                                  def main(args: Array[String]): Unit =

                                  val sc = new SparkContext(conf)
                                  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
                                  val dfdata1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")
                                  .load("data.csv")
                                  dfdata1.show()
                                  val dfOutput = transpose(new HiveContext(sc), dfdata1, List("id"), "tag", "value")
                                  dfOutput.show







                                  share|improve this answer




















                                  • 2





                                    This methods transposes rows to columns...

                                    – Jaigates
                                    Sep 6 '16 at 18:38















                                  0














                                  Initially i adopted Al M's solution. Later took the same thought and rewrote this function as a transpose function.



                                  This method transposes any df rows to columns of any data-format with using key and value column



                                  for input csv



                                  id,tag,value
                                  1,US,50a
                                  1,UK,100
                                  1,Can,125
                                  2,US,75
                                  2,UK,150
                                  2,Can,175


                                  ouput



                                  +--+---+---+---+
                                  |id| UK| US|Can|
                                  +--+---+---+---+
                                  | 2|150| 75|175|
                                  | 1|100|50a|125|
                                  +--+---+---+---+


                                  transpose method :



                                  def transpose(hc : HiveContext , df: DataFrame,compositeId: List[String], key: String, value: String) = 

                                  val distinctCols = df.select(key).distinct.map r => r(0) .collect().toList

                                  val rdd = df.map row =>
                                  (compositeId.collect case id => row.getAs(id).asInstanceOf[Any] ,
                                  scala.collection.mutable.Map(row.getAs(key).asInstanceOf[Any] -> row.getAs(value).asInstanceOf[Any]))

                                  val pairRdd = rdd.reduceByKey(_ ++ _)
                                  val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
                                  hc.createDataFrame(rowRdd, getSchema(df.schema, compositeId, (key, distinctCols)))



                                  private def dynamicRow(r: (List[Any], scala.collection.mutable.Map[Any, Any]), colNames: List[Any]) =
                                  val cols = colNames.collect case col => r._2.getOrElse(col.toString(), null)
                                  val array = r._1 ++ cols
                                  Row(array: _*)


                                  private def getSchema(srcSchema: StructType, idCols: List[String], distinctCols: (String, List[Any])): StructType =
                                  val idSchema = idCols.map idCol => srcSchema.apply(idCol)
                                  val colSchema = srcSchema.apply(distinctCols._1)
                                  val colsSchema = distinctCols._2.map col => StructField(col.asInstanceOf[String], colSchema.dataType, colSchema.nullable)
                                  StructType(idSchema ++ colsSchema)



                                  main snippet



                                  import java.util.Date
                                  import org.apache.spark.SparkConf
                                  import org.apache.spark.SparkContext
                                  import org.apache.spark.sql.Row
                                  import org.apache.spark.sql.DataFrame
                                  import org.apache.spark.sql.types.StructType
                                  import org.apache.spark.sql.hive.HiveContext
                                  import org.apache.spark.sql.types.StructField


                                  ...
                                  ...
                                  def main(args: Array[String]): Unit =

                                  val sc = new SparkContext(conf)
                                  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
                                  val dfdata1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")
                                  .load("data.csv")
                                  dfdata1.show()
                                  val dfOutput = transpose(new HiveContext(sc), dfdata1, List("id"), "tag", "value")
                                  dfOutput.show







                                  share|improve this answer




















                                  • 2





                                    This methods transposes rows to columns...

                                    – Jaigates
                                    Sep 6 '16 at 18:38













                                  0












                                  0








                                  0







                                  Initially i adopted Al M's solution. Later took the same thought and rewrote this function as a transpose function.



                                  This method transposes any df rows to columns of any data-format with using key and value column



                                  for input csv



                                  id,tag,value
                                  1,US,50a
                                  1,UK,100
                                  1,Can,125
                                  2,US,75
                                  2,UK,150
                                  2,Can,175


                                  ouput



                                  +--+---+---+---+
                                  |id| UK| US|Can|
                                  +--+---+---+---+
                                  | 2|150| 75|175|
                                  | 1|100|50a|125|
                                  +--+---+---+---+


                                  transpose method :



                                  def transpose(hc : HiveContext , df: DataFrame,compositeId: List[String], key: String, value: String) = 

                                  val distinctCols = df.select(key).distinct.map r => r(0) .collect().toList

                                  val rdd = df.map row =>
                                  (compositeId.collect case id => row.getAs(id).asInstanceOf[Any] ,
                                  scala.collection.mutable.Map(row.getAs(key).asInstanceOf[Any] -> row.getAs(value).asInstanceOf[Any]))

                                  val pairRdd = rdd.reduceByKey(_ ++ _)
                                  val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
                                  hc.createDataFrame(rowRdd, getSchema(df.schema, compositeId, (key, distinctCols)))



                                  private def dynamicRow(r: (List[Any], scala.collection.mutable.Map[Any, Any]), colNames: List[Any]) =
                                  val cols = colNames.collect case col => r._2.getOrElse(col.toString(), null)
                                  val array = r._1 ++ cols
                                  Row(array: _*)


                                  private def getSchema(srcSchema: StructType, idCols: List[String], distinctCols: (String, List[Any])): StructType =
                                  val idSchema = idCols.map idCol => srcSchema.apply(idCol)
                                  val colSchema = srcSchema.apply(distinctCols._1)
                                  val colsSchema = distinctCols._2.map col => StructField(col.asInstanceOf[String], colSchema.dataType, colSchema.nullable)
                                  StructType(idSchema ++ colsSchema)



                                  main snippet



                                  import java.util.Date
                                  import org.apache.spark.SparkConf
                                  import org.apache.spark.SparkContext
                                  import org.apache.spark.sql.Row
                                  import org.apache.spark.sql.DataFrame
                                  import org.apache.spark.sql.types.StructType
                                  import org.apache.spark.sql.hive.HiveContext
                                  import org.apache.spark.sql.types.StructField


                                  ...
                                  ...
                                  def main(args: Array[String]): Unit =

                                  val sc = new SparkContext(conf)
                                  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
                                  val dfdata1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")
                                  .load("data.csv")
                                  dfdata1.show()
                                  val dfOutput = transpose(new HiveContext(sc), dfdata1, List("id"), "tag", "value")
                                  dfOutput.show







                                  share|improve this answer















                                  Initially i adopted Al M's solution. Later took the same thought and rewrote this function as a transpose function.



                                  This method transposes any df rows to columns of any data-format with using key and value column



                                  for input csv



                                  id,tag,value
                                  1,US,50a
                                  1,UK,100
                                  1,Can,125
                                  2,US,75
                                  2,UK,150
                                  2,Can,175


                                  ouput



                                  +--+---+---+---+
                                  |id| UK| US|Can|
                                  +--+---+---+---+
                                  | 2|150| 75|175|
                                  | 1|100|50a|125|
                                  +--+---+---+---+


                                  transpose method :



                                  def transpose(hc : HiveContext , df: DataFrame,compositeId: List[String], key: String, value: String) = 

                                  val distinctCols = df.select(key).distinct.map r => r(0) .collect().toList

                                  val rdd = df.map row =>
                                  (compositeId.collect case id => row.getAs(id).asInstanceOf[Any] ,
                                  scala.collection.mutable.Map(row.getAs(key).asInstanceOf[Any] -> row.getAs(value).asInstanceOf[Any]))

                                  val pairRdd = rdd.reduceByKey(_ ++ _)
                                  val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
                                  hc.createDataFrame(rowRdd, getSchema(df.schema, compositeId, (key, distinctCols)))



                                  private def dynamicRow(r: (List[Any], scala.collection.mutable.Map[Any, Any]), colNames: List[Any]) =
                                  val cols = colNames.collect case col => r._2.getOrElse(col.toString(), null)
                                  val array = r._1 ++ cols
                                  Row(array: _*)


                                  private def getSchema(srcSchema: StructType, idCols: List[String], distinctCols: (String, List[Any])): StructType =
                                  val idSchema = idCols.map idCol => srcSchema.apply(idCol)
                                  val colSchema = srcSchema.apply(distinctCols._1)
                                  val colsSchema = distinctCols._2.map col => StructField(col.asInstanceOf[String], colSchema.dataType, colSchema.nullable)
                                  StructType(idSchema ++ colsSchema)



                                  main snippet



                                  import java.util.Date
                                  import org.apache.spark.SparkConf
                                  import org.apache.spark.SparkContext
                                  import org.apache.spark.sql.Row
                                  import org.apache.spark.sql.DataFrame
                                  import org.apache.spark.sql.types.StructType
                                  import org.apache.spark.sql.hive.HiveContext
                                  import org.apache.spark.sql.types.StructField


                                  ...
                                  ...
                                  def main(args: Array[String]): Unit =

                                  val sc = new SparkContext(conf)
                                  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
                                  val dfdata1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")
                                  .load("data.csv")
                                  dfdata1.show()
                                  val dfOutput = transpose(new HiveContext(sc), dfdata1, List("id"), "tag", "value")
                                  dfOutput.show








                                  share|improve this answer














                                  share|improve this answer



                                  share|improve this answer








                                  edited Sep 6 '16 at 18:33

























                                  answered Aug 30 '16 at 18:13









                                  JaigatesJaigates

                                  1008




                                  1008







                                  • 2





                                    This methods transposes rows to columns...

                                    – Jaigates
                                    Sep 6 '16 at 18:38












                                  • 2





                                    This methods transposes rows to columns...

                                    – Jaigates
                                    Sep 6 '16 at 18:38







                                  2




                                  2





                                  This methods transposes rows to columns...

                                  – Jaigates
                                  Sep 6 '16 at 18:38





                                  This methods transposes rows to columns...

                                  – Jaigates
                                  Sep 6 '16 at 18:38











                                  -1














                                  There is simple and elegant solution.



                                  scala> spark.sql("select * from k_tags limit 10").show()
                                  +---------------+-------------+------+
                                  | imsi| name| value|
                                  +---------------+-------------+------+
                                  |246021000000000| age| 37|
                                  |246021000000000| gender|Female|
                                  |246021000000000| arpu| 22|
                                  |246021000000000| DeviceType| Phone|
                                  |246021000000000|DataAllowance| 6GB|
                                  +---------------+-------------+------+

                                  scala> spark.sql("select * from k_tags limit 10").groupBy($"imsi").pivot("name").agg(min($"value")).show()
                                  +---------------+-------------+----------+---+----+------+
                                  | imsi|DataAllowance|DeviceType|age|arpu|gender|
                                  +---------------+-------------+----------+---+----+------+
                                  |246021000000000| 6GB| Phone| 37| 22|Female|
                                  |246021000000001| 1GB| Phone| 72| 10| Male|
                                  +---------------+-------------+----------+---+----+------+





                                  share|improve this answer





























                                    -1














                                    There is simple and elegant solution.



                                    scala> spark.sql("select * from k_tags limit 10").show()
                                    +---------------+-------------+------+
                                    | imsi| name| value|
                                    +---------------+-------------+------+
                                    |246021000000000| age| 37|
                                    |246021000000000| gender|Female|
                                    |246021000000000| arpu| 22|
                                    |246021000000000| DeviceType| Phone|
                                    |246021000000000|DataAllowance| 6GB|
                                    +---------------+-------------+------+

                                    scala> spark.sql("select * from k_tags limit 10").groupBy($"imsi").pivot("name").agg(min($"value")).show()
                                    +---------------+-------------+----------+---+----+------+
                                    | imsi|DataAllowance|DeviceType|age|arpu|gender|
                                    +---------------+-------------+----------+---+----+------+
                                    |246021000000000| 6GB| Phone| 37| 22|Female|
                                    |246021000000001| 1GB| Phone| 72| 10| Male|
                                    +---------------+-------------+----------+---+----+------+





                                    share|improve this answer



























                                      -1












                                      -1








                                      -1







                                      There is simple and elegant solution.



                                      scala> spark.sql("select * from k_tags limit 10").show()
                                      +---------------+-------------+------+
                                      | imsi| name| value|
                                      +---------------+-------------+------+
                                      |246021000000000| age| 37|
                                      |246021000000000| gender|Female|
                                      |246021000000000| arpu| 22|
                                      |246021000000000| DeviceType| Phone|
                                      |246021000000000|DataAllowance| 6GB|
                                      +---------------+-------------+------+

                                      scala> spark.sql("select * from k_tags limit 10").groupBy($"imsi").pivot("name").agg(min($"value")).show()
                                      +---------------+-------------+----------+---+----+------+
                                      | imsi|DataAllowance|DeviceType|age|arpu|gender|
                                      +---------------+-------------+----------+---+----+------+
                                      |246021000000000| 6GB| Phone| 37| 22|Female|
                                      |246021000000001| 1GB| Phone| 72| 10| Male|
                                      +---------------+-------------+----------+---+----+------+





                                      share|improve this answer















                                      There is simple and elegant solution.



                                      scala> spark.sql("select * from k_tags limit 10").show()
                                      +---------------+-------------+------+
                                      | imsi| name| value|
                                      +---------------+-------------+------+
                                      |246021000000000| age| 37|
                                      |246021000000000| gender|Female|
                                      |246021000000000| arpu| 22|
                                      |246021000000000| DeviceType| Phone|
                                      |246021000000000|DataAllowance| 6GB|
                                      +---------------+-------------+------+

                                      scala> spark.sql("select * from k_tags limit 10").groupBy($"imsi").pivot("name").agg(min($"value")).show()
                                      +---------------+-------------+----------+---+----+------+
                                      | imsi|DataAllowance|DeviceType|age|arpu|gender|
                                      +---------------+-------------+----------+---+----+------+
                                      |246021000000000| 6GB| Phone| 37| 22|Female|
                                      |246021000000001| 1GB| Phone| 72| 10| Male|
                                      +---------------+-------------+----------+---+----+------+






                                      share|improve this answer














                                      share|improve this answer



                                      share|improve this answer








                                      edited Feb 5 '18 at 8:52









                                      clemens

                                      10.5k102643




                                      10.5k102643










                                      answered Feb 5 '18 at 8:35









                                      MantasMantas

                                      9




                                      9















                                          protected by user8371915 Jul 15 '18 at 19:20



                                          Thank you for your interest in this question.
                                          Because it has attracted low-quality or spam answers that had to be removed, posting an answer now requires 10 reputation on this site (the association bonus does not count).



                                          Would you like to answer one of these unanswered questions instead?



                                          Popular posts from this blog

                                          Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme

                                          Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

                                          2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived