Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion PyFlow/App.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from PyFlow.Core.Common import currentProcessorTime
from PyFlow.Core.Common import SingletonDecorator
from PyFlow.Core.Common import validateGraphDataPackages
from PyFlow.Core.AsyncLoop import close_event_loop, get_or_create_event_loop
from PyFlow.UI.Canvas.UICommon import SessionDescriptor
from PyFlow.UI.Widgets.BlueprintCanvas import BlueprintCanvasWidget
from PyFlow.UI.Tool.Tool import ShelfTool, DockTool
Expand Down Expand Up @@ -486,7 +487,7 @@ def stopMainLoop(self):
self.tick_timer.timeout.disconnect()

def mainLoop(self):
asyncio.get_event_loop().run_until_complete(self._tick_asyncio())
get_or_create_event_loop().run_until_complete(self._tick_asyncio())

deltaTime = currentProcessorTime() - self._lastClock
ds = deltaTime * 1000.0
Expand Down Expand Up @@ -634,6 +635,7 @@ def closeEvent(self, event):
if os.path.exists(self.currentTempDir):
shutil.rmtree(self.currentTempDir)

close_event_loop()
SingletonDecorator.destroyAll()

PyFlow.appInstance = None
Expand Down
59 changes: 59 additions & 0 deletions PyFlow/Core/AsyncLoop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
## Copyright 2015-2019 Ilgar Lunin, Pedro Cabrera

## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at

## http://www.apache.org/licenses/LICENSE-2.0

## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.


import asyncio


_PYFLOW_EVENT_LOOP = None


def get_or_create_event_loop():
global _PYFLOW_EVENT_LOOP

try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop is not None:
_PYFLOW_EVENT_LOOP = loop
return loop

if _PYFLOW_EVENT_LOOP is None or _PYFLOW_EVENT_LOOP.is_closed():
_PYFLOW_EVENT_LOOP = asyncio.new_event_loop()

asyncio.set_event_loop(_PYFLOW_EVENT_LOOP)
return _PYFLOW_EVENT_LOOP


def close_event_loop():
global _PYFLOW_EVENT_LOOP

loop = _PYFLOW_EVENT_LOOP
_PYFLOW_EVENT_LOOP = None

if loop is None:
asyncio.set_event_loop(None)
return

if not loop.is_closed():
pending = [task for task in asyncio.all_tasks(loop) if not task.done()]
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.close()

asyncio.set_event_loop(None)
8 changes: 6 additions & 2 deletions PyFlow/Packages/PyFlowBase/Nodes/subProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import asyncio
import time
import uuid
from PyFlow.Core.AsyncLoop import get_or_create_event_loop
from PyFlow.Core import NodeBase, PinBase
from PyFlow.Core.Common import *
from PyFlow.Core.NodeBase import NodePinsSuggestionsHelper
Expand Down Expand Up @@ -133,7 +134,10 @@ def compute(self, *args, **kwargs):
self.proc_task_args = args
self.proc_task_kwargs = kwargs
self.is_running.setData(True)
self.proc_task = asyncio.get_event_loop().create_task(self._run_cmd(self.proc_task_uuid, cmd, self.cwd.getData()))
loop = get_or_create_event_loop()
self.proc_task = loop.create_task(
self._run_cmd(self.proc_task_uuid, cmd, self.cwd.getData())
)

async def _run_cmd(self, _uuid, cmd, cwd):
if None != self.proc and None == self.proc.returncode:
Expand Down Expand Up @@ -179,4 +183,4 @@ def Tick(self, delta):
self.proc = None
self.computed.send()
self.is_running.setData(False)
self.outExecPin.call(*self.proc_task_args, **self.proc_task_kwargs)
self.outExecPin.call(*self.proc_task_args, **self.proc_task_kwargs)
84 changes: 84 additions & 0 deletions PyFlow/Tests/Test_AsyncioCompatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
## Copyright 2015-2019 Ilgar Lunin, Pedro Cabrera

## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at

## http://www.apache.org/licenses/LICENSE-2.0

## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.


import asyncio
import unittest

from PyFlow.App import PyFlow
from PyFlow.Core.Common import currentProcessorTime
from PyFlow.Packages.PyFlowBase.Nodes.subProcess import subProcess
from PyFlow.Tests.TestsBase import INITIALIZE


class _TickRecorder(object):
def __init__(self):
self.deltas = []

def get(self):
return self

def Tick(self, delta):
self.deltas.append(delta)


class _DummyPyFlow(object):
def __init__(self):
self._lastClock = currentProcessorTime()
self.fps = 0
self.graphManager = _TickRecorder()
self.canvasWidget = _TickRecorder()

async def _tick_asyncio(self):
await asyncio.sleep(0)


class TestAsyncioCompatibility(unittest.TestCase):
def setUp(self):
self._loops = []
asyncio.set_event_loop(None)
INITIALIZE()

def tearDown(self):
for loop in self._loops:
if loop is None or loop.is_closed():
continue
pending = [task for task in asyncio.all_tasks(loop) if not task.done()]
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(
asyncio.gather(*pending, return_exceptions=True)
)
loop.close()

asyncio.set_event_loop(None)

def test_main_loop_runs_without_implicit_default_event_loop(self):
dummy = _DummyPyFlow()

PyFlow.mainLoop(dummy)
self._loops.append(asyncio.get_event_loop())

self.assertEqual(len(dummy.graphManager.deltas), 1)
self.assertEqual(len(dummy.canvasWidget.deltas), 1)

def test_subprocess_compute_creates_task_without_implicit_default_event_loop(self):
node = subProcess("cmd")
node.cmd.setData("echo")

node.compute()

self.assertIsNotNone(node.proc_task)
self._loops.append(node.proc_task.get_loop())