通过一个二维数组表示障碍物与通畅,简陋的模拟对智能车寻找路径的算法。
第一次做虚拟化,感觉很有挑战,也挺好玩。
1.广度优先搜索法(search_breadth_first_1D函数),由起点找到终点。trick:一个点的下一结点有若干个,而上一结点只有一个。
2.这里没有写A*算法的程序。A*程序是有一个启发式heuristics的“地图”,上面标注了当前点到终点的距离,通过比较“已走路程”+“剩余路程”的最小值,确定下一步前进的方向。因为算法简单,而且heuristics变量需要先验的测量或者训练,所以没有写出来。
3.动态规划(compute_value函数)。思路是先利用广度优先搜索,计算出每个点距离终点的距离(类似于搜索出来一个heuristics变量),然后计算出来一条最佳路径。其实我做的跟广度优先搜索没区别。没有使用动态规划,因为太慢了。
4.汽车有了头尾与朝向,汽车可以前进一格,左转进一格、右转进一格,每个方向的cost不同,找出最佳路径(optimum_policy2D函数)。可以认为是2D编程,跟坦克大战差不多。这时要用4维的变量来表示每一点各个方向距离终点的距离。我开始想了特别糟糕的算法,最后发现居然无法表示各个方向,是个错误的算法。这个用的是动态规划,很慢,一点一点的改变。
5.一维动态规划+方向随机概率(stochastic_value函数)。汽车朝某个方向前进时,可能落在其左边或者右边,所以可能误入障碍物,或者超出边界,这时需要惩罚cost。汽车朝一个方向总的cost = sum(此方向概率 * 惩罚cost或者距终点的值) + step_cost
6.最后给出了解决动态规划慢的一个思路:n叉树
程序源码:
# Grid format:
# 0 = Navigable space
# 1 = Occupied space
grid = [[0, 0, 1, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 1, 0],
[0, 0, 1, 1, 1, 0],
[0, 0, 0, 0, 1, 0]]
init = [0, 0]
goal = [len(grid)-1, len(grid[0])-1] # Make sure that the goal definition stays in the function.
delta = [[-1, 0 ], # go up
[ 0, -1], # go left
[ 1, 0 ], # go down
[ 0, 1 ]] # go right
delta_name = ['^', '<', 'v', '>']
cost = 1
def print_expand(expand):
for i in range(len(expand)):
print(expand[i])
def print_path(action):
path = [[' ' for col in range(len(grid[0]))] for row in range(len(grid))]
x, y = goal[0], goal[1]
path[goal[0]][goal[1]] = '*'
while x != init[0] or y != init[1]:
# 下面x,y 都要变化,所以这里提取temp,delta_name取下标就不会错
temp = action[x][y]
opposite = (temp - 2) % len(delta)
x += delta[opposite][0]
y += delta[opposite][1]
path[x][y] = delta_name[temp]
for i in range(len(path)):
print(path[i])
# breadth-first search shorest path, and print the track.
def search_breadth_first_1D():
xboundary = len(grid) - 1
yboundary = len(grid[0]) - 1
search_list = [[0, init[0], init[1]]]
result = []
action = [[-1 for col in range(yboundary+1)] for row in range(xboundary+1)]
closed = [[0 for col in range(yboundary+1)] for row in range(xboundary+1)]
closed[init[0]][init[1]] = 1
expand = [[-1 for col in range(yboundary+1)] for row in range(xboundary+1)]
expand[init[0]][init[1]] = 0
expand_count = 1
while search_list:
#search_list.sort() breadth-first 已经是按照g-value(距离Start点的距离) 排序好的。
current = search_list.pop(0)
for i in range(len(delta)):
x1 = current[1] + delta[i][0]
y1 = current[2] + delta[i][1]
if x1 < 0 or x1 > xboundary or y1 <0 or y1 > yboundary or grid[x1][y1] or closed[x1][y1]:
continue
if x1 == goal[0] and y1 == goal[1]:
result.append([current[0]+cost, x1, y1])
# 终点赋值一次,expand和 action 就都不再赋值,这样能得到最短路径
if expand[goal[0]][goal[1]] == -1: # 一般(expand[goal[0]][goal[1]] > expand_count) 条件不成立,因为短路径先到达终点,且expand_count 是递增赋值的
expand[x1][y1] = expand_count
expand_count += 1
action[x1][y1] = i # for back propagation
else:
search_list.append([current[0]+cost, x1, y1])
closed[x1][y1] = 1
expand[x1][y1] = expand_count
expand_count += 1
action[x1][y1] = i # for back propagation
print_expand(expand)
print_path(action)
if result == []:
return 'fail'
else:
result.sort()
return result#.pop(0)
print(search_breadth_first_1D())
print('\n')
# compute the value,like heuristic.Then easily get the policy.
# dynamic programming
def compute_value():
value = [[999 for row in range(len(grid[0]))] for col in range(len(grid))]
closed = [[0 for row in range(len(grid[0]))] for col in range(len(grid))]
x, y = goal[0], goal[1]
value[x][y] = 0
closed[x][y] = 1
temp = [[x, y]]
# 从后往前搜索,填写距离终点的距离数
while temp:
x, y = temp.pop(0)
for i in range(len(delta)):
x1 = x + delta[i][0]
y1 = y + delta[i][1]
if x1 < 0 or x1 >= len(grid) or y1 < 0 or y1 >= len(grid[0]) or grid[x1][y1] or closed[x1][y1]:
continue
v2 = value[x][y] + cost
if v2 > value[x1][y1]:
continue
temp.append([x1, y1])
closed[x1][y1] = 1
value[x1][y1] = v2
# 从前往后传播,得到每个点的周围四点中,距终点最近的一个,确认此点的方向
policy = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]
for i in range(len(grid)):
for j in range(len(grid[0])):
if grid[i][j]:
continue
temp = []
for k in range(len(delta)):
x1 = i + delta[k][0]
y1 = j + delta[k][1]
if x1 >= 0 and y1 >= 0 and x1 < len(grid) and y1 < len(grid[0]) and grid[x1][y1] == 0:
temp.append([value[x1][y1], k])
if temp == []:
continue
temp.sort()
lowest = temp.pop(0)
if value[i][j] > lowest[0]:
policy[i][j] = delta_name[lowest[1]]
policy[goal[0]][goal[1]] = '*'
print_expand(policy)
return value
a = compute_value()
for i in a:
print(i)
print('\n')
# You are given a car in a grid with initial state
# init = [x-position, y-position, orientation]
# where x/y-position is its position in a given
# grid and orientation is 0-3 corresponding to 'up',
# 'left', 'down' or 'right'.
#
# Your task is to compute and return the car's optimal
# path to the position specified in `goal'; where
# the costs for each motion are as defined in `cost'.
#
# grid format:
# 0 = navigable space
# 1 = occupied space
grid = [[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 1, 0],
[0, 0, 0, 0, 0, 0],
[1, 1, 1, 0, 1, 1],
[1, 1, 1, 0, 1, 1]]
goal = [2, 0] # final position
init = [4, 3, 0] # first 2 elements are coordinates, third is direction
cost = [2, 1, 20] # the cost field has 3 values: right turn, no turn, left turn
# there are four motion directions: up/left/down/right
# increasing the index in this array corresponds to
# a left turn. Decreasing is is a right turn.
forward = [[-1, 0], # go up
[ 0, -1], # go left
[ 1, 0], # go down
[ 0, 1]] # do right
forward_name = ['up', 'left', 'down', 'right']
# the cost field has 3 values: right turn, no turn, left turn
action = [-1, 0, 1]
action_name = ['R', '#', 'L']
def optimum_policy2D():
value = [[[999 for row in range(len(grid[0]))] for col in range(len(grid))], # 距终点的各个方向的距离
[[999 for row in range(len(grid[0]))] for col in range(len(grid))],
[[999 for row in range(len(grid[0]))] for col in range(len(grid))],
[[999 for row in range(len(grid[0]))] for col in range(len(grid))]]
policy = [[[' ' for row in range(len(grid[0]))] for col in range(len(grid))], # 距终点各个方向上的走向
[[' ' for row in range(len(grid[0]))] for col in range(len(grid))],
[[' ' for row in range(len(grid[0]))] for col in range(len(grid))],
[[' ' for row in range(len(grid[0]))] for col in range(len(grid))]]
policy2D = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]
change = True
while change:
change = False
# go through all grid cells and calculate values
for x in range(len(grid)):
for y in range(len(grid[0])):
for orientation in range(4):
if goal[0] == x and goal[1] == y:
if value[orientation][x][y] > 0:
value[orientation][x][y] = 0
policy[orientation][x][y] = '*'
change = True
elif grid[x][y] == 0:
# calculate the three ways to propagate value
for i in range(3):
o2 = (orientation + action[i]) % 4
x2 = x + forward[o2][0]
y2 = y + forward[o2][1]
if x2 >= 0 and x2 < len(grid) and y2 >= 0 and y2 < len(grid[0]) and grid[x2][y2] == 0:
# 从后往前搜索,填写value的过程。可以把x2,y2,o2看成goal的位置和方向,x,y,orientation走一步后到2,所以1的值=2的值+转向的值
# 相当于从前往后,不断的减去转向的值,到终点时,相减的结果变成了0
v2 = value[o2][x2][y2] + cost[i] # tricky
if v2 < value[orientation][x][y]:
value[orientation][x][y] = v2
policy[orientation][x][y] = action_name[i]
change = True
x, y, orientation = init[0], init[1], init[2]
policy2D[x][y] = policy[orientation][x][y]
while policy[orientation][x][y] != '*':
if policy[orientation][x][y] == '#':
o2 = orientation
elif policy[orientation][x][y] == 'R':
o2 = (orientation - 1) % 4
elif policy[orientation][x][y] == 'L':
o2 = (orientation + 1) % 4
x += forward[o2][0]
y += forward[o2][1]
orientation = o2
policy2D[x][y] = policy[orientation][x][y]
for i in range(len(policy2D)):
print(policy2D[i])
def optimum_policy2D_abandon(): # 结果是一个一维的东西,不是三维的思维
value = [[[999 for row in range(len(grid[0]))] for col in range(len(grid))],
[[999 for row in range(len(grid[0]))] for col in range(len(grid))],
[[999 for row in range(len(grid[0]))] for col in range(len(grid))],
[[999 for row in range(len(grid[0]))] for col in range(len(grid))]]
ori = [[[-1 for row in range(len(grid[0]))] for col in range(len(grid))],
[[-1 for row in range(len(grid[0]))] for col in range(len(grid))],
[[-1 for row in range(len(grid[0]))] for col in range(len(grid))],
[[-1 for row in range(len(grid[0]))] for col in range(len(grid))]]
policy2D = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]
for i in range(len(value)):
value[i][goal[0]][goal[1]] = 0
ori[i][goal[0]][goal[1]] = i
back_cost = cost[:]
back_cost.reverse()
# 4种在终点的初始回溯方向
for idx in range(len(value)):
change = True
while change:
change = False
for i in range(len(grid)):
for j in range(len(grid[0])):
if grid[i][j] == 1 or ori[idx][i][j] == -1:
continue
for k in range(len(action)):
direction = (ori[idx][i][j] + action[k]) % len(forward)
x1 = i + forward[direction][0]
y1 = j + forward[direction][1]
if x1 < 0 or x1 >= len(grid) or y1 < 0 or y1 >= len(grid[0]) or grid[x1][y1] == 1:
continue
else:
new_cost = back_cost[k] + value[idx][i][j]
if new_cost < value[idx][x1][y1]:
value[idx][x1][y1] = new_cost
ori[idx][x1][y1] = direction
change = True
for idx in range(len(value)):
# 逆溯时方向相反,需要反向
ori[idx][init[0]][init[1]] = (ori[idx][init[0]][init[1]] + 2) % len(forward)
direction_diff = (ori[idx][init[0]][init[1]] - init[2]) % len(forward)
# 下面是不严格的推导,因为可能转向时遇到障碍物
if direction_diff == 1:
# 开始的方向加一个左转的cost,取代了原来直走一格的cost
value[idx][init[0]][init[1]] += cost[2] - cost[1]
elif direction_diff == 3:
value[idx][init[0]][init[1]] += cost[0] - cost[1]
elif direction_diff == 2:
# 反向要左转三次、右转一次,或者相反,取代原来走的2格cost
if (cost[0]*3 + cost[2]) > (cost[2]*3 + cost[0]):
value[idx][init[0]][init[1]] += (cost[2]*3 + cost[0]) - 2*cost[1]
else:
value[idx][init[0]][init[1]] += (cost[0]*3 + cost[2]) - 2*cost[1]
else:
pass
# 取出最小的一个
temp = value[0][init[0]][init[1]]
index = 0
for idx in range(1, len(value)):
if temp > value[idx][init[0]][init[1]]:
temp = value[idx][init[0]][init[1]]
index = idx
for i in ori[index]:
print(i)
for i in value[index]:
print(i)
policy2D[goal[0]][goal[1]] = '*'
x, y = goal[0], goal[1]
## while x != init[0] or y != init[y]:
## x = x + forward[ori[index][x][y]][0]
## y = y + forward[ori[index][x][y]][1]
## policy2D[x][y] = '#'
return policy2D
success_prob = 0.5
failure_prob = (1.0 - success_prob)/2.0 # Probability(stepping left) = prob(stepping right) = failure_prob
collision_cost = 100
cost_step = 1
def stochastic_value():
value = [[1000 for row in range(len(grid[0]))] for col in range(len(grid))]
policy = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]
value[goal[0]][goal[1]] = 0
policy[goal[0]][goal[1]] = '*'
change = True
while change:
change = False
for x in range(len(grid)):
for y in range(len(grid[0])):
if grid[x][y]: continue
for direct in range(len(delta)):
v2 = cost_step
for turn in range(-1, 2):
i = (direct + turn) % len(delta)
x1 = x + delta[i][0]
y1 = y + delta[i][1]
if turn == 0:
p2 = success_prob
else:
p2 = failure_prob
if x1 < 0 or x1 >= len(grid) or y1 < 0 or y1 >= len(grid[0]) or grid[x1][y1]:
v2 += p2 * collision_cost
else:
v2 += p2 * value[x1][y1]
if v2 < value[x][y]:
value[x][y] = v2
policy[x][y] = delta_name[direct]
change = True
return value, policy
a,b = stochastic_value()
print_expand(a)
print_expand(b)
def stochastic_value_abandon():
value = [[1000 for row in range(len(grid[0]))] for col in range(len(grid))]
policy = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]
value[goal[0]][goal[1]] = 0
policy[goal[0]][goal[1]] = '*'
change = True
while change:
change = False
update = [[goal[0], goal[1]]]
while update:
x, y = update.pop(0)
for i in range(len(delta)):
x1 = x + delta[i][0]
y1 = y + delta[i][1]
if x1 < 0 or x1 >= len(grid) or y1 < 0 or y1 >= len(grid[0]) or grid[x1][y1]:
continue
lvalue = (i - 1) % 4
rvalue = (i + 1) % 4
lx, ly = x1 + delta[lvalue][0], y1 + delta[lvalue][1]
rx, ry = x1 + delta[rvalue][0], y1 + delta[rvalue][1]
if lx < 0 or lx >= len(grid) or ly < 0 or ly >= len(grid[0]) or grid[lx][ly]:
lvalue = collision_cost
else:
lvalue = value[lx][ly]
if rx < 0 or rx >= len(grid) or ry < 0 or ry >= len(grid[0]) or grid[rx][ry]:
rvalue = collision_cost
else:
rvalue = value[rx][ry]
v2 = success_prob * value[x][y] + failure_prob * (lvalue + rvalue) + cost_step
if v2 < value[x1][y1]:
value[x1][y1] = v2
update.append([x1, y1])
change = True
return value, policy
# As you see, dynamic programming for planning is very inefficient.
# It has to go through the entire map to search whether there is a change.
# Google map search is split the entire map into pieces, then calculate
# each piece, this is a litter better.