Spark Window function using more than one columnHow can a time function exist in functional programming?How to change column types in Spark SQL's DataFrame?How to sort by column in descending order in Spark SQL?Spark (Scala): Insert missing rows to complete sequence in a specific columnSpark DataFrame to DataSet with custom columnsSpark Dataframe join heap space issue and too many partitionsWhat is the best way to create a new Spark dataframe column based on an existing column that requires an external API call?Spark DataFrame filtering gives inconsistent outputCompilation error is showing while splitting 1 column into multiple columnSwap multiple value columns of dataframe in spark

To string or not to string

"You are your self first supporter", a more proper way to say it

What do the dots in this tr command do: tr .............A-Z A-ZA-Z <<< "JVPQBOV" (with 13 dots)

Why did the Germans forbid the possession of pet pigeons in Rostov-on-Don in 1941?

What is the offset in a seaplane's hull?

Show that if two triangles built on parallel lines, with equal bases have the same perimeter only if they are congruent.

How do we improve the relationship with a client software team that performs poorly and is becoming less collaborative?

How to test if a transaction is standard without spending real money?

Python: next in for loop

Why Is Death Allowed In the Matrix?

How to say job offer in Mandarin/Cantonese?

How much RAM could one put in a typical 80386 setup?

Why are 150k or 200k jobs considered good when there are 300k+ births a month?

How is the claim "I am in New York only if I am in America" the same as "If I am in New York, then I am in America?

Can I make popcorn with any corn?

Writing rule stating superpower from different root cause is bad writing

What's the output of a record cartridge playing an out-of-speed record

Did Shadowfax go to Valinor?

Why doesn't H₄O²⁺ exist?

What are these boxed doors outside store fronts in New York?

Accidentally leaked the solution to an assignment, what to do now? (I'm the prof)

Why do falling prices hurt debtors?

Is it legal for company to use my work email to pretend I still work there?

Why does Kotter return in Welcome Back Kotter?



Spark Window function using more than one column


How can a time function exist in functional programming?How to change column types in Spark SQL's DataFrame?How to sort by column in descending order in Spark SQL?Spark (Scala): Insert missing rows to complete sequence in a specific columnSpark DataFrame to DataSet with custom columnsSpark Dataframe join heap space issue and too many partitionsWhat is the best way to create a new Spark dataframe column based on an existing column that requires an external API call?Spark DataFrame filtering gives inconsistent outputCompilation error is showing while splitting 1 column into multiple columnSwap multiple value columns of dataframe in spark






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








1















I have this dataframe that shows the send time and the open time for each user:



val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
("user2", "2018-04-05 18:00:00", null),
("user2", "2018-04-05 19:00:00", null)
).toDF("id", "sendTime", "openTime")



+-----+-------------------+-------------------+
| id| sendTime| openTime|
+-----+-------------------+-------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
|user2|2018-04-05 18:00:00| null|
|user2|2018-04-05 19:00:00| null|
+-----+-------------------+-------------------+


Now I want to count the number of opens that have happened in the past two hours from each send time for each user. I used window function to partition by user, but I couldn't figure out how to compare values from the sendTime column to the openTime column. The result dataframe should look like this:



+-----+-------------------+-------------------+-----+
| id| sendTime| openTime|count|
+-----+-------------------+-------------------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00| 2|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00| 2|
|user2|2018-04-05 18:00:00| null| 3|
|user2|2018-04-05 19:00:00| null| 2|
+-----+-------------------+-------------------+-----+


This is as far as I have got but doesn't give me what I need:



var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
df2 = df2.withColumn("count", F.count($"openUnix").over(w))









share|improve this question




























    1















    I have this dataframe that shows the send time and the open time for each user:



    val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
    ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
    ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
    ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
    ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
    ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
    ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
    ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
    ("user2", "2018-04-05 18:00:00", null),
    ("user2", "2018-04-05 19:00:00", null)
    ).toDF("id", "sendTime", "openTime")



    +-----+-------------------+-------------------+
    | id| sendTime| openTime|
    +-----+-------------------+-------------------+
    |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
    |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
    |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
    |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
    |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
    |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
    |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
    |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
    |user2|2018-04-05 18:00:00| null|
    |user2|2018-04-05 19:00:00| null|
    +-----+-------------------+-------------------+


    Now I want to count the number of opens that have happened in the past two hours from each send time for each user. I used window function to partition by user, but I couldn't figure out how to compare values from the sendTime column to the openTime column. The result dataframe should look like this:



    +-----+-------------------+-------------------+-----+
    | id| sendTime| openTime|count|
    +-----+-------------------+-------------------+-----+
    |user1|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
    |user1|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
    |user1|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
    |user1|2018-04-05 18:00:00|2018-04-05 18:50:00| 2|
    |user2|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
    |user2|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
    |user2|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
    |user2|2018-04-05 17:30:00|2018-04-05 17:40:00| 2|
    |user2|2018-04-05 18:00:00| null| 3|
    |user2|2018-04-05 19:00:00| null| 2|
    +-----+-------------------+-------------------+-----+


    This is as far as I have got but doesn't give me what I need:



    var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
    val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
    df2 = df2.withColumn("count", F.count($"openUnix").over(w))









    share|improve this question
























      1












      1








      1








      I have this dataframe that shows the send time and the open time for each user:



      val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
      ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
      ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
      ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
      ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
      ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
      ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
      ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
      ("user2", "2018-04-05 18:00:00", null),
      ("user2", "2018-04-05 19:00:00", null)
      ).toDF("id", "sendTime", "openTime")



      +-----+-------------------+-------------------+
      | id| sendTime| openTime|
      +-----+-------------------+-------------------+
      |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
      |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
      |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
      |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
      |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
      |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
      |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
      |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
      |user2|2018-04-05 18:00:00| null|
      |user2|2018-04-05 19:00:00| null|
      +-----+-------------------+-------------------+


      Now I want to count the number of opens that have happened in the past two hours from each send time for each user. I used window function to partition by user, but I couldn't figure out how to compare values from the sendTime column to the openTime column. The result dataframe should look like this:



      +-----+-------------------+-------------------+-----+
      | id| sendTime| openTime|count|
      +-----+-------------------+-------------------+-----+
      |user1|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
      |user1|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
      |user1|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
      |user1|2018-04-05 18:00:00|2018-04-05 18:50:00| 2|
      |user2|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
      |user2|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
      |user2|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
      |user2|2018-04-05 17:30:00|2018-04-05 17:40:00| 2|
      |user2|2018-04-05 18:00:00| null| 3|
      |user2|2018-04-05 19:00:00| null| 2|
      +-----+-------------------+-------------------+-----+


      This is as far as I have got but doesn't give me what I need:



      var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
      val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
      df2 = df2.withColumn("count", F.count($"openUnix").over(w))









      share|improve this question














      I have this dataframe that shows the send time and the open time for each user:



      val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
      ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
      ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
      ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
      ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
      ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
      ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
      ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
      ("user2", "2018-04-05 18:00:00", null),
      ("user2", "2018-04-05 19:00:00", null)
      ).toDF("id", "sendTime", "openTime")



      +-----+-------------------+-------------------+
      | id| sendTime| openTime|
      +-----+-------------------+-------------------+
      |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
      |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
      |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
      |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
      |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
      |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
      |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
      |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
      |user2|2018-04-05 18:00:00| null|
      |user2|2018-04-05 19:00:00| null|
      +-----+-------------------+-------------------+


      Now I want to count the number of opens that have happened in the past two hours from each send time for each user. I used window function to partition by user, but I couldn't figure out how to compare values from the sendTime column to the openTime column. The result dataframe should look like this:



      +-----+-------------------+-------------------+-----+
      | id| sendTime| openTime|count|
      +-----+-------------------+-------------------+-----+
      |user1|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
      |user1|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
      |user1|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
      |user1|2018-04-05 18:00:00|2018-04-05 18:50:00| 2|
      |user2|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
      |user2|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
      |user2|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
      |user2|2018-04-05 17:30:00|2018-04-05 17:40:00| 2|
      |user2|2018-04-05 18:00:00| null| 3|
      |user2|2018-04-05 19:00:00| null| 2|
      +-----+-------------------+-------------------+-----+


      This is as far as I have got but doesn't give me what I need:



      var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
      val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
      df2 = df2.withColumn("count", F.count($"openUnix").over(w))






      scala apache-spark apache-spark-sql






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Mar 9 at 2:49









      PooyaPooya

      10617




      10617






















          2 Answers
          2






          active

          oldest

          votes


















          2














          This seems quite difficult yo do with just using Window functions because you cannot reference the upper limit of sendTime when trying to derive whether the value from openTime is within the last 2 hours of the upper limit sendTime.



          With spark 2.4 came higher order functions which you can read about here (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Using these you could collect all the openTime within a window using the collect_list function and then using the higher order function filter filter out the openTimes outside the two hours prior to the sendTime. Finally you can count the values remaining in the list to give you the count that you are after. Here is my code for doing this.



          import org.apache.spark.sql.expressions.Window

          val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
          ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
          ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
          ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
          ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
          ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
          ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
          ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
          ("user2", "2018-04-05 18:00:00", null),
          ("user2", "2018-04-05 19:00:00", null)
          ).toDF("id", "sendTime", "openTime")

          var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
          .withColumn("openUnix", unix_timestamp($"openTime"))

          val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))

          df3.show(false)

          +-----+-------------------+-------------------+----------+----------+------------------------------------+
          |id |sendTime |openTime |sendUnix |openUnix |opened |
          +-----+-------------------+-------------------+----------+----------+------------------------------------+
          |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
          |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
          |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
          |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
          |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
          |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
          |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
          |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
          |user2|2018-04-05 18:00:00|null |1522947600|null |[1522946400, 1522947000, 1522943400]|
          |user2|2018-04-05 19:00:00|null |1522951200|null |[1522946400, 1522947000] |
          +-----+-------------------+-------------------+----------+----------+------------------------------------+

          val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
          "size(filter(opened, x -> x < sendUnix AND x > sendUnix - 7200)) as count")

          df4.show(false)

          +-----+-------------------+-------------------+----------+----------+-----+
          |id |sendTime |openTime |sendUnix |openUnix |count|
          +-----+-------------------+-------------------+----------+----------+-----+
          |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
          |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
          |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
          |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2 |
          |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
          |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
          |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
          |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1 |
          |user2|2018-04-05 18:00:00|null |1522947600|null |3 |
          |user2|2018-04-05 19:00:00|null |1522951200|null |2 |
          +-----+-------------------+-------------------+----------+----------+-----+





          share|improve this answer























          • Looks great! There is only a small problem with this that has caused the count on three rows from the bottom to be different from the expected response that I have posted. This is due to rangeBetween(-2*60*60, 0) in my code, that apparently you have also used. This leads to including only Sends that are only two hours before the current Send Time, whereas we need to look at all Send times and just look at the opens that are two hours before. If you remove the rangeBetween(-2*60*60, 0) I think we get the expected result.

            – Pooya
            Mar 11 at 18:08


















          1














          Here you go. Code that solves the problem



          val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))

          val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)

          var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))

          var df3 = df2.select('*, explode('list).as("prevTimeStamp"))

          df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show


          Please accept the answer if it solves.






          share|improve this answer

























            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55073539%2fspark-window-function-using-more-than-one-column%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            2














            This seems quite difficult yo do with just using Window functions because you cannot reference the upper limit of sendTime when trying to derive whether the value from openTime is within the last 2 hours of the upper limit sendTime.



            With spark 2.4 came higher order functions which you can read about here (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Using these you could collect all the openTime within a window using the collect_list function and then using the higher order function filter filter out the openTimes outside the two hours prior to the sendTime. Finally you can count the values remaining in the list to give you the count that you are after. Here is my code for doing this.



            import org.apache.spark.sql.expressions.Window

            val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
            ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
            ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
            ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
            ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
            ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
            ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
            ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
            ("user2", "2018-04-05 18:00:00", null),
            ("user2", "2018-04-05 19:00:00", null)
            ).toDF("id", "sendTime", "openTime")

            var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
            .withColumn("openUnix", unix_timestamp($"openTime"))

            val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))

            df3.show(false)

            +-----+-------------------+-------------------+----------+----------+------------------------------------+
            |id |sendTime |openTime |sendUnix |openUnix |opened |
            +-----+-------------------+-------------------+----------+----------+------------------------------------+
            |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
            |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
            |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
            |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
            |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
            |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
            |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
            |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
            |user2|2018-04-05 18:00:00|null |1522947600|null |[1522946400, 1522947000, 1522943400]|
            |user2|2018-04-05 19:00:00|null |1522951200|null |[1522946400, 1522947000] |
            +-----+-------------------+-------------------+----------+----------+------------------------------------+

            val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
            "size(filter(opened, x -> x < sendUnix AND x > sendUnix - 7200)) as count")

            df4.show(false)

            +-----+-------------------+-------------------+----------+----------+-----+
            |id |sendTime |openTime |sendUnix |openUnix |count|
            +-----+-------------------+-------------------+----------+----------+-----+
            |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
            |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
            |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
            |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2 |
            |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
            |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
            |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
            |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1 |
            |user2|2018-04-05 18:00:00|null |1522947600|null |3 |
            |user2|2018-04-05 19:00:00|null |1522951200|null |2 |
            +-----+-------------------+-------------------+----------+----------+-----+





            share|improve this answer























            • Looks great! There is only a small problem with this that has caused the count on three rows from the bottom to be different from the expected response that I have posted. This is due to rangeBetween(-2*60*60, 0) in my code, that apparently you have also used. This leads to including only Sends that are only two hours before the current Send Time, whereas we need to look at all Send times and just look at the opens that are two hours before. If you remove the rangeBetween(-2*60*60, 0) I think we get the expected result.

              – Pooya
              Mar 11 at 18:08















            2














            This seems quite difficult yo do with just using Window functions because you cannot reference the upper limit of sendTime when trying to derive whether the value from openTime is within the last 2 hours of the upper limit sendTime.



            With spark 2.4 came higher order functions which you can read about here (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Using these you could collect all the openTime within a window using the collect_list function and then using the higher order function filter filter out the openTimes outside the two hours prior to the sendTime. Finally you can count the values remaining in the list to give you the count that you are after. Here is my code for doing this.



            import org.apache.spark.sql.expressions.Window

            val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
            ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
            ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
            ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
            ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
            ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
            ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
            ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
            ("user2", "2018-04-05 18:00:00", null),
            ("user2", "2018-04-05 19:00:00", null)
            ).toDF("id", "sendTime", "openTime")

            var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
            .withColumn("openUnix", unix_timestamp($"openTime"))

            val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))

            df3.show(false)

            +-----+-------------------+-------------------+----------+----------+------------------------------------+
            |id |sendTime |openTime |sendUnix |openUnix |opened |
            +-----+-------------------+-------------------+----------+----------+------------------------------------+
            |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
            |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
            |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
            |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
            |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
            |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
            |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
            |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
            |user2|2018-04-05 18:00:00|null |1522947600|null |[1522946400, 1522947000, 1522943400]|
            |user2|2018-04-05 19:00:00|null |1522951200|null |[1522946400, 1522947000] |
            +-----+-------------------+-------------------+----------+----------+------------------------------------+

            val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
            "size(filter(opened, x -> x < sendUnix AND x > sendUnix - 7200)) as count")

            df4.show(false)

            +-----+-------------------+-------------------+----------+----------+-----+
            |id |sendTime |openTime |sendUnix |openUnix |count|
            +-----+-------------------+-------------------+----------+----------+-----+
            |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
            |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
            |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
            |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2 |
            |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
            |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
            |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
            |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1 |
            |user2|2018-04-05 18:00:00|null |1522947600|null |3 |
            |user2|2018-04-05 19:00:00|null |1522951200|null |2 |
            +-----+-------------------+-------------------+----------+----------+-----+





            share|improve this answer























            • Looks great! There is only a small problem with this that has caused the count on three rows from the bottom to be different from the expected response that I have posted. This is due to rangeBetween(-2*60*60, 0) in my code, that apparently you have also used. This leads to including only Sends that are only two hours before the current Send Time, whereas we need to look at all Send times and just look at the opens that are two hours before. If you remove the rangeBetween(-2*60*60, 0) I think we get the expected result.

              – Pooya
              Mar 11 at 18:08













            2












            2








            2







            This seems quite difficult yo do with just using Window functions because you cannot reference the upper limit of sendTime when trying to derive whether the value from openTime is within the last 2 hours of the upper limit sendTime.



            With spark 2.4 came higher order functions which you can read about here (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Using these you could collect all the openTime within a window using the collect_list function and then using the higher order function filter filter out the openTimes outside the two hours prior to the sendTime. Finally you can count the values remaining in the list to give you the count that you are after. Here is my code for doing this.



            import org.apache.spark.sql.expressions.Window

            val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
            ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
            ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
            ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
            ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
            ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
            ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
            ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
            ("user2", "2018-04-05 18:00:00", null),
            ("user2", "2018-04-05 19:00:00", null)
            ).toDF("id", "sendTime", "openTime")

            var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
            .withColumn("openUnix", unix_timestamp($"openTime"))

            val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))

            df3.show(false)

            +-----+-------------------+-------------------+----------+----------+------------------------------------+
            |id |sendTime |openTime |sendUnix |openUnix |opened |
            +-----+-------------------+-------------------+----------+----------+------------------------------------+
            |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
            |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
            |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
            |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
            |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
            |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
            |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
            |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
            |user2|2018-04-05 18:00:00|null |1522947600|null |[1522946400, 1522947000, 1522943400]|
            |user2|2018-04-05 19:00:00|null |1522951200|null |[1522946400, 1522947000] |
            +-----+-------------------+-------------------+----------+----------+------------------------------------+

            val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
            "size(filter(opened, x -> x < sendUnix AND x > sendUnix - 7200)) as count")

            df4.show(false)

            +-----+-------------------+-------------------+----------+----------+-----+
            |id |sendTime |openTime |sendUnix |openUnix |count|
            +-----+-------------------+-------------------+----------+----------+-----+
            |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
            |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
            |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
            |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2 |
            |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
            |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
            |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
            |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1 |
            |user2|2018-04-05 18:00:00|null |1522947600|null |3 |
            |user2|2018-04-05 19:00:00|null |1522951200|null |2 |
            +-----+-------------------+-------------------+----------+----------+-----+





            share|improve this answer













            This seems quite difficult yo do with just using Window functions because you cannot reference the upper limit of sendTime when trying to derive whether the value from openTime is within the last 2 hours of the upper limit sendTime.



            With spark 2.4 came higher order functions which you can read about here (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Using these you could collect all the openTime within a window using the collect_list function and then using the higher order function filter filter out the openTimes outside the two hours prior to the sendTime. Finally you can count the values remaining in the list to give you the count that you are after. Here is my code for doing this.



            import org.apache.spark.sql.expressions.Window

            val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
            ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
            ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
            ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
            ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
            ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
            ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
            ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
            ("user2", "2018-04-05 18:00:00", null),
            ("user2", "2018-04-05 19:00:00", null)
            ).toDF("id", "sendTime", "openTime")

            var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
            .withColumn("openUnix", unix_timestamp($"openTime"))

            val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))

            df3.show(false)

            +-----+-------------------+-------------------+----------+----------+------------------------------------+
            |id |sendTime |openTime |sendUnix |openUnix |opened |
            +-----+-------------------+-------------------+----------+----------+------------------------------------+
            |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
            |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
            |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
            |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
            |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
            |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
            |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
            |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
            |user2|2018-04-05 18:00:00|null |1522947600|null |[1522946400, 1522947000, 1522943400]|
            |user2|2018-04-05 19:00:00|null |1522951200|null |[1522946400, 1522947000] |
            +-----+-------------------+-------------------+----------+----------+------------------------------------+

            val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
            "size(filter(opened, x -> x < sendUnix AND x > sendUnix - 7200)) as count")

            df4.show(false)

            +-----+-------------------+-------------------+----------+----------+-----+
            |id |sendTime |openTime |sendUnix |openUnix |count|
            +-----+-------------------+-------------------+----------+----------+-----+
            |user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
            |user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
            |user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
            |user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2 |
            |user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
            |user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
            |user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
            |user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1 |
            |user2|2018-04-05 18:00:00|null |1522947600|null |3 |
            |user2|2018-04-05 19:00:00|null |1522951200|null |2 |
            +-----+-------------------+-------------------+----------+----------+-----+






            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Mar 9 at 17:05









            randal25randal25

            29924




            29924












            • Looks great! There is only a small problem with this that has caused the count on three rows from the bottom to be different from the expected response that I have posted. This is due to rangeBetween(-2*60*60, 0) in my code, that apparently you have also used. This leads to including only Sends that are only two hours before the current Send Time, whereas we need to look at all Send times and just look at the opens that are two hours before. If you remove the rangeBetween(-2*60*60, 0) I think we get the expected result.

              – Pooya
              Mar 11 at 18:08

















            • Looks great! There is only a small problem with this that has caused the count on three rows from the bottom to be different from the expected response that I have posted. This is due to rangeBetween(-2*60*60, 0) in my code, that apparently you have also used. This leads to including only Sends that are only two hours before the current Send Time, whereas we need to look at all Send times and just look at the opens that are two hours before. If you remove the rangeBetween(-2*60*60, 0) I think we get the expected result.

              – Pooya
              Mar 11 at 18:08
















            Looks great! There is only a small problem with this that has caused the count on three rows from the bottom to be different from the expected response that I have posted. This is due to rangeBetween(-2*60*60, 0) in my code, that apparently you have also used. This leads to including only Sends that are only two hours before the current Send Time, whereas we need to look at all Send times and just look at the opens that are two hours before. If you remove the rangeBetween(-2*60*60, 0) I think we get the expected result.

            – Pooya
            Mar 11 at 18:08





            Looks great! There is only a small problem with this that has caused the count on three rows from the bottom to be different from the expected response that I have posted. This is due to rangeBetween(-2*60*60, 0) in my code, that apparently you have also used. This leads to including only Sends that are only two hours before the current Send Time, whereas we need to look at all Send times and just look at the opens that are two hours before. If you remove the rangeBetween(-2*60*60, 0) I think we get the expected result.

            – Pooya
            Mar 11 at 18:08













            1














            Here you go. Code that solves the problem



            val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))

            val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)

            var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))

            var df3 = df2.select('*, explode('list).as("prevTimeStamp"))

            df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show


            Please accept the answer if it solves.






            share|improve this answer





























              1














              Here you go. Code that solves the problem



              val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))

              val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)

              var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))

              var df3 = df2.select('*, explode('list).as("prevTimeStamp"))

              df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show


              Please accept the answer if it solves.






              share|improve this answer



























                1












                1








                1







                Here you go. Code that solves the problem



                val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))

                val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)

                var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))

                var df3 = df2.select('*, explode('list).as("prevTimeStamp"))

                df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show


                Please accept the answer if it solves.






                share|improve this answer















                Here you go. Code that solves the problem



                val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))

                val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)

                var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))

                var df3 = df2.select('*, explode('list).as("prevTimeStamp"))

                df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show


                Please accept the answer if it solves.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Mar 10 at 7:45

























                answered Mar 10 at 7:39









                deodeo

                50638




                50638



























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55073539%2fspark-window-function-using-more-than-one-column%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme

                    Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

                    2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived