-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmettabridge.py
133 lines (123 loc) · 5.03 KB
/
mettabridge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import sys
sys.path.append('./plugins/')
sys.path.append('./channels/')
from copy import deepcopy
from exploration import *
from narsplugin import narsplugin_init, call_nars, call_nars_tuple
from guiplugin import guiplugin_init, call_gui
from hyperon.ext import register_atoms
from hyperon import *
import time
import io
NAV_STATE_FAIL = "FAIL"
NAV_STATE_BUSY = "BUSY"
NAV_STATE_SUCCESS = "SUCCESS"
NAV_STATE = NAV_STATE_SUCCESS #"busy", "success" and "fail" (initially set to success)
def NAV_STATE_SET(NAV_STATE_ARG):
global NAV_STATE
NAV_STATE = NAV_STATE_ARG
return None
class PatternOperation(OperationObject):
def __init__(self, name, op, unwrap=False, rec=False):
super().__init__(name, op, unwrap)
self.rec = rec
def execute(self, *args, res_typ=AtomType.UNDEFINED):
return super().execute(*args, res_typ=res_typ)
def wrapnpop(func):
def wrapper(*args):
a = [str("'"+arg) if arg is SymbolAtom else str(arg) for arg in args]
res = func(*a)
return [res]
return wrapper
MeTTaROS2Command = ""
def call_ros(*a):
global runner, MeTTaROS2Command
tokenizer = runner.tokenizer()
cmd = str(a[0])
parser = SExprParser(f"(nartech.ros.command {cmd})")
MeTTaROS2Command = cmd
return parser.parse(tokenizer)
objects = "()"
def call_ros_objects(*a):
global runner, objects
tokenizer = runner.tokenizer()
parser = SExprParser(objects)
return parser.parse(tokenizer)
def call_ros_navigation(*a):
global runner, NAV_STATE
tokenizer = runner.tokenizer()
parser = SExprParser(NAV_STATE)
return parser.parse(tokenizer)
def space_init():
global runner
metta_code = None
for arg in sys.argv:
if arg.endswith(".metta"):
with open(arg, "r") as f:
metta_code = f.read()
runner = MeTTa()
runner.run("""
(= (nartech.ros.objects.filter $category $objects)
(collapse (let* (($obj (superpose $objects))
((detection $category (coordinates $x $y)) $obj))
$obj)))
""")
runner.register_atom("nartech.ros", G(PatternOperation('nartech.ros', wrapnpop(call_ros), unwrap=False)))
runner.register_atom("nartech.ros.navigation", G(PatternOperation('nartech.ros.navigation', wrapnpop(call_ros_navigation), unwrap=False)))
runner.register_atom("nartech.ros.objects", G(PatternOperation('nartech.ros.objects', wrapnpop(call_ros_objects), unwrap=False)))
runner.register_atom("nartech.nars", G(PatternOperation('nartech.nars', wrapnpop(call_nars), unwrap=False)))
runner.register_atom("nartech.nars.tuple", G(PatternOperation('nartech.nars.tuple', wrapnpop(call_nars_tuple), unwrap=False)))
runner.register_atom("nartech.gui", G(PatternOperation('nartech.gui', wrapnpop(call_gui), unwrap=False)))
narsplugin_init(runner)
guiplugin_init(runner)
if metta_code is not None:
result = runner.run(metta_code)
for x in result:
print(x) # Only prints the return value
currentTime = 0
start_time = time.time()
def space_tick(node = None):
global currentTime, MeTTaROS2Command, runner, objects
elapsedTime = round(time.time() - start_time, 2)
if elapsedTime < 10 and node is not None:
return
cmd = MeTTaROS2Command
MeTTaROS2Command = ""
objects = "()"
if node:
if cmd == "right":
node.start_navigation_by_moves("right")
if cmd == "left":
node.start_navigation_by_moves("left")
if cmd == "up":
node.start_navigation_by_moves("up")
if cmd == "down":
node.start_navigation_by_moves("down")
if cmd.startswith("(go (coordinates "):
x_y = cmd.split("(go (coordinates ")[1].split(")")[0]
x = int(x_y.split(" ")[0])
y = int(x_y.split(" ")[1])
node.start_navigation_to_coordinate((x, y))
alldetections = deepcopy(node.semantic_slam.previous_detections)
objects = "("
if "{SELF}" in node.semantic_slam.previous_detections:
(t, object_grid_x, object_grid_y, origin_x, origin_y) = node.semantic_slam.previous_detections["{SELF}"]
x_y_unknown = BFS_for_nearest_unknown_cell(node.semantic_slam.low_res_grid, node.semantic_slam.new_width, node.semantic_slam.new_height, object_grid_x, object_grid_y)
if x_y_unknown:
(x_unknown,y_unknown) = x_y_unknown
alldetections["unknown"] = (time.time(), x_unknown, y_unknown, origin_x, origin_y)
print(alldetections)
for category in alldetections:
(t, object_grid_x, object_grid_y, _, __) = alldetections[category]
SEXP = f"(detection {category} (coordinates {object_grid_x} {object_grid_y}))"
objects += SEXP
if category == "{SELF}":
SELF_position = (object_grid_x, object_grid_y)
objects += ")"
currentTime += 1
print("STEP", str(currentTime) +":", runner.run(f"!(Step {currentTime} {elapsedTime})"))
space_init()
if __name__ == "__main__":
while True:
space_tick()
time.sleep(0.01)