1
1
import logging
2
- from datetime import datetime
3
- from urllib .parse import urlparse
4
2
from dataclasses import dataclass , asdict
5
- from typing import Dict , List , Optional
3
+ from datetime import datetime
6
4
from functools import lru_cache
5
+ from typing import Dict , List , Optional
6
+ from urllib .parse import urlparse
7
7
8
8
from mattermostdriver import Driver
9
9
13
13
from data_source_api .utils import parse_with_workers
14
14
from indexing_queue import IndexingQueue
15
15
16
-
17
16
logger = logging .getLogger (__name__ )
18
17
19
18
@@ -23,136 +22,126 @@ class MattermostChannel:
23
22
name : str
24
23
team_id : str
25
24
25
+
26
26
@dataclass
27
27
class MattermostConfig :
28
28
url : str
29
29
token : str
30
30
scheme : Optional [str ] = "https"
31
31
port : Optional [int ] = 443
32
-
32
+
33
33
def __post_init__ (self ):
34
34
try :
35
35
parsed_url = urlparse (self .url )
36
36
except Exception as e :
37
37
raise ValueError from e
38
-
38
+
39
39
self .url = parsed_url .hostname
40
40
self .port = parsed_url .port if parsed_url .port is not None else self .port
41
41
self .scheme = parsed_url .scheme if parsed_url .scheme != "" else self .scheme
42
42
43
-
44
43
45
44
class MattermostDataSource (BaseDataSource ):
46
45
FEED_BATCH_SIZE = 500
47
46
48
47
@staticmethod
49
48
def get_config_fields () -> List [ConfigField ]:
50
49
return [
51
- ConfigField (label = "Mattermost Server" , name = "url" , placeholder = "https://mattermost.server.com" , input_type = HTMLInputType .TEXT ),
52
- ConfigField (label = "Token" , name = "token" , placeholder = "paste-your-token-here" , input_type = HTMLInputType .PASSWORD ),
50
+ ConfigField (label = "Mattermost Server" , name = "url" , placeholder = "https://mattermost.server.com" ,
51
+ input_type = HTMLInputType .TEXT ),
52
+ ConfigField (label = "Access Token" , name = "token" , placeholder = "paste-your-access-token-here" ,
53
+ input_type = HTMLInputType .PASSWORD ),
53
54
]
54
55
55
-
56
56
@staticmethod
57
57
def validate_config (config : Dict ) -> None :
58
58
try :
59
59
parsed_config = MattermostConfig (** config )
60
- maattermost = Driver (options = asdict (parsed_config ))
60
+ maattermost = Driver (options = asdict (parsed_config ))
61
61
maattermost .login ()
62
62
except Exception as e :
63
63
raise InvalidDataSourceConfig from e
64
64
65
-
66
65
def __init__ (self , * args , ** kwargs ):
67
66
super ().__init__ (* args , ** kwargs )
68
67
mattermost_config = MattermostConfig (** self ._config )
69
- self ._mattermost = Driver (options = asdict (mattermost_config ))
70
-
71
-
68
+ self ._mattermost = Driver (options = asdict (mattermost_config ))
69
+
72
70
def _list_channels (self ) -> List [MattermostChannel ]:
73
71
channels = self ._mattermost .channels .client .get (f"/users/me/channels" )
74
72
return [MattermostChannel (id = channel ["id" ], name = channel ["name" ], team_id = channel ["team_id" ])
75
73
for channel in channels ]
76
74
77
-
78
75
def _is_valid_message (self , message : Dict ) -> bool :
79
- return message ["type" ] == ""
80
-
81
-
76
+ return message ["type" ] == ""
77
+
82
78
def _is_valid_channel (self , channel : MattermostChannel ) -> bool :
83
79
return channel .team_id != ""
84
-
85
-
80
+
86
81
def _list_posts_in_channel (self , channel_id : str , page : int ) -> Dict :
87
82
endpoint = f"/channels/{ channel_id } /posts"
88
83
params = {
89
84
"since" : int (self ._last_index_time .timestamp ()) * 1000 ,
90
85
"page" : page
91
86
}
92
-
87
+
93
88
posts = self ._mattermost .channels .client .get (endpoint , params = params )
94
89
return posts
95
90
96
-
97
91
def _feed_new_documents (self ) -> None :
98
92
self ._mattermost .login ()
99
93
channels = self ._list_channels ()
100
-
94
+
101
95
logger .info (f'Found { len (channels )} channels' )
102
96
parse_with_workers (self ._parse_channel_worker , channels )
103
97
104
-
105
- def _parse_channel_worker (self , channels : List [MattermostChannel ]):
98
+ def _parse_channel_worker (self , channels : List [MattermostChannel ]):
106
99
for channel in channels :
107
100
self ._feed_channel (channel )
108
101
109
-
110
102
def _get_mattermost_url (self ):
111
103
options = self ._mattermost .options
112
104
return f"{ options ['scheme' ]} ://{ options ['url' ]} :{ options ['port' ]} "
113
105
114
-
115
106
def _get_team_url (self , channel : MattermostChannel ):
116
107
url = self ._get_mattermost_url ()
117
108
team = self ._mattermost .teams .get_team (channel .team_id )
118
109
return f"{ url } /{ team ['name' ]} "
119
-
120
-
110
+
121
111
@lru_cache (maxsize = 512 )
122
112
def _get_mattermost_user (self , user_id : str ):
123
113
return self ._mattermost .users .get_user (user_id )["username" ]
124
-
125
-
114
+
126
115
def _feed_channel (self , channel : MattermostChannel ):
127
116
if not self ._is_valid_channel (channel ):
128
- return
117
+ return
129
118
logger .info (f'Feeding channel { channel .name } ' )
130
-
119
+
131
120
page = 0
132
121
total_fed = 0
133
-
122
+
134
123
parsed_posts = []
135
-
124
+
136
125
team_url = self ._get_team_url (channel )
137
-
126
+
138
127
while True :
139
128
posts = self ._list_posts_in_channel (channel .id , page )
140
129
141
130
last_message : Optional [BasicDocument ] = None
142
-
131
+
143
132
posts ["order" ].reverse ()
144
133
for id in posts ["order" ]:
145
134
post = posts ["posts" ][id ]
146
-
135
+
147
136
if not self ._is_valid_message (post ):
148
137
if last_message is not None :
149
138
parsed_posts .append (last_message )
150
139
last_message = None
151
140
continue
152
-
141
+
153
142
author = self ._get_mattermost_user (post ["user_id" ])
154
143
content = post ["message" ]
155
-
144
+
156
145
if last_message is not None :
157
146
if last_message .author == author :
158
147
last_message .content += f"\n { content } "
@@ -163,7 +152,7 @@ def _feed_channel(self, channel: MattermostChannel):
163
152
total_fed += len (parsed_posts )
164
153
IndexingQueue .get ().feed (docs = parsed_posts )
165
154
parsed_posts = []
166
-
155
+
167
156
author_image_url = f"{ self ._get_mattermost_url ()} /api/v4/users/{ post ['user_id' ]} /image?_=0"
168
157
timestamp = datetime .fromtimestamp (post ["update_at" ] / 1000 )
169
158
last_message = BasicDocument (
@@ -178,18 +167,16 @@ def _feed_channel(self, channel: MattermostChannel):
178
167
url = f"{ team_url } /pl/{ id } " ,
179
168
type = DocumentType .MESSAGE
180
169
)
181
-
170
+
182
171
if last_message is not None :
183
172
parsed_posts .append (last_message )
184
173
185
174
if posts ["prev_post_id" ] == "" :
186
175
break
187
176
page += 1
188
-
189
177
190
178
IndexingQueue .get ().feed (docs = parsed_posts )
191
179
total_fed += len (parsed_posts )
192
180
193
181
if len (parsed_posts ) > 0 :
194
182
logger .info (f"Worker fed { total_fed } documents" )
195
-
0 commit comments