TransWikia.com

Evaluate Xquery in pyspark on RDD elements

Stack Overflow Asked by Fahad Rana on September 6, 2020

We are trying to read large number of XML’s and run Xquery on them in pyspark for example books xml. We are using spark-xml-utils library.

  • We want to feed the directory containing xmls to pyspark.
  • Run Xquery on all of them to get our results.

reference answer: Calling scala code in pyspark for XSLT transformations

The definition of xquery processor where xquery is the string of xquery:

proc = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(xquery)

We are reading the files in a directory using:

sc.wholeTextFiles("xmls/test_files")

This gives us an RDD containing all the files as a list of tuples:

[ (Filename1,FileContentAsAString), (Filename2,File2ContentAsAString) ]

The xquery evaluates and gives us results if we run on the string (FileContentAsAString)

whole_files = sc.wholeTextFiles("xmls/test_files").collect()
proc.evaluate(whole_files[1][1])
# Prints proper xquery result for that file

Problem:

If we try to run proc.evaluate() on the RDD using lambda function, it is failing.

test_file = sc.wholeTextFiles("xmls/test_files")
test_file.map(lambda x: proc.evaluate(x[1])).collect()

# Should give us a list of xquery results 

Error:

PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

These functions work somehow but not the evaluate above:

Print the content xquery is applied on

test_file.map(lambda x: x[1]).collect()

# Outputs the content. if x[0], gives us the list of filenames

Return the len of characters in the contents

test_file.map(lambda x: len(x[1])).collect()
# Output: [15274, 13689, 13696]

Books example for reference:

books_xquery = """for $x in /bookstore/book
where $x/price>30
return $x/title/data()"""

proc_books = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(books_xquery)

books_xml = sc.wholeTextFiles("xmls/books.xml")
books_xml.map(lambda x: proc_books.evaluate(x[1])).collect()
# Error
# I can share the stacktrace if you guys want

One Answer

Unfortunately it is not possible to call a Java/Scala library directly within a map call from Python code. This answer gives a good explanation why there is no easy way to do this. In short the reason is that the Py4J gateway (which is necessary to "translate" the Python calls into the JVM world) only lives on the driver node while the map calls that you are trying to execute are running on the executor nodes.

One way around that problem would be to wrap the XQuery function in a Scala UDF (explained here), but it still would be necessary to write a few lines of Scala code.

EDIT: If you are able to switch from XQuery to XPath, a probably easier option is to change the (XPath) library. ElementTree is an XML libary written in Python and also XPath.

The code

xmls = spark.sparkContext.wholeTextFiles("xmls/test_files")
import xml.etree.ElementTree as ET
xpathquery = "...your query..."
xmls.flatMap(lambda x: ET.fromstring(x[1]).findall(xpathquery)) 
    .map(lambda x: x.text) 
    .foreach(print)

would print all results of running the xpathquery against all documents loaded from the directory xmls/test_files.

At first a flatMap is used as the findall call returns a list of all matching elements within each document. By using flatMap this list is flattened (the result might contain more than one element per file). In the second map call the elements are mapped to their text in order to get a readable output.

Answered by werner on September 6, 2020

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP