Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4562067
  • 博文数量: 1214
  • 博客积分: 13195
  • 博客等级: 上将
  • 技术积分: 9105
  • 用 户 组: 普通用户
  • 注册时间: 2007-01-19 14:41
个人简介

C++,python,热爱算法和机器学习

文章分类

全部博文(1214)

文章存档

2021年(13)

2020年(49)

2019年(14)

2018年(27)

2017年(69)

2016年(100)

2015年(106)

2014年(240)

2013年(5)

2012年(193)

2011年(155)

2010年(93)

2009年(62)

2008年(51)

2007年(37)

分类: Python/Ruby

2012-03-31 20:53:23

  通过一个二维数组表示障碍物与通畅,简陋的模拟对智能车寻找路径的算法。
  第一次做虚拟化,感觉很有挑战,也挺好玩。

1.广度优先搜索法(search_breadth_first_1D函数),由起点找到终点。trick:一个点的下一结点有若干个,而上一结点只有一个。
2.这里没有写A*算法的程序。A*程序是有一个启发式heuristics的“地图”,上面标注了当前点到终点的距离,通过比较“已走路程”+“剩余路程”的最小值,确定下一步前进的方向。因为算法简单,而且heuristics变量需要先验的测量或者训练,所以没有写出来。
3.动态规划(compute_value函数)。思路是先利用广度优先搜索,计算出每个点距离终点的距离(类似于搜索出来一个heuristics变量),然后计算出来一条最佳路径。其实我做的跟广度优先搜索没区别。没有使用动态规划,因为太慢了。
4.汽车有了头尾与朝向,汽车可以前进一格,左转进一格、右转进一格,每个方向的cost不同,找出最佳路径(optimum_policy2D函数)。可以认为是2D编程,跟坦克大战差不多。这时要用4维的变量来表示每一点各个方向距离终点的距离。我开始想了特别糟糕的算法,最后发现居然无法表示各个方向,是个错误的算法。这个用的是动态规划,很慢,一点一点的改变。
5.一维动态规划+方向随机概率(stochastic_value函数)。汽车朝某个方向前进时,可能落在其左边或者右边,所以可能误入障碍物,或者超出边界,这时需要惩罚cost。汽车朝一个方向总的cost = sum(此方向概率 * 惩罚cost或者距终点的值) + step_cost
6.最后给出了解决动态规划慢的一个思路:n叉树

程序源码:
# Grid format:
#   0 = Navigable space
#   1 = Occupied space

grid = [[0, 0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0, 0],
        [0, 0, 0, 0, 1, 0],
        [0, 0, 1, 1, 1, 0],
        [0, 0, 0, 0, 1, 0]]

init = [0, 0]
goal = [len(grid)-1, len(grid[0])-1] # Make sure that the goal definition stays in the function.

delta = [[-1, 0 ], # go up
        [ 0, -1], # go left
        [ 1, 0 ], # go down
        [ 0, 1 ]] # go right
delta_name = ['^', '<', 'v', '>']

cost = 1

def print_expand(expand):
    for i in range(len(expand)):
        print(expand[i])

def print_path(action):
    path = [[' ' for col in range(len(grid[0]))] for row in range(len(grid))]
    x, y = goal[0], goal[1]
    path[goal[0]][goal[1]] = '*'
    while x != init[0] or y != init[1]:
        # 下面x,y 都要变化,所以这里提取temp,delta_name取下标就不会错
        temp = action[x][y]
        opposite = (temp - 2) % len(delta)
        x += delta[opposite][0]
        y += delta[opposite][1]
        path[x][y] = delta_name[temp]
    for i in range(len(path)):
        print(path[i])

# breadth-first search shorest path, and print the track.
def search_breadth_first_1D():
    xboundary = len(grid) - 1
    yboundary = len(grid[0]) - 1
    search_list = [[0, init[0], init[1]]]
    result = []
    action = [[-1 for col in range(yboundary+1)] for row in range(xboundary+1)]
    closed = [[0 for col in range(yboundary+1)] for row in range(xboundary+1)]
    closed[init[0]][init[1]] = 1
    expand = [[-1 for col in range(yboundary+1)] for row in range(xboundary+1)]
    expand[init[0]][init[1]] = 0
    expand_count = 1
   
    while search_list:
        #search_list.sort() breadth-first 已经是按照g-value(距离Start点的距离) 排序好的。
        current = search_list.pop(0)
        for i in range(len(delta)):
            x1 = current[1] + delta[i][0]
            y1 = current[2] + delta[i][1]
            if x1 < 0 or x1 > xboundary or y1 <0 or y1 > yboundary or grid[x1][y1] or closed[x1][y1]:
                continue
            if x1 == goal[0] and y1 == goal[1]:
                result.append([current[0]+cost, x1, y1])
                # 终点赋值一次,expand和 action 就都不再赋值,这样能得到最短路径
                if expand[goal[0]][goal[1]] == -1: # 一般(expand[goal[0]][goal[1]] > expand_count) 条件不成立,因为短路径先到达终点,且expand_count 是递增赋值的
                    expand[x1][y1] = expand_count
                    expand_count += 1
                    action[x1][y1] = i # for back propagation
            else:
                search_list.append([current[0]+cost, x1, y1])
                closed[x1][y1] = 1
                expand[x1][y1] = expand_count
                expand_count += 1
                action[x1][y1] = i # for back propagation

    print_expand(expand)
    print_path(action)
    if result == []:
        return 'fail'
    else:
        result.sort()
        return result#.pop(0)
print(search_breadth_first_1D())
print('\n')


# compute the value,like heuristic.Then easily get the policy.
# dynamic programming
def compute_value():
    value = [[999 for row in range(len(grid[0]))] for col in range(len(grid))]
    closed = [[0 for row in range(len(grid[0]))] for col in range(len(grid))]
   
    x, y = goal[0], goal[1]
    value[x][y] = 0
    closed[x][y] = 1
    temp = [[x, y]]

    # 从后往前搜索,填写距离终点的距离数
    while temp:
        x, y = temp.pop(0)
        for i in range(len(delta)):
            x1 = x + delta[i][0]
            y1 = y + delta[i][1]
            if x1 < 0 or x1 >= len(grid) or y1 < 0 or y1 >= len(grid[0]) or grid[x1][y1] or closed[x1][y1]:
                continue
            v2 = value[x][y] + cost
            if v2 > value[x1][y1]:
                continue
            temp.append([x1, y1])
            closed[x1][y1] = 1
            value[x1][y1] = v2

    # 从前往后传播,得到每个点的周围四点中,距终点最近的一个,确认此点的方向
    policy = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]           
    for i in range(len(grid)):
        for j in range(len(grid[0])):
            if grid[i][j]:
                continue
            temp = []
            for k in range(len(delta)):
                x1 = i + delta[k][0]
                y1 = j + delta[k][1]
                if x1 >= 0 and y1 >= 0 and x1 < len(grid) and y1 < len(grid[0]) and grid[x1][y1] == 0:
                    temp.append([value[x1][y1], k])
            if temp == []:
                continue
            temp.sort()
            lowest = temp.pop(0)
            if value[i][j] > lowest[0]:
                policy[i][j] = delta_name[lowest[1]]
    policy[goal[0]][goal[1]] = '*'
    print_expand(policy)
    return value
   
a = compute_value() 
for i in a:
    print(i)
print('\n')


# You are given a car in a grid with initial state
# init = [x-position, y-position, orientation]
# where x/y-position is its position in a given
# grid and orientation is 0-3 corresponding to 'up',
# 'left', 'down' or 'right'.
#
# Your task is to compute and return the car's optimal
# path to the position specified in `goal'; where
# the costs for each motion are as defined in `cost'.
#
# grid format:
#     0 = navigable space
#     1 = occupied space
grid = [[1, 1, 1, 0, 0, 0],
        [1, 1, 1, 0, 1, 0],
        [0, 0, 0, 0, 0, 0],
        [1, 1, 1, 0, 1, 1],
        [1, 1, 1, 0, 1, 1]]
goal = [2, 0] # final position
init = [4, 3, 0] # first 2 elements are coordinates, third is direction
cost = [2, 1, 20] # the cost field has 3 values: right turn, no turn, left turn

# there are four motion directions: up/left/down/right
# increasing the index in this array corresponds to
# a left turn. Decreasing is is a right turn.

forward = [[-1,  0], # go up
           [ 0, -1], # go left
           [ 1,  0], # go down
           [ 0,  1]] # do right
forward_name = ['up', 'left', 'down', 'right']

# the cost field has 3 values: right turn, no turn, left turn
action = [-1, 0, 1]
action_name = ['R', '#', 'L']

def optimum_policy2D():
    value = [[[999 for row in range(len(grid[0]))] for col in range(len(grid))], # 距终点的各个方向的距离
             [[999 for row in range(len(grid[0]))] for col in range(len(grid))],
             [[999 for row in range(len(grid[0]))] for col in range(len(grid))],
             [[999 for row in range(len(grid[0]))] for col in range(len(grid))]]
    policy = [[[' ' for row in range(len(grid[0]))] for col in range(len(grid))], # 距终点各个方向上的走向
             [[' ' for row in range(len(grid[0]))] for col in range(len(grid))],
             [[' ' for row in range(len(grid[0]))] for col in range(len(grid))],
             [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]]
    policy2D = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]

    change = True
    while change:
        change = False
        # go through all grid cells and calculate values
        for x in range(len(grid)):
            for y in range(len(grid[0])):
                for orientation in range(4):
                    if goal[0] == x and goal[1] == y:
                        if value[orientation][x][y] > 0:
                            value[orientation][x][y] = 0
                            policy[orientation][x][y] = '*'
                            change = True
                    elif grid[x][y] == 0:

                        # calculate the three ways to propagate value
                        for i in range(3):
                            o2 = (orientation + action[i]) % 4
                            x2 = x + forward[o2][0]
                            y2 = y + forward[o2][1]

                            if x2 >= 0 and x2 < len(grid) and y2 >= 0 and y2 < len(grid[0]) and grid[x2][y2] == 0:
                                # 从后往前搜索,填写value的过程。可以把x2,y2,o2看成goal的位置和方向,x,y,orientation走一步后到2,所以1的值=2的值+转向的值
                                # 相当于从前往后,不断的减去转向的值,到终点时,相减的结果变成了0
                                v2 = value[o2][x2][y2] + cost[i] # tricky
                                if v2 < value[orientation][x][y]:
                                    value[orientation][x][y] = v2
                                    policy[orientation][x][y] = action_name[i]
                                    change = True
                                   
    x, y, orientation = init[0], init[1], init[2]
    policy2D[x][y] = policy[orientation][x][y]
    while policy[orientation][x][y] != '*':
        if policy[orientation][x][y] == '#':
            o2 = orientation
        elif policy[orientation][x][y] == 'R':
            o2 = (orientation - 1) % 4
        elif policy[orientation][x][y] == 'L':
            o2 = (orientation + 1) % 4
        x += forward[o2][0]
        y += forward[o2][1]
        orientation = o2
        policy2D[x][y] = policy[orientation][x][y]

    for i in range(len(policy2D)):
        print(policy2D[i])
       
def optimum_policy2D_abandon(): # 结果是一个一维的东西,不是三维的思维
    value = [[[999 for row in range(len(grid[0]))] for col in range(len(grid))],
             [[999 for row in range(len(grid[0]))] for col in range(len(grid))],
             [[999 for row in range(len(grid[0]))] for col in range(len(grid))],
             [[999 for row in range(len(grid[0]))] for col in range(len(grid))]]
    ori = [[[-1 for row in range(len(grid[0]))] for col in range(len(grid))],
           [[-1 for row in range(len(grid[0]))] for col in range(len(grid))],
           [[-1 for row in range(len(grid[0]))] for col in range(len(grid))],
           [[-1 for row in range(len(grid[0]))] for col in range(len(grid))]]
    policy2D = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]
    for i in range(len(value)):
        value[i][goal[0]][goal[1]] = 0
        ori[i][goal[0]][goal[1]] = i
    back_cost = cost[:]
    back_cost.reverse()

    # 4种在终点的初始回溯方向   
    for idx in range(len(value)):
        change = True
        while change:
            change = False
            for i in range(len(grid)):
                for j in range(len(grid[0])):
                    if grid[i][j] == 1 or ori[idx][i][j] == -1:
                        continue
                   
                    for k in range(len(action)):
                        direction = (ori[idx][i][j] + action[k]) % len(forward)
                        x1 = i + forward[direction][0]
                        y1 = j + forward[direction][1]
                        if x1 < 0 or x1 >= len(grid) or y1 < 0 or y1 >= len(grid[0]) or grid[x1][y1] == 1:
                            continue
                        else:
                            new_cost = back_cost[k] + value[idx][i][j]
                            if new_cost < value[idx][x1][y1]:
                                value[idx][x1][y1] = new_cost
                                ori[idx][x1][y1] = direction
                                change = True
    for idx in range(len(value)):
        # 逆溯时方向相反,需要反向
        ori[idx][init[0]][init[1]] = (ori[idx][init[0]][init[1]] + 2) % len(forward)
        direction_diff = (ori[idx][init[0]][init[1]] - init[2]) % len(forward)
        # 下面是不严格的推导,因为可能转向时遇到障碍物
        if direction_diff == 1:
            # 开始的方向加一个左转的cost,取代了原来直走一格的cost
            value[idx][init[0]][init[1]] += cost[2] - cost[1]
        elif direction_diff == 3:
            value[idx][init[0]][init[1]] += cost[0] - cost[1]
        elif direction_diff == 2:
            # 反向要左转三次、右转一次,或者相反,取代原来走的2格cost
            if (cost[0]*3 + cost[2]) > (cost[2]*3 + cost[0]):
                value[idx][init[0]][init[1]] += (cost[2]*3 + cost[0]) - 2*cost[1]
            else:
                value[idx][init[0]][init[1]] += (cost[0]*3 + cost[2]) - 2*cost[1]
        else:
            pass
    # 取出最小的一个
    temp = value[0][init[0]][init[1]]
    index = 0
    for idx in range(1, len(value)):
        if temp > value[idx][init[0]][init[1]]:
            temp = value[idx][init[0]][init[1]]
            index = idx

    for i in ori[index]:
        print(i)
    for i in value[index]:
        print(i)
   
    policy2D[goal[0]][goal[1]] = '*'
    x, y = goal[0], goal[1]
##    while x != init[0] or y != init[y]:
##        x = x + forward[ori[index][x][y]][0]
##        y = y + forward[ori[index][x][y]][1]
##        policy2D[x][y] = '#'
    return policy2D



success_prob = 0.5                     
failure_prob = (1.0 - success_prob)/2.0 # Probability(stepping left) = prob(stepping right) = failure_prob
collision_cost = 100                   
cost_step = 1 

def stochastic_value():
    value = [[1000 for row in range(len(grid[0]))] for col in range(len(grid))]
    policy = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]
    value[goal[0]][goal[1]] = 0
    policy[goal[0]][goal[1]] = '*'
    change = True
    while change:
        change = False
        for x in range(len(grid)):
            for y in range(len(grid[0])):
                if grid[x][y]: continue
                for direct in range(len(delta)):
                    v2 = cost_step
                    for turn in range(-1, 2):
                        i = (direct + turn) % len(delta)
                        x1 = x + delta[i][0]
                        y1 = y + delta[i][1]
                        if turn == 0:
                            p2 = success_prob
                        else:
                            p2 = failure_prob
                        if x1 < 0 or x1 >= len(grid) or y1 < 0 or y1 >= len(grid[0]) or grid[x1][y1]:
                            v2 += p2 * collision_cost
                        else:
                            v2 += p2 * value[x1][y1]
                    if v2 < value[x][y]:
                        value[x][y] = v2
                        policy[x][y] = delta_name[direct]
                        change = True

    return value, policy
a,b = stochastic_value()
print_expand(a)
print_expand(b)

def stochastic_value_abandon():
    value = [[1000 for row in range(len(grid[0]))] for col in range(len(grid))]
    policy = [[' ' for row in range(len(grid[0]))] for col in range(len(grid))]
    value[goal[0]][goal[1]] = 0
    policy[goal[0]][goal[1]] = '*'
    change = True
    while change:
        change = False
        update = [[goal[0], goal[1]]]
        while update:
            x, y = update.pop(0)
            for i in range(len(delta)):
                x1 = x + delta[i][0]
                y1 = y + delta[i][1]
                if x1 < 0 or x1 >= len(grid) or y1 < 0 or y1 >= len(grid[0]) or grid[x1][y1]:
                    continue
                lvalue = (i - 1) % 4
                rvalue = (i + 1) % 4
                lx, ly = x1 + delta[lvalue][0], y1 + delta[lvalue][1]
                rx, ry = x1 + delta[rvalue][0], y1 + delta[rvalue][1]
                if lx < 0 or lx >= len(grid) or ly < 0 or ly >= len(grid[0]) or grid[lx][ly]:
                    lvalue = collision_cost
                else:
                    lvalue = value[lx][ly]
                if rx < 0 or rx >= len(grid) or ry < 0 or ry >= len(grid[0]) or grid[rx][ry]:
                    rvalue = collision_cost
                else:
                    rvalue = value[rx][ry]
                v2 = success_prob * value[x][y] + failure_prob * (lvalue + rvalue) + cost_step
                if v2 < value[x1][y1]:
                    value[x1][y1] = v2
                    update.append([x1, y1])
                    change = True
    return value, policy

# As you see, dynamic programming for planning is very inefficient.
# It has to go through the entire map to search whether there is a change.
# Google map search is split the entire map into pieces, then calculate
# each piece, this is a litter better.
阅读(1255) | 评论(0) | 转发(0) |
0

上一篇:Matlab的for循环

下一篇:在Python中调试代码

给主人留下些什么吧!~~