Skip to content

Commit 97bc8ab

Browse files
author
Guido Serra aka Zeph
committed
publishing on github
1 parent 9ea1376 commit 97bc8ab

21 files changed

+2031
-0
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
.idea
2+
*/*.pyc
3+
MANIFEST
4+
dist
5+
build

LICENSE.txt

+674
Large diffs are not rendered by default.

MANIFEST.in

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
include LICENSE.txt
2+
include README.md
3+
recursive-include sqlhbase *.py
4+
recursive-exclude sqlhbase *.pyc

README.md

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
========
2+
SqlHBase
3+
========
4+
5+
SqlHBase is an HBase ingestion tool for MySQL generated dumps
6+
7+
The aim of this tool is to provide a 1:1 mapping of a MySQL table
8+
into an HBase table, mapped on Hive (schema is handled too)
9+
10+
To run this requires a working HBase with Thrift enabled,
11+
and a Hive instance, with metastore properly configured and
12+
Thrift enabled as well. If u need I/O performance, I recommend to
13+
look into Pig or Jython, or directly a native Map Reduce job.
14+
15+
SQOOP was discarded as an option, as it doesn't cope with dump files
16+
and it does not compute the difference between dumps before ingestion.
17+
18+
SqlHBase does a 2 level ingestion process, described below.
19+
20+
"INSERT INTO `table_name` VALUE (), ()" statements are hashed
21+
and stored (dropping anything at the left side of the first open
22+
round bracket) as a single row into a staging table on HBase (the
23+
md5 hash of the row is the row_key on HBase).
24+
When multiple dumps of the same table/database are inserted, this
25+
prevents (or at least reduce) the duplication of data on HBase side.
26+
27+
MySQL by default chunks rows as tuples, up to 16Mb, in a single
28+
INSERT statement. Given that, we basically have a list of tuples:
29+
30+
[(1, "c1", "c2", "c3"), (2, "c1", "c2", "c3"), ... ]
31+
32+
Initial attempt of parsing/splitting such a string with a regexp
33+
failed, of course. Since a column value could contain ANYTHING,
34+
even round brackets and quotes. This kind of language is not
35+
recognizable by a Finite State Automata, so something else had to
36+
be implemented, to keep track of the nested brackets for example.
37+
A PDA (push down automata) would have helped but... as u can
38+
look above, the syntax is exactly the one from a list of tuples
39+
in python.... an eval() is all we needed in such a case.
40+
(and it is also, I guess, optimized on C level by the interpreter)
41+
42+
To be taken in consideration that the IDs of the rows are integers
43+
while HBase wants a string... plus, we need to do some zero padding
44+
due to the fact that HBase does lexicographic sorting of its keys.
45+
46+
There are tons of threads on forums about how bad is to use a
47+
monotonically incrementing key on HBase, but... this is what we needed.
48+
49+
[...]
50+
51+
A 2-level Ingestion Process
52+
===========================
53+
54+
A staging, -> (bin/sqlhbase-mysqlimport)
55+
--------------------------------------------
56+
without any kind of interpretation of the content of the MySQL dump
57+
file apart of the splitting between schema data and raw data (INSERTs).
58+
2 tables are created _"namespace"_creates, _"namespace"_values
59+
The first table contains an entry/row for each dumpfile ingested,
60+
having as a rowkey the timestamp of the day at the bottom of the dumpfile
61+
(or a command line provided one, in case that information is missing).
62+
Such row contains the list of hashes that for a table (see below),
63+
a create statement for each table, and a create statement for each view,
64+
plus some statistics related to the time of parsing of the file,
65+
and the amount of rows it was containing, and the overall md5 hash.
66+
67+
A publishing, -> (bin/sqlhbase-populate)
68+
-----------------------------------------
69+
given a namespace (as of initial import) and a timestamp (from a list):
70+
- the content of the table CREATE statement gets interpreted, the data
71+
types mapped from MySQL to HIVE, and the table created on HIVE.
72+
- if not existing, the table gets created fully, reading each 16Mb chunk
73+
- the table gets created with such convention: "namespace"_"table_name"
74+
- if the table exists, and it contains data, we compute the difference
75+
between the 2 lists of hashes that were created at ingestion time
76+
-- then we check what has already been ingested in the range of row ids
77+
which is contained in the mysql chunk (we took the assumption that
78+
mysql is sequentially dumping a table, hopefully)
79+
-- if a row id which is in the sequence in the database is not in the
80+
sequence from the chunk we are ingesting, than we might have a DELETE
81+
(DELETE that we do not execute on HBase due to HBASE-5154, HBASE-5241)
82+
-- if a row id is also in our chunk, we check each column for changes
83+
-- duplicated columns are removed from the list that is going to be sent
84+
to the server, this to avoid waste of bandwidth consumption
85+
- at this stage, we get a copy of the data on the next known ingestion
86+
date (dates are known from the list of dumps in the meta table)
87+
-- if data are found, each row gets diffed with the data to be ingested
88+
that are left from the previous cleaning... if there are real changes
89+
those are kept and will be sent to the HBase server for writing
90+
(timestamps are verified at this stage, to avoid to resend data
91+
that have already been written previously)
92+
93+
FIXME: ingesting data, skipping a day, will need proper recalculation
94+
of the difference of the hashes list...
95+
ingesting data, from a backup that was not previously ingested
96+
(while we kept ingesting data in the tables) will cause some
97+
redundant data duplicated in HBase, simply cause we do not dare
98+
to delete the duplicate that are "in the future"
99+
100+
...anyway, it is pretty easy to delete a table and reconstruct it
101+
having all the history into the staging level of HBase
102+
103+
Last but not least, we do parse VIEWs and apply them on HIVE
104+
... be careful about https://issues.apache.org/jira/browse/HIVE-2055 !!!

bin/sqlhbase-mysqlimport

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
import getopt
4+
import sys
5+
from datetime import datetime
6+
from sqlhbase.mysql import MySQLDump
7+
8+
def usage():
9+
print """
10+
USAGE:
11+
$ tar xOf 01-20-00_all_databases.tar bob_live_hk.sql.gz | zcat | ...
12+
or
13+
$ unzip -p production_sg_DAY_20130121.zip | ...
14+
15+
then ...
16+
$ echo $SQL|ri-fastdump2hbase -d INDFAS
17+
18+
ADDONs:
19+
"skip tables option" -s, --skiptable=
20+
-s table1,table2,table3
21+
# tables to absolutely avoid to ingest, like "alice_message"
22+
23+
"forced_timestamp" -t, --timestamp=
24+
-t 2013-01-20
25+
# in case we have no timestamp at the bottom of the dump
26+
"""
27+
28+
if len(sys.argv) < 3:
29+
usage()
30+
sys.exit(2)
31+
32+
try:
33+
opts, args = getopt.getopt(sys.argv[1:], "d:f:s:t:", ["db=", "sqlfile=", "skiptable=", "timestamp="])
34+
except getopt.GetoptError as err:
35+
# print help information and exit:
36+
print str(err) # will print something like "option -a not recognized"
37+
usage()
38+
sys.exit(2)
39+
40+
input_db = ""
41+
sql_file = None
42+
skip_tables = []
43+
forced_timestamp = ""
44+
for opt, arg in opts:
45+
if opt in ('-d', '--db'):
46+
input_db = arg
47+
elif opt in ('-f', '--sqlfile'):
48+
sql_file = arg
49+
elif opt in ('-s', '--skiptable'):
50+
skip_tables = arg.split(",")
51+
elif opt in ('-t', '--timestamp'):
52+
forced_timestamp = arg
53+
54+
print 'DB>', input_db
55+
56+
# get the argument file (uncompressed MySQL dump)
57+
if sql_file is not None:
58+
f = open(sql_file)
59+
print MySQLDump(open(sql_file), input_db, skip_tables, forced_timestamp)
60+
f.close()
61+
sys.exit(0)
62+
63+
# ... or read what they pipe me into stdin
64+
print MySQLDump(sys.stdin, input_db, skip_tables, forced_timestamp)
65+

bin/sqlhbase-populate

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
__author__ = 'zeph'
4+
import getopt
5+
import sys
6+
from sqlhbase.intake import HBaseIntake
7+
8+
def usage():
9+
print """
10+
USAGE:
11+
$ ri-parseintake -d INDFAS -t 1234567890
12+
13+
ADDONS: -i file_with_list_of_tables_to_INCLUDE
14+
-e file_with_list_of_tables_to_EXCLUDE
15+
-l just list the tables available
16+
"""
17+
18+
if len(sys.argv) < 3:
19+
usage()
20+
21+
try:
22+
opts, args = getopt.getopt(sys.argv[1:],
23+
"d:t:i:e:l", ["db=", "timestamp=", "include=", "exclude=", "listing="])
24+
except getopt.GetoptError as err:
25+
# print help information and exit:
26+
print str(err) # will print something like "option -a not recognized"
27+
usage()
28+
sys.exit(2)
29+
30+
input_db = ""
31+
sql_file = None
32+
timestamp = ""
33+
include_list = ""
34+
exclude_list = ""
35+
listing = False
36+
for opt, arg in opts:
37+
if opt in ('-d', '--db'):
38+
input_db = arg
39+
elif opt in ('-t', '--timestamp'):
40+
timestamp = arg
41+
elif opt in ('-i', '--include'):
42+
include_list = arg
43+
elif opt in ('-e', '--exclude'):
44+
exclude_list = arg
45+
elif opt in ('-l', '--listing'):
46+
listing = True
47+
48+
hbase = HBaseIntake(input_db)
49+
try: hbase.connect()
50+
except:
51+
print hbase.get_namespaces()
52+
sys.exit(2)
53+
54+
print 'DB>', input_db
55+
if timestamp == "":
56+
ava = hbase.get_dumps()
57+
for day in ava: print day
58+
print "DAY(DUMP)s AVAILABLE>",len(ava)
59+
#print >> sys.stderr, hbase.prettify()
60+
sys.exit(2)
61+
elif listing:
62+
print "\n".join(hbase.cls_parser().get_tables(timestamp))
63+
else: print hbase.parse(timestamp, exclude_list, include_list)
64+

pytest.ini

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# content of pytest.ini
2+
[pytest]
3+
addopts = --doctest-modules
4+
5+
6+
# src: http://pytest.org/latest/doctest.html

setup.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
__author__ = 'zeph'
2+
3+
from distutils.core import setup
4+
from setuptools import setup
5+
6+
setup(
7+
name='SqlHBase',
8+
version='0.4',
9+
author='Guido Serra aka Zeph',
10+
author_email='[email protected]',
11+
url='https://github.com/zeph/sqlhbase',
12+
packages=['sqlhbase',],
13+
scripts=['bin/sqlhbase-mysqlimport','bin/sqlhbase-populate'],
14+
license='GPL 3',
15+
description='MySQLDump to HBase, ETL scripts',
16+
long_description=open('README.md').read(),
17+
install_requires=[
18+
"happybase >= 0.4",
19+
"hive-thrift-py == 0.0.1",
20+
],
21+
include_package_data = True,
22+
)

sqlhbase/__init__.py

Whitespace-only changes.

sqlhbase/borgs/__init__.py

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
__author__ = 'zeph'
2+
CLUSTER_HOST = 'localhost'
3+
4+
import os
5+
if os.environ.get('CLUSTER_HOST') is not None:
6+
CLUSTER_HOST = os.environ.get('CLUSTER_HOST')
7+
8+
# http://code.activestate.com/recipes/66531/
9+
class Borg:
10+
__shared_state = {}
11+
def __init__(self):
12+
self.__dict__ = self.__shared_state

sqlhbase/borgs/hbase.py

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
__author__ = 'zeph'
2+
3+
from happybase import Connection
4+
from sqlhbase.borgs import *
5+
6+
class HBase(Borg):
7+
8+
_db = {} # all the connections
9+
10+
def link(self, ns):
11+
if ns not in self._db:
12+
try:
13+
self._db[ns] = Connection(CLUSTER_HOST, table_prefix=ns)
14+
except Exception, e:
15+
print e
16+
print 'export CLUSTER_HOST="yourserver-hostname", please'
17+
return self._db[ns]
18+
19+
def __str__(self):
20+
return id(self._db)

sqlhbase/borgs/hive.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
__author__ = 'zeph'
2+
THRIFT_PORT = 10000
3+
4+
from hive_service import ThriftHive
5+
from hive_service.ttypes import HiveServerException
6+
from thrift import Thrift
7+
from thrift.transport import TSocket
8+
from thrift.transport import TTransport
9+
from thrift.protocol import TBinaryProtocol
10+
from sqlhbase.borgs import *
11+
12+
class Hive(Borg):
13+
14+
_db = None # DB connection
15+
16+
def link(self):
17+
if self._db is None:
18+
transport = TSocket.TSocket(CLUSTER_HOST, THRIFT_PORT)
19+
transport = TTransport.TBufferedTransport(transport)
20+
protocol = TBinaryProtocol.TBinaryProtocol(transport)
21+
self._db = ThriftHive.Client(protocol)
22+
transport.open()
23+
return self._db
24+
25+
def __str__(self):
26+
return id(self._db)
27+
28+
def exec_stmt(self, sql):
29+
try:
30+
self._db.execute(sql)
31+
return self._db.fetchAll()
32+
33+
except Thrift.TException, tx:
34+
print '%s' % (tx.message)
35+
36+
except HiveServerException:
37+
print "HiveServerException>", sql.strip()

0 commit comments

Comments
 (0)