Exception when using a udf function with Spark DataFrameScala Spark - task not serializableWrite HDFS outputfile with ScalaSpark Task not serializable with lag Window functionHow to combine dataset from each input stream into onespark kafka producer serializablespark structured streaming (java): task not serializableSpark Scala: Task Not serializable errorScala: Task not serializable in RDD map Caused by json4s “implicit val formats = DefaultFormats”Spark streaming nested execution serialization issuesintermittent issues with saveAsTextFile while running spark ETL
Is there a nicer/politer/more positive alternative for "negates"?
How can I write humor as character trait?
PTIJ: Why is Haman obsessed with Bose?
Review your own paper in Mathematics
What is Cash Advance APR?
Has the laser at Magurele, Romania reached a tenth of the Sun's power?
How to explain what's wrong with this application of the chain rule?
Doesn't the system of the Supreme Court oppose justice?
What are some good ways to treat frozen vegetables such that they behave like fresh vegetables when stir frying them?
Is this toilet slogan correct usage of the English language?
Why is it that I can sometimes guess the next note?
Are Captain Marvel's powers affected by Thanos breaking the Tesseract and claiming the stone?
Is there a RAID 0 Equivalent for RAM?
Stack Interview Code methods made from class Node and Smart Pointers
How to draw a matrix with arrows in limited space
What is going on with gets(stdin) on the site coderbyte?
Why do Radio Buttons not fill the entire outer circle?
What features enable the Su-25 Frogfoot to operate with such a wide variety of fuels?
How much of a Devil Fruit must be consumed to gain the power?
Biological Blimps: Propulsion
C++ check if statement can be evaluated constexpr
"It doesn't matter" or "it won't matter"?
What is the highest possible scrabble score for placing a single tile
Is there any evidence that Cleopatra and Caesarion considered fleeing to India to escape the Romans?
Exception when using a udf function with Spark DataFrame
Scala Spark - task not serializableWrite HDFS outputfile with ScalaSpark Task not serializable with lag Window functionHow to combine dataset from each input stream into onespark kafka producer serializablespark structured streaming (java): task not serializableSpark Scala: Task Not serializable errorScala: Task not serializable in RDD map Caused by json4s “implicit val formats = DefaultFormats”Spark streaming nested execution serialization issuesintermittent issues with saveAsTextFile while running spark ETL
In Spark version: 2.4.0, I am trying to execute the code below on the given DataFrame:
unfoldedDF:org.apache.spark.sql.DataFrame
movieid:integer
words:array -- element:string
tokens:string
val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()
The new dataframe created is tokensWithDf:org.apache.spark.sql.DataFrame
tokens:string
df:long
On it the following operation is done.
def findIdf(x : Long) : Double = scala.math.log10((42306).toDouble/x)
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()
It fails with the following exception:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)
scala apache-spark dataframe user-defined-functions
add a comment |
In Spark version: 2.4.0, I am trying to execute the code below on the given DataFrame:
unfoldedDF:org.apache.spark.sql.DataFrame
movieid:integer
words:array -- element:string
tokens:string
val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()
The new dataframe created is tokensWithDf:org.apache.spark.sql.DataFrame
tokens:string
df:long
On it the following operation is done.
def findIdf(x : Long) : Double = scala.math.log10((42306).toDouble/x)
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()
It fails with the following exception:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)
scala apache-spark dataframe user-defined-functions
what is the full stack trace of error?
– deo
Mar 7 at 22:26
You need to give more information, such as questions, full stack trace, and source code.
– howie
Mar 7 at 23:44
Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.
– philantrovert
Mar 8 at 13:40
The full stack trace has been added to the original question. The version of Spark is 2.4.0.
– Rajarshi Chattopadhyay
Mar 9 at 14:55
The code works on Spark 2.4.0. Can you share the entire stacktrace ?
– Tej
Mar 9 at 21:08
add a comment |
In Spark version: 2.4.0, I am trying to execute the code below on the given DataFrame:
unfoldedDF:org.apache.spark.sql.DataFrame
movieid:integer
words:array -- element:string
tokens:string
val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()
The new dataframe created is tokensWithDf:org.apache.spark.sql.DataFrame
tokens:string
df:long
On it the following operation is done.
def findIdf(x : Long) : Double = scala.math.log10((42306).toDouble/x)
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()
It fails with the following exception:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)
scala apache-spark dataframe user-defined-functions
In Spark version: 2.4.0, I am trying to execute the code below on the given DataFrame:
unfoldedDF:org.apache.spark.sql.DataFrame
movieid:integer
words:array -- element:string
tokens:string
val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()
The new dataframe created is tokensWithDf:org.apache.spark.sql.DataFrame
tokens:string
df:long
On it the following operation is done.
def findIdf(x : Long) : Double = scala.math.log10((42306).toDouble/x)
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()
It fails with the following exception:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)
scala apache-spark dataframe user-defined-functions
scala apache-spark dataframe user-defined-functions
edited Mar 9 at 14:59
Rajarshi Chattopadhyay
asked Mar 7 at 21:58
Rajarshi ChattopadhyayRajarshi Chattopadhyay
63
63
what is the full stack trace of error?
– deo
Mar 7 at 22:26
You need to give more information, such as questions, full stack trace, and source code.
– howie
Mar 7 at 23:44
Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.
– philantrovert
Mar 8 at 13:40
The full stack trace has been added to the original question. The version of Spark is 2.4.0.
– Rajarshi Chattopadhyay
Mar 9 at 14:55
The code works on Spark 2.4.0. Can you share the entire stacktrace ?
– Tej
Mar 9 at 21:08
add a comment |
what is the full stack trace of error?
– deo
Mar 7 at 22:26
You need to give more information, such as questions, full stack trace, and source code.
– howie
Mar 7 at 23:44
Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.
– philantrovert
Mar 8 at 13:40
The full stack trace has been added to the original question. The version of Spark is 2.4.0.
– Rajarshi Chattopadhyay
Mar 9 at 14:55
The code works on Spark 2.4.0. Can you share the entire stacktrace ?
– Tej
Mar 9 at 21:08
what is the full stack trace of error?
– deo
Mar 7 at 22:26
what is the full stack trace of error?
– deo
Mar 7 at 22:26
You need to give more information, such as questions, full stack trace, and source code.
– howie
Mar 7 at 23:44
You need to give more information, such as questions, full stack trace, and source code.
– howie
Mar 7 at 23:44
Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.
– philantrovert
Mar 8 at 13:40
Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.
– philantrovert
Mar 8 at 13:40
The full stack trace has been added to the original question. The version of Spark is 2.4.0.
– Rajarshi Chattopadhyay
Mar 9 at 14:55
The full stack trace has been added to the original question. The version of Spark is 2.4.0.
– Rajarshi Chattopadhyay
Mar 9 at 14:55
The code works on Spark 2.4.0. Can you share the entire stacktrace ?
– Tej
Mar 9 at 21:08
The code works on Spark 2.4.0. Can you share the entire stacktrace ?
– Tej
Mar 9 at 21:08
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55053454%2fexception-when-using-a-udf-function-with-spark-dataframe%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55053454%2fexception-when-using-a-udf-function-with-spark-dataframe%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
what is the full stack trace of error?
– deo
Mar 7 at 22:26
You need to give more information, such as questions, full stack trace, and source code.
– howie
Mar 7 at 23:44
Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.
– philantrovert
Mar 8 at 13:40
The full stack trace has been added to the original question. The version of Spark is 2.4.0.
– Rajarshi Chattopadhyay
Mar 9 at 14:55
The code works on Spark 2.4.0. Can you share the entire stacktrace ?
– Tej
Mar 9 at 21:08