Pyspark SQL query to get rows that are +/- 20% of a specific column2019 Community Moderator ElectionSQL JOIN and different types of JOINsWhat are the various join types in Spark?Add a column with a default value to an existing table in SQL ServerFetch the row which has the Max value for a columnInserting multiple rows in a single SQL query?“Least Astonishment” and the Mutable Default ArgumentSQL Server: How to Join to first rowWhat are the options for storing hierarchical data in a relational database?Get top 1 row of each groupSQL select only rows with max value on a columnAdding new column to existing DataFrame in Python pandas“Large data” work flows using pandas
How to read string as hex number in bash?
Why doesn't the chatan sign the ketubah?
Is a square zero matrix positive semidefinite?
What are the rules for concealing thieves' tools (or items in general)?
How can an organ that provides biological immortality be unable to regenerate?
Weird lines in Microsoft Word
How to test the sharpness of a knife?
Does convergence of polynomials imply that of its coefficients?
Writing in a Christian voice
What is the difference between something being completely legal and being completely decriminalized?
Someone scrambled my calling sign- who am I?
Air travel with refrigerated insulin
Why do I have a large white artefact on the rendered image?
label a part of commutative diagram
Nested Dynamic SOQL Query
Extraneous elements in "Europe countries" list
Emojional cryptic crossword
Print last inputted byte
Why is this tree refusing to shed its dead leaves?
How do researchers send unsolicited emails asking for feedback on their works?
What will the Frenchman say?
CLI: Get information Ubuntu releases
PTIJ: Which Dr. Seuss books should one obtain?
What kind of footwear is suitable for walking in micro gravity environment?
Pyspark SQL query to get rows that are +/- 20% of a specific column
2019 Community Moderator ElectionSQL JOIN and different types of JOINsWhat are the various join types in Spark?Add a column with a default value to an existing table in SQL ServerFetch the row which has the Max value for a columnInserting multiple rows in a single SQL query?“Least Astonishment” and the Mutable Default ArgumentSQL Server: How to Join to first rowWhat are the options for storing hierarchical data in a relational database?Get top 1 row of each groupSQL select only rows with max value on a columnAdding new column to existing DataFrame in Python pandas“Large data” work flows using pandas
I have the following pyspark df:
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137| 16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202| 481045| 241788|
|201522369349300207| 700861|1185640|
|201522369349300227| 178479| 267976|
+------------------+--------+-------+
For each row, I want to be able to get the rows that are within 20% of the Assets amount. For example, for the first row (ID=201542399349300619), I want to be able to get all the rows where Assets are within 20% +/- of 1,633,944 (so between 1,307,155 to 1,960,732):
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201522369349300122| 1401406|1010828|
Using this subsetted table, I want to get the average assets and add it as a new column. So for the above example, it would be the average assets of (1633944+1401406) = 1517675
+------------------+--------+-------+---------+
| ID| Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944| 32850| 1517675|
python sql pyspark apache-spark-sql pyspark-sql
add a comment |
I have the following pyspark df:
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137| 16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202| 481045| 241788|
|201522369349300207| 700861|1185640|
|201522369349300227| 178479| 267976|
+------------------+--------+-------+
For each row, I want to be able to get the rows that are within 20% of the Assets amount. For example, for the first row (ID=201542399349300619), I want to be able to get all the rows where Assets are within 20% +/- of 1,633,944 (so between 1,307,155 to 1,960,732):
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201522369349300122| 1401406|1010828|
Using this subsetted table, I want to get the average assets and add it as a new column. So for the above example, it would be the average assets of (1633944+1401406) = 1517675
+------------------+--------+-------+---------+
| ID| Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944| 32850| 1517675|
python sql pyspark apache-spark-sql pyspark-sql
@pault My mistake, I have corrected the typos. I am not very familiar with SQL, hence I want to just get some guidance on how I can achieve this with what I have in place.
– himi64
Mar 7 at 18:37
add a comment |
I have the following pyspark df:
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137| 16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202| 481045| 241788|
|201522369349300207| 700861|1185640|
|201522369349300227| 178479| 267976|
+------------------+--------+-------+
For each row, I want to be able to get the rows that are within 20% of the Assets amount. For example, for the first row (ID=201542399349300619), I want to be able to get all the rows where Assets are within 20% +/- of 1,633,944 (so between 1,307,155 to 1,960,732):
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201522369349300122| 1401406|1010828|
Using this subsetted table, I want to get the average assets and add it as a new column. So for the above example, it would be the average assets of (1633944+1401406) = 1517675
+------------------+--------+-------+---------+
| ID| Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944| 32850| 1517675|
python sql pyspark apache-spark-sql pyspark-sql
I have the following pyspark df:
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137| 16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202| 481045| 241788|
|201522369349300207| 700861|1185640|
|201522369349300227| 178479| 267976|
+------------------+--------+-------+
For each row, I want to be able to get the rows that are within 20% of the Assets amount. For example, for the first row (ID=201542399349300619), I want to be able to get all the rows where Assets are within 20% +/- of 1,633,944 (so between 1,307,155 to 1,960,732):
+------------------+--------+-------+
| ID| Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944| 32850|
|201522369349300122| 1401406|1010828|
Using this subsetted table, I want to get the average assets and add it as a new column. So for the above example, it would be the average assets of (1633944+1401406) = 1517675
+------------------+--------+-------+---------+
| ID| Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944| 32850| 1517675|
python sql pyspark apache-spark-sql pyspark-sql
python sql pyspark apache-spark-sql pyspark-sql
edited Mar 7 at 18:36
himi64
asked Mar 7 at 17:38
himi64himi64
138314
138314
@pault My mistake, I have corrected the typos. I am not very familiar with SQL, hence I want to just get some guidance on how I can achieve this with what I have in place.
– himi64
Mar 7 at 18:37
add a comment |
@pault My mistake, I have corrected the typos. I am not very familiar with SQL, hence I want to just get some guidance on how I can achieve this with what I have in place.
– himi64
Mar 7 at 18:37
@pault My mistake, I have corrected the typos. I am not very familiar with SQL, hence I want to just get some guidance on how I can achieve this with what I have in place.
– himi64
Mar 7 at 18:37
@pault My mistake, I have corrected the typos. I am not very familiar with SQL, hence I want to just get some guidance on how I can achieve this with what I have in place.
– himi64
Mar 7 at 18:37
add a comment |
1 Answer
1
active
oldest
votes
Assuming your DataFrame has a schema similar to the following (i.e. Assets and Revenue are numeric):
df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)
You can join the DataFrame to itself on the condition that you've set forth. After the join, you can group and aggregate by taking the average of the Assets column.
For example:
from pyspark.sql.functions import avg, expr
df.alias("l")
.join(
df.alias("r"),
on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
)
.groupBy("l.ID", "l.Assets", "l.Revenue")
.agg(avg("r.Assets").alias("AvgAssets"))
.show()
#+------------------+--------+-------+------------------+
#| ID| Assets|Revenue| AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914| 3691223.5|
#|201522369349300202| 481045| 241788| 481045.0|
#|201522369349300207| 700861|1185640| 700861.0|
#|201522369349300137| 16948| 171534| 16948.0|
#|201522369349300142|13474056|2285323| 1.3474056E7|
#|201522369349300227| 178479| 267976| 178479.0|
#|201542399349300619| 1633944| 32850| 1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553| 1138291.0|
#|201542399349300634| 3402687|1983568| 3691223.5|
#+------------------+--------+-------+------------------+
Since we are joining the DataFrame to itself, we can use aliases to refer to the left table ("l") and the right table ("r"). The logic above says join l to r on the condition that the assets in r is +/20% of the assets in l.
There are multiple ways to express the +/20% condition, but I am using the spark-sql between expression to find rows that are between Assets * 0.8 and Assets * 1.2.
Then we aggregate on all of the columns (groupBy) of the left table and average over the assets in the right table.
The resulting AvgAssets column is a FloatType column, but you can easily convert it to IntegerType by adding a .cast("int") before the .alias("AvgAssets") if that's what you prefer.
See also:
- What are the various join types in Spark?
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55049848%2fpyspark-sql-query-to-get-rows-that-are-20-of-a-specific-column%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Assuming your DataFrame has a schema similar to the following (i.e. Assets and Revenue are numeric):
df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)
You can join the DataFrame to itself on the condition that you've set forth. After the join, you can group and aggregate by taking the average of the Assets column.
For example:
from pyspark.sql.functions import avg, expr
df.alias("l")
.join(
df.alias("r"),
on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
)
.groupBy("l.ID", "l.Assets", "l.Revenue")
.agg(avg("r.Assets").alias("AvgAssets"))
.show()
#+------------------+--------+-------+------------------+
#| ID| Assets|Revenue| AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914| 3691223.5|
#|201522369349300202| 481045| 241788| 481045.0|
#|201522369349300207| 700861|1185640| 700861.0|
#|201522369349300137| 16948| 171534| 16948.0|
#|201522369349300142|13474056|2285323| 1.3474056E7|
#|201522369349300227| 178479| 267976| 178479.0|
#|201542399349300619| 1633944| 32850| 1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553| 1138291.0|
#|201542399349300634| 3402687|1983568| 3691223.5|
#+------------------+--------+-------+------------------+
Since we are joining the DataFrame to itself, we can use aliases to refer to the left table ("l") and the right table ("r"). The logic above says join l to r on the condition that the assets in r is +/20% of the assets in l.
There are multiple ways to express the +/20% condition, but I am using the spark-sql between expression to find rows that are between Assets * 0.8 and Assets * 1.2.
Then we aggregate on all of the columns (groupBy) of the left table and average over the assets in the right table.
The resulting AvgAssets column is a FloatType column, but you can easily convert it to IntegerType by adding a .cast("int") before the .alias("AvgAssets") if that's what you prefer.
See also:
- What are the various join types in Spark?
add a comment |
Assuming your DataFrame has a schema similar to the following (i.e. Assets and Revenue are numeric):
df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)
You can join the DataFrame to itself on the condition that you've set forth. After the join, you can group and aggregate by taking the average of the Assets column.
For example:
from pyspark.sql.functions import avg, expr
df.alias("l")
.join(
df.alias("r"),
on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
)
.groupBy("l.ID", "l.Assets", "l.Revenue")
.agg(avg("r.Assets").alias("AvgAssets"))
.show()
#+------------------+--------+-------+------------------+
#| ID| Assets|Revenue| AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914| 3691223.5|
#|201522369349300202| 481045| 241788| 481045.0|
#|201522369349300207| 700861|1185640| 700861.0|
#|201522369349300137| 16948| 171534| 16948.0|
#|201522369349300142|13474056|2285323| 1.3474056E7|
#|201522369349300227| 178479| 267976| 178479.0|
#|201542399349300619| 1633944| 32850| 1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553| 1138291.0|
#|201542399349300634| 3402687|1983568| 3691223.5|
#+------------------+--------+-------+------------------+
Since we are joining the DataFrame to itself, we can use aliases to refer to the left table ("l") and the right table ("r"). The logic above says join l to r on the condition that the assets in r is +/20% of the assets in l.
There are multiple ways to express the +/20% condition, but I am using the spark-sql between expression to find rows that are between Assets * 0.8 and Assets * 1.2.
Then we aggregate on all of the columns (groupBy) of the left table and average over the assets in the right table.
The resulting AvgAssets column is a FloatType column, but you can easily convert it to IntegerType by adding a .cast("int") before the .alias("AvgAssets") if that's what you prefer.
See also:
- What are the various join types in Spark?
add a comment |
Assuming your DataFrame has a schema similar to the following (i.e. Assets and Revenue are numeric):
df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)
You can join the DataFrame to itself on the condition that you've set forth. After the join, you can group and aggregate by taking the average of the Assets column.
For example:
from pyspark.sql.functions import avg, expr
df.alias("l")
.join(
df.alias("r"),
on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
)
.groupBy("l.ID", "l.Assets", "l.Revenue")
.agg(avg("r.Assets").alias("AvgAssets"))
.show()
#+------------------+--------+-------+------------------+
#| ID| Assets|Revenue| AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914| 3691223.5|
#|201522369349300202| 481045| 241788| 481045.0|
#|201522369349300207| 700861|1185640| 700861.0|
#|201522369349300137| 16948| 171534| 16948.0|
#|201522369349300142|13474056|2285323| 1.3474056E7|
#|201522369349300227| 178479| 267976| 178479.0|
#|201542399349300619| 1633944| 32850| 1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553| 1138291.0|
#|201542399349300634| 3402687|1983568| 3691223.5|
#+------------------+--------+-------+------------------+
Since we are joining the DataFrame to itself, we can use aliases to refer to the left table ("l") and the right table ("r"). The logic above says join l to r on the condition that the assets in r is +/20% of the assets in l.
There are multiple ways to express the +/20% condition, but I am using the spark-sql between expression to find rows that are between Assets * 0.8 and Assets * 1.2.
Then we aggregate on all of the columns (groupBy) of the left table and average over the assets in the right table.
The resulting AvgAssets column is a FloatType column, but you can easily convert it to IntegerType by adding a .cast("int") before the .alias("AvgAssets") if that's what you prefer.
See also:
- What are the various join types in Spark?
Assuming your DataFrame has a schema similar to the following (i.e. Assets and Revenue are numeric):
df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)
You can join the DataFrame to itself on the condition that you've set forth. After the join, you can group and aggregate by taking the average of the Assets column.
For example:
from pyspark.sql.functions import avg, expr
df.alias("l")
.join(
df.alias("r"),
on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
)
.groupBy("l.ID", "l.Assets", "l.Revenue")
.agg(avg("r.Assets").alias("AvgAssets"))
.show()
#+------------------+--------+-------+------------------+
#| ID| Assets|Revenue| AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914| 3691223.5|
#|201522369349300202| 481045| 241788| 481045.0|
#|201522369349300207| 700861|1185640| 700861.0|
#|201522369349300137| 16948| 171534| 16948.0|
#|201522369349300142|13474056|2285323| 1.3474056E7|
#|201522369349300227| 178479| 267976| 178479.0|
#|201542399349300619| 1633944| 32850| 1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553| 1138291.0|
#|201542399349300634| 3402687|1983568| 3691223.5|
#+------------------+--------+-------+------------------+
Since we are joining the DataFrame to itself, we can use aliases to refer to the left table ("l") and the right table ("r"). The logic above says join l to r on the condition that the assets in r is +/20% of the assets in l.
There are multiple ways to express the +/20% condition, but I am using the spark-sql between expression to find rows that are between Assets * 0.8 and Assets * 1.2.
Then we aggregate on all of the columns (groupBy) of the left table and average over the assets in the right table.
The resulting AvgAssets column is a FloatType column, but you can easily convert it to IntegerType by adding a .cast("int") before the .alias("AvgAssets") if that's what you prefer.
See also:
- What are the various join types in Spark?
answered Mar 7 at 18:54
paultpault
16.2k32652
16.2k32652
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55049848%2fpyspark-sql-query-to-get-rows-that-are-20-of-a-specific-column%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
@pault My mistake, I have corrected the typos. I am not very familiar with SQL, hence I want to just get some guidance on how I can achieve this with what I have in place.
– himi64
Mar 7 at 18:37