source

메모리 효율이 뛰어난 SqlAlchemy Iterator/Generator 내장

manycodes 2022. 12. 5. 21:32
반응형

메모리 효율이 뛰어난 SqlAlchemy Iterator/Generator 내장

SqlAlchemy를 사용하여 인터페이스하는 최대 1000만 개의 MySQL 테이블이 있습니다.데이터 세트의 한 입 크기 청크를 지능적으로 가져오는 내장 생성기를 사용한다고 생각했지만, 이 테이블의 큰 하위 세트에 대한 쿼리는 메모리를 너무 많이 소비한다는 것을 알게 되었습니다.

for thing in session.query(Things):
    analyze(thing)

이를 피하기 위해서는 여러 덩어리로 잘라내는 나만의 반복기를 만들어야 합니다.

lastThingID = None
while True:
    things = query.filter(Thing.id < lastThingID).limit(querySize).all()
    if not rows or len(rows) == 0: 
        break
    for thing in things:
        lastThingID = row.id
        analyze(thing)

이게 정상인가요, 아니면 SA 내장 발전기에 대해 뭔가 부족한 점이 있나요?

질문에 대한 답변은 메모리 소비량이 예상되지 않음을 나타내는 것 같습니다.

대부분의 DBAPI 구현은 가져올 때 행을 완전히 버퍼링하기 때문에 일반적으로 SQLLchemy ORM이 하나의 결과를 얻기 전에 결과 세트 전체가 메모리에 저장됩니다.

그 다음, 그 은요.Query는 오브젝트로 돌아가기 전에 기본적으로 지정된 결과 세트를 완전히 로드하는 것입니다.여기서의 근거는 단순한 SELECT 문 이외의 쿼리에 관한 것입니다.예를 들어, 한 결과 집합에서 동일한 개체 ID를 여러 번 반환할 수 있는 다른 테이블에 결합할 경우(급하게 로드하는 경우 공통), 올바른 결과를 반환할 수 있도록 행의 전체 세트가 메모리에 있어야 하며 그렇지 않으면 수집이 부분적으로만 채워질 수 있습니다.

★★★★★★★★★★★★★★★★★.Query그럼, 에서 이 동작을 변경할 수 있는 옵션이 있습니다.이 호출에 의해Query행을 일괄 산출하고 배치 크기를 지정할 수 있습니다.에 따르면 않는 .docs는 기본적으로 무엇을 있는 입니다. 따라서 기본적으로는 자신이 무엇을 하고 있는지 알고 있는 경우입니다.또한 기본 DBAPI 사전 버퍼 행이 있는 경우에도 메모리 오버헤드가 있기 때문에 사용하지 않는 것보다 약간 더 잘 확장됩니다.

거의 .yield_per()그 대신, 저는 당신이 창 기능을 사용하여 위에서 제안한 더 나은 버전의 LIMIT 접근 방식을 사용합니다.LIMIT 은 OFFSET 、 OFFSET 、 OFFSET 。OFFSET N의 N은 OFFSET입니다.이것은 같은 쿼리를 1개가 아닌 50번 실행하는 것과 같습니다.매번 더 많은 행을 읽을 때마다 같은 쿼리를 실행하는 것과 같습니다.윈도우 함수 접근법에서는 선택할 테이블의 청크를 참조하는 일련의 "윈도우" 값을 미리 가져옵니다.선택하다

Wiki에 윈도 기능의 접근법이 게재되어 있기 때문에, 저는 그것을 매우 잘 사용하고 있습니다.

주의: 모든 데이터베이스가 창 기능을 지원하는 것은 아닙니다.Postgresql, Oracle 또는 SQL Server가 필요합니다.IMHO는 적어도 Postgresql을 사용할 가치가 있습니다.관계형 데이터베이스를 사용하고 있다면 최선의 방법을 사용하는 것이 좋습니다.

데이터베이스 전문가는 아니지만 SQL Chemy를 단순한 Python 추상화 레이어로 사용할 때(즉, ORM Query 객체를 사용하지 않음) 메모리 사용량을 폭발적으로 늘리지 않고 300M 행 테이블을 쿼리할 수 있는 만족스러운 솔루션을 생각해냈습니다.

다음으로 더미의 예를 제시하겠습니다.

from sqlalchemy import create_engine, select

conn = create_engine("DB URL...").connect()
q = select([huge_table])

proxy = conn.execution_options(stream_results=True).execute(q)

SQL Chemy SQL Chemy를 합니다.fetchmany()에 대해 while 디세이블로그:

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time

    if not batch:
        break

    for row in batch:
        # Do your stuff here...

proxy.close()

이 방법을 사용하면 위험한 메모리 오버헤드 없이 모든 종류의 데이터 집약을 수행할 수 있습니다.

NOTE stream_results및 Postgres와 함께 합니다.pyscopg2어댑터는 DBAPI나 데이터베이스 드라이버에서는 동작하지 않을 것입니다.

블로그 투고에는 위의 방법에 영감을 준 흥미로운 사용 사례가 있습니다.

SQLAlchemy를 사용하여 효율적인 트래버설/페이징을 검토하고 있으며, 이 답변을 업데이트하고자 합니다.

슬라이스 콜을 사용하여 쿼리 범위를 적절하게 제한하고 효율적으로 재사용할 수 있다고 생각합니다.

예:

window_size = 10  # or whatever limit you like
window_idx = 0
while True:
    start,stop = window_size*window_idx, window_size*(window_idx+1)
    things = query.slice(start, stop).all()
    if things is None:
        break
    for thing in things:
        analyze(thing)
    if len(things) < window_size:
        break
    window_idx += 1

Joel의 대답의 정신에 따라, 저는 다음을 사용합니다.

WINDOW_SIZE = 1000
def qgen(query):
    start = 0
    while True:
        stop = start + WINDOW_SIZE
        things = query.slice(start, stop).all()
        if len(things) == 0:
            break
        for thing in things:
            yield thing
        start += WINDOW_SIZE

이전에 모든 {OFFSET}개의 열을 찾아야 하므로 LIMIT/OFFSET을 사용하는 것은 좋지 않습니다. 따라서 더 클수록 OFFSET이 됩니다. 요청을 받는 시간이 길어집니다.윈도우 쿼리를 사용하면 대량의 데이터가 있는 큰 테이블에서도 좋지 않은 결과를 얻을 수 있습니다(첫 번째 결과를 너무 오래 기다리면 내 경우 청크된 웹 응답에 좋지 않습니다).

https://stackoverflow.com/a/27169302/450103에서 제공하는 최선의 접근법.제 경우 datetime 필드의 인덱스를 사용하여 문제를 해결하고 datetime > = previous_datetime으로 다음 쿼리를 가져옵니다.바보같아요, 왜냐하면 예전에 다른 경우에 인덱스를 사용했지만, 모든 데이터 창 쿼리를 가져오는 것이 더 낫다고 생각했기 때문이죠.내 경우엔 내가 틀렸다.

AFIK에서는 첫 번째 변형은 테이블에서 (1개의 SQL 쿼리를 사용하여) 모든 튜플을 가져오지만 반복할 때 각 엔티티에 대해 ORM 프레젠테이션을 구축합니다.따라서 반복하기 전에 모든 엔티티 목록을 작성하는 것보다 효율적이지만 모든 (원시) 데이터를 메모리로 가져와야 합니다.

따라서 큰 테이블에 LIMIT을 사용하는 것은 좋은 생각인 것 같습니다.

Postgres 또는 커서를 지원하는 RDBMS를 사용하는 경우 여러 행에서 효율적으로 반복할 수 있습니다.

 with db_session() as session:
     for partition in session.stream(select(Thing.id)).partitions(1000):
         for item in partition:
             analyze(item)

그러면 결과를 1000행 단위로 가져오는 순방향 커서가 생성되므로 서버 및 클라이언트에서 메모리 사용량이 최소화됩니다.

언급URL : https://stackoverflow.com/questions/7389759/memory-efficient-built-in-sqlalchemy-iterator-generator

반응형