feinong
feinong
2081 1 0

用 150 行 Python 编写 SQL 查询构建器

从零开始设计一个 python 小项目,包括 API 设计、Lazy 加载等

import functools
import textwrap
from collections import defaultdict
from typing import NamedTuple


class Query:

    keywords = [
        'WITH',
        'SELECT',
        'FROM',
        'WHERE',
        'GROUP BY',
        'HAVING',
        'ORDER BY',
        'LIMIT',
    ]

    separators = dict(WHERE='AND', HAVING='AND')
    default_separator = ','

    formats = (
        defaultdict(lambda: '{value}'),
        defaultdict(lambda: '{value} AS {alias}', WITH='{alias} AS {value}'),
    )

    def __init__(self):
        self.data = {k: [] for k in self.keywords}

    def add(self, keyword, *args):
        target = self.data[keyword]

        for arg in args:
            target.append(_Thing.from_arg(arg))

        return self

    def __getattr__(self, name):
        if not name.isupper():
            return getattr(super(), name)
        return functools.partial(self.add, name.replace('_', ' '))

    def __str__(self):
        return ''.join(self._lines())

    def _lines(self):
        for keyword, things in self.data.items():
            if not things:
                continue

            yield f'{keyword}\n'
            yield from self._lines_keyword(keyword, things)

    def _lines_keyword(self, keyword, things):
        for i, thing in enumerate(things, 1):
            last = i == len(things)

            format = self.formats[bool(thing.alias)][keyword]
            yield self._indent(format.format(value=thing.value, alias=thing.alias))

            if not last:
                try:
                    yield ' ' + self.separators[keyword]
                except KeyError:
                    yield self.default_separator

            yield '\n'

    _indent = functools.partial(textwrap.indent, prefix='    ')


class _Thing(NamedTuple):
    value: str
    alias: str = ''

    @[classmethod](/name/classmethod)     def from_arg(cls, arg):
        if isinstance(arg, str):
            alias, value = '', arg
        elif len(arg) == 2:
            alias, value = arg
        else:
            raise ValueError(f"invalid arg: {arg!r}")
        return cls(_clean_up(value), _clean_up(alias))


def _clean_up(thing: str) -> str:
    return textwrap.dedent(thing.rstrip()).strip()

原文 https://death.andgravity.com/query-builder-how

0

See Also

Nearby


Discussion (1)

feinong
feinong 2021-08-27 20:01

测试代码

from textwrap import dedent

from builder import Query


def test_query_simple():
    query = (
        Query()
        .WITH(('alias', 'with'))
        .SELECT('select-one', ('alias', 'select-two'))
        .FROM('from-one', 'from-two')
        .WHERE('where-one', 'where-two')
    )
    assert str(query) == dedent(
        """\
        WITH
            alias AS (
                with
            )
        SELECT
            select-one,
            select-two AS alias
        FROM
            from-one,
            from-two
        WHERE
            where-one AND
            where-two
        """
    )

0
Login Topics