mirror of
https://github.com/boostorg/mysql.git
synced 2026-01-21 17:12:14 +00:00
252 lines
8.0 KiB
Python
252 lines
8.0 KiB
Python
#!/usr/bin/python3
|
|
#
|
|
# Copyright (c) 2019-2025 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
|
|
#
|
|
# Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
#
|
|
|
|
# Generates the seed corpus for field and row fuzzers from captured wireshark frames.
|
|
|
|
import json
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from os import path
|
|
import struct
|
|
from typing import List, NamedTuple
|
|
import csv
|
|
|
|
_REPO_BASE = Path(path.join(path.dirname(path.realpath(__file__)), '..', '..')).absolute()
|
|
_TEXT_JSON = _REPO_BASE.joinpath('private', 'fuzzing', 'text.json')
|
|
_BINARY_JSON = _REPO_BASE.joinpath('private', 'fuzzing', 'binary.json')
|
|
|
|
class _ProtocolFieldType(Enum):
|
|
decimal = 0x00
|
|
tiny = 0x01
|
|
short_ = 0x02
|
|
long_ = 0x03
|
|
float_ = 0x04
|
|
double_ = 0x05
|
|
null = 0x06
|
|
timestamp = 0x07
|
|
longlong = 0x08
|
|
int24 = 0x09
|
|
date = 0x0a
|
|
time = 0x0b
|
|
datetime = 0x0c
|
|
year = 0x0d
|
|
varchar = 0x0f
|
|
bit = 0x10
|
|
json = 0xf5
|
|
newdecimal = 0xf6
|
|
blob = 0xfc
|
|
var_string = 0xfd
|
|
string = 0xfe
|
|
geometry = 0xff
|
|
|
|
|
|
class _ColumnType(Enum):
|
|
tinyint = 0
|
|
smallint = 1
|
|
mediumint = 2
|
|
int_ = 3
|
|
bigint = 4
|
|
float_ = 5
|
|
double_ = 6
|
|
decimal = 7
|
|
bit = 8
|
|
year = 9
|
|
time = 10
|
|
date = 11
|
|
datetime = 12
|
|
timestamp = 13
|
|
char_ = 14
|
|
varchar = 15
|
|
binary = 16
|
|
varbinary = 17
|
|
text = 18
|
|
blob = 19
|
|
enum_ = 20
|
|
set = 21
|
|
json = 22
|
|
geometry = 23
|
|
unknown = 24
|
|
|
|
|
|
class _Encoding(Enum):
|
|
text = 0
|
|
binary = 1
|
|
|
|
|
|
_TYPE_MAP = {
|
|
_ProtocolFieldType.decimal: _ColumnType.decimal,
|
|
_ProtocolFieldType.newdecimal: _ColumnType.decimal,
|
|
_ProtocolFieldType.geometry: _ColumnType.geometry,
|
|
_ProtocolFieldType.tiny: _ColumnType.tinyint,
|
|
_ProtocolFieldType.short_: _ColumnType.smallint,
|
|
_ProtocolFieldType.int24: _ColumnType.mediumint,
|
|
_ProtocolFieldType.long_: _ColumnType.int_,
|
|
_ProtocolFieldType.longlong: _ColumnType.bigint,
|
|
_ProtocolFieldType.float_: _ColumnType.float_,
|
|
_ProtocolFieldType.double_: _ColumnType.double_,
|
|
_ProtocolFieldType.bit: _ColumnType.bit,
|
|
_ProtocolFieldType.date: _ColumnType.date,
|
|
_ProtocolFieldType.datetime: _ColumnType.datetime,
|
|
_ProtocolFieldType.timestamp: _ColumnType.timestamp,
|
|
_ProtocolFieldType.time: _ColumnType.time,
|
|
_ProtocolFieldType.year: _ColumnType.year,
|
|
_ProtocolFieldType.json: _ColumnType.json,
|
|
}
|
|
|
|
|
|
def _compute_column_type(mysql_frame) -> _ColumnType:
|
|
type = _ProtocolFieldType(int(mysql_frame['mysql.field.type_raw'][0], 16))
|
|
isbin = mysql_frame['mysql.field.charsetnr_raw'][0] == '3f00'
|
|
if type == _ProtocolFieldType.string:
|
|
if mysql_frame['mysql.field.flags_tree']['mysql.field.flags.set_raw'][0] != '0':
|
|
return _ColumnType.set
|
|
elif mysql_frame['mysql.field.flags_tree']['mysql.field.flags.enum_raw'][0] != '0':
|
|
return _ColumnType.enum_
|
|
elif isbin:
|
|
return _ColumnType.binary
|
|
else:
|
|
return _ColumnType.char_
|
|
elif type == _ProtocolFieldType.var_string:
|
|
if isbin:
|
|
return _ColumnType.varbinary
|
|
else:
|
|
return _ColumnType.varchar
|
|
elif type == _ProtocolFieldType.blob:
|
|
if isbin:
|
|
return _ColumnType.blob
|
|
else:
|
|
return _ColumnType.text
|
|
else:
|
|
return _TYPE_MAP[type]
|
|
|
|
|
|
class _Sample(NamedTuple):
|
|
fuzzer: str
|
|
name: str
|
|
content: bytes
|
|
|
|
|
|
def _encoding_to_str(enc: _Encoding) -> str:
|
|
return 'text' if enc == _Encoding.text else 'binary'
|
|
|
|
|
|
# meta: spans 2 bytes
|
|
# meta[0][low 7 bits]: column_type
|
|
# meta[0][high bit]: is unsigned flag
|
|
# meta[1]: decimals
|
|
def _gen_meta(mysql_frame) -> bytes:
|
|
type = _compute_column_type(mysql_frame)
|
|
unsigned_flag = int(mysql_frame['mysql.field.flags_tree']['mysql.field.flags.unsigned_raw'][0])
|
|
assert unsigned_flag == 0 or unsigned_flag == 1
|
|
decimals = int(mysql_frame['mysql.field.decimals_raw'][0], 16)
|
|
return struct.pack(
|
|
'<BB',
|
|
type.value | (unsigned_flag << 7),
|
|
decimals
|
|
)
|
|
|
|
# Parses a text row into its fields. Relies on int_lenencs not being more than 1 byte
|
|
def _parse_text_row(r: bytes, num_fields: int) -> List[bytes]:
|
|
res: List[bytes] = []
|
|
offset = 0
|
|
for _ in range(num_fields):
|
|
(length,) = struct.unpack('<B', r[offset:offset+1])
|
|
if length >= 0xfb: # Too long or NULL samples
|
|
continue
|
|
res.append(r[offset+1:offset+1+length])
|
|
offset += length + 1
|
|
return res
|
|
|
|
|
|
def _gen_text_field_samples(response, table: str) -> List[_Sample]:
|
|
mysql_raw = response['_source']['layers']['mysql_raw']
|
|
mysql = response['_source']['layers']['mysql']
|
|
|
|
num_fields = int(mysql[0]['mysql.num_fields_raw'][0], 16)
|
|
metas = [_gen_meta(mysql[i + 1]) for i in range(num_fields)]
|
|
|
|
res: List[_Sample] = []
|
|
for i, rec in enumerate(mysql_raw[1 + num_fields:-1]):
|
|
row = bytes.fromhex(rec[0][2*4:])
|
|
fields = _parse_text_row(row, num_fields)
|
|
res += [_Sample(
|
|
fuzzer='fuzz_text_field',
|
|
name=f'{table}_{i}_{j}',
|
|
content=meta + field
|
|
) for j, (meta, field) in enumerate(zip(metas, fields))]
|
|
|
|
return res
|
|
|
|
|
|
def _gen_row_samples(response, enc: _Encoding, table: str) -> List[_Sample]:
|
|
mysql_raw = response['_source']['layers']['mysql_raw']
|
|
mysql = response['_source']['layers']['mysql']
|
|
|
|
# Header[0][low 7 bits]: num_fields
|
|
# Header[0][high bit]: encoding
|
|
num_fields = int(mysql[0]['mysql.num_fields_raw'][0], 16)
|
|
header = struct.pack('<B', num_fields | (enc.value << 7))
|
|
|
|
# As many meta blocks as num_fields
|
|
meta = b''.join(_gen_meta(mysql[i + 1]) for i in range(num_fields))
|
|
|
|
# Rows
|
|
return [
|
|
_Sample(
|
|
fuzzer='fuzz_row',
|
|
name=f'{_encoding_to_str(enc)}_{table}_{i}',
|
|
content=b''.join((header, meta, bytes.fromhex(rec[0][2*4:])))
|
|
)
|
|
for i, rec in enumerate(mysql_raw[1 + num_fields:-1])
|
|
]
|
|
|
|
|
|
# Frames should be captured with wireshark (see below for each protocol encoding)
|
|
# and saved as pcapng files. They should be converted to jsonraw using
|
|
# tshark -r fname.pcapng -T jsonraw --no-duplicate-keys -Y 'mysql' > fname.json
|
|
def main():
|
|
corpus: List[_Sample] = []
|
|
|
|
# Text. These should contain frames generated by running SELECT * FROM <table>
|
|
# repeatedly over the set of tables, from mysql command line.
|
|
# This generates two frames (request and response) for each select
|
|
with open(_TEXT_JSON, 'rt') as f:
|
|
obj = json.load(f)
|
|
for query, response in zip(*(iter(obj),) * 2):
|
|
table = bytes.fromhex(query['_source']['layers']['mysql']['mysql.payload_raw'][0])[1:].decode().split(' ')[-1]
|
|
corpus += _gen_row_samples(response, _Encoding.text, table)
|
|
corpus += _gen_text_field_samples(response, table)
|
|
|
|
# Binary. These should contain frames generated by running SELECT * FROM <table>
|
|
# repeatedly over the set of tables, from the Python mysql protocol driver,
|
|
# with code like:
|
|
# cursor = cnx.cursor(prepared=True)
|
|
# cursor.execute(f'SELECT * FROM {table}')
|
|
# cursor.fetchall()
|
|
# This generates 7 frames for each table, since the driver prepares, executes and resets the statement.
|
|
with open(_BINARY_JSON, 'rt') as f:
|
|
obj = json.load(f)
|
|
for reqclose, query, qres, reset, resetres, exec, response in zip(*(iter(obj),) * 7):
|
|
table = bytes.fromhex(query['_source']['layers']['mysql']['mysql.request_raw'][0])[1:].decode().split(' ')[-1]
|
|
corpus += _gen_row_samples(response, _Encoding.binary, table)
|
|
|
|
# Write all frames to a CSV file
|
|
csv_path = _REPO_BASE.joinpath('tools', 'seed_corpus', 'field_table.csv')
|
|
with open(csv_path, 'wt') as f:
|
|
writer = csv.DictWriter(f, fieldnames=_Sample._fields)
|
|
writer.writeheader()
|
|
writer.writerows({
|
|
"fuzzer": sample.fuzzer,
|
|
"name": sample.name,
|
|
"content": sample.content.hex()
|
|
} for sample in corpus)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|