-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpandasql2.py
More file actions
156 lines (129 loc) · 5.24 KB
/
pandasql2.py
File metadata and controls
156 lines (129 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""Patches for pandasql package to add parameterized queries"""
from pandasql.sqldf import (
PandaSQL as pSQL,
extract_table_names,
write_table,
get_outer_frame_variables,
PandaSQLException
)
from sqlalchemy.exc import DatabaseError, ResourceClosedError
import pandas as pd
class PandaSQL(pSQL):
"""A patched subclass of PandaSQL providing parameterized queries against Pandas DataFrames"""
@staticmethod
def sql_power(x: float, y: float) -> float | None:
"""UDF to return x to the power of y"""
try:
return x**y
except:
return None
def _init_connection(self, conn):
if self.engine.name == 'sqlite':
conn.connection.driver_connection.create_function(
'power', 2, self.sql_power)
super()._init_connection(conn)
def __call__(self, query, env=None, params=None):
"""
Execute the SQL query.
Automatically creates tables mentioned in the query from dataframes before executing.
:param query: SQL query string, which can reference pandas dataframes as SQL tables.
:param env: Variables environment - a dict mapping table names to pandas dataframes.
If not specified use local and global variables of the caller.
:return: Pandas dataframe with the result of the SQL query.
"""
if env is None:
env = get_outer_frame_variables()
result = None
with self.conn as conn:
for table_name in extract_table_names(query):
if table_name not in env:
# don't raise error because the table may be already in the database
continue
if self.persist and table_name in self.loaded_tables:
# table was loaded before using the same instance, don't do it again
continue
if isinstance(env[table_name], pd.DataFrame):
self.loaded_tables.add(table_name)
write_table(env[table_name], table_name, conn)
try:
result = pd.read_sql(query, conn, params=params)
except DatabaseError as ex:
raise PandaSQLException(ex) from ex
except ResourceClosedError:
# query returns nothing
result = None
return result
@staticmethod
def uniquify(df_columns) -> list[str]:
"""Make a list of columns unique"""
seen = set()
result = []
for item in df_columns:
fudge = 1
newitem = item
while newitem in seen:
fudge += 1
newitem = f"{item}_{fudge}"
result.append(newitem)
seen.add(newitem)
return result
PANDASQL_STR = '''class PandaSQL(pSQL):
"""A patched subclass of PandaSQL providing parameterized queries against Pandas DataFrames"""
@staticmethod
def sql_power(x: float, y: float) -> float | None:
"""UDF to return x to the power of y"""
try:
return x**y
except:
return None
def _init_connection(self, conn):
if self.engine.name == 'sqlite':
conn.connection.driver_connection.create_function(
'power', 2, self.sql_power)
super()._init_connection(conn)
def __call__(self, query, env=None, params=None):
"""
Execute the SQL query.
Automatically creates tables mentioned in the query from dataframes before executing.
:param query: SQL query string, which can reference pandas dataframes as SQL tables.
:param env: Variables environment - a dict mapping table names to pandas dataframes.
If not specified use local and global variables of the caller.
:return: Pandas dataframe with the result of the SQL query.
"""
if env is None:
env = get_outer_frame_variables()
result = None
with self.conn as conn:
for table_name in extract_table_names(query):
if table_name not in env:
# don't raise error because the table may be already in the database
continue
if self.persist and table_name in self.loaded_tables:
# table was loaded before using the same instance, don't do it again
continue
if isinstance(env[table_name], pd.DataFrame):
self.loaded_tables.add(table_name)
write_table(env[table_name], table_name, conn)
try:
result = pd.read_sql(query, conn, params=params)
except DatabaseError as ex:
raise PandaSQLException(ex) from ex
except ResourceClosedError:
# query returns nothing
result = None
return result
@staticmethod
def uniquify(df_columns) -> list[str]:
"""Make a list of columns unique"""
seen = set()
result = []
for item in df_columns:
fudge = 1
newitem = item
while newitem in seen:
fudge += 1
newitem = f"{item}_{fudge}"
result.append(newitem)
seen.add(newitem)
return result
'''