Python에서 비동기 메서드 호출?
Python에 비동기 메서드 호출용 라이브러리가 있는지 궁금합니다.이런 거 해주면 좋을 것 같아요.
@async
def longComputation():
<code>
token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
doSomethingElse()
if token.finished():
result = token.result()
또는 비동기 루틴을 비동기적으로 호출하려면
def longComputation()
<code>
token = asynccall(longComputation())
언어 중심에서 원어민답게 좀 더 세련된 전략이 있으면 좋겠습니다.이 점은 고려되었습니까?
예를 들어 다음과 같습니다.
import threading
thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done
상세한 것에 대하여는, https://docs.python.org/library/threading.html 의 메뉴얼을 참조해 주세요.
Python 2.6에 추가된 멀티프로세서 모듈을 사용할 수 있습니다.프로세스 풀을 사용한 후 다음과 같이 비동기적으로 결과를 얻을 수 있습니다.
apply_async(func[, args[, kwds[, callback]]])
예:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=1) # Start a worker processes.
result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.
이것은 하나의 대안일 뿐이다.이 모듈은 원하는 것을 달성하기 위한 다양한 기능을 제공합니다.또한 이것으로 데코레이터를 만드는 것은 매우 간단할 것입니다.
Python 3.5부터는 비동기 함수에 확장 생성기를 사용할 수 있습니다.
import asyncio
import datetime
향상된 제너레이터 구문:
@asyncio.coroutine
def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
신규async/await
구문:
async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
언어 핵심이 아니라 원하는 기능을 하는 매우 성숙한 라이브러리가 트위스트입니다.콜백 또는 에러 핸들러("errback")를 부가할 수 있는 Deferred 객체가 도입되었습니다.지연은 기본적으로 함수가 최종적으로 결과를 가져올 것이라는 "약속"입니다.
데코레이터를 구현하여 기능을 비동기화할 수 있습니다.단, 조금 까다롭습니다.그multiprocessing
모듈에는 사소한 기호가 가득하고 겉보기에는 제멋대로인 제약이 있습니다.단, 친근한 인터페이스 뒤에 캡슐화할 이유는 더욱 있습니다.
from inspect import getmodule
from multiprocessing import Pool
def async(decorated):
r'''Wraps a top-level function around an asynchronous dispatcher.
when the decorated function is called, a task is submitted to a
process pool, and a future object is returned, providing access to an
eventual return value.
The future object has a blocking get() method to access the task
result: it will return immediately if the job is already done, or block
until it completes.
This decorator won't work on methods, due to limitations in Python's
pickling machinery (in principle methods could be made pickleable, but
good luck on that).
'''
# Keeps the original function visible from the module global namespace,
# under a name consistent to its __name__ attribute. This is necessary for
# the multiprocessing pickling machinery to work properly.
module = getmodule(decorated)
decorated.__name__ += '_original'
setattr(module, decorated.__name__, decorated)
def send(*args, **opts):
return async.pool.apply_async(decorated, args, opts)
return send
아래 코드는 데코레이터의 사용법을 보여줍니다.
@async
def printsum(uid, values):
summed = 0
for value in values:
summed += value
print("Worker %i: sum value is %i" % (uid, summed))
return (uid, summed)
if __name__ == '__main__':
from random import sample
# The process pool must be created inside __main__.
async.pool = Pool(4)
p = range(0, 1000)
results = []
for i in range(4):
result = printsum(i, sample(p, 100))
results.append(result)
for result in results:
print("Worker %i: sum value is %i" % result.get())
실제 상황에서는 디버깅을 위해(미래 인터페이스를 그대로 유지한 상태에서) 끌 수 있는 방법을 제공하거나 예외에 대처하는 기능을 제공하면서 데코레이터에 대해 좀 더 자세히 설명하겠습니다만, 이것으로 충분히 원리를 알 수 있다고 생각합니다.
그저.
import threading, time
def f():
print "f started"
time.sleep(3)
print "f finished"
threading.Thread(target=f).start()
솔루션은 다음과 같습니다.
import threading
class TimeoutError(RuntimeError):
pass
class AsyncCall(object):
def __init__(self, fnc, callback = None):
self.Callable = fnc
self.Callback = callback
def __call__(self, *args, **kwargs):
self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
self.Thread.start()
return self
def wait(self, timeout = None):
self.Thread.join(timeout)
if self.Thread.isAlive():
raise TimeoutError()
else:
return self.Result
def run(self, *args, **kwargs):
self.Result = self.Callable(*args, **kwargs)
if self.Callback:
self.Callback(self.Result)
class AsyncMethod(object):
def __init__(self, fnc, callback=None):
self.Callable = fnc
self.Callback = callback
def __call__(self, *args, **kwargs):
return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)
def Async(fnc = None, callback = None):
if fnc == None:
def AddAsyncCallback(fnc):
return AsyncMethod(fnc, callback)
return AddAsyncCallback
else:
return AsyncMethod(fnc, callback)
요구대로 동작합니다.
@Async
def fnc():
pass
이벤트렛을 사용할 수 있습니다.동기 코드라고 생각되는 것을 쓸 수 있지만, 네트워크상에서 비동기적으로 동작하도록 할 수 있습니다.
다음은 슈퍼 미니멀 크롤러의 예입니다.
urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]
import eventlet
from eventlet.green import urllib2
def fetch(url):
return urllib2.urlopen(url).read()
pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
print "got body", len(body)
이와 같은 기능을 사용하면 함수를 호출할 수 있습니다. 그러면 새로운 스레드로 자동 디스패치됩니다.
from thread import start_new_thread
def dowork(asynchronous=True):
if asynchronous:
args = (False)
start_new_thread(dowork,args) #Call itself on a new thread.
else:
while True:
#do something...
time.sleep(60) #sleep for a minute
return
동시에 사용할 수 있습니다.(Python 3.2에 추가)
import time
from concurrent.futures import ThreadPoolExecutor
def long_computation(duration):
for x in range(0, duration):
print(x)
time.sleep(1)
return duration * 2
print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(long_computation, 5)
while not future.done():
print('waiting...')
time.sleep(0.5)
print(future.result())
print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))
print('waiting for callback')
executor.shutdown(False) # non-blocking
print('shutdown invoked')
레드를사 용용용? 는?? ????.threading
★★★★★★★★★★★★★★★★★ finished()
은 " " 를 합니다.isAlive()
. 。result()
는 능능 function function function function function function function functionjoin()
이치노 수 , 리면면, 면 the the .run()
★★★★★★★★★★★★★★★★★」__init__
함수를 사용하여 생성자에 지정된 함수를 호출하고 값을 클래스의 인스턴스에 저장합니다.
2021년 Python 3.9와 함께 비동기 호출을 위한 네이티브 Python 방식도 Jupyter/Ipython 커널에 적합합니다.
Camabeh의 대답은 Python 3.3 이후부터의 길이다.
async def display_date(loop): end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) loop = asyncio.get_event_loop() # Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop)) loop.close()
이것은 Jupyter Notebook/Jupyter Lab에서 작동하지만 다음과 같은 오류가 발생합니다.
RuntimeError: This event loop is already running
Ipython의 이벤트 루프 사용으로 인해 Python에는 아직 구현되지 않은 중첩된 비동기 루프라고 불리는 것이 필요합니다.다행히도 이 문제를 처리할 nest_asyncio가 있다.필요한 것은 다음과 같습니다.
!pip install nest_asyncio # use ! within Jupyter Notebook, else pip install in shell
import nest_asyncio
nest_asyncio.apply()
「 」를 loop.close()
Ipython의 메인 루프를 참조하고 있기 때문에, 또 다른 에러가 발생합니다.
RuntimeError: Cannot close a running event loop
이 github 문제에 대한 답변이 오는 대로 업데이트 하겠습니다.
" " "asyncio
Python 3.7 이상에서 실행 중인 메서드는asyncio.run()
를 작성하는 loop
를 호출하고 있습니다.loop.run_until_complete()
또한 닫기도 합니다.
import asyncio
import datetime
async def display_date(delay):
loop = asyncio.get_running_loop()
end_time = loop.time() + delay
while True:
print("Blocking...", datetime.datetime.now())
await asyncio.sleep(1)
if loop.time() > end_time:
print("Done.")
break
asyncio.run(display_date(5))
프로세스를 사용할 수 있습니다.(네트워킹과 같은) 기능을 하는 동안 계속 사용하려는 경우:
from multiprocessing import Process
def foo():
while 1:
# Do something
p = Process(target = foo)
p.start()
한 번만 실행하려면 다음과 같이 하십시오.
from multiprocessing import Process
def foo():
# Do something
p = Process(target = foo)
p.start()
p.join()
언급URL : https://stackoverflow.com/questions/1239035/asynchronous-method-call-in-python
'source' 카테고리의 다른 글
특정 페이지에서만 Vue Router 액세스 경로 (0) | 2022.11.16 |
---|---|
문자열의 첫 번째 문자가 0인 경우 삭제 (0) | 2022.11.16 |
날짜와 함께 GROUP_CONCAT에서 NOT IN 연산자(<>)를 사용하는 방법 (0) | 2022.11.15 |
EntityManager.find()와 EntityManager.getReference()를 JPA와 함께 사용하는 경우 (0) | 2022.11.15 |
Panda 데이터 프레임의 두 열에 함수를 적용하는 방법 (0) | 2022.11.15 |