diff --git a/PyFlow/App.py b/PyFlow/App.py index b33c05a5..76a27d74 100644 --- a/PyFlow/App.py +++ b/PyFlow/App.py @@ -35,6 +35,7 @@ from PyFlow.Core.Common import currentProcessorTime from PyFlow.Core.Common import SingletonDecorator from PyFlow.Core.Common import validateGraphDataPackages +from PyFlow.Core.AsyncLoop import close_event_loop, get_or_create_event_loop from PyFlow.UI.Canvas.UICommon import SessionDescriptor from PyFlow.UI.Widgets.BlueprintCanvas import BlueprintCanvasWidget from PyFlow.UI.Tool.Tool import ShelfTool, DockTool @@ -486,7 +487,7 @@ def stopMainLoop(self): self.tick_timer.timeout.disconnect() def mainLoop(self): - asyncio.get_event_loop().run_until_complete(self._tick_asyncio()) + get_or_create_event_loop().run_until_complete(self._tick_asyncio()) deltaTime = currentProcessorTime() - self._lastClock ds = deltaTime * 1000.0 @@ -634,6 +635,7 @@ def closeEvent(self, event): if os.path.exists(self.currentTempDir): shutil.rmtree(self.currentTempDir) + close_event_loop() SingletonDecorator.destroyAll() PyFlow.appInstance = None diff --git a/PyFlow/Core/AsyncLoop.py b/PyFlow/Core/AsyncLoop.py new file mode 100644 index 00000000..3db91f85 --- /dev/null +++ b/PyFlow/Core/AsyncLoop.py @@ -0,0 +1,59 @@ +## Copyright 2015-2019 Ilgar Lunin, Pedro Cabrera + +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at + +## http://www.apache.org/licenses/LICENSE-2.0 + +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + + +import asyncio + + +_PYFLOW_EVENT_LOOP = None + + +def get_or_create_event_loop(): + global _PYFLOW_EVENT_LOOP + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop is not None: + _PYFLOW_EVENT_LOOP = loop + return loop + + if _PYFLOW_EVENT_LOOP is None or _PYFLOW_EVENT_LOOP.is_closed(): + _PYFLOW_EVENT_LOOP = asyncio.new_event_loop() + + asyncio.set_event_loop(_PYFLOW_EVENT_LOOP) + return _PYFLOW_EVENT_LOOP + + +def close_event_loop(): + global _PYFLOW_EVENT_LOOP + + loop = _PYFLOW_EVENT_LOOP + _PYFLOW_EVENT_LOOP = None + + if loop is None: + asyncio.set_event_loop(None) + return + + if not loop.is_closed(): + pending = [task for task in asyncio.all_tasks(loop) if not task.done()] + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + loop.close() + + asyncio.set_event_loop(None) diff --git a/PyFlow/Packages/PyFlowBase/Nodes/subProcess.py b/PyFlow/Packages/PyFlowBase/Nodes/subProcess.py index c81cc605..3b4aa569 100644 --- a/PyFlow/Packages/PyFlowBase/Nodes/subProcess.py +++ b/PyFlow/Packages/PyFlowBase/Nodes/subProcess.py @@ -16,6 +16,7 @@ import asyncio import time import uuid +from PyFlow.Core.AsyncLoop import get_or_create_event_loop from PyFlow.Core import NodeBase, PinBase from PyFlow.Core.Common import * from PyFlow.Core.NodeBase import NodePinsSuggestionsHelper @@ -133,7 +134,10 @@ def compute(self, *args, **kwargs): self.proc_task_args = args self.proc_task_kwargs = kwargs self.is_running.setData(True) - self.proc_task = asyncio.get_event_loop().create_task(self._run_cmd(self.proc_task_uuid, cmd, self.cwd.getData())) + loop = get_or_create_event_loop() + self.proc_task = loop.create_task( + self._run_cmd(self.proc_task_uuid, cmd, self.cwd.getData()) + ) async def _run_cmd(self, _uuid, cmd, cwd): if None != self.proc and None == self.proc.returncode: @@ -179,4 +183,4 @@ def Tick(self, delta): self.proc = None self.computed.send() self.is_running.setData(False) - self.outExecPin.call(*self.proc_task_args, **self.proc_task_kwargs) \ No newline at end of file + self.outExecPin.call(*self.proc_task_args, **self.proc_task_kwargs) diff --git a/PyFlow/Tests/Test_AsyncioCompatibility.py b/PyFlow/Tests/Test_AsyncioCompatibility.py new file mode 100644 index 00000000..1418b1c5 --- /dev/null +++ b/PyFlow/Tests/Test_AsyncioCompatibility.py @@ -0,0 +1,84 @@ +## Copyright 2015-2019 Ilgar Lunin, Pedro Cabrera + +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at + +## http://www.apache.org/licenses/LICENSE-2.0 + +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + + +import asyncio +import unittest + +from PyFlow.App import PyFlow +from PyFlow.Core.Common import currentProcessorTime +from PyFlow.Packages.PyFlowBase.Nodes.subProcess import subProcess +from PyFlow.Tests.TestsBase import INITIALIZE + + +class _TickRecorder(object): + def __init__(self): + self.deltas = [] + + def get(self): + return self + + def Tick(self, delta): + self.deltas.append(delta) + + +class _DummyPyFlow(object): + def __init__(self): + self._lastClock = currentProcessorTime() + self.fps = 0 + self.graphManager = _TickRecorder() + self.canvasWidget = _TickRecorder() + + async def _tick_asyncio(self): + await asyncio.sleep(0) + + +class TestAsyncioCompatibility(unittest.TestCase): + def setUp(self): + self._loops = [] + asyncio.set_event_loop(None) + INITIALIZE() + + def tearDown(self): + for loop in self._loops: + if loop is None or loop.is_closed(): + continue + pending = [task for task in asyncio.all_tasks(loop) if not task.done()] + for task in pending: + task.cancel() + if pending: + loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True) + ) + loop.close() + + asyncio.set_event_loop(None) + + def test_main_loop_runs_without_implicit_default_event_loop(self): + dummy = _DummyPyFlow() + + PyFlow.mainLoop(dummy) + self._loops.append(asyncio.get_event_loop()) + + self.assertEqual(len(dummy.graphManager.deltas), 1) + self.assertEqual(len(dummy.canvasWidget.deltas), 1) + + def test_subprocess_compute_creates_task_without_implicit_default_event_loop(self): + node = subProcess("cmd") + node.cmd.setData("echo") + + node.compute() + + self.assertIsNotNone(node.proc_task) + self._loops.append(node.proc_task.get_loop())