Add python xdb maker

This commit is contained in:
linyufeng
2022-07-13 20:23:26 +08:00
parent 97b837bc81
commit a3015ce8c5
8 changed files with 899 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
# Created by leolin49 on 2022/7/7.
# Copyright (C) 2022 leolin49. All rights reserved.

52
maker/python/xdb/index.py Normal file
View File

@@ -0,0 +1,52 @@
# Created by leolin49 on 2022/7/7.
# Copyright (C) 2022 leolin49. All rights reserved.
import struct
VectorIndexPolicy = 1
BTreeIndexPolicy = 2
SegmentIndexBlockSize = 14
def index_policy_from_string(s: str) -> int:
sl = s.lower()
if sl == "vector":
return VectorIndexPolicy
elif sl == "btree":
return BTreeIndexPolicy
else:
print("invalid policy `{}`, used default vector index".format(s))
return VectorIndexPolicy
class VectorIndexBlock:
first_ptr = 0
last_ptr = 0
def __init__(self, fp=0, lp=0):
self.first_ptr = fp
self.last_ptr = lp
def encode(self) -> bytes:
return struct.pack("<II", self.first_ptr, self.last_ptr)
def string(self) -> str:
return "FirstPtr: {}, LastPrt: {}".format(self.first_ptr, self.last_ptr)
class SegmentIndexBlock:
start_ip = 0
end_ip = 0
data_len = 0
data_ptr = 0
def __init__(self, sip, eip, dl, dp):
self.start_ip = sip
self.end_ip = eip
self.data_len = dl
self.data_ptr = dp
def encode(self) -> bytes:
return struct.pack("<IIHI", self.start_ip, self.end_ip, self.data_len, self.data_ptr)
def string(self) -> str:
return "{sip: {}, eip: {}, len: {}, ptr: {}}".format(self.start_ip, self.end_ip, self.data_len, self.data_ptr)

269
maker/python/xdb/maker.py Normal file
View File

@@ -0,0 +1,269 @@
# Created by leolin49 on 2022/7/7.
# Copyright (C) 2022 leolin49. All rights reserved.
#
# ----
# ip2region database v2.0 structure
#
# +----------------+-------------------+---------------+--------------+
# | header space | speed up index | data payload | block index |
# +----------------+-------------------+---------------+--------------+
# | 256 bytes | 512 KiB (fixed) | dynamic size | dynamic size |
# +----------------+-------------------+---------------+--------------+
#
# 1. padding space : for header info like block index ptr, version, release date eg ... or any other temporary needs.
# -- 2bytes: version number, different version means structure update, it fixed to 2 for now
# -- 2bytes: index algorithm code.
# -- 4bytes: generate unix timestamp (version)
# -- 4bytes: index block start ptr
# -- 4bytes: index block end ptr
#
#
# 2. data block : region or whatever data info.
# 3. segment index block : binary index block.
# 4. vector index block : fixed index info for block index search speed up.
# space structure table:
# -- 0 -> | 1rt super block | 2nd super block | 3rd super block | ... | 255th super block
# -- 1 -> | 1rt super block | 2nd super block | 3rd super block | ... | 255th super block
# -- 2 -> | 1rt super block | 2nd super block | 3rd super block | ... | 255th super block
# -- ...
# -- 255 -> | 1rt super block | 2nd super block | 3rd super block | ... | 255th super block
#
#
# super block structure:
# +-----------------------+----------------------+
# | first index block ptr | last index block ptr |
# +-----------------------+----------------------+
#
# data entry structure:
# +--------------------+-----------------------+
# | 2bytes (for desc) | dynamic length |
# +--------------------+-----------------------+
# data length whatever in bytes
#
# index entry structure
# +------------+-----------+---------------+------------+
# | 4bytes | 4bytes | 2bytes | 4 bytes |
# +------------+-----------+---------------+------------+
# start ip end ip data length data ptr
import os
import struct
import sys
sys.path.append(os.path.realpath(os.path.dirname(os.path.realpath(__file__))))
import logging
import time
import segment as seg
import index as idx
import util
VersionNo = 2
HeaderInfoLength = 256
VectorIndexRows = 256
VectorIndexCols = 256
VectorIndexSize = 8
VectorIndexLength = VectorIndexRows * VectorIndexCols * VectorIndexSize
class Maker:
src_handle = None
dst_handle = None
index_policy = 0
segments = []
region_pool = {}
vector_index = None
def __init__(self, sh, dh, ip, sg, rp, vi):
self.src_handle = sh
self.dst_handle = dh
self.index_policy = ip
self.segments = sg
self.region_pool = rp
self.vector_index = vi
def init(self):
"""
Init the `xdb` binary file.
1. init the file header
2. load all the segments
"""
self.init_db_header()
self.load_segments()
def init_db_header(self):
"""Init and write the file header to the destination xdb file."""
logging.info("try to init the db header ... ")
self.src_handle.seek(0, 0)
header = bytearray([0]*256)
# make and write the header space
# 1. version number
header[0:2] = VersionNo.to_bytes(2, byteorder="little")
# 2. index policy code
header[2:4] = int(self.index_policy).to_bytes(2, byteorder="little")
# 3. generate unix timestamp
header[4:8] = int(time.time()).to_bytes(4, byteorder="little")
# 4. index block start ptr
header[8:12] = int(0).to_bytes(4, byteorder="little")
# 5. index block end ptr
header[12:16] = int(0).to_bytes(4, byteorder="little")
# write header buffer to file
self.dst_handle.write(header)
def load_segments(self) -> list:
"""
Load the segments [start ip|end ip|region] from source ip text file.
:return: the list of Segment
"""
logging.info("try to load the segments ... ")
last = None
s_tm = time.time()
lines = self.src_handle.read().splitlines()
for line in lines:
logging.info("load segment: `{}`".format(line))
ps = line.split("|", maxsplit=2)
if len(ps) != 3:
logging.error("invalid ip segment line `{}`".format(line))
return []
sip = util.checkip(ps[0])
if sip == -1:
logging.error("invalid ip address `{}`".format(line))
return []
eip = util.checkip(ps[1])
if eip == -1:
logging.error("invalid ip address `{}`".format(line))
return []
if sip > eip:
logging.error("start ip({}) should not be greater than end ip({})".format(ps[0], ps[1]))
return []
if len(ps[2]) < 1:
logging.error("empty region info in segment line `{}`".format(line))
return []
segment = seg.Segment(sip=sip, eip=eip, reg=ps[2])
# check the continuity of data segment
if last is not None:
if last.end_ip + 1 != segment.start_ip:
logging.error("discontinuous data segment: last.eip+1({})!=seg.sip({}, {})".format(sip, eip, ps[0]))
return []
self.segments.append(segment)
last = segment
logging.info("all segments loaded, length: {}, elapsed: {}".format(len(self.segments), time.time() - s_tm))
def set_vector_index(self, ip, ptr):
row, col = (ip >> 24) & 0xFF, (ip >> 16) & 0xFF
vi_block = self.vector_index[row][col]
if vi_block.first_ptr == 0:
vi_block.first_ptr = ptr
vi_block.last_ptr = ptr + idx.SegmentIndexBlockSize
else:
vi_block.last_ptr = ptr + idx.SegmentIndexBlockSize
self.vector_index[row][col] = vi_block
def start(self):
"""Start to make the 'xdb' binary file."""
if len(self.segments) < 1:
logging.error("empty segment list")
return
# 1. write all the region/data to the binary file
self.dst_handle.seek(HeaderInfoLength+VectorIndexLength, 0)
logging.info("try to write the data block ... ")
for s in self.segments:
logging.info("try to write region '{}'...".format(s.region))
if s.region in self.region_pool:
logging.info(" --[Cached] with ptr={}".format(self.region_pool[s.region]))
continue
region = bytes(s.region, encoding="utf-8")
if len(region) > 0xFFFF:
logging.error("too long region info `{}`: should be less than {} bytes".format(s.region, 0xFFFF))
return
# get the first ptr of the next region
pos = self.dst_handle.seek(0, 1)
logging.info("{} {} {}".format(pos, region, s.region))
self.dst_handle.write(region)
self.region_pool[s.region] = pos
logging.info(" --[Added] with ptr={}".format(pos))
# 2. write the index block and cache the super index block
logging.info("try to write the segment index block ... ")
counter, start_index_ptr, end_index_ptr = 0, -1, -1
for sg in self.segments:
data_ptr = -1
if sg.region in self.region_pool:
data_ptr = self.region_pool[sg.region]
else:
logging.error("missing ptr cache for region `{}`".format(sg.region))
return
data_len = len(bytes(sg.region, encoding="utf-8"))
if data_len < 1:
logging.error("empty region info for segment '{}'".format(sg.region))
return
seg_list = sg.split()
logging.info("try to index segment({} split) {} ...".format(len(seg_list), sg.string()))
for s in seg_list:
pos = self.dst_handle.seek(0, 1)
s_index = idx.SegmentIndexBlock(
sip=s.start_ip, eip=s.end_ip, dl=data_len, dp=data_ptr
)
self.dst_handle.write(s_index.encode())
logging.info("|-segment index: {}, ptr: {}, segment: {}".format(counter, pos, s.string()))
self.set_vector_index(s.start_ip, pos)
counter += 1
# check and record the start index ptr
if start_index_ptr == -1:
start_index_ptr = pos
end_index_ptr = pos
# synchronized the vector index block
logging.info("try to write the vector index block ... ")
self.dst_handle.seek(HeaderInfoLength, 0)
for i in range(0, len(self.vector_index)):
for j in range(0, len(self.vector_index[i])):
vi = self.vector_index[i][j]
self.dst_handle.write(vi.encode())
# synchronized the segment index info
logging.info("try to write the segment index ptr ... ")
buff = struct.pack("<II", start_index_ptr, end_index_ptr)
self.dst_handle.seek(8, 0)
self.dst_handle.write(buff)
logging.info("write done, dataBlocks: {}, indexBlocks: ({}, {}), indexPtr: ({}, {})".format(
len(self.region_pool), len(self.segments), counter, start_index_ptr, end_index_ptr
))
def end(self):
"""End of make the 'xdb' binary file."""
try:
self.src_handle.close()
self.dst_handle.close()
except IOError as e:
logging.error(e)
sys.exit()
def new_maker(policy: int, srcfile: str, dstfile: str) -> Maker:
"""Create a xdb Maker to make the xdb binary file
:param policy: index algorithm code 1:vector, 2:b-tree
:param srcfile: source ip text file path
:param dstfile: destination binary xdb file path
:return: the 'xdb' Maker
"""
try:
sh = open(srcfile, mode='r', encoding='utf-8')
dh = open(dstfile, mode='wb')
return Maker(
sh=sh, dh=dh, ip=policy, sg=[], rp={},
vi=[[idx.VectorIndexBlock() for _ in range(VectorIndexRows)] for _ in range(VectorIndexCols)],
)
except IOError as e:
logging.error(e)
sys.exit()

View File

@@ -0,0 +1,161 @@
import socket
import struct
import io
import sys
HeaderInfoLength = 256
VectorIndexRows = 256
VectorIndexCols = 256
VectorIndexSize = 8
SegmentIndexSize = 14
class XdbSearcher(object):
__f = None
# the minimal memory allocation.
vectorIndex = None
# 整个读取xdb保存在内存中
contentBuff = None
@staticmethod
def loadVectorIndexFromFile(dbfile):
try:
f = io.open(dbfile, "rb")
f.seek(HeaderInfoLength)
vi_len = VectorIndexRows * VectorIndexCols * SegmentIndexSize
vector_data = f.read(vi_len)
f.close()
return vector_data
except IOError as e:
print("[Error]: %s" % e)
@staticmethod
def loadContentFromFile(dbfile):
try:
f = io.open(dbfile, "rb")
all_data = f.read()
f.close()
return all_data
except IOError as e:
print("[Error]: %s" % e)
def __init__(self, dbfile=None, vectorIndex=None, contentBuff=None):
self.initDatabase(dbfile, vectorIndex, contentBuff)
def search(self, ip):
if isinstance(ip, str):
if not ip.isdigit(): ip = self.ip2long(ip)
return self.searchByIPLong(ip)
else:
return self.searchByIPLong(ip)
def searchByIPStr(self, ip):
if not ip.isdigit(): ip = self.ip2long(ip)
return self.searchByIPLong(ip)
def searchByIPLong(self, ip):
# locate the segment index block based on the vector index
sPtr = ePtr = 0
il0 = (int)((ip >> 24) & 0xFF)
il1 = (int)((ip >> 16) & 0xFF)
idx = il0 * VectorIndexCols * VectorIndexSize + il1 * VectorIndexSize
if self.vectorIndex is not None:
sPtr = self.getLong(self.vectorIndex, idx)
ePtr = self.getLong(self.vectorIndex, idx + 4)
elif self.contentBuff is not None:
sPtr = self.getLong(self.contentBuff, HeaderInfoLength + idx)
ePtr = self.getLong(self.contentBuff, HeaderInfoLength + idx + 4)
else:
self.__f.seek(HeaderInfoLength + idx)
buffer_ptr = self.__f.read(8)
sPtr = self.getLong(buffer_ptr, 0)
ePtr = self.getLong(buffer_ptr, 4)
# binary search the segment index block to get the region info
dataLen = dataPtr = int(-1)
l = int(0)
h = int((ePtr - sPtr) / SegmentIndexSize)
while l <= h:
m = int((l + h) >> 1)
p = int(sPtr + m * SegmentIndexSize)
# read the segment index
buffer_sip = self.readBuffer(p, SegmentIndexSize)
sip = self.getLong(buffer_sip, 0)
if ip < sip:
h = m - 1
else:
eip = self.getLong(buffer_sip, 4)
if ip > eip:
l = m + 1
else:
dataLen = self.getInt2(buffer_sip, 8)
dataPtr = self.getLong(buffer_sip, 10)
break
# empty match interception
if dataPtr < 0:
return ""
buffer_string = self.readBuffer(dataPtr, dataLen)
return_string = buffer_string.decode("utf-8")
return return_string
def readBuffer(self, offset, length):
buffer = None
# check the in-memory buffer first
if self.contentBuff is not None:
buffer = self.contentBuff[offset:offset + length]
return buffer
# read from the file handle
if self.__f is not None:
self.__f.seek(offset)
buffer = self.__f.read(length)
return buffer
def initDatabase(self, dbfile, vi, cb):
"""
" initialize the database for search
" param: dbFile, vectorIndex, contentBuff
"""
try:
if cb is not None:
self.__f = None
self.vectorIndex = None
self.contentBuff = cb
else:
self.__f = io.open(dbfile, "rb")
self.vectorIndex = vi
except IOError as e:
print("[Error]: %s" % e)
sys.exit()
def ip2long(self, ip):
_ip = socket.inet_aton(ip)
return struct.unpack("!L", _ip)[0]
def isip(self, ip):
p = ip.split(".")
if len(p) != 4: return False
for pp in p:
if not pp.isdigit(): return False
if len(pp) > 3: return False
if int(pp) > 255: return False
return True
def getLong(self, b, offset):
if len(b[offset:offset + 4]) == 4:
return struct.unpack('I', b[offset:offset + 4])[0]
return 0
def getInt2(self, b, offset):
return ((b[offset] & 0x000000FF) | (b[offset + 1] & 0x0000FF00))
def close(self):
if self.__f is not None:
self.__f.close()
self.vectorIndex = None
self.contentBuff = None

View File

@@ -0,0 +1,69 @@
# Created by leolin49 on 2022/7/7.
# Copyright (C) 2022 leolin49. All rights reserved.
import util
class Segment:
start_ip = 0
end_ip = 0
region = ""
def __init__(self, sip=0, eip=0, reg=""):
self.start_ip, self.end_ip = sip, eip
self.region = reg
def split(self) -> list:
"""Split the segment based on the pre-two bytes."""
# 1, split the segment with the first byte
t_list_1 = []
s_byte_1, e_byte_1 = (self.start_ip >> 24) & 0xFF, (self.end_ip >> 24) & 0xFF
n_sip = self.start_ip
for i in range(s_byte_1, e_byte_1 + 1):
sip = (i << 24) | (n_sip & 0xFFFFFF)
eip = (i << 24) | 0xFFFFFF
if eip < self.end_ip:
n_sip = (i + 1) << 24
else:
eip = self.end_ip
# append the new segment (maybe)
t_list_1.append(Segment(sip, eip))
# 2, split the segments with the second byte
t_list_2 = []
for s in t_list_1:
base = s.start_ip & 0xFF000000
n_sip = s.start_ip
s_byte_2, e_byte_2 = (s.start_ip >> 16) & 0xFF, (s.end_ip >> 16) & 0xFF
for i in range(s_byte_2, e_byte_2 + 1):
sip = base | (i << 16) | (n_sip & 0xFFFF)
eip = base | (i << 16) | 0xFFFF
if eip < self.end_ip:
n_sip = 0
else:
eip = self.end_ip
t_list_2.append(Segment(sip, eip, self.region))
return t_list_2
def string(self) -> str:
return util.long2ip(self.start_ip) + "|" + util.long2ip(self.end_ip) + "|" + self.region
def segment_from(seg: str) -> Segment:
segment = Segment()
ps = seg.split("|", 3)
if len(ps) != 3:
return segment
sip = util.checkip(ps[0])
if sip == -1:
return segment
eip = util.checkip(ps[1])
if eip == -1:
return segment
segment.start_ip, segment.end_ip = sip, eip
segment.region = ps[2]
return segment

42
maker/python/xdb/util.py Normal file
View File

@@ -0,0 +1,42 @@
# Created by leolin49 on 2022/7/7.
# Copyright (C) 2022 leolin49. All rights reserved.
shift_index = (24, 16, 8, 0)
# Util function
def checkip(ip: str) -> int:
"""Convert ip string to integer."""
if not is_ipv4(ip):
return -1
ps = ip.split(".")
if len(ps) != 4:
return 0
val = 0
for i in range(len(ps)):
d = int(ps[i])
if d < 0 or d > 255:
return 0
val |= d << shift_index[i]
return val
def long2ip(num: int) -> str:
"""Convert integer to ip string."""
return "{}.{}.{}.{}".format((num >> 24) & 0xFF, (num >> 16) & 0xFF, (num >> 8) & 0xFF, num & 0xFF)
def mid_ip(sip: int, eip: int):
"""Get the middle ip between sip and eip."""
return (sip + eip) >> 1
def is_ipv4(ip: str) -> bool:
"""Determine whether it is an ipv4 address."""
p = ip.split(".")
if len(p) != 4:
return False
for pp in p:
if not pp.isdigit() or len(pp) > 3 or int(pp) > 255:
return False
return True