-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfeed_admin.py
executable file
·61 lines (50 loc) · 1.74 KB
/
feed_admin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python3
import json
from datetime import datetime
from typing import Any
from google.cloud import ndb # type: Any
from absl import app, flags
from validators import url
from jqqb_evaluator.evaluator import Evaluator
from model import FilterFeed, validate_jqqb
flags.DEFINE_integer("id", None, "ID for previous feed")
flags.DEFINE_string("url", None, "Upstream feed")
flags.DEFINE_string("name", None, "nickname for feed")
flags.DEFINE_string("query_builder", None, "querybuilder string to filter")
@flags.validator('url', 'not a valid url')
def _CheckUrl(value) -> bool:
if value is None:
return True
return url(value, public=True)
@flags.validator('query_builder', 'Not a valid query builder')
def _CheckQuery(value):
if value is None:
return True
return validate_jqqb(value)
@flags.multi_flags_validator(["id", "url", "query_builder"],
"Require both --url and --query_builder to create a new feed.")
def _CheckFlagsSet(f) -> bool:
# If no id is set (new feed) we need both fields
if not f["id"]:
return bool(f["url"] and f["query_builder"])
return True
FLAGS = flags.FLAGS
def upsert_feed(feed_id, url, name, query_builder) -> str:
client = ndb.Client()
with client.context():
if feed_id:
feed = FilterFeed.get_by_id(feed_id)
else:
feed = FilterFeed()
if url:
feed.url = url
if name:
feed.name = name
if query_builder:
feed.query_builder = json.loads(query_builder)
key = feed.put()
return f"https://filter-feed.newg.as/v1/{key.id()}"
def main(_):
print(upsert_feed(FLAGS.id, FLAGS.url, FLAGS.name, FLAGS.query_builder))
if __name__ == '__main__':
app.run(main)