forked from Pingze-github/mango
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmango.py
214 lines (179 loc) · 6.36 KB
/
mango.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# mongo的ORM库
# 宗旨:
# API尽量贴合mongo原生pymongo
# 转换pymongo的字典为对象
# 具有简单的校验,
# 可管理index
from pymongo import MongoClient, errors
global connection
global db
connection = None
db = None
# ****** connect *******
def connect(
database,
ip='127.0.0.1',
port=27017,
username=None,
password=None,
):
global connection, db
connection = MongoClient(
ip,
port,
username=username,
password=password,
authSource=database
)
db = connection[database]
return connection, db
# ****** Fields ******
class Field(object):
_type = object
def __init__(self, field_name=None, index=False, unique=False):
self.name = field_name
self.index = index
self.unique = unique
def __str__(self):
return self.name
def field_assert(self, value, name='?'):
if not isinstance(value, self._type):
raise TypeError('document.{} given not match the field "{}". It\'s class is "{}"'
.format(name, self.__class__.__name__, value.__class__.__name__))
def setname(self, name):
self.name = name
class IntField(Field):
_type = int
class StringField(Field):
_type = str
class ListField(Field):
_type = list
class DictField(Field):
_type = dict
# ****** Model Funcs ******
def cls_method_creater(func_name):
'直接生成pymongo原生方法的包装类方法'
@classmethod
def func(cls, *args, **kargs):
# args[0]为插入字段时,做替换,清除未声明的field
if func_name in ['insert', 'create']:
args = (cls.filte_field(args[0]),) + args[1:]
else:
cls.filte_field(args[0])
func_real = getattr(db[cls.Meta.collection], func_name)
returned = func_real(*args, **kargs)
return returned
return func
@classmethod
def find(cls, *args, **kargs):
cursor = db[cls.Meta.collection].find(*args, **kargs)
# 可以实现直接返回Model对象,但这样返回字典手动转换,更灵活
return cursor
@classmethod
def find_one(cls, *args, **kargs):
data_dict = db[cls.Meta.collection].find_one(*args, **kargs)
return cls.dict2obj(data_dict)
@classmethod
def insert(cls, *args, **kargs):
args = (cls.filte_field(args[0]),) + args[1:]
_id = db[cls.Meta.collection].insert(*args, **kargs)
return _id
@classmethod
def create(cls, **kargs):
kargs = cls.filte_field(kargs)
_id = db[cls.Meta.collection].insert(kargs)
return _id
@classmethod
def update(cls, *args, **kargs):
# cls.filte_field(args[0])
# TODO pymongo的update使用多重字典,无法直接做检查
returned = db[cls.Meta.collection].update(*args, **kargs)
return returned
@classmethod
def remove(cls, *args, **kargs):
# cls.filte_field(args[0])
# TODO pymongo的remove使用多重字典,无法直接做检查
returned = db[cls.Meta.collection].remove(*args, **kargs)
return returned
# ****** Model ******
class Model(object):
find = find
find_one = find_one
insert = cls_method_creater('insert')
create = create
update = cls_method_creater('update')
remove = cls_method_creater('remove')
insert_many = cls_method_creater('insert_many')
insert_one = cls_method_creater('insert_one')
index_information = cls_method_creater('index_information')
update_many = cls_method_creater('update_many')
ensure_index = cls_method_creater('ensure_index')
drop_indexes = cls_method_creater('drop_indexes')
drop_index = cls_method_creater('drop_index')
delete_one = cls_method_creater('delete_one')
delete_many = cls_method_creater('delete_many')
create_indexes = cls_method_creater('create_indexes')
create_index = cls_method_creater('create_index')
count = cls_method_creater('count')
aggregate = cls_method_creater('aggregate')
class Meta():
strict = False
collection = None
index = None
database = db
# Python3.6及以上
def __init_subclass__(subcls, **kargs):
subcls.Meta.collection = subcls.Meta.collection if hasattr(subcls.Meta, 'collection') else subcls.__name__.lower()
collection = subcls.Meta.database[subcls.Meta.collection]
print(dir(collection))
if hasattr(subcls.Meta, 'index'):
for index in subcls.Meta.index:
try:
collection.create_index([index])
except errors.OperationFailure:
pass
for attr_name in dir(subcls):
field = getattr(subcls, attr_name)
if isinstance(field, Field):
try:
if field.unique:
collection.create_index([(attr_name, -1 if field.index == -1 else 1)], unique=True)
elif field.index:
collection.create_index([(attr_name, -1 if field.index == -1 else 1)])
except errors.OperationFailure:
pass
def __init__(self, *args, **kargs):
if args and not kargs and type(args[0]) == dict:
dict_data = args[0]
kargs = (lambda **kargs: kargs)(**dict_data)
for attr_name in dir(self):
field = self.__getitem__(attr_name)
if isinstance(field, Field):
if not field.name:
field.setname(attr_name)
if attr_name in kargs:
kargs_v = kargs[attr_name]
field.field_assert(kargs_v, attr_name)
self.__setitem__(attr_name, kargs_v)
else:
if self.__class__.Meta.strict:
raise AttributeError('Model.{} not exists in real document'.format(attr_name))
else:
self.__setitem__(attr_name, None)
def __getitem__(self, k):
return getattr(self, k)
def __setitem__(self, k, v):
setattr(self, k, v)
@classmethod
def dict2obj(cls, data_dict):
return cls(**data_dict)
@classmethod
def filte_field(cls, field_dict):
new_dict = {}
for key in field_dict:
val = field_dict[key]
if not hasattr(cls, key) or not isinstance(getattr(cls, key), Field):
continue
getattr(cls, key).field_assert(val, key)
new_dict[key] = val
return new_dict