-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsql_worker.py
More file actions
307 lines (268 loc) · 14.2 KB
/
sql_worker.py
File metadata and controls
307 lines (268 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
import json
import sqlite3
import time
class SQLWrapper:
def __init__(self, dbname):
self.dbname = dbname
def __enter__(self):
self.sqlite_connection = sqlite3.connect(self.dbname)
self.cursor = self.sqlite_connection.cursor()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_type:
self.sqlite_connection.commit()
self.cursor.close()
self.sqlite_connection.close()
class SqlWorker:
dbname = ""
def __init__(self, dbname, recommended):
self.dbname = dbname
sqlite_connection = sqlite3.connect(dbname)
cursor = sqlite_connection.cursor()
cursor.execute("""CREATE TABLE if not exists current_polls (
unique_id TEXT NOT NULL PRIMARY KEY,
message_id INTEGER UNIQUE,
type TEXT NOT NULL,
chat_id INTEGER,
buttons TEXT,
timer INTEGER,
data TEXT NOT NULL,
votes_need INTEGER,
hidden INTEGER,
thread_id INTEGER);""")
cursor.execute("""CREATE TABLE if not exists abuse (
user_id INTEGER PRIMARY KEY,
start_time INTEGER,
timer INTEGER);""")
cursor.execute("""CREATE TABLE if not exists whitelist (
user_id INTEGER PRIMARY KEY);""")
cursor.execute("""CREATE TABLE if not exists mailing (
user_id INTEGER PRIMARY KEY);""")
cursor.execute("""CREATE TABLE if not exists rating (
user_id INTEGER PRIMARY KEY,
rate INTEGER);""")
cursor.execute("""CREATE TABLE if not exists abuse_random (
chat_id INTEGER PRIMARY KEY,
abuse_random INTEGER);""")
cursor.execute("""CREATE TABLE if not exists allies (
chat_id INTEGER PRIMARY KEY);""")
cursor.execute("""CREATE TABLE if not exists params (
params TEXT PRIMARY KEY);""")
cursor.execute("""CREATE TABLE if not exists captcha (
message_id TEXT,
user_id TEXT,
max_value INTEGER,
username TEXT);""")
cursor.execute("""CREATE TABLE if not exists marmalade (
user_id INTEGER PRIMARY KEY,
entry_time INTEGER);""")
cursor.execute("""DELETE FROM captcha""")
cursor.execute("""SELECT * FROM params""")
records = cursor.fetchall()
if not records:
cursor.execute("""INSERT INTO params VALUES (?)""", (json.dumps(recommended),))
# Temporary code for backward compatibility
try:
cursor.execute("ALTER TABLE current_polls ADD COLUMN thread_id INTEGER")
except sqlite3.OperationalError:
pass
# End of temporary code block
sqlite_connection.commit()
cursor.close()
sqlite_connection.close()
def get_all_polls(self):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM current_polls""")
records = sql_wrapper.cursor.fetchall()
return records
def abuse_update(self, user_id, timer=1800, force=False):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM abuse WHERE user_id = ?""", (user_id,))
record = sql_wrapper.cursor.fetchall()
if not record:
sql_wrapper.cursor.execute("""INSERT INTO abuse VALUES (?,?,?);""", (user_id, int(time.time()), timer))
elif not force:
sql_wrapper.cursor.execute("""UPDATE abuse SET start_time = ?, timer = ? WHERE user_id = ?""",
(int(time.time()), record[0][2] * 2, user_id))
else:
sql_wrapper.cursor.execute("""UPDATE abuse SET start_time = ?, timer = ? WHERE user_id = ?""",
(int(time.time()), timer, user_id))
def abuse_remove(self, user_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""DELETE FROM abuse WHERE user_id = ?""", (user_id,))
def abuse_check(self, user_id, force=False):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM abuse WHERE user_id = ?""", (user_id,))
record = sql_wrapper.cursor.fetchall()
if not record:
return 0, 0
if record[0][1] + record[0][2] < int(time.time()) and not force:
return 0, 0
else:
return record[0][1], record[0][2]
def whitelist(self, user_id, add=False, remove=False):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM whitelist WHERE user_id = ?""", (user_id,))
fetchall = sql_wrapper.cursor.fetchall()
is_white = False
if fetchall:
if remove:
sql_wrapper.cursor.execute("""DELETE FROM whitelist WHERE user_id = ?""", (user_id,))
else:
is_white = True
if add and not fetchall:
sql_wrapper.cursor.execute("""INSERT INTO whitelist VALUES (?);""", (user_id,))
is_white = True
return is_white
def whitelist_get_all(self):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM whitelist""")
fetchall = sql_wrapper.cursor.fetchall()
return fetchall
def mailing(self, user_id, add=False, remove=False):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM mailing WHERE user_id = ?""", (user_id,))
fetchall = sql_wrapper.cursor.fetchall()
is_mailing = False
if fetchall:
if remove:
sql_wrapper.cursor.execute("""DELETE FROM mailing WHERE user_id = ?""", (user_id,))
else:
is_mailing = True
if add and not fetchall:
sql_wrapper.cursor.execute("""INSERT INTO mailing VALUES (?);""", (user_id,))
is_mailing = True
return is_mailing
def mailing_get_all(self):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM mailing""")
fetchall = sql_wrapper.cursor.fetchall()
return fetchall
def add_poll(self, *args):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""INSERT INTO current_polls VALUES (?,?,?,?,?,?,?,?,?,?);""", args)
def get_poll(self, message_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM current_polls WHERE message_id = ?""", (message_id,))
records = sql_wrapper.cursor.fetchall()
return records
def get_message_id(self, unique_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM current_polls WHERE unique_id = ?""", (unique_id,))
records = sql_wrapper.cursor.fetchall()
if records:
return records[0][1]
return None
def update_poll_votes(self, unique_id, buttons_scheme):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""UPDATE current_polls SET buttons = ? where unique_id = ?""",
(buttons_scheme, unique_id))
def rem_rec(self, unique_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""DELETE FROM current_polls WHERE unique_id = ?""", (unique_id,))
def get_rate(self, user_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM rating WHERE user_id = ?""", (user_id,))
record = sql_wrapper.cursor.fetchall()
if not record:
sql_wrapper.cursor.execute("""INSERT INTO rating VALUES (?,?)""", (user_id, 0))
return 0
return record[0][1]
def get_all_rates(self):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM rating""")
record = sql_wrapper.cursor.fetchall()
if not record:
return None
return record
def update_rate(self, user_id, change):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM rating WHERE user_id = ?""", (user_id,))
record = sql_wrapper.cursor.fetchall()
if not record:
sql_wrapper.cursor.execute("""INSERT INTO rating VALUES (?,?)""", (user_id, change))
else:
sql_wrapper.cursor.execute("""UPDATE rating SET rate = ? where user_id = ?""",
(record[0][1] + change, user_id))
def clear_rate(self, user_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""DELETE FROM rating WHERE user_id = ?""", (user_id,))
def get_ally(self, chat_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM allies WHERE chat_id = ?""", (chat_id,))
record = sql_wrapper.cursor.fetchall()
if not record:
return None
return record[0]
def get_allies(self):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM allies""")
record = sql_wrapper.cursor.fetchall()
if not record:
return []
return record
def add_ally(self, chat_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""INSERT INTO allies VALUES (?)""", (chat_id,))
def remove_ally(self, chat_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""DELETE FROM allies WHERE chat_id = ?""", (chat_id,))
def abuse_random(self, chat_id, change=None):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM abuse_random WHERE chat_id = ?""", (chat_id,))
record = sql_wrapper.cursor.fetchall()
if change is not None:
if not record:
sql_wrapper.cursor.execute("""INSERT INTO abuse_random VALUES (?,?)""", (chat_id, change))
else:
sql_wrapper.cursor.execute("""UPDATE abuse_random SET abuse_random = ? where chat_id = ?""",
(change, chat_id))
if not record:
return 0
return record[0][1]
def params(self, key, rewrite_value=None, default_return=None):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM params""")
record: dict = json.loads(sql_wrapper.cursor.fetchall()[0][0])
return_value = record.get(key, default_return)
if rewrite_value is not None:
record.update({key: rewrite_value})
sql_wrapper.cursor.execute("""UPDATE params SET params = ?""", (json.dumps(record),))
return return_value
def captcha(self, message_id, add=False, remove=False, user_id=None, max_value=None, username=None):
with SQLWrapper(self.dbname) as sql_wrapper:
if add:
sql_wrapper.cursor.execute("""INSERT INTO captcha VALUES (?, ?, ?, ?)""",
(message_id, user_id, max_value, username))
return None
elif remove:
sql_wrapper.cursor.execute("""DELETE FROM captcha WHERE message_id = ?""", (message_id,))
return None
elif user_id:
sql_wrapper.cursor.execute("""SELECT * FROM captcha WHERE user_id = ?""", (user_id,))
return sql_wrapper.cursor.fetchall()
else:
sql_wrapper.cursor.execute("""SELECT * FROM captcha WHERE message_id = ?""", (message_id,))
return sql_wrapper.cursor.fetchall()
def marmalade_add(self, user_id, entry_time):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM marmalade WHERE user_id = ?""", (user_id,))
record = sql_wrapper.cursor.fetchall()
if not record:
sql_wrapper.cursor.execute("""INSERT INTO marmalade VALUES (?, ?)""",
(user_id, entry_time))
else:
sql_wrapper.cursor.execute("""UPDATE marmalade SET entry_time = ? WHERE user_id = ?""",
(entry_time, user_id))
return None
def marmalade_get(self, user_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""SELECT * FROM marmalade WHERE user_id = ?""", (user_id,))
record = sql_wrapper.cursor.fetchall()
if record:
return record[0][1]
return None
def marmalade_remove(self, user_id):
with SQLWrapper(self.dbname) as sql_wrapper:
sql_wrapper.cursor.execute("""DELETE FROM marmalade WHERE user_id = ?""", (user_id,))
return None