-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathfuncs.py
355 lines (283 loc) · 11.5 KB
/
funcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
#!/usr/bin/env python3
# --------------------------------------------------------------------------- #
# The MIT License (MIT) #
# #
# Copyright (c) 2021 Eliud Cabrera Castillo <[email protected]> #
# #
# Permission is hereby granted, free of charge, to any person obtaining #
# a copy of this software and associated documentation files #
# (the "Software"), to deal in the Software without restriction, including #
# without limitation the rights to use, copy, modify, merge, publish, #
# distribute, sublicense, and/or sell copies of the Software, and to permit #
# persons to whom the Software is furnished to do so, subject to the #
# following conditions: #
# #
# The above copyright notice and this permission notice shall be included #
# in all copies or substantial portions of the Software. #
# #
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR #
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, #
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL #
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER #
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING #
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER #
# DEALINGS IN THE SOFTWARE. #
# --------------------------------------------------------------------------- #
"""Auxiliary functions for other methods of the lbrytools package."""
import os
import random
import regex
import requests
import subprocess
import time
try:
import emoji
EMOJI_LOADED = True
except ModuleNotFoundError:
EMOJI_LOADED = False
TFMT = "%Y-%m-%d_%H:%M:%S%z %A"
TFMTp = "%Y-%m-%d_%H:%M:%S%z"
TFMTf = "%Y%m%d_%H%M"
def start_lbry():
"""Launch the lbrynet client through subprocess."""
subprocess.run(["lbrynet", "start"], stdout=subprocess.DEVNULL)
def check_lbry(server="http://localhost:5279"):
"""Check if the LBRY daemon is running, and launch it if it's not.
Send a `'status'` request to the server `'http://localhost:5279'`,
and check for `'is_running'` being `True`.
Start the service if it is not running.
::
lbrynet start
Other functions that need to call `lbrynet` should call this method
before doing other things.
Parameters
----------
server: str, optional
It defaults to `'http://localhost:5279'`.
This is the address of the `lbrynet` daemon, which should be running
in your computer before using any `lbrynet` command.
Normally, there is no need to change this parameter from its default
value.
Returns
-------
bool
It returns `True` if the LBRY daemon is already running.
It returns `False` if the LBRY daemon was not running
but it was started manually.
"""
msg = {"method": "status"}
try:
output = requests.post(server, json=msg).json()
except requests.exceptions.ConnectionError as err:
# Trap all with requests.exceptions.RequestException
print(err)
start_lbry()
return False
if "result" not in output:
print(">>> No 'result' in the JSON-RPC server output")
start_lbry()
return False
if "is_running" in output["result"] and output["result"]["is_running"]:
return True
start_lbry()
return False
# Only really works in Linux
# try:
# subprocess.run(["pidof", "lbrynet"], stdout=subprocess.DEVNULL)
# return True
# except subprocess.CalledProcessError:
# start_lbry()
# return False
def server_exists(server="http://localhost:5279"):
"""Return True if the server is up, and False if not."""
try:
requests.post(server)
except requests.exceptions.ConnectionError:
print(f"Cannot establish connection to 'lbrynet' on {server}")
print("Start server with:")
print(" lbrynet start")
return False
return True
def get_data_dir(server="http://localhost:5279"):
"""Return the directory where LBRY stores its data."""
msg = {"method": "settings_get",
"params": {}}
output = requests.post(server, json=msg).json()
data_dir = output["result"]["data_dir"]
return data_dir
def get_bdir(server="http://localhost:5279"):
"""Return the directory where LBRY stores the blob files."""
msg = {"method": "settings_get",
"params": {}}
output = requests.post(server, json=msg).json()
datadir = output["result"]["data_dir"]
blobdir = os.path.join(datadir, "blobfiles")
return blobdir
def default_ddir(server="http://localhost:5279"):
"""Get the default download directory for lbrynet."""
if not server_exists(server=server):
return os.path.expanduser("~")
msg = {"method": "settings_get"}
out_set = requests.post(server, json=msg).json()
ddir = out_set["result"]["download_dir"]
return ddir
def get_download_dir(ddir=None,
server="http://localhost:5279"):
"""Get the entered directory if it exists, or the default directory."""
old_ddir = str(ddir)
if not ddir or not os.path.exists(ddir):
ddir = default_ddir(server=server)
print(f"Directory does not exist: {old_ddir}")
print(f"Default directory: {ddir}")
return ddir
def print_content(output_list, file=None, fdate=False):
"""Print contents to the terminal or to a file."""
fd = 0
if file:
dirn = os.path.dirname(file)
base = os.path.basename(file)
if fdate:
fdate = time.strftime(TFMTf, time.gmtime()) + "_"
else:
fdate = ""
file = os.path.join(dirn, fdate + base)
try:
fd = open(file, "w")
except (FileNotFoundError, PermissionError) as err:
print(f"Cannot open file for writing; {err}")
content = "\n".join(output_list)
if file and fd:
print(content, file=fd)
fd.close()
print(f"Summary written: {file}")
else:
print(content)
return content
def sanitize_text(text="random_string"):
"""Sanitize text with complex unicode characters.
Some names have complex unicode characters, especially emojis.
With this method we remove these `grapheme clusters` so that applications
that receive the string don't cause an error.
Many terminals and interface toolkits are able to display the emojis
without problem but others such as Tkinter Text widgets
may crash when trying to display such symbols.
"""
# This will find unicode country flags, which are actually composed
# of two or more characters together, like 'U' 'S' is the US flag,
# and 'F' 'R' is the France flag.
flags = regex.findall(u'[\U0001F1E6-\U0001F1FF]', text)
text_normalized = ""
# Only remove the emojis if we have the `emoji` package loaded
if EMOJI_LOADED:
emoji_dict = emoji.UNICODE_EMOJI['en']
else:
emoji_dict = ""
for character in text:
if character in emoji_dict or character in flags:
text_normalized += "\u275A" # monospace black box
else:
text_normalized += character
return text_normalized
def process_ch_num(channels=None,
number=None, shuffle=True):
"""Process the channels which are contained in a list.
It returns a properly formatted list of pairs.
Parameters
----------
channels: list of elements
Each element of the `channels` list may be a string,
a list with a single element, or a list with two elements.
For example, given
::
ch1 = "Channel"
ch2 = ["@Ch1"]
ch3 = ["Mychan", 2]
A valid input is
::
channels = [ch1, ch2, ch3]
channels = ["Channel", ["@Ch1"], ["MyChan", 2]]
number: int, optional
It defaults to `None`.
If this is present, it will override the individual
numbers in `channels`.
shuffle: bool, optional
It defaults to `True`, in which case it will shuffle
the list of channels so that they are not processed in the order
that they come.
Returns
--------
list of dict
It returns a list of dictionaries, where each element corresponds
to a channel. The two keys are:
- 'channel', the actual channel name; if the input is invalid,
this will be set to `None`.
- 'number', the number of items from this channel;
if the input is invalid this value will be set to 0.
False
If there is a problem such as an empty input it will return `False`.
"""
if isinstance(channels, str):
channels = [channels]
if not channels or not isinstance(channels, (list, tuple)):
m = ["Specify channels and a number of claims.",
" [",
" ['@MyChannel', 2],",
" ['@AwesomeCh:8', 1],",
" ['@A-B-C#a', 3]",
" ]"]
print("\n".join(m))
print(f"channels={channels}")
return False
DEFAULT_NUM = 2
if number:
if not isinstance(number, int) or number < 0:
number = DEFAULT_NUM
print("Number must be a positive integer, "
f"set to default value, number={number}")
print("Global value overrides per channel number, "
f"number={number}")
n_channels = len(channels)
if n_channels <= 0:
print(">>> No channels in the list")
return False
out_ch_info = []
if shuffle:
if isinstance(channels, tuple):
channels = list(channels)
random.shuffle(channels)
random.shuffle(channels)
for num, channel in enumerate(channels, start=1):
ch_info = {"channel": None,
"number": 0}
if isinstance(channel, str):
c_number = DEFAULT_NUM
elif isinstance(channel, (list, tuple)):
if len(channel) < 2:
c_number = DEFAULT_NUM
else:
c_number = channel[1]
if not isinstance(c_number, (int, float)) or c_number < 0:
print(f">>> Number set to {DEFAULT_NUM}")
c_number = DEFAULT_NUM
c_number = int(c_number)
channel = channel[0]
if not isinstance(channel, str):
print(f"Channel {num}/{n_channels}, {channel}")
print(">>> Error: channel must be a string. Skip channel.")
print()
out_ch_info.append(ch_info)
continue
if channel.startswith("[") and channel.endswith("]"):
# Sometimes we want to have the channel surrounded
# by brackets `[@channels]` to indicate it is invalid.
# We just allow the channel as is.
pass
elif not channel.startswith("@"):
channel = "@" + channel
# Number overrides the individual number for all channels
if number:
c_number = number
ch_info["channel"] = channel
ch_info["number"] = c_number
out_ch_info.append(ch_info)
return out_ch_info