Skip to content

Commit 72c4c8e

Browse files
authored
Merge pull request #23 from postgrespro/python_logging_support
Add support of external logging for postgresql nodes
2 parents 5a283a6 + 60df1db commit 72c4c8e

File tree

3 files changed

+130
-6
lines changed

3 files changed

+130
-6
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
*.pyc
2-
dist
2+
dist
3+
tags

testgres/testgres.py

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
import time
3232
import six
3333

34+
import threading
35+
import logging
36+
import select
37+
import tempfile
38+
3439
# Try to use psycopg2 by default. If psycopg2 isn"t available then use
3540
# pg8000 which is slower but much more portable because uses only
3641
# pure-Python code
@@ -44,6 +49,7 @@
4449

4550

4651
registered_nodes = []
52+
util_threads = []
4753
last_assigned_port = int(random.random() * 16384) + 49152
4854
pg_config_data = {}
4955

@@ -74,6 +80,54 @@ def __str__(self):
7480
return '\n ERROR: {0}\n CMD: {1}'.format(self.error_text, self.cmd)
7581

7682

83+
class LogWriter(threading.Thread):
84+
'''
85+
Helper class to implement reading from postgresql.log and redirect
86+
it python logging
87+
'''
88+
89+
def __init__(self, node_name, fd):
90+
assert callable(fd.readline)
91+
92+
threading.Thread.__init__(self)
93+
94+
self.node_name = node_name
95+
self.fd = fd
96+
self.stop_event = threading.Event()
97+
self.logger = logging.getLogger(node_name)
98+
self.logger.setLevel(logging.INFO)
99+
100+
def run(self):
101+
while self.fd in select.select([self.fd], [], [], 0)[0]:
102+
line = self.fd.readline()
103+
if line:
104+
extra = {'node': self.node_name}
105+
self.logger.info(line.strip(), extra=extra)
106+
elif self.stopped():
107+
break
108+
else:
109+
time.sleep(0.1)
110+
111+
def stop(self):
112+
self.stop_event.set()
113+
114+
def stopped(self):
115+
return self.stop_event.isSet()
116+
117+
118+
def log_watch(node_name, pg_logname):
119+
''' Starts thread for node that redirects postgresql logs
120+
to python logging system
121+
'''
122+
123+
reader = LogWriter(node_name, open(pg_logname, 'r'))
124+
reader.start()
125+
126+
global util_threads
127+
util_threads.append(reader)
128+
return reader
129+
130+
77131
class NodeConnection(object):
78132

79133
"""
@@ -148,7 +202,7 @@ def close(self):
148202

149203
class PostgresNode(object):
150204

151-
def __init__(self, name, port, base_dir=None):
205+
def __init__(self, name, port, base_dir=None, use_logging=False):
152206
self.name = name
153207
self.host = '127.0.0.1'
154208
self.port = port
@@ -160,6 +214,9 @@ def __init__(self, name, port, base_dir=None):
160214
os.makedirs(self.logs_dir)
161215
self.working = False
162216

217+
self.use_logging = use_logging
218+
self.logger = None
219+
163220
@property
164221
def data_dir(self):
165222
return os.path.join(self.base_dir, "data")
@@ -316,7 +373,15 @@ def pg_ctl(self, command, params={}, command_options=[]):
316373

317374
def start(self, params={}):
318375
""" Starts cluster """
319-
logfile = os.path.join(self.logs_dir, "postgresql.log")
376+
377+
if self.use_logging:
378+
tmpfile = tempfile.NamedTemporaryFile('w', dir=self.logs_dir, delete=False)
379+
logfile = tmpfile.name
380+
381+
self.logger = log_watch(self.name, logfile)
382+
else:
383+
logfile = os.path.join(self.logs_dir, "postgresql.log")
384+
320385
_params = {
321386
"-D": self.data_dir,
322387
"-w": None,
@@ -387,6 +452,9 @@ def stop(self, params={}):
387452
_params.update(params)
388453
self.pg_ctl("stop", _params)
389454

455+
if self.logger:
456+
self.logger.stop()
457+
390458
self.working = False
391459

392460
return self
@@ -623,7 +691,7 @@ def version_to_num(version):
623691
return num
624692

625693

626-
def get_new_node(name, base_dir=None):
694+
def get_new_node(name, base_dir=None, use_logging=False):
627695
global registered_nodes
628696
global last_assigned_port
629697

@@ -647,7 +715,7 @@ def get_new_node(name, base_dir=None):
647715
# socket.SOCK_STREAM,
648716
# socket.getprotobyname("tcp"))
649717

650-
node = PostgresNode(name, port, base_dir)
718+
node = PostgresNode(name, port, base_dir, use_logging=use_logging)
651719
registered_nodes.append(node)
652720
last_assigned_port = port
653721

@@ -663,7 +731,12 @@ def clean_all():
663731

664732
def stop_all():
665733
global registered_nodes
734+
global util_threads
735+
666736
for node in registered_nodes:
667737
# stop server if it still working
668738
if node.working:
669739
node.stop()
740+
741+
for thread in util_threads:
742+
thread.stop()

testgres/tests/test_simple.py

100644100755
Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1+
#!/usr/bin/env python
2+
13
import unittest
4+
import re
5+
import six
6+
import tempfile
7+
import logging.config
8+
29
from testgres import get_new_node, stop_all
310

411

@@ -73,7 +80,50 @@ def test_users(self):
7380
node.init().start()
7481
node.psql('postgres', 'create role test_user login')
7582
value = node.safe_psql('postgres', 'select 1', username='test_user')
76-
self.assertEqual(value, '1\n')
83+
self.assertEqual(value, six.b('1\n'))
84+
85+
def test_logging(self):
86+
regex = re.compile('\w+:\s{1}LOG:.*')
87+
logfile = tempfile.NamedTemporaryFile('w', delete=True)
88+
89+
log_conf = {
90+
'version': 1,
91+
'handlers': {
92+
'file': {
93+
'class': 'logging.FileHandler',
94+
'filename': logfile.name,
95+
'formatter': 'base_format',
96+
'level': logging.DEBUG,
97+
},
98+
},
99+
'formatters': {
100+
'base_format': {
101+
'format': '%(node)-5s: %(message)s',
102+
},
103+
},
104+
'root': {
105+
'handlers': ('file', ),
106+
'level': 'DEBUG',
107+
},
108+
}
109+
110+
logging.config.dictConfig(log_conf)
111+
112+
node = get_new_node('master', use_logging=True)
113+
node1 = get_new_node('slave1', use_logging=True)
114+
node2 = get_new_node('slave2', use_logging=True)
115+
116+
node.init().start()
117+
node1.init().start()
118+
node2.init().start()
119+
120+
with open(logfile.name, 'r') as log:
121+
for line in log:
122+
self.assertTrue(regex.match(line))
123+
124+
node.stop()
125+
node1.stop()
126+
node2.stop()
77127

78128

79129
if __name__ == '__main__':

0 commit comments

Comments
 (0)