其他分享
首页 > 其他分享> > NCP:可实现自动驾驶控制的神经回路策略

NCP:可实现自动驾驶控制的神经回路策略

作者:互联网

引入

参考资料

NCP

import paddle
import paddle.nn as nn
import numpy as np


class LTCCell(nn.Layer):
    def __init__(
        self,
        wiring,
        in_features=None,
        input_mapping="affine",
        output_mapping="affine",
        ode_unfolds=6,
        epsilon=1e-8,
    ):
        super(LTCCell, self).__init__()
        if in_features is not None:
            wiring.build((None, in_features))
        if not wiring.is_built():
            raise ValueError(
                "Wiring error! Unknown number of input features. Please pass the parameter 'in_features' or call the 'wiring.build()'."
            )
        self._init_ranges = {
            "gleak": (0.001, 1.0),
            "vleak": (-0.2, 0.2),
            "cm": (0.4, 0.6),
            "w": (0.001, 1.0),
            "sigma": (3, 8),
            "mu": (0.3, 0.8),
            "sensory_w": (0.001, 1.0),
            "sensory_sigma": (3, 8),
            "sensory_mu": (0.3, 0.8),
        }
        self._wiring = wiring
        self._input_mapping = input_mapping
        self._output_mapping = output_mapping
        self._ode_unfolds = ode_unfolds
        self._epsilon = epsilon
        self._allocate_parameters()

    @property
    def state_size(self):
        return self._wiring.units

    @property
    def sensory_size(self):
        return self._wiring.input_dim

    @property
    def motor_size(self):
        return self._wiring.output_dim

    @property
    def output_size(self):
        return self.motor_size

    @property
    def synapse_count(self):
        return np.sum(np.abs(self._wiring.adjacency_matrix))

    @property
    def sensory_synapse_count(self):
        return np.sum(np.abs(self._wiring.adjacency_matrix))

    def add_weight(self, name, init_value):
        param = self.create_parameter(
            init_value.shape, attr=nn.initializer.Assign(init_value))
        self.add_parameter(name, param)
        return param

    def _get_init_value(self, shape, param_name):
        minval, maxval = self._init_ranges[param_name]
        if minval == maxval:
            return paddle.ones(shape) * minval
        else:
            return paddle.rand(shape) * (maxval - minval) + minval

    def _allocate_parameters(self):
        print("alloc!")
        self._params = {}
        self._params["gleak"] = self.add_weight(
            name="gleak", init_value=self._get_init_value((self.state_size,), "gleak")
        )
        self._params["vleak"] = self.add_weight(
            name="vleak", init_value=self._get_init_value((self.state_size,), "vleak")
        )
        self._params["cm"] = self.add_weight(
            name="cm", init_value=self._get_init_value((self.state_size,), "cm")
        )
        self._params["sigma"] = self.add_weight(
            name="sigma",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "sigma"
            ),
        )
        self._params["mu"] = self.add_weight(
            name="mu",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "mu"),
        )
        self._params["w"] = self.add_weight(
            name="w",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "w"),
        )
        self._params["erev"] = self.add_weight(
            name="erev",
            init_value=paddle.to_tensor(self._wiring.erev_initializer()),
        )
        self._params["sensory_sigma"] = self.add_weight(
            name="sensory_sigma",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_sigma"
            ),
        )
        self._params["sensory_mu"] = self.add_weight(
            name="sensory_mu",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_mu"
            ),
        )
        self._params["sensory_w"] = self.add_weight(
            name="sensory_w",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_w"
            ),
        )
        self._params["sensory_erev"] = self.add_weight(
            name="sensory_erev",
            init_value=paddle.to_tensor(
                self._wiring.sensory_erev_initializer()),
        )

        self._params["sparsity_mask"] = paddle.to_tensor(
            np.abs(self._wiring.adjacency_matrix)
        )
        self._params["sensory_sparsity_mask"] = paddle.to_tensor(
            np.abs(self._wiring.sensory_adjacency_matrix)
        )

        if self._input_mapping in ["affine", "linear"]:
            self._params["input_w"] = self.add_weight(
                name="input_w",
                init_value=paddle.ones((self.sensory_size,)),
            )
        if self._input_mapping == "affine":
            self._params["input_b"] = self.add_weight(
                name="input_b",
                init_value=paddle.zeros((self.sensory_size,)),
            )

        if self._output_mapping in ["affine", "linear"]:
            self._params["output_w"] = self.add_weight(
                name="output_w",
                init_value=paddle.ones((self.motor_size,)),
            )
        if self._output_mapping == "affine":
            self._params["output_b"] = self.add_weight(
                name="output_b",
                init_value=paddle.zeros((self.motor_size,)),
            )

    def _sigmoid(self, v_pre, mu, sigma):
        v_pre = paddle.unsqueeze(v_pre, -1)  # For broadcasting
        mues = v_pre - mu
        x = sigma * mues
        return nn.functional.sigmoid(x)

    def _ode_solver(self, inputs, state, elapsed_time):
        v_pre = state

        # We can pre-compute the effects of the sensory neurons here
        sensory_w_activation = self._params["sensory_w"] * self._sigmoid(
            inputs, self._params["sensory_mu"], self._params["sensory_sigma"]
        )
        sensory_w_activation *= self._params["sensory_sparsity_mask"]

        sensory_rev_activation = sensory_w_activation * \
            self._params["sensory_erev"]

        # Reduce over dimension 1 (=source sensory neurons)
        w_numerator_sensory = paddle.sum(sensory_rev_activation, axis=1)
        w_denominator_sensory = paddle.sum(sensory_w_activation, axis=1)

        # cm/t is loop invariant
        cm_t = self._params["cm"] / (elapsed_time / self._ode_unfolds)

        # Unfold the multiply ODE multiple times into one RNN step
        for t in range(self._ode_unfolds):
            w_activation = self._params["w"] * self._sigmoid(
                v_pre, self._params["mu"], self._params["sigma"]
            )

            w_activation *= self._params["sparsity_mask"]

            rev_activation = w_activation * self._params["erev"]

            # Reduce over dimension 1 (=source neurons)
            w_numerator = paddle.sum(
                rev_activation, axis=1) + w_numerator_sensory
            w_denominator = paddle.sum(
                w_activation, axis=1) + w_denominator_sensory

            numerator = (
                cm_t * v_pre
                + self._params["gleak"] * self._params["vleak"]
                + w_numerator
            )
            denominator = cm_t + self._params["gleak"] + w_denominator

            # Avoid dividing by 0
            v_pre = numerator / (denominator + self._epsilon)

        return v_pre

    def _map_inputs(self, inputs):
        if self._input_mapping in ["affine", "linear"]:
            inputs = inputs * self._params["input_w"]
        if self._input_mapping == "affine":
            inputs = inputs + self._params["input_b"]
        return inputs

    def _map_outputs(self, state):
        output = state
        if self.motor_size < self.state_size:
            output = output[:, 0: self.motor_size]  # slice

        if self._output_mapping in ["affine", "linear"]:
            output = output * self._params["output_w"]
        if self._output_mapping == "affine":
            output = output + self._params["output_b"]
        return output

    def _clip(self, w):
        return nn.functional.relu(w)

    def apply_weight_constraints(self):
        self._params["w"].set_value(self._clip(self._params["w"].detach()))
        self._params["sensory_w"].set_value(self._clip(self._params["sensory_w"].detach()))
        self._params["cm"].set_value(self._clip(self._params["cm"].detach()))
        self._params["gleak"].set_value(self._clip(self._params["gleak"].detach()))

    def forward(self, inputs, states):
        # Regularly sampled mode (elapsed time = 1 second)
        elapsed_time = 1.0
        inputs = self._map_inputs(inputs)

        next_state = self._ode_solver(inputs, states, elapsed_time)

        outputs = self._map_outputs(next_state)

        return outputs, next_state

简单使用

同步项目代码

!git clone https://github.com/jm12138/keras-ncp

切换到项目目录

%cd ~/keras-ncp
%matplotlib inline
/home/aistudio/keras-ncp

导入基础模块

import paddle
import numpy as np
import paddle.nn as nn
import kerasncp as kncp
import matplotlib.pyplot as plt
from paddle.optimizer import Adam
from kerasncp.paddle import LTCCell
from paddle.io import DataLoader, TensorDataset

搭建一个简单的 RNN 模型网络

class RNNSequence(nn.Layer):
    def __init__(
        self,
        rnn_cell,
    ):
        super(RNNSequence, self).__init__()
        self.rnn_cell = rnn_cell

    def forward(self, x):
        batch_size, seq_len = x.shape[:2]
        hidden_state = paddle.zeros((batch_size, self.rnn_cell.state_size))
        outputs = []
        for t in range(seq_len):
            inputs = x[:, t]
            new_output, hidden_state = self.rnn_cell.forward(
                inputs, hidden_state)
            outputs.append(new_output)
        outputs = paddle.stack(outputs, axis=1)  # return entire sequence
        return outputs

搭建一个简单的模型学习器

class SequenceLearner(paddle.Model):
    def train_batch(self, inputs, labels=None, update=True):
        x, y = inputs[0], labels[0]
        y_hat = self.network.forward(x)
        y_hat = y_hat.reshape(y.shape)
        loss = self._loss(y_hat, y)
        loss.backward()
        if update:
            self._optimizer.step()
            self._optimizer.clear_grad()
            self.network.rnn_cell.apply_weight_constraints()
        return [loss.numpy()]

    def eval_batch(self, inputs, labels=None):
        x, y = inputs[0], labels[0]
        y_hat = self.network.forward(x)
        y_hat = y_hat.reshape(y.shape)
        loss = self._loss(y_hat, y)
        return [loss.numpy()]

    def predict_batch(self, inputs):
        x = inputs[0]
        y_hat = self.network.forward(x)
        return [x.numpy(), y_hat.numpy()]

构建数据

# 数据参数
in_features = 2
out_features = 1
N = 128

# 生成数据
data_x = np.stack(
    [np.sin(np.linspace(0, 3 * np.pi, N)), 
    np.cos(np.linspace(0, 3 * np.pi, N))], 
    axis=1
)
data_x = np.expand_dims(data_x, axis=0).astype(np.float32)
data_y = np.sin(np.linspace(0, 6 * np.pi, N)).reshape([1, N, 1]).astype(np.float32)

# 预览数据
print("data_x.shape: ", str(data_x.shape))
print("data_y.shape: ", str(data_y.shape))
for i in range(in_features):
    plt.plot(range(N), data_x[0, :, i], color='black')
for i in range(out_features):
    plt.plot(range(N), data_y[0, :, i], color='red')

# 转换为 Tensor
data_x = paddle.to_tensor(data_x)
data_y = paddle.to_tensor(data_y)

# 组建数据集和读取器
train_dataloader = DataLoader(TensorDataset(
    [data_x, data_y]), batch_size=1, shuffle=True, num_workers=0)
val_dataloader = DataLoader(TensorDataset(
    [data_x, data_y]), batch_size=1, shuffle=False, num_workers=0)
data_x.shape:  (1, 128, 2)
data_y.shape:  (1, 128, 1)


/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/matplotlib/cbook/__init__.py:2349: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  if isinstance(obj, collections.Iterator):
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/matplotlib/cbook/__init__.py:2366: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  return list(data) if isinstance(data, collections.MappingView) else data

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Hxd2rJ1b-1639749211251)(output_16_2.png)]

使用 NCP 构建 RNN 序列模型

wiring = kncp.wirings.FullyConnected(8, out_features)
ltc_cell = LTCCell(wiring, in_features)
ltc_sequence = RNNSequence(ltc_cell)

配置模型训练器

loss = nn.MSELoss()
opt = Adam(learning_rate=0.01, parameters=ltc_sequence.parameters())

learn = SequenceLearner(ltc_sequence)
learn.prepare(opt, loss)

模型训练和验证

learn.fit(train_dataloader, epochs=400, verbose=2)
learn.evaluate(val_dataloader)

模型测试

x, y = learn.predict(val_dataloader)
x = x[0]
y = y[0]


for i in range(in_features):
    plt.plot(range(N), data_x[0, :, i], color='black')
for i in range(out_features):
    plt.plot(range(N), data_y[0, :, i], color='red')
for i in range(out_features):
    plt.plot(range(N), y[0, :, i], color='green')
Predict begin...
step 1/1 [==============================] - 834ms/step
Predict samples: 1

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kEtB5H1q-1639749211252)(output_24_1.png)]

总结

标签:NCP,self,回路,驾驶,sensory,init,params,._,size
来源: https://blog.csdn.net/m0_63642362/article/details/122005760