Skip to content

Commit 0225d19

Browse files
Merge pull request #9 from RumbleDB/JSONiqMagic
Add JSONiq magic.
2 parents 5ddcc1b + b4fd49c commit 0225d19

10 files changed

Lines changed: 400 additions & 12 deletions

File tree

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ JSONiq queries are invoked with rumble.jsoniq() in a way similar to the way Spar
3030

3131
JSONiq variables can be bound to lists of JSON values (str, int, float, True, False, None, dict, list) or to Pyspark DataFrames. A JSONiq query can use as many variables as needed (for example, it can join between different collections).
3232

33-
It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://rumble.readthedocs.io/en/latest/).
33+
It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://docs.rumbledb.org/writing-jsoniq-queries-in-python).
3434

3535
The resulting sequence of items can be retrieved as a list of JSON values, as a Pyspark DataFrame, or, for advanced users, as an RDD or with a streaming iteration over the items using the [RumbleDB Item API](https://github.com/RumbleDB/rumble/blob/master/src/main/java/org/rumbledb/api/Item.java).
3636

3737
It is also possible to write the sequence of items to the local disk, to HDFS, to S3, etc in a way similar to how DataFrames are written back by Pyspark.
3838

39+
The library also contains a jsoniq magic that allows you to directly write JSONiq queries in a Jupyter notebook and see the results automatically output on the screen.
40+
3941
The design goal is that it is possible to chain DataFrames between JSONiq and Spark SQL queries seamlessly. For example, JSONiq can be used to clean up very messy data and turn it into a clean DataFrame, which can then be processed with Spark SQL, spark.ml, etc.
4042

4143
Any feedback or error reports are very welcome.
@@ -345,7 +347,12 @@ seq.write().mode("overwrite").text("outputtext");
345347

346348
Even more queries can be found [here](https://colab.research.google.com/github/RumbleDB/rumble/blob/master/RumbleSandbox.ipynb) and you can look at the [JSONiq documentation](https://www.jsoniq.org) and tutorials.
347349

348-
# Last updates
350+
# Latest updates
351+
352+
## Version 0.2.0 alpha 2
353+
- You can change the result size cap through to the now accessible Rumble configuration (for example rumble .getRumbleConf().setResultSizeCap(10)). This controls how many items can be retrieved at most with a json() call. You can increase it to whichever number you would like if you reach the cap.
354+
- Add the JSONiq magic to execute JSONiq queries directly in a notebook cell, using the RumbleDB instance shipped with the library.
355+
- RumbleSession.builder.getOrCreate() now correctly reuses an existing session instead of creating a new object. It preserves the configuration.
349356

350357
## Version 0.2.0 alpha 1
351358
- Allow to bind JSONiq variables to pandas dataframes

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "jsoniq"
7-
version = "0.2.0a1"
7+
version = "0.2.0a2"
88
description = "Python edition of RumbleDB, a JSONiq engine"
99
requires-python = ">=3.11"
1010
dependencies = [

src/jsoniq/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from jsoniq.session import RumbleSession
22

3-
__all__ = ["RumbleSession"]
3+
__all__ = ["RumbleSession"]
288 Bytes
Binary file not shown.

src/jsoniq/sequence.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,26 @@
44
import json
55

66
class SequenceOfItems:
7-
def __init__(self, sequence, sparksession):
7+
def __init__(self, sequence, rumblesession):
88
self._jsequence = sequence
9-
self._sparkcontext = sparksession.sparkContext
10-
self._sparksession = sparksession
9+
self._rumblesession = rumblesession
10+
self._sparksession = rumblesession._sparksession
11+
self._sparkcontext = self._sparksession.sparkContext
12+
13+
def items(self):
14+
return self.getAsList()
15+
16+
def take(self, n):
17+
return tuple(self.getFirstItemsAsList(n))
18+
19+
def first(self):
20+
return tuple(self.getFirstItemsAsList(self._rumblesession.getRumbleConf().getResultSizeCap()))
1121

1222
def json(self):
13-
return tuple([json.loads(l.serializeAsJSON()) for l in self._jsequence.items()])
23+
return tuple([json.loads(l.serializeAsJSON()) for l in self._jsequence.getAsList()])
1424

1525
def rdd(self):
16-
rdd = self._jsequence.getAsPickledStringRDD();
26+
rdd = self._jsequence.getAsPickledStringRDD()
1727
rdd = RDD(rdd, self._sparkcontext)
1828
return rdd.map(lambda l: json.loads(l))
1929

@@ -22,7 +32,10 @@ def df(self):
2232

2333
def pdf(self):
2434
return self.df().toPandas()
25-
35+
36+
def count(self):
37+
return self._jsequence.count()
38+
2639
def nextJSON(self):
2740
return self._jsequence.next().serializeAsJSON()
2841

src/jsoniq/session.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ def __init__(self, spark_session: SparkSession):
2222
self._sparksession = spark_session
2323
self._jrumblesession = spark_session._jvm.org.rumbledb.api.Rumble(spark_session._jsparkSession)
2424

25+
def getRumbleConf(self):
26+
return self._jrumblesession.getConfiguration()
27+
2528
class Builder:
2629
def __init__(self):
2730

@@ -60,8 +63,18 @@ def __init__(self):
6063
self._sparkbuilder = SparkSession.builder.config("spark.jars", jar_path_str)
6164

6265
def getOrCreate(self):
63-
return RumbleSession(self._sparkbuilder.getOrCreate())
66+
if RumbleSession._rumbleSession is None:
67+
RumbleSession._rumbleSession = RumbleSession(self._sparkbuilder.getOrCreate())
68+
return RumbleSession._rumbleSession
6469

70+
def create(self):
71+
RumbleSession._rumbleSession = RumbleSession(self._sparkbuilder.create())
72+
return RumbleSession._rumbleSession
73+
74+
def remote(self, spark_url):
75+
self._sparkbuilder = self._sparkbuilder.remote(spark_url)
76+
return self
77+
6578
def appName(self, name):
6679
self._sparkbuilder = self._sparkbuilder.appName(name);
6780
return self;
@@ -83,6 +96,7 @@ def __getattr__(self, name):
8396
return res;
8497

8598
_builder = Builder()
99+
_rumbleSession = None
86100

87101
def convert(self, value):
88102
if isinstance(value, tuple):
@@ -155,7 +169,7 @@ def bindDataFrameAsVariable(self, name: str, df):
155169

156170
def jsoniq(self, str):
157171
sequence = self._jrumblesession.runQuery(str);
158-
return SequenceOfItems(sequence, self._sparksession);
172+
return SequenceOfItems(sequence, self);
159173

160174
def __getattr__(self, item):
161175
return getattr(self._sparksession, item)

src/jsoniqmagic/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from jsoniq.session import RumbleSession
2+
from jsoniqmagic.magic import JSONiqMagic
3+
4+
__all__ = ["JSONiqMagic"]
5+
6+
def load_ipython_extension(ipython):
7+
rumble = RumbleSession.builder.getOrCreate();
8+
rumble.getRumbleConf().setResultSizeCap(10);
9+
ipython.register_magics(JSONiqMagic)

src/jsoniqmagic/magic.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from IPython.core.magic import Magics, cell_magic, magics_class
2+
import time, json
3+
from jsoniq.session import RumbleSession
4+
from py4j.protocol import Py4JJavaError
5+
6+
@magics_class
7+
class JSONiqMagic(Magics):
8+
def run(self, line, cell=None, timed=False):
9+
if cell is None:
10+
data = line
11+
else:
12+
data = cell
13+
14+
start = time.time()
15+
try:
16+
rumble = RumbleSession.builder.getOrCreate();
17+
response = rumble.jsoniq(data);
18+
except Py4JJavaError as e:
19+
print(e.java_exception.getMessage())
20+
return
21+
except Exception as e:
22+
print("Query unsuccessful.")
23+
print("Usual reasons: firewall, misconfigured proxy.")
24+
print("Error message:")
25+
print(e.args[0])
26+
return
27+
except:
28+
print("Query unsuccessful.")
29+
print("Usual reasons: firewall, misconfigured proxy.")
30+
return
31+
end = time.time()
32+
if(timed):
33+
print("Response time: %s ms" % (end - start))
34+
35+
if ("DataFrame" in response.availableOutputs()):
36+
print(response.pdf())
37+
elif ("Local" in response.availableOutputs()):
38+
capplusone = response.take(rumble.getRumbleConf().getResultSizeCap() + 1)
39+
if len(capplusone) > rumble.getRumbleConf().getResultSizeCap():
40+
count = response.count()
41+
print("The query output %s items, which is too many to display. Displaying the first %s items:" % (count, rumble.getRumbleConf().getResultSizeCap()))
42+
for e in capplusone[:rumble.getRumbleConf().getResultSizeCap()]:
43+
print(json.dumps(json.loads(e.serializeAsJSON()), indent=2))
44+
elif ("PUL" in response.availableOutputs()):
45+
print("The query output a Pending Update List.")
46+
else:
47+
print("No output available.")
48+
49+
@cell_magic
50+
def jsoniq(self, line, cell=None):
51+
return self.run(line, cell, False)
52+
53+
@cell_magic
54+
def timedjsoniq(self, line, cell=None):
55+
return self.run(line, cell, True)

tests/test_iterator_bug.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from jsoniq import RumbleSession
2+
from unittest import TestCase
3+
import json
4+
class TryTesting(TestCase):
5+
def test1(self):
6+
# The syntax to start a session is similar to that of Spark.
7+
# A RumbleSession is a SparkSession that additionally knows about RumbleDB.
8+
# All attributes and methods of SparkSession are also available on RumbleSession.
9+
rumble = RumbleSession.builder.appName("PyRumbleExample").getOrCreate();
10+
# A more complex, standalone query
11+
12+
seq = rumble.jsoniq("""
13+
max(
14+
let $path := "http://www.rumbledb.org/samples/git-archive-small.json"
15+
for $event in json-lines($path)
16+
return 1
17+
)
18+
""");
19+
20+
expected = [1]
21+
22+
self.assertTrue(json.dumps(seq.json()) == json.dumps(expected))

0 commit comments

Comments
 (0)