source

하위 프로세스를 사용하여 실시간 출력 가져오기

manycodes 2023. 6. 20. 21:43
반응형

하위 프로세스를 사용하여 실시간 출력 가져오기

작업 진행 표시기를 표시하는 명령줄 프로그램(svnadmin verify)의 래퍼 스크립트를 작성하려고 합니다.이를 위해서는 포장된 프로그램의 각 출력 라인을 출력 즉시 볼 수 있어야 합니다.

저는 그냥 프로그램을 실행하기로 결심했습니다.subprocess.Popen,사용하다stdout=PIPE그 다음 각 행을 읽고 그에 따라 행동합니다.그러나 다음 코드를 실행했을 때 출력이 어딘가에서 버퍼링된 것처럼 보여 1~332행, 333~439행(출력의 마지막 행)의 두 청크로 표시되었습니다.

from subprocess import Popen, PIPE, STDOUT

p = Popen('svnadmin verify /var/svn/repos/config', stdout = PIPE, 
        stderr = STDOUT, shell = True)
for line in p.stdout:
    print line.replace('\n', '')

하위 프로세스에 대한 문서를 조금 본 후, 저는 다음을 발견했습니다.bufsize 변수 to 한대 변수 개매 에▁to.Popen그래서 buffsize를 1(각 라인마다 버퍼)과 0(버퍼 없음)으로 설정하려고 했지만, 두 값 모두 라인이 전달되는 방식을 바꾸지 않았습니다.

이 시점에서 저는 지푸라기를 잡기 시작했고, 그래서 저는 다음과 같은 출력 루프를 작성했습니다.

while True:
    try:
        print p.stdout.next().replace('\n', '')
    except StopIteration:
        break

하지만 같은 결과를 얻었습니다.

하위 프로세스를 사용하여 실행되는 프로그램의 '실시간' 프로그램 출력을 얻을 수 있습니까?파이썬에서 전방 호환되는 다른 옵션이 있습니까(그렇지 않음).exec*)?

저는 이것을 시도했고, 어떤 이유에서인지 코드가

for line in p.stdout:
  ...

버퍼 공격적, 변형

while True:
  line = p.stdout.readline()
  if not line: break
  ...

하지 않다.분명히 이것은 알려진 버그입니다: http://bugs.python.org/issue3907 (이 문제는 현재 2018년 8월 29일자로 "닫힌" 상태입니다.

버퍼 크기를 1로 설정하면 프로세스가 출력을 버퍼링하지 않도록 강제할 수 있습니다.

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
    print line,
p.stdout.close()
p.wait()

하위 프로세스 출력을 직접 스트림으로 보낼 수 있습니다.단순화된 예:

subprocess.run(['ls'], stderr=sys.stderr, stdout=sys.stdout)

사용해 볼 수 있습니다.

import subprocess
import sys

process = subprocess.Popen(
    cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

while True:
    out = process.stdout.read(1)
    if out == '' and process.poll() != None:
        break
    if out != '':
        sys.stdout.write(out)
        sys.stdout.flush()

읽기 대신 읽기 줄을 사용하면 입력 메시지가 인쇄되지 않는 경우가 있습니다.에서 인라인 입력을 요구하는 명령을 사용하여 사용해 보고 직접 확인하십시오.

Python 3.x에서는 출력이 문자열이 아닌 바이트 배열이기 때문에 프로세스가 중단될 수 있습니다.반드시 문자열로 디코딩해야 합니다.

3 변수 Python 3.6을 할 수 .encodingPopen Constructor에서.전체 예:

process = subprocess.Popen(
    'my_command',
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    shell=True,
    encoding='utf-8',
    errors='replace'
)

while True:
    realtime_output = process.stdout.readline()

    if realtime_output == '' and process.poll() is not None:
        break

    if realtime_output:
        print(realtime_output.strip(), flush=True)

코드는 리디렉션됩니다. stderrstdout 출력 오류를 처리합니다.

Python에서 했습니다.C 프로그램에서 실시간 출력을 캡처하는 동안 Python에서 유사한 문제가 발생했습니다.추가했습니다.fflush(stdout);내 C 코드로.그것은 나에게 효과가 있었다.여기 코드가 있습니다.

C 프로그램:

#include <stdio.h>
void main()
{
    int count = 1;
    while (1)
    {
        printf(" Count  %d\n", count++);
        fflush(stdout);
        sleep(1);
    }
}

파이썬 프로그램:

#!/usr/bin/python

import os, sys
import subprocess


procExe = subprocess.Popen(".//count", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

while procExe.poll() is None:
    line = procExe.stdout.readline()
    print("Print:" + line)

출력:

Print: Count  1
Print: Count  2
Print: Count  3

Kevin McCarthy의 Python 블로그 게시물에서 스트리밍 하위 프로세스 stdinstdout은 비동기로 수행하는 방법을 보여줍니다.

import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def run(command):
    process = await create_subprocess_exec(
        *command, stdout=PIPE, stderr=PIPE
    )

    await asyncio.wait(
        [
            _read_stream(
                process.stdout,
                lambda x: print(
                    "STDOUT: {}".format(x.decode("UTF8"))
                ),
            ),
            _read_stream(
                process.stderr,
                lambda x: print(
                    "STDERR: {}".format(x.decode("UTF8"))
                ),
            ),
        ]
    )

    await process.wait()


async def main():
    await run("docker build -t my-docker-image:latest .")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

사용 사례에 따라 하위 프로세스 자체에서 버퍼링을 사용하지 않도록 설정할 수도 있습니다.

하위 프로세스가 Python 프로세스인 경우, 호출 전에 다음 작업을 수행할 수 있습니다.

os.environ["PYTHONUNBUFFERED"] = "1"

또는 이것을 다음에 전달합니다.env에 대한 인수입니다.Popen.

Linux/Unix 파일을 할 수 .stdbuf도구. 예:

cmd = ["stdbuf", "-oL"] + cmd

여기를 참조하십시오.stdbuf또는 다른 옵션.

(동일한 답변은 여기를 참조하십시오.)

여기서 이 "플러그 앤 플레이" 기능을 찾았습니다.아주 잘 작동했어요!

import subprocess

def myrun(cmd):
    """from
    http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)

저는 얼마 전에 같은 문제에 부딪혔습니다.제 해결책은 다음을 위한 반복 작업을 포기하는 것이었습니다.read하위 프로세스 실행이 완료되지 않은 경우 등에도 즉시 반환되는 메서드입니다.

하위 프로세스에서 실시간 출력을 얻기 위해 이 솔루션을 사용했습니다.이 루프는 프로세스가 완료되는 즉시 중단 문 또는 무한 루프의 필요성을 배제하고 중지됩니다.

sub_process = subprocess.Popen(my_command, close_fds=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

while sub_process.poll() is None:
    out = sub_process.stdout.read(1)
    sys.stdout.write(out)
    sys.stdout.flush()

하위 프로세스의 출력에서 각 바이트에 반복기를 사용할 수 있습니다.이렇게 하면 하위 프로세스에서 인라인 업데이트('\r'로 끝나는 라인이 이전 출력 라인을 덮어쓸 수 있습니다.

from subprocess import PIPE, Popen

command = ["my_command", "-my_arg"]

# Open pipe to subprocess
subprocess = Popen(command, stdout=PIPE, stderr=PIPE)


# read each byte of subprocess
while subprocess.poll() is None:
    for c in iter(lambda: subprocess.stdout.read(1) if subprocess.poll() is None else {}, b''):
        c = c.decode('ascii')
        sys.stdout.write(c)
sys.stdout.flush()

if subprocess.returncode != 0:
    raise Exception("The subprocess did not terminate correctly.")

이것은 제가 항상 사용하는 기본 골격입니다.이를 통해 시간 초과를 쉽게 구현할 수 있으며 불가피한 중단 프로세스를 처리할 수 있습니다.

import subprocess
import threading
import Queue

def t_read_stdout(process, queue):
    """Read from stdout"""

    for output in iter(process.stdout.readline, b''):
        queue.put(output)

    return

process = subprocess.Popen(['dir'],
                           stdout=subprocess.PIPE,
                           stderr=subprocess.STDOUT,
                           bufsize=1,
                           cwd='C:\\',
                           shell=True)

queue = Queue.Queue()
t_stdout = threading.Thread(target=t_read_stdout, args=(process, queue))
t_stdout.daemon = True
t_stdout.start()

while process.poll() is None or not queue.empty():
    try:
        output = queue.get(timeout=.5)

    except Queue.Empty:
        continue

    if not output:
        continue

    print(output),

t_stdout.join()

실시간으로 로그를 콘솔로 전달하려는 경우

아래 코드는 두 가지 모두에 적용됩니다.

 p = subprocess.Popen(cmd,
                         shell=True,
                         cwd=work_dir,
                         bufsize=1,
                         stdin=subprocess.PIPE,
                         stderr=sys.stderr,
                         stdout=sys.stdout)

완전한 솔루션:

import contextlib
import subprocess

# Unix, Windows and old Macintosh end-of-line
newlines = ['\n', '\r\n', '\r']
def unbuffered(proc, stream='stdout'):
    stream = getattr(proc, stream)
    with contextlib.closing(stream):
        while True:
            out = []
            last = stream.read(1)
            # Don't loop forever
            if last == '' and proc.poll() is not None:
                break
            while last not in newlines:
                # Don't loop forever
                if last == '' and proc.poll() is not None:
                    break
                out.append(last)
                last = stream.read(1)
            out = ''.join(out)
            yield out

def example():
    cmd = ['ls', '-l', '/']
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        # Make all end-of-lines '\n'
        universal_newlines=True,
    )
    for line in unbuffered(proc):
        print line

example()

비차단 읽기 줄에 pexpect를 사용하면 이 문제가 해결됩니다.파이프가 버퍼링되기 때문에 앱의 출력이 파이프에 의해 버퍼링되므로 버퍼가 채워지거나 프로세스가 종료될 때까지 해당 출력에 도달할 수 없습니다.

다음은 저에게 도움이 되었습니다.

import subprocess
import sys

def run_cmd_print_output_to_console_and_log_to_file(cmd, log_file_path):
    make_file_if_not_exist(log_file_path)
    logfile = open(log_file_path, 'w')

    proc=subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell = True)
    for line in proc.stdout:
        sys.stdout.write(line.decode("utf-8") )
        print(line.decode("utf-8").strip(), file=logfile, flush=True)
    proc.wait()

    logfile.close()

2
행 readflush/flush flush() sys.sys.stdout .

while proc.poll() is None:
    line = proc.stdout.readline()
    sys.stdout.write(line)
    # or print(line.strip()), you still need to force the flush.
    sys.stdout.flush()

python 3.x 또는 pthon 2.x를 제안하는 답변은 거의 없습니다. 아래 코드는 두 가지 모두에 적용됩니다.

 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,)
    stdout = []
    while True:
        line = p.stdout.readline()
        if not isinstance(line, (str)):
            line = line.decode('utf-8')
        stdout.append(line)
        print (line)
        if (line == '' and p.poll() != None):
            break
def run_command(command):
process = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
while True:
    output = process.stdout.readline()
    if output == '' and process.poll() is not None:
        break
    if output:
        print(output.strip())
rc = process.poll()
return rc

하지만 또 다른 대답!다음과 같은 요구 사항이 있었습니다.

  • 일부 명령을 실행하고 사용자가 실행한 것처럼 출력을 stdout으로 인쇄합니다.
  • 명령의 프롬프트를 사용자에게 표시합니다.예.pip uninstall numpy메시지가 나타납니다.... Proceed (Y/n)?(새 줄로 끝나지 않음)
  • 사용자가 본 출력을 문자열로 캡처

이것은 나에게 효과가 있었습니다(Windows의 Python 3.10에서만 테스트됨).

def run(*args: list[str]) -> str:
    proc = subprocess.Popen(
        *args,
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

    result = ""

    while proc.poll() is None:
        output = proc.stdout.read(1)

        if output:
            sys.stdout.write(output)
            sys.stdout.flush()
            result += output

    return result

이것들은 모두 좋은 예이지만, 저는 그들이 (a) 부분적인 선을 처리하는 것(예: "Y/n):") 하지만 정말 느리거나 b) 빠르지만 부분적인 선에 매달려 있다는 것을 발견했습니다.

저는 다음과 같은 일을 했습니다.

  • stdout 및 stderr에 대한 실시간 출력을 각 스트림에 제공합니다.
  • 스트림 버퍼링과 함께 작동하기 때문에 매우 빠릅니다.
  • 읽기 시 차단하지 않으므로 제한 시간 사용 가능 »
  • stdout 및 stderr을 독립적으로 효율적으로 저장
  • 텍스트 인코딩 처리(이진 스트림에 쉽게 적응할 수 있음)
  • Python 3.6+에서 작동합니다.
import os
import subprocess
import sys
import selectors
import io

def run_command(command: str) -> (int, str):

    proc = subprocess.Popen(
        command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )

    sel = selectors.DefaultSelector()
    for fobj in [ proc.stdout, proc.stderr ]:
        os.set_blocking(fobj.fileno(), False)
        sel.register(fobj, selectors.EVENT_READ)

    out=io.StringIO()
    err=io.StringIO()

    # loop until all descriptors removed
    while len(sel.get_map()) > 0:
        events = sel.select()
        if len(events) == 0:
            # timeout or signal, kill to prevent wait hanging
            proc.terminate()
            break
        for key, _ in events:
            # read all available data
            buf = key.fileobj.read().decode(errors='ignore')
            if buf == '':
                sel.unregister(key.fileobj)
            elif key.fileobj == proc.stdout:
                sys.stdout.write(buf)
                sys.stdout.flush()
                out.write(buf)
            elif key.fileobj == proc.stderr:
                sys.stderr.write(buf)
                sys.stderr.flush()
                err.write(buf)

    sel.close()
    proc.wait()
    if proc.returncode != 0:
        return (proc.returncode, err.getvalue())
    return (0, out.getvalue())

시간 초과 로직(주제가 실시간 출력이므로)은 포함하지 않았지만, select()/wait()에 추가하기만 하면 되므로 더 이상 무한 중단 걱정이 없습니다.

시간을 쟀습니다.cat '25MB-file'그리고 그것과 비교하면..read(1)솔루션은 약 300배 더 빠릅니다.

제 솔루션은 다음과 같습니다.

process = subprocess.Popen(command, stdout=PIPE, stderr=PIPE)

error_output = ""

while True:

    # The empty string is important to fulfill the exit condition (see below)
    stdout_line = ""
    if process.stdout:
        stdout = process.stdout.readline()
        if stdout:
            stdout_line = stdout.decode("utf-8")
            log.debug(stdout_line)

    # The empty string is important to fulfill the exit condition (see below)
    stderr_line = ""
    if process.stderr:
        stderr = process.stderr.readline()
        if stderr:
            stderr_line = stderr.decode("utf-8")
            error_output += stderr_line
            log.debug(stderr_line)

    # It might be the case that the process is finished but reading the
    # output is not finished. This is why we check both conditions:
    # Condition for readline:
    #   https://docs.python.org/3.6/tutorial/inputoutput.html#methods-of-file-objects
    # Condition for poll:
    #   https://docs.python.org/3/library/subprocess.html#subprocess.Popen.poll
    if stdout_line == "" and stderr_line == "" and process.poll() != None:
        break

if process.returncode != 0:
    raise Exception(error_output)

답변이 늦었지만 다음은 Python3에서 작동합니다.

import subprocess
import sys

process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

while True:
    out = process.stdout.read(1)
    if process.poll() is not None:
        break
    if out != '':
        sys.stdout.buffer.write(out)
        sys.stdout.flush()

언급URL : https://stackoverflow.com/questions/803265/getting-realtime-output-using-subprocess

반응형