Exception when using a udf function with Spark DataFrameScala Spark - task not serializableWrite HDFS outputfile with ScalaSpark Task not serializable with lag Window functionHow to combine dataset from each input stream into onespark kafka producer serializablespark structured streaming (java): task not serializableSpark Scala: Task Not serializable errorScala: Task not serializable in RDD map Caused by json4s “implicit val formats = DefaultFormats”Spark streaming nested execution serialization issuesintermittent issues with saveAsTextFile while running spark ETL

Does the Crossbow Expert feat's extra crossbow attack work with the reaction attack from a Hunter ranger's Giant Killer feature?

How to I force windows to use a specific version of SQLCMD?

Pre-Employment Background Check With Consent For Future Checks

Sound waves in different octaves

What should be the ideal length of sentences in a blog post for ease of reading?

Review your own paper in Mathematics

Would this string work as string?

When and why was runway 07/25 at Kai Tak removed?

Why does a 97 / 92 key piano exist by Bösendorfer?

Do I have to take mana from my deck or hand when tapping a dual land?

Make a Bowl of Alphabet Soup

Language involving irrational number is not a CFL

Giving feedback to someone without sounding prejudiced

Storage of electrolytic capacitors - how long?

Is there a reason to prefer HFS+ over APFS for disk images in High Sierra and/or Mojave?

Do I have to know the General Relativity theory to understand the concept of inertial frame?

Sigmoid with a slope but no asymptotes?

Proving an identity involving cross products and coplanar vectors

Possible Eco thriller, man invents a device to remove rain from glass

Why does the Persian emissary display a string of crowned skulls?

Why is the Sun approximated as a black body at ~ 5800 K?

What is the meaning of "You've never met a graph you didn't like?"

What the heck is gets(stdin) on site coderbyte?

What is this high flying aircraft over Pennsylvania?



Exception when using a udf function with Spark DataFrame


Scala Spark - task not serializableWrite HDFS outputfile with ScalaSpark Task not serializable with lag Window functionHow to combine dataset from each input stream into onespark kafka producer serializablespark structured streaming (java): task not serializableSpark Scala: Task Not serializable errorScala: Task not serializable in RDD map Caused by json4s “implicit val formats = DefaultFormats”Spark streaming nested execution serialization issuesintermittent issues with saveAsTextFile while running spark ETL













1















In Spark version: 2.4.0, I am trying to execute the code below on the given DataFrame:
unfoldedDF:org.apache.spark.sql.DataFrame

movieid:integer

words:array -- element:string

tokens:string



val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()


The new dataframe created is tokensWithDf:org.apache.spark.sql.DataFrame

tokens:string

df:long



On it the following operation is done.



def findIdf(x : Long) : Double = scala.math.log10((42306).toDouble/x)
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()


It fails with the following exception:



org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)









share|improve this question
























  • what is the full stack trace of error?

    – deo
    Mar 7 at 22:26











  • You need to give more information, such as questions, full stack trace, and source code.

    – howie
    Mar 7 at 23:44











  • Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.

    – philantrovert
    Mar 8 at 13:40











  • The full stack trace has been added to the original question. The version of Spark is 2.4.0.

    – Rajarshi Chattopadhyay
    Mar 9 at 14:55











  • The code works on Spark 2.4.0. Can you share the entire stacktrace ?

    – Tej
    Mar 9 at 21:08















1















In Spark version: 2.4.0, I am trying to execute the code below on the given DataFrame:
unfoldedDF:org.apache.spark.sql.DataFrame

movieid:integer

words:array -- element:string

tokens:string



val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()


The new dataframe created is tokensWithDf:org.apache.spark.sql.DataFrame

tokens:string

df:long



On it the following operation is done.



def findIdf(x : Long) : Double = scala.math.log10((42306).toDouble/x)
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()


It fails with the following exception:



org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)









share|improve this question
























  • what is the full stack trace of error?

    – deo
    Mar 7 at 22:26











  • You need to give more information, such as questions, full stack trace, and source code.

    – howie
    Mar 7 at 23:44











  • Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.

    – philantrovert
    Mar 8 at 13:40











  • The full stack trace has been added to the original question. The version of Spark is 2.4.0.

    – Rajarshi Chattopadhyay
    Mar 9 at 14:55











  • The code works on Spark 2.4.0. Can you share the entire stacktrace ?

    – Tej
    Mar 9 at 21:08













1












1








1








In Spark version: 2.4.0, I am trying to execute the code below on the given DataFrame:
unfoldedDF:org.apache.spark.sql.DataFrame

movieid:integer

words:array -- element:string

tokens:string



val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()


The new dataframe created is tokensWithDf:org.apache.spark.sql.DataFrame

tokens:string

df:long



On it the following operation is done.



def findIdf(x : Long) : Double = scala.math.log10((42306).toDouble/x)
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()


It fails with the following exception:



org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)









share|improve this question
















In Spark version: 2.4.0, I am trying to execute the code below on the given DataFrame:
unfoldedDF:org.apache.spark.sql.DataFrame

movieid:integer

words:array -- element:string

tokens:string



val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()


The new dataframe created is tokensWithDf:org.apache.spark.sql.DataFrame

tokens:string

df:long



On it the following operation is done.



def findIdf(x : Long) : Double = scala.math.log10((42306).toDouble/x)
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()


It fails with the following exception:



org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)






scala apache-spark dataframe user-defined-functions






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 9 at 14:59







Rajarshi Chattopadhyay

















asked Mar 7 at 21:58









Rajarshi ChattopadhyayRajarshi Chattopadhyay

63




63












  • what is the full stack trace of error?

    – deo
    Mar 7 at 22:26











  • You need to give more information, such as questions, full stack trace, and source code.

    – howie
    Mar 7 at 23:44











  • Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.

    – philantrovert
    Mar 8 at 13:40











  • The full stack trace has been added to the original question. The version of Spark is 2.4.0.

    – Rajarshi Chattopadhyay
    Mar 9 at 14:55











  • The code works on Spark 2.4.0. Can you share the entire stacktrace ?

    – Tej
    Mar 9 at 21:08

















  • what is the full stack trace of error?

    – deo
    Mar 7 at 22:26











  • You need to give more information, such as questions, full stack trace, and source code.

    – howie
    Mar 7 at 23:44











  • Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.

    – philantrovert
    Mar 8 at 13:40











  • The full stack trace has been added to the original question. The version of Spark is 2.4.0.

    – Rajarshi Chattopadhyay
    Mar 9 at 14:55











  • The code works on Spark 2.4.0. Can you share the entire stacktrace ?

    – Tej
    Mar 9 at 21:08
















what is the full stack trace of error?

– deo
Mar 7 at 22:26





what is the full stack trace of error?

– deo
Mar 7 at 22:26













You need to give more information, such as questions, full stack trace, and source code.

– howie
Mar 7 at 23:44





You need to give more information, such as questions, full stack trace, and source code.

– howie
Mar 7 at 23:44













Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.

– philantrovert
Mar 8 at 13:40





Your code works fine on Spark 2.3.2. Please add more details about the version and the entire stacktrace so the error can be reproduced.

– philantrovert
Mar 8 at 13:40













The full stack trace has been added to the original question. The version of Spark is 2.4.0.

– Rajarshi Chattopadhyay
Mar 9 at 14:55





The full stack trace has been added to the original question. The version of Spark is 2.4.0.

– Rajarshi Chattopadhyay
Mar 9 at 14:55













The code works on Spark 2.4.0. Can you share the entire stacktrace ?

– Tej
Mar 9 at 21:08





The code works on Spark 2.4.0. Can you share the entire stacktrace ?

– Tej
Mar 9 at 21:08












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55053454%2fexception-when-using-a-udf-function-with-spark-dataframe%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55053454%2fexception-when-using-a-udf-function-with-spark-dataframe%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme

Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived