From 228d4d570d5c86273a4feb4fd80b2ad50ba8981f Mon Sep 17 00:00:00 2001 From: Christopher Toth Date: Wed, 9 Feb 2022 01:36:40 -0700 Subject: [PATCH] Significant changes for Python 3 compatibility - Update all generators to use next(gen) instead of gen.next() - The visit function is now PEP 479 compliant - Fix all tests to use next(gen) instead of gen.next() - Ensure all tests pass - Fix test discovery so that python -m unittest and python setup.py test both work - Bump version to 0.4 --- setup.py | 139 +++--- src/owyl/__init__.py | 145 +++--- src/owyl/blackboard.py | 151 +++---- src/owyl/core.py | 982 +++++++++++++++++++++-------------------- src/owyl/decorators.py | 259 +++++------ src/owyl/dsl.py | 6 +- src/owyl/stack.py | 48 +- tests/__init__.py | 0 tests/testowyl.py | 808 ++++++++++++++++----------------- 9 files changed, 1273 insertions(+), 1265 deletions(-) create mode 100644 tests/__init__.py diff --git a/setup.py b/setup.py index cd48d03..b5cbaff 100644 --- a/setup.py +++ b/setup.py @@ -1,69 +1,70 @@ -# -*- coding: utf-8 -*- -"""setup -- setuptools setup file for Owyl. - -$Author$\n -$Rev$\n -$Date$ -""" - -__author__ = "$Author$"[9:-2] -__revision__ = "$Rev$" -__date__ = "$Date$"[7:-2] - -__version__ = "0.3" -__release__ = '.r'.join((__version__, __revision__)) - -__description__ = "The goal of Owyl: provide a fast and flexible Behavior Tree library implemented in python." -__long_description__ = """You have Pyglet. You've got Rabbyt. But who do your sprites go to for advice? Owyl, of course. - -The goal of Owyl: provide a fast and flexible Behavior Tree library implemented in python. For more information on Behavior Trees, see the articles at http://aigamedev.com/hierarchical-logic - -""" -__classifiers__ = ["Development Status :: 3 - Alpha", - "Environment :: Console", - "Intended Audience :: Developers", - "License :: OSI Approved :: BSD License", - "Natural Language :: English", - "Operating System :: OS Independent", - "Programming Language :: Python", - "Topic :: Games/Entertainment", - "Topic :: Scientific/Engineering :: Artificial Intelligence", - "Topic :: Software Development :: Libraries",] - -import sys - -try: - import setuptools -except ImportError: - from ez_setup import use_setuptools - use_setuptools() - -from setuptools import setup, find_packages - -INSTALL_REQUIRES=[] -ZIP_SAFE = True - -setup( - name = "owyl", - version = __version__, - author = "David Eyk", - author_email = "eykd@eykd.net", - url = "http://code.google.com/p/owyl/", - description = __description__, - long_description = __long_description__, - download_url = "http://code.google.com/p/owyl/downloads/list", - classifiers = __classifiers__, - - package_dir = {'': 'src',}, - packages = find_packages('src'), - - include_package_data = True, - exclude_package_data = {'src':['*.c', '*.h', '*.pyx', '*.pxd', '*.g']}, - #data_files=['src/data',], - - install_requires=INSTALL_REQUIRES, - zip_safe = ZIP_SAFE, - - test_suite = "nose.collector", - ) - +# -*- coding: utf-8 -*- +"""setup -- setuptools setup file for Owyl. + +$Author$\n +$Rev$\n +$Date$ +""" + +__author__ = "$Author$"[9:-2] +__revision__ = "$Rev$" +__date__ = "$Date$"[7:-2] + +__version__ = "0.4" +__release__ = '.r'.join((__version__, __revision__)) + +__description__ = "The goal of Owyl: provide a fast and flexible Behavior Tree library implemented in python." +__long_description__ = """You have Pyglet. You've got Rabbyt. But who do your sprites go to for advice? Owyl, of course. + +The goal of Owyl: provide a fast and flexible Behavior Tree library implemented in python. For more information on Behavior Trees, see the articles at http://aigamedev.com/hierarchical-logic + +""" +__classifiers__ = ["Development Status :: 3 - Alpha", + "Environment :: Console", + "Intended Audience :: Developers", + "License :: OSI Approved :: BSD License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Topic :: Games/Entertainment", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development :: Libraries",] + +import sys + +try: + import setuptools +except ImportError: + from ez_setup import use_setuptools + use_setuptools() + +from setuptools import setup, find_packages + +INSTALL_REQUIRES=[] +ZIP_SAFE = True + +setup( + name = "owyl", + version = __version__, + author = "David Eyk", + author_email = "eykd@eykd.net", + maintainer="Christopher Toth", + maintainer_email="q@q-continuum.net", + url = "https://github.com/ctoth/owyl", + description = __description__, + long_description = __long_description__, + classifiers = __classifiers__, + + package_dir = {'': 'src',}, + packages = find_packages('src'), + + include_package_data = True, + exclude_package_data = {'src':['*.c', '*.h', '*.pyx', '*.pxd', '*.g']}, + #data_files=['src/data',], + + install_requires=INSTALL_REQUIRES, + zip_safe = ZIP_SAFE, + + test_suite = "tests", + ) + diff --git a/src/owyl/__init__.py b/src/owyl/__init__.py index b0b6067..134374b 100644 --- a/src/owyl/__init__.py +++ b/src/owyl/__init__.py @@ -1,72 +1,73 @@ -# -*- coding: utf-8 -*- -"""owyl -- Owyl Behavior Trees - -Behavior Trees are a form of U{hierarchical -logic}, and are quite useful -and flexible for implementing game AI. - -Owyl implements a behavior tree using nested iterators/generators. A -top-level generator function, L{visit} (implementing the Visitor -Pattern), iterates through the tree, descending into child generators -as they are yielded, and passing yielded termination status values to -the parent generators. - -Nested Generators -================= - - Tasks in the behavior tree are implemented as iterators (typically - being generator functions wrapped by the L{task} decorator - function). - - The first call to the task is at tree-building time. All passed - arguments should be for static initialization. The task should - return a factory function which itself should return an iterator. - - The factory function should accept **kwargs, which should be - combined with the initialization keyword arugments and passed to the - iterator at construction time. - - The iterator must yield values of None, True, False, or a child - iterator. An iterator that yields child iterators must be ready to - accept values yielded by the child. (See Termination Status Values, - below.) - - -Termination Status Values -========================= - - As mentioned, iterators in the tree may yield values of None, True, - False, or a child iterator: - - - B{None:} May be used to defer execution for another pass from the - scheduler. An iterator yielding None will be queried again. - - - B{True:} Termination value signalling successful execution. - - - B{False:} Termination value signalling unsuccessful execution, or - failure. Note: this is not considered an error value. - - True errors or exceptions should C{raise} the appropriate C{Error} - or C{Exception}. - - For more information, see the discussion at - U{http://aigamedev.com/hierarchical-logic/termination-status}. - -For more information on Behavior Trees and hierarchical logic, please -see U{http://aigamedev.com/hierarchical-logic} and -U{http://aigamedev.com/hierarchical-logic/advice-2}. - -Copyright 2008 David Eyk. All rights reserved. - -$Author$\n -$Rev$\n -$Date$ -""" - -__author__ = "$Author$"[9:-2] -__revision__ = "$Rev$"[6:-2] -__date__ = "$Date$"[7:-2] - -from core import * -from decorators import * -from blackboard import * +# -*- coding: utf-8 -*- +"""owyl -- Owyl Behavior Trees + +Behavior Trees are a form of U{hierarchical +logic}, and are quite useful +and flexible for implementing game AI. + +Owyl implements a behavior tree using nested iterators/generators. A +top-level generator function, L{visit} (implementing the Visitor +Pattern), iterates through the tree, descending into child generators +as they are yielded, and passing yielded termination status values to +the parent generators. + +Nested Generators +================= + + Tasks in the behavior tree are implemented as iterators (typically + being generator functions wrapped by the L{task} decorator + function). + + The first call to the task is at tree-building time. All passed + arguments should be for static initialization. The task should + return a factory function which itself should return an iterator. + + The factory function should accept **kwargs, which should be + combined with the initialization keyword arugments and passed to the + iterator at construction time. + + The iterator must yield values of None, True, False, or a child + iterator. An iterator that yields child iterators must be ready to + accept values yielded by the child. (See Termination Status Values, + below.) + + +Termination Status Values +========================= + + As mentioned, iterators in the tree may yield values of None, True, + False, or a child iterator: + + - B{None:} May be used to defer execution for another pass from the + scheduler. An iterator yielding None will be queried again. + + - B{True:} Termination value signalling successful execution. + + - B{False:} Termination value signalling unsuccessful execution, or + failure. Note: this is not considered an error value. + + True errors or exceptions should C{raise} the appropriate C{Error} + or C{Exception}. + + For more information, see the discussion at + U{http://aigamedev.com/hierarchical-logic/termination-status}. + +For more information on Behavior Trees and hierarchical logic, please +see U{http://aigamedev.com/hierarchical-logic} and +U{http://aigamedev.com/hierarchical-logic/advice-2}. + +Copyright 2008 David Eyk. All rights reserved. + +$Author$\n +$Rev$\n +$Date$ +""" +from __future__ import absolute_import + +__author__ = "$Author$"[9:-2] +__revision__ = "$Rev$"[6:-2] +__date__ = "$Date$"[7:-2] + +from .core import * +from .decorators import * +from .blackboard import * diff --git a/src/owyl/blackboard.py b/src/owyl/blackboard.py index a1599c3..a3504b4 100644 --- a/src/owyl/blackboard.py +++ b/src/owyl/blackboard.py @@ -1,75 +1,76 @@ -# -*- coding: utf-8 -*- -"""blackboard -- basic blackboard behaviors for Owyl - - - -Copyright 2008 David Eyk. All rights reserved. - -$Author$\n -$Rev$\n -$Date$ -""" - -__author__ = "$Author$"[9:-2] -__revision__ = "$Rev$"[6:-2] -__date__ = "$Date$"[7:-2] - - -from collections import defaultdict - -import core - -__all__ = ['Blackboard', 'checkBB', 'setBB', ] - - -class Blackboard(defaultdict): - """A dict that defaults values to None. - - Blackboards are registered by name. All blackboards with the same - name have the same contents. - """ - _name_dict = defaultdict(dict) # For a twist on the Borg idiom - - def __init__(self, name, **kwargs): - self.__dict__ = Blackboard._name_dict[name] - - default = lambda: None - super(Blackboard, self).__init__(default, **kwargs) - - -@core.task -def checkBB(**kwargs): - """Check a value on the blackboard. - - @keyword blackboard: The blackboard object. - - @keyword key: The name of a key on the blackboard. - @type key: A hashable object - - @keyword check: A function that takes the value on the blackboard - and returns a boolean. - """ - bb = kwargs['blackboard'] - key = kwargs['key'] - check = kwargs.get('check', lambda x: x is not None) - value = bb[key] - result = check(value) and True or False # Always return a boolean. - yield result - - -@core.task -def setBB(**kwargs): - """Set a value on the blackboard. - - @keyword blackboard: The blackboard object. - - @keyword key: The name of a key on the blackboard. - @type key: A hashable object - - @keyword value: The value to set on the key. - """ - bb = kwargs['blackboard'] - key = kwargs['key'] - value = kwargs['value'] - bb[key] = value - yield True +# -*- coding: utf-8 -*- +"""blackboard -- basic blackboard behaviors for Owyl + + + +Copyright 2008 David Eyk. All rights reserved. + +$Author$\n +$Rev$\n +$Date$ +""" +from __future__ import absolute_import + +__author__ = "$Author$"[9:-2] +__revision__ = "$Rev$"[6:-2] +__date__ = "$Date$"[7:-2] + + +from collections import defaultdict + +from . import core + +__all__ = ['Blackboard', 'checkBB', 'setBB', ] + + +class Blackboard(defaultdict): + """A dict that defaults values to None. + + Blackboards are registered by name. All blackboards with the same + name have the same contents. + """ + _name_dict = defaultdict(dict) # For a twist on the Borg idiom + + def __init__(self, name, **kwargs): + self.__dict__ = Blackboard._name_dict[name] + + default = lambda: None + super(Blackboard, self).__init__(default, **kwargs) + + +@core.task +def checkBB(**kwargs): + """Check a value on the blackboard. + + @keyword blackboard: The blackboard object. + + @keyword key: The name of a key on the blackboard. + @type key: A hashable object + + @keyword check: A function that takes the value on the blackboard + and returns a boolean. + """ + bb = kwargs['blackboard'] + key = kwargs['key'] + check = kwargs.get('check', lambda x: x is not None) + value = bb[key] + result = check(value) and True or False # Always return a boolean. + yield result + + +@core.task +def setBB(**kwargs): + """Set a value on the blackboard. + + @keyword blackboard: The blackboard object. + + @keyword key: The name of a key on the blackboard. + @type key: A hashable object + + @keyword value: The value to set on the key. + """ + bb = kwargs['blackboard'] + key = kwargs['key'] + value = kwargs['value'] + bb[key] = value + yield True diff --git a/src/owyl/core.py b/src/owyl/core.py index 71e477d..53d5caf 100644 --- a/src/owyl/core.py +++ b/src/owyl/core.py @@ -1,490 +1,492 @@ -# -*- coding: utf-8 -*- -"""core -- core behaviors for Owyl. - -Copyright 2008 David Eyk. All rights reserved. - -$Author$\n -$Rev$\n -$Date$ -""" - -__author__ = "$Author$"[9:-2] -__revision__ = "$Rev$"[6:-2] -__date__ = "$Date$"[7:-2] - -import logging - -try: - from mx.Stack import Stack, EmptyError -except ImportError: - from stack import Stack, EmptyError - -RETURN_VALUES = set((True, False, None)) - -__all__ = ['wrap', 'task', 'taskmethod', 'parent_task', 'parent_taskmethod', 'visit', - 'succeed', 'fail', 'succeedAfter', 'failAfter', - 'sequence', 'selector', 'parallel', 'PARALLEL_SUCCESS', - 'queue', 'parallel_queue', - 'throw', 'catch', - 'log',] - - -def wrap(func, *args, **kwargs): - """Wrap a callable as a task. Yield the boolean of its result. - """ - def initTask(**initkwargs): - def makeIterator(**runkwargs): - result = func(*args, **kwargs) - yield bool(result) - try: makeIterator.__name__ = func.__name__ - except AttributeError: pass - try: makeIterator.__doc__ = func.__doc__ - except AttributeError: pass - return makeIterator - try: initTask.__doc__ = func.__doc__ - except AttributeError: pass - try: initTask.__name__ = func.__name__ - except AttributeError: pass - return initTask - - -def task(func): - """Task decorator. - - Decorate a generator function to produce a re-usable generator - factory for the given task. - """ - def initTask(**initkwargs): - def makeIterator(**runkwargs): - runkwargs.update(initkwargs) - iterator = func(**runkwargs) - return iterator - try: makeIterator.__name__ = func.__name__ - except AttributeError: pass - try: makeIterator.__doc__ = func.__doc__ - except AttributeError: pass - return makeIterator - try: initTask.__doc__ = func.__doc__ - except AttributeError: pass - try: initTask.__name__ = func.__name__ - except AttributeError: pass - return initTask - - -def taskmethod(func): - """Task decorator. - - Decorate a generator function to produce a re-usable generator - factory for the given task. - """ - def initTask(self, **initkwargs): - def makeIterator(**runkwargs): - runkwargs.update(initkwargs) - iterator = func(self, **runkwargs) - return iterator - try: makeIterator.__name__ = func.__name__ - except AttributeError: pass - try: makeIterator.__doc__ = func.__doc__ - except AttributeError: pass - return makeIterator - try: initTask.__doc__ = func.__doc__ - except AttributeError: pass - try: initTask.__name__ = func.__name__ - except AttributeError: pass - return initTask - - -def parent_task(func): - """Parent task decorator. - - A parent task is a task that accepts children. - - Decorate a generator function to produce a re-usable generator - factory for the given task. - """ - def initTask(*children, **initkwargs): - def makeIterator(**runkwargs): - runkwargs.update(initkwargs) - iterator = func(*children, **runkwargs) - return iterator - try: makeIterator.__name__ = func.__name__ - except AttributeError: pass - try: makeIterator.__doc__ = func.__doc__ - except AttributeError: pass - return makeIterator - try: initTask.__doc__ = func.__doc__ - except AttributeError: pass - try: initTask.__name__ = func.__name__ - except AttributeError: pass - return initTask - - -def parent_taskmethod(func): - """Parent task decorator. - - A parent task is a task that accepts children. - - Decorate a generator function to produce a re-usable generator - factory for the given task. - """ - def initTask(self, *children, **initkwargs): - def makeIterator(**runkwargs): - runkwargs.update(initkwargs) - iterator = func(self, *children, **runkwargs) - return iterator - try: makeIterator.__name__ = func.__name__ - except AttributeError: pass - try: makeIterator.__doc__ = func.__doc__ - except AttributeError: pass - return makeIterator - try: initTask.__doc__ = func.__doc__ - except AttributeError: pass - try: initTask.__name__ = func.__name__ - except AttributeError: pass - return initTask - - -def visit(tree, **kwargs): - """Iterate over a tree of nested iterators. - - Apply the U{Visitor - Pattern} to a tree - of nested iterators. Iterators should yield True, False, None, or - a child iterator. Values of True or False are passed back to the - parent iterator. A value of None is silently ignored, and the - current iterator will be queried again on the next pass. - - The visitor will yield None until the tree raises StopIteration, - upon which the visitor will yield the last value yielded by the - tree, and terminate itself with StopIteration. - - The visitor is essentially a micro-scheduler for a Behavior Tree - implemented as a tree of nested iterators. For more information, - see the discussion at - U{http://aigamedev.com/programming-tips/scheduler}. - """ - s = Stack() - return_values = RETURN_VALUES - - current = tree(**kwargs) - send_value = None - send_ok = False - while True: - try: - if send_ok: - child = current.send(send_value) - send_value = None - send_ok = False - else: - child = current.next() - - if child in return_values: - send_value = child - yield send_value - else: - # Descend into child node - s.push(current) - current = child - - except StopIteration: - try: - current = s.pop() - send_ok = True - except EmptyError: - raise StopIteration - - -@task -def succeed(**kwargs): - """Always succeed. - """ - yield True - - -@task -def fail(**kwargs): - """Always fail. - """ - yield False - - -@task -def stall(**kwargs): - """Wrap a callable as a task. Yield the boolean of its result after 'after' iterations. - - Yields 'None' 'after' times. - - @keyword func: The callable to run. - @type func: callable - - @keyword after: Run the callable after this many iterations. - @type after: int - """ - func = kwargs.pop('func') - after = kwargs.pop('after', 1) - for x in xrange(after): - yield None - yield bool(func()) - - -@task -def succeedAfter(**kwargs): - """Succeed after a given number of iterations. - - Yields 'None' 'after' times. - - @keyword after: How many iterations to succeed after. - @type after: int - """ - after = kwargs.pop('after', 1) - for x in xrange(after): - yield None - yield True - - -@task -def failAfter(**kwargs): - """Fail after a given number of iterations. - - Yields 'None' 'after' times. - - @keyword after: How many iterations to fail after. - @type after: int - """ - after = kwargs.pop('after', 1) - for x in xrange(after): - yield None - yield False - - -@parent_task -def sequence(*children, **kwargs): - """Run tasks in sequence until one fails. - - The sequence will run each task in sequence until one fails, - returning a failure. If all fail, returns a success. - - For more information, see the discussion at - U{http://aigamedev.com/hierarchical-logic/sequence}. - - @param children: tasks to run in sequence as children. - """ - final_value = True - for child in children: - result = yield child(**kwargs) - if not result and result is not None: - final_value = False - break - - yield final_value - - -@parent_task -def queue(queue, **kwargs): - """Run tasks in the queue in sequence. - - The queue will run each task in the queue in sequence. If the - queue is empty, it will stall until the queue receives new items. - - Note: the queue task *never* returns a success or failure code. - - The queue should be an object implementing pop(). If the queue has - items in it, it should evaluate to True, otherwise False. The - queue task will pop the next task in the queue and evaluate it in - the normal fashion. - - @param queue: task queue. - @type queue: A sequence object implementing pop() - """ - while True: - if queue: - child = queue.pop() - yield child(**kwargs) - else: - yield None - - -@parent_task -def parallel_queue(queue, **kwargs): - """Run tasks in the queue in parallel. - - The queue will run each task in the queue in parallel. If the - queue is empty, it will stall until the queue receives new items. - - Note: the queue task *never* returns a success or failure code. - - The queue should be an object implementing pop(). If the queue has - items in it, it should evaluate to True, otherwise False. The - queue task will pop the next task in the queue and evaluate it in - the normal fashion. - - @param queue: task queue. - """ - visits = [] # Canonical list of visited children - visiting = [] # Working list of visited children - while True: - if queue: - child = queue.pop() - visits.append(visit(child, **kwargs)) - visiting[:] = visits # Se we can remove from visits - for child in visiting: - try: - child.next() - except StopIteration: - visits.remove(child) - yield None - - -@parent_task -def selector(*children, **kwargs): - """Run tasks in sequence until one succeeds. - - The selector will run each task in sequence until one succeeds, - returning a success. If all fail, returns a failure. - - For more information, see the discussion at - U{http://aigamedev.com/hierarchical-logic/selector}. - - @param children: child tasks to select from. - """ - final_value = False - for child in children: - result = (yield child(**kwargs)) - if result: - final_value = True - break - - yield final_value - - -class Enum(object): - """Enum/namespace class. Cannot be implemented. - - Subclass and add class variables. - """ - def __init__(self): - raise NotImplementedError("_Enum class object. Do not instantiate.") - - -class PARALLEL_SUCCESS(Enum): - """Success policy enumerator for parallel behavior. - - C{REQUIRE_ALL}: All child tasks must succeed. - C{REQUIRE_ONE}: Only one child task must succeed. - """ - REQUIRE_ALL = "ALL" - REQUIRE_ONE = "ONE" - - -@parent_task -def parallel(*children, **kwargs): - """Run tasks in parallel until the success policy is fulfilled or broken. - - If the success policy is met, return a success. If the policy is - broken, return a failure. - - For more information, see the discussion at - U{aigamedev.com/hierarchical-logic/parallel}. - - @param children: tasks to run in parallel as children. - - @keyword policy: The success policy. All must succeed, - or only one must succeed. - @type policy: C{PARALLEL_SUCCESS.REQUIRE_ALL} or - C{PARALLEL_SUCCESS.REQUIRE_ONE}. - """ - return_values = set((True, False)) - policy = kwargs.pop('policy', PARALLEL_SUCCESS.REQUIRE_ONE) - all_must_succeed = (policy == PARALLEL_SUCCESS.REQUIRE_ALL) - visits = [visit(arg, **kwargs) for arg in children] - final_value = True - while True: - try: - # Run one step on each child per iteration. - for child in visits: - result = child.next() - if result in return_values: - if not result and all_must_succeed: - final_value = False - break - elif result and not all_must_succeed: - final_value = True - break - else: - final_value = result - yield None - except StopIteration: - break - except EmptyError: - break - yield final_value - - -@task -def throw(**kwargs): - """Throw (raise) an exception. - - @keyword throws: An Exception to throw. - @type throws: C{Exception} - - @keyword throws_message: Text to instantiate C{throws} with. - @type throws_message: C{str} - """ - throws = kwargs.pop('throws', Exception) - throws_message = kwargs.pop('throws_message', '') - - class gen(object): - def __iter__(self): - return self - - def next(self): - raise throws(throws_message) - - return gen() - - -@parent_task -def catch(child, **kwargs): - """Catch a raised exception from child and run an alternate branch. - - Note: this will not catch exceptions raised in the branch. - - @keyword caught: An Exception to catch. - @type caught: C{Exception} - - @keyword branch: An alternate tree to visit when caught. - """ - caught = kwargs.pop('caught', Exception) - branch = kwargs.pop('branch', fail()) - - result = None - tree = visit(child, **kwargs) - try: - while result is None: - result = tree.next() - yield None - except caught: - while result is None: - result = (yield branch(**kwargs)) - yield result - - -@parent_task -def log(message, **kwargs): - """Log a message to the given logger. - - @keyword name: The name of the logger to use. - @type name: str - - @keyword level: The logging level to use. - @default level: logging.DEBUG - """ - name = kwargs.pop('name', None) - if name is None: - logger = logging.getLogger() - else: - logger = logging.getLogger(name) - - level = kwargs.pop('level', logging.DEBUG) - logger.log(level, message) - yield True +# -*- coding: utf-8 -*- +"""core -- core behaviors for Owyl. + +Copyright 2008 David Eyk. All rights reserved. + +$Author$\n +$Rev$\n +$Date$ +""" +from __future__ import absolute_import + +__author__ = "$Author$"[9:-2] +__revision__ = "$Rev$"[6:-2] +__date__ = "$Date$"[7:-2] + +import logging + +try: + from mx.Stack import Stack, EmptyError +except ImportError: + from .stack import Stack, EmptyError + +RETURN_VALUES = set((True, False, None)) + +__all__ = ['wrap', 'task', 'taskmethod', 'parent_task', 'parent_taskmethod', 'visit', + 'succeed', 'fail', 'succeedAfter', 'failAfter', + 'sequence', 'selector', 'parallel', 'PARALLEL_SUCCESS', + 'queue', 'parallel_queue', + 'throw', 'catch', + 'log',] + + +def wrap(func, *args, **kwargs): + """Wrap a callable as a task. Yield the boolean of its result. + """ + def initTask(**initkwargs): + def makeIterator(**runkwargs): + result = func(*args, **kwargs) + yield bool(result) + try: makeIterator.__name__ = func.__name__ + except AttributeError: pass + try: makeIterator.__doc__ = func.__doc__ + except AttributeError: pass + return makeIterator + try: initTask.__doc__ = func.__doc__ + except AttributeError: pass + try: initTask.__name__ = func.__name__ + except AttributeError: pass + return initTask + + +def task(func): + """Task decorator. + + Decorate a generator function to produce a re-usable generator + factory for the given task. + """ + def initTask(**initkwargs): + def makeIterator(**runkwargs): + runkwargs.update(initkwargs) + iterator = func(**runkwargs) + return iterator + try: makeIterator.__name__ = func.__name__ + except AttributeError: pass + try: makeIterator.__doc__ = func.__doc__ + except AttributeError: pass + return makeIterator + try: initTask.__doc__ = func.__doc__ + except AttributeError: pass + try: initTask.__name__ = func.__name__ + except AttributeError: pass + return initTask + + +def taskmethod(func): + """Task decorator. + + Decorate a generator function to produce a re-usable generator + factory for the given task. + """ + def initTask(self, **initkwargs): + def makeIterator(**runkwargs): + runkwargs.update(initkwargs) + iterator = func(self, **runkwargs) + return iterator + try: makeIterator.__name__ = func.__name__ + except AttributeError: pass + try: makeIterator.__doc__ = func.__doc__ + except AttributeError: pass + return makeIterator + try: initTask.__doc__ = func.__doc__ + except AttributeError: pass + try: initTask.__name__ = func.__name__ + except AttributeError: pass + return initTask + + +def parent_task(func): + """Parent task decorator. + + A parent task is a task that accepts children. + + Decorate a generator function to produce a re-usable generator + factory for the given task. + """ + def initTask(*children, **initkwargs): + def makeIterator(**runkwargs): + runkwargs.update(initkwargs) + iterator = func(*children, **runkwargs) + return iterator + try: makeIterator.__name__ = func.__name__ + except AttributeError: pass + try: makeIterator.__doc__ = func.__doc__ + except AttributeError: pass + return makeIterator + try: initTask.__doc__ = func.__doc__ + except AttributeError: pass + try: initTask.__name__ = func.__name__ + except AttributeError: pass + return initTask + + +def parent_taskmethod(func): + """Parent task decorator. + + A parent task is a task that accepts children. + + Decorate a generator function to produce a re-usable generator + factory for the given task. + """ + def initTask(self, *children, **initkwargs): + def makeIterator(**runkwargs): + runkwargs.update(initkwargs) + iterator = func(self, *children, **runkwargs) + return iterator + try: makeIterator.__name__ = func.__name__ + except AttributeError: pass + try: makeIterator.__doc__ = func.__doc__ + except AttributeError: pass + return makeIterator + try: initTask.__doc__ = func.__doc__ + except AttributeError: pass + try: initTask.__name__ = func.__name__ + except AttributeError: pass + return initTask + + +def visit(tree, **kwargs): + """Iterate over a tree of nested iterators. + + Apply the U{Visitor + Pattern} to a tree + of nested iterators. Iterators should yield True, False, None, or + a child iterator. Values of True or False are passed back to the + parent iterator. A value of None is silently ignored, and the + current iterator will be queried again on the next pass. + + The visitor will yield None until the tree raises StopIteration, + upon which the visitor will yield the last value yielded by the + tree, and terminate itself with StopIteration. + + The visitor is essentially a micro-scheduler for a Behavior Tree + implemented as a tree of nested iterators. For more information, + see the discussion at + U{http://aigamedev.com/programming-tips/scheduler}. + """ + s = Stack() + return_values = RETURN_VALUES + + current = tree(**kwargs) + send_value = None + send_ok = False + while True: + try: + if send_ok: + child = current.send(send_value) + send_value = None + send_ok = False + else: + child = next(current) + + if child in return_values: + send_value = child + yield send_value + else: + # Descend into child node + s.push(current) + current = child + + except StopIteration: + try: + current = s.pop() + send_ok = True + except (EmptyError, IndexError) as e: + return + + +@task +def succeed(**kwargs): + """Always succeed. + """ + yield True + + +@task +def fail(**kwargs): + """Always fail. + """ + yield False + + +@task +def stall(**kwargs): + """Wrap a callable as a task. Yield the boolean of its result after 'after' iterations. + + Yields 'None' 'after' times. + + @keyword func: The callable to run. + @type func: callable + + @keyword after: Run the callable after this many iterations. + @type after: int + """ + func = kwargs.pop('func') + after = kwargs.pop('after', 1) + for x in range(after): + yield None + yield bool(func()) + + +@task +def succeedAfter(**kwargs): + """Succeed after a given number of iterations. + + Yields 'None' 'after' times. + + @keyword after: How many iterations to succeed after. + @type after: int + """ + after = kwargs.pop('after', 1) + for x in range(after): + yield None + yield True + + +@task +def failAfter(**kwargs): + """Fail after a given number of iterations. + + Yields 'None' 'after' times. + + @keyword after: How many iterations to fail after. + @type after: int + """ + after = kwargs.pop('after', 1) + for x in range(after): + yield None + yield False + + +@parent_task +def sequence(*children, **kwargs): + """Run tasks in sequence until one fails. + + The sequence will run each task in sequence until one fails, + returning a failure. If all fail, returns a success. + + For more information, see the discussion at + U{http://aigamedev.com/hierarchical-logic/sequence}. + + @param children: tasks to run in sequence as children. + """ + final_value = True + for child in children: + result = yield child(**kwargs) + if not result and result is not None: + final_value = False + break + + yield final_value + + +@parent_task +def queue(queue, **kwargs): + """Run tasks in the queue in sequence. + + The queue will run each task in the queue in sequence. If the + queue is empty, it will stall until the queue receives new items. + + Note: the queue task *never* returns a success or failure code. + + The queue should be an object implementing pop(). If the queue has + items in it, it should evaluate to True, otherwise False. The + queue task will pop the next task in the queue and evaluate it in + the normal fashion. + + @param queue: task queue. + @type queue: A sequence object implementing pop() + """ + while True: + if queue: + child = queue.pop() + yield child(**kwargs) + else: + yield None + + +@parent_task +def parallel_queue(queue, **kwargs): + """Run tasks in the queue in parallel. + + The queue will run each task in the queue in parallel. If the + queue is empty, it will stall until the queue receives new items. + + Note: the queue task *never* returns a success or failure code. + + The queue should be an object implementing pop(). If the queue has + items in it, it should evaluate to True, otherwise False. The + queue task will pop the next task in the queue and evaluate it in + the normal fashion. + + @param queue: task queue. + """ + visits = [] # Canonical list of visited children + visiting = [] # Working list of visited children + while True: + if queue: + child = queue.pop() + visits.append(visit(child, **kwargs)) + visiting[:] = visits # Se we can remove from visits + for child in visiting: + try: + next(child) + except StopIteration: + visits.remove(child) + yield None + + +@parent_task +def selector(*children, **kwargs): + """Run tasks in sequence until one succeeds. + + The selector will run each task in sequence until one succeeds, + returning a success. If all fail, returns a failure. + + For more information, see the discussion at + U{http://aigamedev.com/hierarchical-logic/selector}. + + @param children: child tasks to select from. + """ + final_value = False + for child in children: + result = (yield child(**kwargs)) + if result: + final_value = True + break + + yield final_value + + +class Enum(object): + """Enum/namespace class. Cannot be implemented. + + Subclass and add class variables. + """ + def __init__(self): + raise NotImplementedError("_Enum class object. Do not instantiate.") + + +class PARALLEL_SUCCESS(Enum): + """Success policy enumerator for parallel behavior. + + C{REQUIRE_ALL}: All child tasks must succeed. + C{REQUIRE_ONE}: Only one child task must succeed. + """ + REQUIRE_ALL = "ALL" + REQUIRE_ONE = "ONE" + + +@parent_task +def parallel(*children, **kwargs): + """Run tasks in parallel until the success policy is fulfilled or broken. + + If the success policy is met, return a success. If the policy is + broken, return a failure. + + For more information, see the discussion at + U{aigamedev.com/hierarchical-logic/parallel}. + + @param children: tasks to run in parallel as children. + + @keyword policy: The success policy. All must succeed, + or only one must succeed. + @type policy: C{PARALLEL_SUCCESS.REQUIRE_ALL} or + C{PARALLEL_SUCCESS.REQUIRE_ONE}. + """ + return_values = set((True, False)) + policy = kwargs.pop('policy', PARALLEL_SUCCESS.REQUIRE_ONE) + all_must_succeed = (policy == PARALLEL_SUCCESS.REQUIRE_ALL) + visits = [visit(arg, **kwargs) for arg in children] + final_value = True + while True: + try: + # Run one step on each child per iteration. + for child in visits: + result = next(child) + if result in return_values: + if not result and all_must_succeed: + final_value = False + break + elif result and not all_must_succeed: + final_value = True + break + else: + final_value = result + yield None + except (EmptyError, IndexError): + break + except StopIteration: + break + yield final_value + + +@task +def throw(**kwargs): + """Throw (raise) an exception. + + @keyword throws: An Exception to throw. + @type throws: C{Exception} + + @keyword throws_message: Text to instantiate C{throws} with. + @type throws_message: C{str} + """ + throws = kwargs.pop('throws', Exception) + throws_message = kwargs.pop('throws_message', '') + + class gen(object): + def __iter__(self): + return self + + def __next__(self): + raise throws(throws_message) + + next = __next__ + return gen() + + +@parent_task +def catch(child, **kwargs): + """Catch a raised exception from child and run an alternate branch. + + Note: this will not catch exceptions raised in the branch. + + @keyword caught: An Exception to catch. + @type caught: C{Exception} + + @keyword branch: An alternate tree to visit when caught. + """ + caught = kwargs.pop('caught', Exception) + branch = kwargs.pop('branch', fail()) + + result = None + tree = visit(child, **kwargs) + try: + while result is None: + result = next(tree) + yield None + except caught: + while result is None: + result = (yield branch(**kwargs)) + yield result + + +@parent_task +def log(message, **kwargs): + """Log a message to the given logger. + + @keyword name: The name of the logger to use. + @type name: str + + @keyword level: The logging level to use. + @default level: logging.DEBUG + """ + name = kwargs.pop('name', None) + if name is None: + logger = logging.getLogger() + else: + logger = logging.getLogger(name) + + level = kwargs.pop('level', logging.DEBUG) + logger.log(level, message) + yield True diff --git a/src/owyl/decorators.py b/src/owyl/decorators.py index 21fc75f..18dfe4b 100644 --- a/src/owyl/decorators.py +++ b/src/owyl/decorators.py @@ -1,129 +1,130 @@ -# -*- coding: utf-8 -*- -"""decorators -- decorators for owyl behavior trees. - - - -Copyright 2008 David Eyk. All rights reserved. - -$Author$\n -$Rev$\n -$Date$ -""" - -__author__ = "$Author$"[9:-2] -__revision__ = "$Rev$"[6:-2] -__date__ = "$Date$"[7:-2] - -import time - -import core - -__all__ = ['identity', 'repeatUntilFail', 'repeatUntilSucceed', - 'flip', 'repeatAlways', 'limit'] - -@core.parent_task -def identity(child, **kwargs): - """Transparent decorator. Pass yielded values from child unchanged. - """ - result = None - child = child(**kwargs) - while result is None: - result = (yield child) - yield result - -@core.parent_task -def flip(child, **kwargs): - """NOT decorator. Pass yielded values from child with the boolean flipped. - - Yielded values of "None" are passed unchanged. - """ - result = None - child = child(**kwargs) - while result is None: - result = (yield child) - yield not result - -@core.parent_task -def repeatAlways(child, **kwargs): - """Perpetually iterate over the child, regardless of return value. - """ - result = None - while True: - try: - visitor = core.visit(child, **kwargs) - except StopIteration: - continue - while result is None: - try: - result = (yield visitor.next()) - except StopIteration: - yield None - break - - -@core.parent_task -def repeatUntilFail(child, **kwargs): - """Repeatedly iterate over the child until it fails. - - @keyword final_value: Value to return on failure. - @type final_value: C{True} or C{False} - """ - final_value = kwargs.pop('final_value', False) - result = None - child = child(**kwargs) - while result is None: - try: - result = (yield child) - if result is False: - break - else: - yield None # Yield to other tasks. - result = None - except StopIteration: - result = None - yield final_value - -@core.parent_task -def repeatUntilSucceed(child, **kwargs): - """Repeatedly iterate over the child until it succeeds. - - @keyword final_value: Value to return on failure. - @type final_value: C{True} or C{False} - """ - final_value = kwargs.pop('final_value', True) - result = None - child = child(**kwargs) - while result is None: - try: - result = (yield child) - if result is True: - break - else: - yield None # Yield to other tasks. - result = None - except StopIteration: - result = None - yield final_value - -@core.parent_task -def limit(child, **kwargs): - """Limit the child to only iterate once every period. - - Otherwise, act as an identity decorator. - - @keyword limit_period: how often to run the child, in seconds. - """ - nowtime = time.time - last_run = nowtime() - period = kwargs.get('limit_period', 1.0) - result = None - visitor = core.visit(child, **kwargs) - while True: - now = nowtime() - since_last = now - last_run - if (since_last) <= period: - yield None - continue - last_run = nowtime() - result = visitor.next() - yield result +# -*- coding: utf-8 -*- +"""decorators -- decorators for owyl behavior trees. + + + +Copyright 2008 David Eyk. All rights reserved. + +$Author$\n +$Rev$\n +$Date$ +""" +from __future__ import absolute_import + +__author__ = "$Author$"[9:-2] +__revision__ = "$Rev$"[6:-2] +__date__ = "$Date$"[7:-2] + +import time + +from . import core + +__all__ = ['identity', 'repeatUntilFail', 'repeatUntilSucceed', + 'flip', 'repeatAlways', 'limit'] + +@core.parent_task +def identity(child, **kwargs): + """Transparent decorator. Pass yielded values from child unchanged. + """ + result = None + child = child(**kwargs) + while result is None: + result = (yield child) + yield result + +@core.parent_task +def flip(child, **kwargs): + """NOT decorator. Pass yielded values from child with the boolean flipped. + + Yielded values of "None" are passed unchanged. + """ + result = None + child = child(**kwargs) + while result is None: + result = (yield child) + yield not result + +@core.parent_task +def repeatAlways(child, **kwargs): + """Perpetually iterate over the child, regardless of return value. + """ + result = None + while True: + try: + visitor = core.visit(child, **kwargs) + except StopIteration: + continue + while result is None: + try: + result = (yield next(visitor)) + except StopIteration: + yield None + break + + +@core.parent_task +def repeatUntilFail(child, **kwargs): + """Repeatedly iterate over the child until it fails. + + @keyword final_value: Value to return on failure. + @type final_value: C{True} or C{False} + """ + final_value = kwargs.pop('final_value', False) + result = None + child = child(**kwargs) + while result is None: + try: + result = (yield child) + if result is False: + break + else: + yield None # Yield to other tasks. + result = None + except StopIteration: + result = None + yield final_value + +@core.parent_task +def repeatUntilSucceed(child, **kwargs): + """Repeatedly iterate over the child until it succeeds. + + @keyword final_value: Value to return on failure. + @type final_value: C{True} or C{False} + """ + final_value = kwargs.pop('final_value', True) + result = None + child = child(**kwargs) + while result is None: + try: + result = (yield child) + if result is True: + break + else: + yield None # Yield to other tasks. + result = None + except StopIteration: + result = None + yield final_value + +@core.parent_task +def limit(child, **kwargs): + """Limit the child to only iterate once every period. + + Otherwise, act as an identity decorator. + + @keyword limit_period: how often to run the child, in seconds. + """ + nowtime = time.time + last_run = nowtime() + period = kwargs.get('limit_period', 1.0) + result = None + visitor = core.visit(child, **kwargs) + while True: + now = nowtime() + since_last = now - last_run + if (since_last) <= period: + yield None + continue + last_run = nowtime() + result = next(visitor) + yield result diff --git a/src/owyl/dsl.py b/src/owyl/dsl.py index 313bdba..f90b366 100644 --- a/src/owyl/dsl.py +++ b/src/owyl/dsl.py @@ -1,3 +1,3 @@ -# -*- coding: utf-8 -*- -"""owyl.dsl -- a domain-specific language (based on YAML) for behavior trees. -""" +# -*- coding: utf-8 -*- +"""owyl.dsl -- a domain-specific language (based on YAML) for behavior trees. +""" diff --git a/src/owyl/stack.py b/src/owyl/stack.py index 955e27d..b0b5689 100644 --- a/src/owyl/stack.py +++ b/src/owyl/stack.py @@ -1,24 +1,24 @@ -# -*- coding: utf-8 -*- -"""stack -- stack implementation for owyl - - - -Copyright 2008 David Eyk. All rights reserved. - -$Author$\n -$Rev$\n -$Date$ -""" - -__author__ = "$Author$"[9:-2] -__revision__ = "$Rev$"[6:-2] -__date__ = "$Date$"[7:-2] - -__all__ = "EmptyError Stack".split() - -EmptyError = IndexError - -class Stack(list): - """A list with a push method. - """ - push = list.append +# -*- coding: utf-8 -*- +"""stack -- stack implementation for owyl + + + +Copyright 2008 David Eyk. All rights reserved. + +$Author$\n +$Rev$\n +$Date$ +""" + +__author__ = "$Author$"[9:-2] +__revision__ = "$Rev$"[6:-2] +__date__ = "$Date$"[7:-2] + +__all__ = ["EmptyError", "Stack"] + +EmptyError = IndexError + +class Stack(list): + """A list with a push method. + """ + push = list.append diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/testowyl.py b/tests/testowyl.py index d6ad4b0..773c0f9 100644 --- a/tests/testowyl.py +++ b/tests/testowyl.py @@ -1,403 +1,405 @@ -# -*- coding: utf-8 -*- -"""testowyl -- some tests for owyl. - -Copyright 2008 David Eyk. All rights reserved. - -$Author$\n -$Rev$\n -$Date$ -""" - -__author__ = "$Author$"[9:-2] -__revision__ = "$Rev$"[6:-2] -__date__ = "$Date$"[7:-2] - -import unittest - -import owyl -from owyl import blackboard - - -class OwylTests(unittest.TestCase): - """Tests for Owyl. - - Note: tests should run the tree twice to make sure that the - constructed tree is re-usable. - """ - def testSucceed(self): - """Can we succeed? - """ - s = owyl.succeed() - t = s() - self.assertEqual(t.next(), True) - self.assertRaises(StopIteration, t.next) - - t = s() - self.assertEqual(t.next(), True) - self.assertRaises(StopIteration, t.next) - - def testFail(self): - """Can we fail? - """ - s = owyl.fail() - t = s() - self.assertEqual(t.next(), False) - self.assertRaises(StopIteration, t.next) - - t = s() - self.assertEqual(t.next(), False) - self.assertRaises(StopIteration, t.next) - - def testVisitSequenceSuccess(self): - """Can we visit a successful sequence? - """ - tree = owyl.sequence(owyl.succeed(), - owyl.succeed(), - owyl.succeed()) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [True, True, True, True]) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [True, True, True, True]) - - def testVisitSequenceFailure(self): - """Can we visit a failing sequence? - """ - tree = owyl.sequence(owyl.succeed(), - owyl.succeed(), - owyl.fail(), - owyl.succeed()) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [True, True, False, False]) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [True, True, False, False]) - - def testVisitSelectorSuccess(self): - """Can we visit a successful selector? - """ - tree = owyl.selector(owyl.fail(), - owyl.fail(), - owyl.succeed(), - owyl.fail()) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [False, False, True, True]) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [False, False, True, True]) - - def testVisitSelectorFailure(self): - """Can we visit a failing selector? - """ - tree = owyl.selector(owyl.fail(), - owyl.fail(), - owyl.fail()) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [False, False, False, False]) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [False, False, False, False]) - - def testParallel_AllSucceed_Success(self): - """Can we visit a suceeding parallel (all succeed)? - """ - tree = owyl.parallel(owyl.sequence(owyl.succeed(), - owyl.succeed()), - owyl.sequence(owyl.succeed(), - owyl.succeed()), - policy=owyl.PARALLEL_SUCCESS.REQUIRE_ALL) - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [True]) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [True]) - - def testParallel_OneSucceeds_Success(self): - """Can we visit a suceeding parallel (one succeeds)? - """ - tree = owyl.parallel(owyl.sequence(owyl.succeed(), - owyl.succeed()), - owyl.sequence(owyl.succeed(), - owyl.fail()), - policy=owyl.PARALLEL_SUCCESS.REQUIRE_ONE) - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [True]) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [True]) - - def testParallel_AllSucceed_Failure(self): - """Can we visit a failing parallel (all succeed)? - """ - tree = owyl.parallel(owyl.sequence(owyl.succeed(), - owyl.fail()), - owyl.sequence(owyl.succeed(), - owyl.succeed()), - policy=owyl.PARALLEL_SUCCESS.REQUIRE_ALL) - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [False]) - - def testParallel_OneSucceeds_Failure(self): - """Can we visit a failing parallel (one succeeds)? - """ - tree = owyl.parallel(owyl.sequence(owyl.fail(), - owyl.fail()), - owyl.sequence(owyl.fail(), - owyl.fail()), - policy=owyl.PARALLEL_SUCCESS.REQUIRE_ONE) - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [False]) - - v = owyl.visit(tree) - - results = [x for x in v if x is not None] - self.assertEqual(results, [False]) - - def testThrow(self): - """Can we throw an exception within the tree? - """ - tree = owyl.sequence(owyl.succeed(), - owyl.succeed(), - owyl.throw(throws=ValueError, - throws_message="AUGH!!"), - ) - v = owyl.visit(tree) - self.assertEqual(v.next(), True) - self.assertEqual(v.next(), True) - self.assertRaises(ValueError, v.next) - - v = owyl.visit(tree) - self.assertEqual(v.next(), True) - self.assertEqual(v.next(), True) - self.assertRaises(ValueError, v.next) - - def testCatch(self): - """Can we catch an exception thrown within the tree? - """ - tree = owyl.sequence(owyl.succeed(), - owyl.succeed(), - owyl.catch(owyl.throw(throws=ValueError, - throws_message="AUGH!!"), - caught=ValueError, - branch=owyl.succeed()) - ) - v = owyl.visit(tree) - self.assertEqual(v.next(), True) - self.assertEqual(v.next(), True) - self.assertEqual(v.next(), True) - - v = owyl.visit(tree) - self.assertEqual(v.next(), True) - self.assertEqual(v.next(), True) - self.assertEqual(v.next(), True) - - def testCatchIgnoresOthers(self): - """Does catch ignore other exceptions thrown within the tree? - """ - tree = owyl.sequence(owyl.succeed(), - owyl.succeed(), - owyl.catch(owyl.throw(throws=ValueError, - throws_message="AUGH!!"), - caught=IndexError, - branch=owyl.succeed()) - ) - v = owyl.visit(tree) - self.assertEqual(v.next(), True) - self.assertEqual(v.next(), True) - self.assertRaises(ValueError, v.next) - - v = owyl.visit(tree) - self.assertEqual(v.next(), True) - self.assertEqual(v.next(), True) - self.assertRaises(ValueError, v.next) - - def testIdentity(self): - """Does identity pass on return values unchanged? - """ - # Succeed after 5 iterations. - after = 5 - tree = owyl.identity(owyl.succeedAfter(after=after)) - - v = owyl.visit(tree) - for x in xrange(after): - self.assertEqual(v.next(), None) - self.assertEqual(v.next(), True) - - v = owyl.visit(tree) - for x in xrange(after): - self.assertEqual(v.next(), None) - self.assertEqual(v.next(), True) - - tree = owyl.identity(owyl.failAfter(after=after)) - - v = owyl.visit(tree) - for x in xrange(after): - self.assertEqual(v.next(), None) - self.assertEqual(v.next(), False) - - v = owyl.visit(tree) - for x in xrange(after): - self.assertEqual(v.next(), None) - self.assertEqual(v.next(), False) - - def testCheckBB(self): - """Can we check a value on a blackboard? - """ - value = "foo" - checker = lambda x: x == value - - bb = blackboard.Blackboard('test', value=value) - tree = blackboard.checkBB(key='value', - check=checker) - - # Note that we can pass in the blackboard at run-time. - v = owyl.visit(tree, blackboard=bb) - - # Check should succeed. - self.assertEqual(v.next(), True) - - v = owyl.visit(tree, blackboard=bb) - self.assertEqual(v.next(), True) - - bb['value'] = 'bar' - - # Check should now fail. - v = owyl.visit(tree, blackboard=bb) - self.assertEqual(v.next(), False) - - v = owyl.visit(tree, blackboard=bb) - self.assertEqual(v.next(), False) - - def testSetBB(self): - """Can we set a value on a blackboard? - """ - value = 'foo' - checker = lambda x: x == value - - bb = blackboard.Blackboard('test', value='bar') - tree = owyl.sequence(blackboard.setBB(key="value", - value=value), - blackboard.checkBB(key='value', - check=checker) - ) - - # Note that we can pass in the blackboard at run-time. - v = owyl.visit(tree, blackboard=bb) - - # Sequence will succeed if the check succeeds. - result = [x for x in v][-1] - self.assertEqual(result, True) - - v = owyl.visit(tree, blackboard=bb) - result = [x for x in v][-1] - self.assertEqual(result, True) - - def testRepeatUntilSucceed(self): - """Can we repeat a behavior until it succeeds? - """ - bb = blackboard.Blackboard('test', ) # 'value' defaults to None. - checker = lambda x: x is not None - - parallel = owyl.parallel - repeat = owyl.repeatUntilSucceed - checkBB = blackboard.checkBB - setBB = blackboard.setBB - - tree = parallel(repeat(checkBB(key='value', - check=checker), - final_value=True), - - # That should fail until this sets the value: - owyl.selector(owyl.fail(), - owyl.fail(), - setBB(key='value', - value='foo')), - policy=owyl.PARALLEL_SUCCESS.REQUIRE_ALL) - - v = owyl.visit(tree, blackboard=bb) - results = [x for x in v] - result = results[-1] - self.assertEqual(result, True) - - # Need to reset the blackboard to get the same results. - bb = blackboard.Blackboard('test', ) # 'value' defaults to None. - v = owyl.visit(tree, blackboard=bb) - results = [x for x in v] - result = results[-1] - self.assertEqual(result, True) - - def testRepeatUntilFail(self): - """Can we repeat a behavior until it fails? - """ - bb = blackboard.Blackboard('test', value="foo") - checker = lambda x: x and True or False # must eval to True - - parallel = owyl.parallel - repeat = owyl.repeatUntilFail - checkBB = blackboard.checkBB - setBB = blackboard.setBB - - tree = parallel(repeat(checkBB(key='value', - check=checker), - final_value=True), - - # That should succeed until this sets the value: - owyl.selector(owyl.fail(), - owyl.fail(), - setBB(key='value', - value=None)), - policy=owyl.PARALLEL_SUCCESS.REQUIRE_ALL) - - v = owyl.visit(tree, blackboard=bb) - results = [x for x in v] - result = results[-1] - self.assertEqual(result, True) - - # Need to reset the blackboard to get the same results. - bb = blackboard.Blackboard('test', value="foo") - v = owyl.visit(tree, blackboard=bb) - results = [x for x in v] - result = results[-1] - self.assertEqual(result, True) - - -if __name__ == "__main__": - runner = unittest - try: - import testoob - runner = testoob - except ImportError: - pass - runner.main() +# -*- coding: utf-8 -*- +"""testowyl -- some tests for owyl. + +Copyright 2008 David Eyk. All rights reserved. + +$Author$\n +$Rev$\n +$Date$ +""" + +__author__ = "$Author$"[9:-2] +__revision__ = "$Rev$"[6:-2] +__date__ = "$Date$"[7:-2] + +import unittest +import os, sys +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src')) + +import owyl +from owyl import blackboard + + +class OwylTests(unittest.TestCase): + """Tests for Owyl. + + Note: tests should run the tree twice to make sure that the + constructed tree is re-usable. + """ + def testSucceed(self): + """Can we succeed? + """ + s = owyl.succeed() + t = s() + self.assertEqual(next(t), True) + self.assertRaises(StopIteration, lambda: next(t)) + + t = s() + self.assertEqual(next(t), True) + self.assertRaises(StopIteration, lambda: next(t)) + + def testFail(self): + """Can we fail? + """ + s = owyl.fail() + t = s() + self.assertEqual(next(t), False) + self.assertRaises(StopIteration, lambda: next(t)) + + t = s() + self.assertEqual(next(t), False) + self.assertRaises(StopIteration, lambda: next(t)) + + def testVisitSequenceSuccess(self): + """Can we visit a successful sequence? + """ + tree = owyl.sequence(owyl.succeed(), + owyl.succeed(), + owyl.succeed()) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [True, True, True, True]) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [True, True, True, True]) + + def testVisitSequenceFailure(self): + """Can we visit a failing sequence? + """ + tree = owyl.sequence(owyl.succeed(), + owyl.succeed(), + owyl.fail(), + owyl.succeed()) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [True, True, False, False]) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [True, True, False, False]) + + def testVisitSelectorSuccess(self): + """Can we visit a successful selector? + """ + tree = owyl.selector(owyl.fail(), + owyl.fail(), + owyl.succeed(), + owyl.fail()) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [False, False, True, True]) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [False, False, True, True]) + + def testVisitSelectorFailure(self): + """Can we visit a failing selector? + """ + tree = owyl.selector(owyl.fail(), + owyl.fail(), + owyl.fail()) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [False, False, False, False]) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [False, False, False, False]) + + def testParallel_AllSucceed_Success(self): + """Can we visit a succeeding parallel (all succeed)? + """ + tree = owyl.parallel(owyl.sequence(owyl.succeed(), + owyl.succeed()), + owyl.sequence(owyl.succeed(), + owyl.succeed()), + policy=owyl.PARALLEL_SUCCESS.REQUIRE_ALL) + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [True]) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [True]) + + def testParallel_OneSucceeds_Success(self): + """Can we visit a suceeding parallel (one succeeds)? + """ + tree = owyl.parallel(owyl.sequence(owyl.succeed(), + owyl.succeed()), + owyl.sequence(owyl.succeed(), + owyl.fail()), + policy=owyl.PARALLEL_SUCCESS.REQUIRE_ONE) + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [True]) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [True]) + + def testParallel_AllSucceed_Failure(self): + """Can we visit a failing parallel (all succeed)? + """ + tree = owyl.parallel(owyl.sequence(owyl.succeed(), + owyl.fail()), + owyl.sequence(owyl.succeed(), + owyl.succeed()), + policy=owyl.PARALLEL_SUCCESS.REQUIRE_ALL) + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [False]) + + def testParallel_OneSucceeds_Failure(self): + """Can we visit a failing parallel (one succeeds)? + """ + tree = owyl.parallel(owyl.sequence(owyl.fail(), + owyl.fail()), + owyl.sequence(owyl.fail(), + owyl.fail()), + policy=owyl.PARALLEL_SUCCESS.REQUIRE_ONE) + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [False]) + + v = owyl.visit(tree) + + results = [x for x in v if x is not None] + self.assertEqual(results, [False]) + + def testThrow(self): + """Can we throw an exception within the tree? + """ + tree = owyl.sequence(owyl.succeed(), + owyl.succeed(), + owyl.throw(throws=ValueError, + throws_message="AUGH!!"), + ) + v = owyl.visit(tree) + self.assertEqual(next(v), True) + self.assertEqual(next(v), True) + self.assertRaises(ValueError, lambda: next(v)) + + v = owyl.visit(tree) + self.assertEqual(next(v), True) + self.assertEqual(next(v), True) + self.assertRaises(ValueError, lambda: next(v)) + + def testCatch(self): + """Can we catch an exception thrown within the tree? + """ + tree = owyl.sequence(owyl.succeed(), + owyl.succeed(), + owyl.catch(owyl.throw(throws=ValueError, + throws_message="AUGH!!"), + caught=ValueError, + branch=owyl.succeed()) + ) + v = owyl.visit(tree) + self.assertEqual(next(v), True) + self.assertEqual(next(v), True) + self.assertEqual(next(v), True) + + v = owyl.visit(tree) + self.assertEqual(next(v), True) + self.assertEqual(next(v), True) + self.assertEqual(next(v), True) + + def testCatchIgnoresOthers(self): + """Does catch ignore other exceptions thrown within the tree? + """ + tree = owyl.sequence(owyl.succeed(), + owyl.succeed(), + owyl.catch(owyl.throw(throws=ValueError, + throws_message="AUGH!!"), + caught=IndexError, + branch=owyl.succeed()) + ) + v = owyl.visit(tree) + self.assertEqual(next(v), True) + self.assertEqual(next(v), True) + self.assertRaises(ValueError, lambda: next(v)) + + v = owyl.visit(tree) + self.assertEqual(next(v), True) + self.assertEqual(next(v), True) + self.assertRaises(ValueError, lambda: next(v)) + + def testIdentity(self): + """Does identity pass on return values unchanged? + """ + # Succeed after 5 iterations. + after = 5 + tree = owyl.identity(owyl.succeedAfter(after=after)) + + v = owyl.visit(tree) + for x in range(after): + self.assertEqual(next(v), None) + self.assertEqual(next(v), True) + + v = owyl.visit(tree) + for x in range(after): + self.assertEqual(next(v), None) + self.assertEqual(next(v), True) + + tree = owyl.identity(owyl.failAfter(after=after)) + + v = owyl.visit(tree) + for x in range(after): + self.assertEqual(next(v), None) + self.assertEqual(next(v), False) + + v = owyl.visit(tree) + for x in range(after): + self.assertEqual(next(v), None) + self.assertEqual(next(v), False) + + def testCheckBB(self): + """Can we check a value on a blackboard? + """ + value = "foo" + checker = lambda x: x == value + + bb = blackboard.Blackboard('test', value=value) + tree = blackboard.checkBB(key='value', + check=checker) + + # Note that we can pass in the blackboard at run-time. + v = owyl.visit(tree, blackboard=bb) + + # Check should succeed. + self.assertEqual(next(v), True) + + v = owyl.visit(tree, blackboard=bb) + self.assertEqual(next(v), True) + + bb['value'] = 'bar' + + # Check should now fail. + v = owyl.visit(tree, blackboard=bb) + self.assertEqual(next(v), False) + + v = owyl.visit(tree, blackboard=bb) + self.assertEqual(next(v), False) + + def testSetBB(self): + """Can we set a value on a blackboard? + """ + value = 'foo' + checker = lambda x: x == value + + bb = blackboard.Blackboard('test', value='bar') + tree = owyl.sequence(blackboard.setBB(key="value", + value=value), + blackboard.checkBB(key='value', + check=checker) + ) + + # Note that we can pass in the blackboard at run-time. + v = owyl.visit(tree, blackboard=bb) + + # Sequence will succeed if the check succeeds. + result = [x for x in v][-1] + self.assertEqual(result, True) + + v = owyl.visit(tree, blackboard=bb) + result = [x for x in v][-1] + self.assertEqual(result, True) + + def testRepeatUntilSucceed(self): + """Can we repeat a behavior until it succeeds? + """ + bb = blackboard.Blackboard('test', ) # 'value' defaults to None. + checker = lambda x: x is not None + + parallel = owyl.parallel + repeat = owyl.repeatUntilSucceed + checkBB = blackboard.checkBB + setBB = blackboard.setBB + + tree = parallel(repeat(checkBB(key='value', + check=checker), + final_value=True), + + # That should fail until this sets the value: + owyl.selector(owyl.fail(), + owyl.fail(), + setBB(key='value', + value='foo')), + policy=owyl.PARALLEL_SUCCESS.REQUIRE_ALL) + + v = owyl.visit(tree, blackboard=bb) + results = [x for x in v] + result = results[-1] + self.assertEqual(result, True) + + # Need to reset the blackboard to get the same results. + bb = blackboard.Blackboard('test', ) # 'value' defaults to None. + v = owyl.visit(tree, blackboard=bb) + results = [x for x in v] + result = results[-1] + self.assertEqual(result, True) + + def testRepeatUntilFail(self): + """Can we repeat a behavior until it fails? + """ + bb = blackboard.Blackboard('test', value="foo") + checker = lambda x: x and True or False # must eval to True + + parallel = owyl.parallel + repeat = owyl.repeatUntilFail + checkBB = blackboard.checkBB + setBB = blackboard.setBB + + tree = parallel(repeat(checkBB(key='value', + check=checker), + final_value=True), + + # That should succeed until this sets the value: + owyl.selector(owyl.fail(), + owyl.fail(), + setBB(key='value', + value=None)), + policy=owyl.PARALLEL_SUCCESS.REQUIRE_ALL) + + v = owyl.visit(tree, blackboard=bb) + results = [x for x in v] + result = results[-1] + self.assertEqual(result, True) + + # Need to reset the blackboard to get the same results. + bb = blackboard.Blackboard('test', value="foo") + v = owyl.visit(tree, blackboard=bb) + results = [x for x in v] + result = results[-1] + self.assertEqual(result, True) + + +if __name__ == "__main__": + runner = unittest + try: + import testoob + runner = testoob + except ImportError: + pass + runner.main()