|
5 | 5 | import datetime
|
6 | 6 | import logging
|
7 | 7 | import os
|
| 8 | +import random |
8 | 9 | import re
|
9 | 10 | import time
|
10 | 11 | from xml.etree import ElementTree
|
11 | 12 |
|
12 | 13 | import requests
|
13 | 14 |
|
| 15 | +try: |
| 16 | + from kazoo.client import KazooClient, KazooState |
| 17 | +except ImportError: |
| 18 | + KazooClient = KazooState = None |
| 19 | + |
14 | 20 | try:
|
15 | 21 | # Prefer simplejson, if installed.
|
16 | 22 | import simplejson as json
|
@@ -187,10 +193,12 @@ def is_valid_xml_char_ordinal(i):
|
187 | 193 | Char ::= #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF]
|
188 | 194 | """
|
189 | 195 | # conditions ordered by presumed frequency
|
190 |
| - return (0x20 <= i <= 0xD7FF |
191 |
| - or i in (0x9, 0xA, 0xD) |
192 |
| - or 0xE000 <= i <= 0xFFFD |
193 |
| - or 0x10000 <= i <= 0x10FFFF) |
| 196 | + return ( |
| 197 | + 0x20 <= i <= 0xD7FF |
| 198 | + or i in (0x9, 0xA, 0xD) |
| 199 | + or 0xE000 <= i <= 0xFFFD |
| 200 | + or 0x10000 <= i <= 0x10FFFF |
| 201 | + ) |
194 | 202 |
|
195 | 203 |
|
196 | 204 | def clean_xml_string(s):
|
@@ -316,7 +324,8 @@ def __init__(self, url, decoder=None, timeout=60, results_cls=Results):
|
316 | 324 | self.results_cls = results_cls
|
317 | 325 |
|
318 | 326 | def __del__(self):
|
319 |
| - self.session.close() |
| 327 | + if hasattr(self, "session"): |
| 328 | + self.session.close() |
320 | 329 |
|
321 | 330 | def _get_log(self):
|
322 | 331 | return LOG
|
@@ -1127,3 +1136,151 @@ def sanitize(data):
|
1127 | 1136 | fixed_string = fixed_string.replace(bad, good)
|
1128 | 1137 |
|
1129 | 1138 | return force_unicode(fixed_string)
|
| 1139 | + |
| 1140 | + |
| 1141 | +class SolrCloud(Solr): |
| 1142 | + |
| 1143 | + def __init__(self, zookeeper, collection, decoder=None, timeout=60, retry_timeout=0.2, *args, **kwargs): |
| 1144 | + url = zookeeper.getRandomURL(collection) |
| 1145 | + |
| 1146 | + super(SolrCloud, self).__init__(url, decoder=decoder, timeout=timeout, *args, **kwargs) |
| 1147 | + |
| 1148 | + self.zookeeper = zookeeper |
| 1149 | + self.collection = collection |
| 1150 | + self.retry_timeout = retry_timeout |
| 1151 | + |
| 1152 | + def _randomized_request(self, method, path, body, headers, files): |
| 1153 | + self.url = self.zookeeper.getRandomURL(self.collection) |
| 1154 | + LOG.debug('Using random URL: %s', self.url) |
| 1155 | + return Solr._send_request(self, method, path, body, headers, files) |
| 1156 | + |
| 1157 | + def _send_request(self, method, path='', body=None, headers=None, files=None): |
| 1158 | + # FIXME: this needs to have a maximum retry counter rather than waiting endlessly |
| 1159 | + try: |
| 1160 | + return self._randomized_request(method, path, body, headers, files) |
| 1161 | + except requests.exceptions.RequestException: |
| 1162 | + LOG.warning('RequestException, retrying after %fs', self.retry_timeout, exc_info=True) |
| 1163 | + time.sleep(self.retry_timeout) # give zookeeper time to notice |
| 1164 | + return self._randomized_request(method, path, body, headers, files) |
| 1165 | + except SolrError: |
| 1166 | + LOG.warning('SolrException, retrying after %fs', self.retry_timeout, exc_info=True) |
| 1167 | + time.sleep(self.retry_timeout) # give zookeeper time to notice |
| 1168 | + return self._randomized_request(method, path, body, headers, files) |
| 1169 | + |
| 1170 | + def _update(self, message, clean_ctrl_chars=True, commit=True, softCommit=False, waitFlush=None, |
| 1171 | + waitSearcher=None): |
| 1172 | + self.url = self.zookeeper.getLeaderURL(self.collection) |
| 1173 | + LOG.debug('Using random leader URL: %s', self.url) |
| 1174 | + return Solr._update(self, message, clean_ctrl_chars, commit, softCommit, waitFlush, waitSearcher) |
| 1175 | + |
| 1176 | + |
| 1177 | +class ZooKeeper(object): |
| 1178 | + # Constants used by the REST API: |
| 1179 | + LIVE_NODES_ZKNODE = '/live_nodes' |
| 1180 | + ALIASES = '/aliases.json' |
| 1181 | + CLUSTER_STATE = '/clusterstate.json' |
| 1182 | + SHARDS = 'shards' |
| 1183 | + REPLICAS = 'replicas' |
| 1184 | + STATE = 'state' |
| 1185 | + ACTIVE = 'active' |
| 1186 | + LEADER = 'leader' |
| 1187 | + BASE_URL = 'base_url' |
| 1188 | + TRUE = 'true' |
| 1189 | + FALSE = 'false' |
| 1190 | + COLLECTION = 'collection' |
| 1191 | + |
| 1192 | + def __init__(self, zkServerAddress, zkClientTimeout=15, zkClientConnectTimeout=15): |
| 1193 | + if KazooClient is None: |
| 1194 | + logging.error('ZooKeeper requires the `kazoo` library to be installed') |
| 1195 | + raise RuntimeError |
| 1196 | + |
| 1197 | + self.collections = {} |
| 1198 | + self.liveNodes = {} |
| 1199 | + self.aliases = {} |
| 1200 | + self.state = None |
| 1201 | + |
| 1202 | + self.zk = KazooClient(zkServerAddress, read_only=True) |
| 1203 | + |
| 1204 | + self.zk.start() |
| 1205 | + |
| 1206 | + def connectionListener(state): |
| 1207 | + if state == KazooState.LOST: |
| 1208 | + self.state = state |
| 1209 | + elif state == KazooState.SUSPENDED: |
| 1210 | + self.state = state |
| 1211 | + self.zk.add_listener(connectionListener) |
| 1212 | + |
| 1213 | + @self.zk.DataWatch(ZooKeeper.CLUSTER_STATE) |
| 1214 | + def watchClusterState(data, *args, **kwargs): |
| 1215 | + if not data: |
| 1216 | + LOG.warning("No cluster state available: no collections defined?") |
| 1217 | + else: |
| 1218 | + self.collections = json.loads(data) |
| 1219 | + LOG.info("Updated collections") |
| 1220 | + |
| 1221 | + collection_data = json.loads(data.decode('utf-8')) |
| 1222 | + self.collections = collection_data |
| 1223 | + LOG.info('Updated collections: %s', collection_data) |
| 1224 | + |
| 1225 | + @self.zk.ChildrenWatch(ZooKeeper.LIVE_NODES_ZKNODE) |
| 1226 | + def watchLiveNodes(children): |
| 1227 | + self.liveNodes = children |
| 1228 | + LOG.info("Updated live nodes: %s", children) |
| 1229 | + |
| 1230 | + @self.zk.DataWatch(ZooKeeper.ALIASES) |
| 1231 | + def watchAliases(data, stat): |
| 1232 | + if data: |
| 1233 | + self.aliases = json.loads(data)[ZooKeeper.COLLECTION] |
| 1234 | + else: |
| 1235 | + self.aliases = None |
| 1236 | + LOG.info("Updated aliases: %s", self.aliases) |
| 1237 | + |
| 1238 | + def __del__(self): |
| 1239 | + # Avoid leaking connection handles in Kazoo's atexit handler: |
| 1240 | + self.zk.stop() |
| 1241 | + self.zk.close() |
| 1242 | + |
| 1243 | + def getHosts(self, collname, only_leader=False, seen_aliases=None): |
| 1244 | + if self.aliases and collname in self.aliases: |
| 1245 | + return self.getAliasHosts(collname, only_leader, seen_aliases) |
| 1246 | + |
| 1247 | + hosts = [] |
| 1248 | + if not self.collections.has_key(collname): |
| 1249 | + raise SolrError("Unknown collection: %s", collname) |
| 1250 | + collection = self.collections[collname] |
| 1251 | + shards = collection[ZooKeeper.SHARDS] |
| 1252 | + for shardname in shards.keys(): |
| 1253 | + shard = shards[shardname] |
| 1254 | + if shard[ZooKeeper.STATE] == ZooKeeper.ACTIVE: |
| 1255 | + replicas = shard[ZooKeeper.REPLICAS] |
| 1256 | + for replicaname in replicas.keys(): |
| 1257 | + replica = replicas[replicaname] |
| 1258 | + |
| 1259 | + if replica[ZooKeeper.STATE] == ZooKeeper.ACTIVE: |
| 1260 | + if not only_leader or (replica.get(ZooKeeper.LEADER, None) == ZooKeeper.TRUE): |
| 1261 | + base_url = replica[ZooKeeper.BASE_URL] |
| 1262 | + if base_url not in hosts: |
| 1263 | + hosts.append(base_url) |
| 1264 | + return hosts |
| 1265 | + |
| 1266 | + def getAliasHosts(self, collname, only_leader, seen_aliases): |
| 1267 | + if seen_aliases: |
| 1268 | + if collname in seen_aliases: |
| 1269 | + LOG.warn("%s in circular alias definition - ignored", collname) |
| 1270 | + return [] |
| 1271 | + else: |
| 1272 | + seen_aliases = [] |
| 1273 | + seen_aliases.append(collname) |
| 1274 | + collections = self.aliases[collname].split(",") |
| 1275 | + hosts = [] |
| 1276 | + for collection in collections: |
| 1277 | + for host in self.getHosts(collection, only_leader, seen_aliases): |
| 1278 | + if host not in hosts: |
| 1279 | + hosts.append(host) |
| 1280 | + return hosts |
| 1281 | + |
| 1282 | + def getRandomURL(self, collname): |
| 1283 | + return random.choice(self.getHosts(collname, only_leader=False)) + "/" + collname |
| 1284 | + |
| 1285 | + def getLeaderURL(self, collname): |
| 1286 | + return random.choice(self.getHosts(collname, only_leader=True)) + "/" + collname |
0 commit comments