Record study record life
大创第十次汇报
大创第十次汇报

大创第十次汇报

File name : 第十次汇报.pdf

本周主要进行复现论文《计算机线缆的电磁信息泄漏智能分析方法》

中图分类号 TP309.1 文献标志码 A 文章编号 1005-0388(2022)04-0710-09
DOI  10.12265/j.cjors.202119

论 文 摘 要 

计算机系统中的各型线缆会通过电磁传导发射泄漏内部信息,导致信息安全问题. 为了分析来自计算机线缆的电磁信息泄漏,提出了基于深度学习的智能分析方法. 设计一维卷积神经网络算法,对电磁泄漏信号进行深层的特征提取与学习,从泄漏的电磁信号中智能识别泄漏源的线缆类型,进而分析其中泄漏的视频信息. 实测结果表明,该文提出的方法,在未知目标信号特征的情况下,能够有效识别电磁信息的泄漏源与泄漏信息,为计算机线缆提供了一种电磁信息泄漏的智能分析手段.

实验流程:

计算机线缆的电磁信息泄漏分析实验分为电磁信息泄漏源检测和电磁泄漏信息检测两个步骤.首先判别电磁信号中是否包含信息,再进而识别其中的信息类别每步实验的过程都包括数据采集、数据预处理、模型训练和模型验证

实验一:

针对VGA、HDMI、网络传输线缆和计算机电源线中无意传导发射的电磁信号,学习并提取其中的电磁特征,实现电磁信息泄漏的智能识别

实验二:

通过两类分别有无泄漏信息的样本进行二分类来训练出那些可能是泄漏信息的参数,然后用类激活热力图来实现红黑分离进行可视化的验证

观测域:

时域,频域(快速傅里叶变换)、小波域(小波变换)、倒谱域(倒谱分析)

实验一中:4 种不同线缆的电磁泄漏样本在采样率均为 1 MS/s. 所采集的电磁泄漏信号为时域序列,样本长度为16384

实验二中原始样本为时域电磁信号数据,猫和飞机数据集和黑白中文文字及黑白英文文字为包含信息的一类,黑白屏数据为不含信息的一类,通过MGCNN,D-CNN,D-CNN-BN分别对其进行训练

论文复现的实现代码如下:

import os
import cv2
import sys
import change
import shutil
import datetime
import argparse
import pickle
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt                              #可视化的模块
from sklearn import metrics
from tensorflow import keras
from tensorflow.python.keras import utils
from skimage.transform import resize
from tensorflow.python.keras import Sequential               #按顺序建立的model,一个层建立之后再建立一个层
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.optimizers import Adam
from tensorflow.python.keras.layers import Conv1D
from tensorflow.python.keras.layers import MaxPooling1D
from tensorflow.python.keras.layers import GlobalMaxPool1D
from tensorflow.python.keras.layers import Flatten
from tensorflow.python.keras.layers import Dense                        #全连接层
from tensorflow.python.keras.layers import Dropout
from tensorflow.python.keras.layers import BatchNormalization
from tensorflow.python.keras.layers import Activation
from utils import get_logger, mkdir_recursively
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')

t=np.load(r'./data/savePath/cats-airplanes_fft/0/airplane_1_显示器+台式机_640_480_60HZ_1M.lvm_0.npy')#(244,8192)

# 日志对象
logger = get_logger('dcnn_airplane_cat')

class DCNN:
    #数据初始化(n_classes 类别数)
    def __init__(self, data_path, dataset_x=None, dataset_labels=None,
                 epochs=10, batch_size=32, shape=(16384, 1),
                 n_classes=None, project_name='d-cnn', model_save_file_name=None, class_names=None):
        self.data_path = data_path #数据集所在文件夹位置
        self.checkpoint_dir = './training/{}-model'.format(project_name)#训练模型保存的位置
        self.csv_logger_dir = './logs/csv'#训练结果日志保存位置
        self.csv_false_logger_dir='./logs/false_csv'#保存每折预测错误的信息
        self.class_dirs = os.listdir(self.data_path)#返回列表包含地址内的文件夹或文件名列表----》对应类别名,01
        self.n_classes = n_classes if n_classes is not None else 0
        self.epochs = epochs    #迭代次数
        self.batch_size = batch_size#每批样本大小
        self.shape = shape
        self.feature_length = self.shape[0]#每个样本长度
        self.channel = self.shape[1]
        self.model = None
        self.project_name = project_name#该项目名称
        # 如果没有模型保存名,则将模型保存名设置为project_name,保存再checkpoint路径下,否则模型名为model_save_file_name
        if model_save_file_name is None:
            self.model_save_file_name = os.path.join(self.checkpoint_dir, '{}.h5'.format(project_name))
        else:
            self.model_save_file_name = model_save_file_name
        #创建模型保存地址文件夹和日志文件夹
        mkdir_recursively(self.checkpoint_dir)
        # mkdir_recursively(self.grad_cam_data_path)
        mkdir_recursively(self.csv_logger_dir)
        mkdir_recursively(self.csv_false_logger_dir)

        # 创建模型参数保存地址和训练结果日志文件保存地址
        self.checkpoint_filepath_format = os.path.join(self.checkpoint_dir, 'cp-{epoch:04d}.ckpt')
        self.csvlogger_filepath = os.path.join(self.csv_logger_dir, '{}.csv'.format(project_name))
        # 数据集xlabel  |  训练集xylabel  |  测试集xylabel
        self.dataset_x = dataset_x
        self.dataset_labels = dataset_labels
        self.train_x = None
        self.train_labels = None
        self.train_y = None
        self.test_x = None
        self.test_labels = None
        self.test_y = None

        self.class_names = class_names

        # 判断0 1目录是否有效,有效则n_classes+1(即类别数加1)
        for class_dir in self.class_dirs:
            sub_dir_path = os.path.join(self.data_path, class_dir)
            if os.path.isdir(sub_dir_path) and class_dir.isdigit():
                self.n_classes += 1
        pass

    # -----------------------------------------------------------------------------------------------------
    # 数据处理

    # 初始化数据
    def init_data(self):
        self.train_x = np.empty(shape=[0, self.feature_length, self.channel])
        self.train_labels = np.empty(shape=[0], dtype=int)
        self.test_x = np.empty(shape=[0, self.feature_length, self.channel])
        self.test_labels = np.empty(shape=[0], dtype=int)
        pass

    #修改数据格式
    def reshape(self, file_path, class_label):
        sample_x = np.load(file_path)   #(244, 8192)
        sample_x = np.reshape(sample_x, (
            len(sample_x), self.feature_length, self.channel)) #升维(244, 8192, 1)
        sample_y = np.full(len(sample_x), class_label)#一个和sample_x个数(列数)相同,全为标签值的np一维数组
        return sample_x, sample_y

    #加载数据 ->分割数据
    # split_type=0:通过文件分割数据,将(0,1)每个文件夹内文件切割为k,k-1份做训练集,len(files)/k 份测试
    # split_type=1:
    def load_data(self, k, n, split_type=0):
        logger.info('开始从 ({}) 路径加载数据 '.format(self.data_path))
        if split_type == 0:
            self.split_data_by_file_num(k, n)
        else:
            self.split_data_by_sample_num(k, n)
        logger.info('完成从 ({}) 路径加载数据'.format(self.data_path))

    # 通过文件分割数据,将(0,1)每个文件夹内文件切割为k,k-1份做训练集,len(files)/k 份测试
    def split_data_by_file_num(self, k, n):
        self.init_data()
        for class_dir in self.class_dirs:
            sub_dir_path = os.path.join(self.data_path, class_dir)
            if os.path.isdir(sub_dir_path) and class_dir.isdigit():
                class_index = int(class_dir)
                files = sorted(os.listdir(sub_dir_path))#取所有文件
                length = len(files)#文件数量
                each = length // k#向下取整,每一折的数量
                if n - 1 >= 0:#k折交叉验证训练集的前半部分[:n*each]
                    for file_name in files[:n * each]:
                        sample_x, sample_y = self.reshape(os.path.join(sub_dir_path, file_name), class_index)
                        self.train_x = np.concatenate((self.train_x, sample_x), axis=0)
                        self.train_labels = np.concatenate((self.train_labels, sample_y), axis=0)
                if n + 1 < k:#0,1,2,3...k-1,取训练集后半部分为[(n+1)each:-1]
                    for file_name in files[(n + 1) * each:]:
                        sample_x, sample_y = self.reshape(os.path.join(sub_dir_path, file_name), class_index)
                        self.train_x = np.concatenate((self.train_x, sample_x), axis=0)#1维拼接
                        self.train_labels = np.concatenate((self.train_labels, sample_y), axis=0)
                for file_name in files[n * each: (n + 1) * each]:#k折交叉验证的k部分做测试集
                    sample_x, sample_y = self.reshape(os.path.join(sub_dir_path, file_name), class_index)
                    self.test_x = np.concatenate((self.test_x, sample_x), axis=0)
                    self.test_labels = np.concatenate((self.test_labels, sample_y), axis=0)
        # permutation为静态方法,作用打乱
        self.train_x, self.train_labels = self.permutation(self.train_x, self.train_labels)
        self.test_x, self.test_labels = self.permutation(self.test_x, self.test_labels)
        self.train_y = utils.to_categorical(self.train_labels, self.n_classes)#将标签转化为n_classes(2)onehot编码
        self.test_y = utils.to_categorical(self.test_labels, self.n_classes)
        pass

    def split_data_by_sample_num(self, k, n):
        self.init_data()
        if self.dataset_x is None and self.dataset_labels is None:
            self.load_data_by_path(self.data_path)
        data = self.dataset_x#内包含所有文件的数据
        labels = self.dataset_labels#内包含所有文件的label
        length = len(data)#数据集的大小
        each = length // k
        # 取训练集
        if n - 1 >= 0:
            self.train_x = np.concatenate((self.train_x, data[:n * each]), axis=0)
            self.train_labels = np.concatenate((self.train_labels, labels[:n * each]), axis=0)
        if n + 1 < k:
            self.train_x = np.concatenate((self.train_x, data[(n + 1) * each:]), axis=0)
            self.train_labels = np.concatenate((self.train_labels, labels[(n + 1) * each:]), axis=0)
        # 取测试集
        self.test_x = data[n * each: (n + 1) * each]
        self.test_labels = labels[n * each: (n + 1) * each]
        self.train_y = utils.to_categorical(self.train_labels, self.n_classes) #将标签转化为n_classes(2)onehot编码
        self.test_y = utils.to_categorical(self.test_labels, self.n_classes)
        pass

    # 加载文件夹(0,1)所有的文件到dataset_x,dataset_labels,并打乱顺序
    def load_data_by_path(self, data_path, data_len=None):
        # 数据初始化
        self.dataset_x = np.empty(shape=[0, self.feature_length, self.channel])
        self.dataset_labels = np.empty(shape=[0], dtype=int)
        for class_dir in self.class_dirs:#0,1文件夹中
            sub_dir_path = os.path.join(data_path, class_dir)
            if os.path.isdir(sub_dir_path) and class_dir.isdigit():
                class_index = int(class_dir)
                for file_name in os.listdir(sub_dir_path):
                    sample_x, sample_y = self.reshape(os.path.join(sub_dir_path, file_name), class_index)
                    self.dataset_x = np.concatenate((self.dataset_x, sample_x), axis=0)
                    self.dataset_labels = np.concatenate((self.dataset_labels, sample_y), axis=0)
                    if data_len is not None and len(self.dataset_x) >= data_len:
                        self.dataset_x, self.dataset_labels = self.permutation(self.dataset_x, self.dataset_labels)
                        return self.dataset_x, self.dataset_labels
        self.dataset_x, self.dataset_labels = self.permutation(self.dataset_x, self.dataset_labels)
        return self.dataset_x, self.dataset_labels
        pass

    # ------------------------------------------------------------------------------------------------------
    # 创建模型
    def create_model_dcnn(self):
        # self.epochs=200
        self.epochs=10
        self.model = Sequential()
        self.model.add(
            Conv1D(filters=32, kernel_size=4, strides=2, input_shape=(self.feature_length, 1), padding='same',
                   activation='relu'))
        self.model.add(Dropout(0.5))
        self.model.add(MaxPooling1D(pool_size=2))

        self.model.add(
            Conv1D(filters=64, kernel_size=12, strides=2, padding='same',
                   activation='relu', name='last_conv_layer1'))
        self.model.add(Dropout(0.5))
        self.model.add(MaxPooling1D(pool_size=2, name='last_conv_layer'))
        self.model.add(Flatten())
        self.model.add(Dense(self.n_classes, activation='softmax'))
        opt = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
        self.model.compile(loss='categorical_crossentropy', optimizer=opt,
                           metrics=['accuracy'])
        # logger.info(self.model.summary())
        # utils.plot_model(self.model, to_file='./model.png', show_shapes=True, show_layer_names=True)
        pass

    def create_model_dcnn_bn(self):
        self.epochs=10
        self.batch_size=100
        self.model = Sequential()
        self.model.add(
            Conv1D(filters=32, kernel_size=4, strides=2, input_shape=(self.feature_length, 1), padding='same'))
        self.model.add(BatchNormalization())
        self.model.add(Activation('relu'))
        self.model.add(Dropout(0.5))
        self.model.add(MaxPooling1D(pool_size=2))
        self.model.add(
            Conv1D(filters=64, kernel_size=12, strides=2, padding='same'))
        self.model.add(BatchNormalization())
        self.model.add(Activation('relu'))
        self.model.add(Dropout(0.5))
        self.model.add(MaxPooling1D(pool_size=2, name='last_conv_layer'))
        self.model.add(Flatten())
        self.model.add(Dense(self.n_classes, activation='softmax'))
        opt = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
        self.model.compile(loss='categorical_crossentropy', optimizer=opt,
                           metrics=['accuracy'])

        pass

    def create_model_mgcnn(self):
        # 迭代次数
        self.epochs = 10
        # 一次取样的个数
        self.batch_size = 100
        # 顺序模型
        self.model = Sequential()
        self.model.add(
            # 一维卷积层,#filter = 卷积核个数,kernel_size = 卷积核尺寸,strides = 卷积步长,
            # padding = ”valid“ or "same",且取值为‘VALID’padding=0,并不会对输入(input)做填充;取值为‘SAME’padding>0,将会对输入(input)做填充,填充值都是0            # 激活函数activation
            Conv1D(filters=16, kernel_size=160, strides=4, input_shape=(self.feature_length, 1), padding='same',
                   activation='relu'))
        # 防止过拟合,dropout,每次随机隐藏百分之五十的神经元,相当于每次训练得到不同的模型
        self.model.add(Dropout(0.5))
        # 一维输入的最大池化层,pool_size:单个整数的整数或元组/列表,表示池化窗口的大小
        self.model.add(MaxPooling1D(pool_size=2))

        self.model.add(
            Conv1D(filters=16, kernel_size=12, strides=2, padding='same',
                   activation='relu', name='last_conv_layer1'))  # name表示图层的名称
        self.model.add(Dropout(0.5))
        self.model.add(MaxPooling1D(pool_size=2, name='last_conv_layer'))
        # Flatten()表示降维的作用
        self.model.add(Flatten())
        # 全连接层,softmax是用于多分类过程
        self.model.add(Dense(self.n_classes, activation='softmax'))
        # adma代表梯度向量算法,lr: 学习率,beta_1: 01之间,一般接近于1beta_2: 01之间,一般接近于1,和beta_1一样,使用默认的就好,epsilon: 模糊因子,如果为空,默认为k.epsilon()decay: 每次参数更新后学习率的衰减值(每次更新时学习率下降)
        # amsgrad: 布尔型,是否使用AMSGrad变体
        opt = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
        # optimizer代表tensorflow中的优化方法,loss代表要优化的损失函数,metrics表示一些检测指标,此为准确率
        self.model.compile(loss='categorical_crossentropy', optimizer=opt,
                           metrics=['accuracy'])
        # logger.info(self.model.summary())
        # utils.plot_model(self.model, to_file='./model.png', show_shapes=True, show_layer_names=True)
        pass

    # ------------------------------------------------------------------------------------------------------
    # 模型的训练
    def fit(self):
        # log_dir = os.path.join(
        #     "logs",
        #     "fit",
        #     self.project_name,
        # )
        #
        # tensorborad = keras.callbacks.TensorBoard(
        #     log_dir=log_dir,
        #     histogram_freq=1,
        #     write_graph=True)

        # 在每个训练期之后保存模型。
        ckpt = keras.callbacks.ModelCheckpoint(
                filepath=self.checkpoint_filepath_format,
                # Path where to save the model
                # The two parameters below mean that we will overwrite
                # the current checkpoint if and only if
                # the `val_loss` score has improved.
                monitor='val_loss',
                verbose=1,
                period=5)
        #当被监测的数量不再提升,则停止训练。
        earlystop = keras.callbacks.EarlyStopping(
            monitor='val_loss',
            min_delta=1e-8, patience=5,
            verbose=True)

        # 把训练轮结果数据流到 csv 文件的回调函数。
        csv_logger = keras.callbacks.CSVLogger(self.csvlogger_filepath, separator=',', append=False)

        callbacks = [ckpt, csv_logger,earlystop]

        index = -(len(self.train_x) // 10)#取训练集中10%作为训练中的校验集
        history = self.model.fit(self.train_x[:index], self.train_y[:index],
                                 batch_size=self.batch_size,
                                 epochs=self.epochs,
                                 # We pass some validation for
                                 # monitoring validation loss and metrics
                                 # at the end of each epoch
                                 callbacks=callbacks,
                                 validation_data=(self.train_x[index:], self.train_y[index:]))
        self.model.save(self.model_save_file_name)
        return self.model, history

    # 绘制每个模型训练和验证的准确率和损失率图片
    def paint_acc_loss(self,history):
        acc = history.history['acc']
        val_acc = history.history['val_acc']
        loss = history.history['loss']
        val_loss = history.history['val_loss']
        plt.subplot(1, 2, 1)
        plt.plot(acc, label='Training Accuracy')
        plt.plot(val_acc, label='Validation Accuracy')
        plt.title('Training and Validation Accuracy')
        plt.legend()

        plt.subplot(1, 2, 2)
        plt.plot(loss, label='Training Loss')
        plt.plot(val_loss, label='Validation Loss')
        plt.title('Training and Validation Loss')
        plt.legend()
        plt.show()
    pass
    # -----------------------------------------------------------------------------------------------
    # 模型的评估

    # 保存预测错误的信息
    def save_predict(self, n=0, prediction_y=None, predict_labels=None):
        files_name = "./logs/false_csv/{}_{}_False.csv".format(n, self.project_name)
        name = ['predict_airplane', 'predict_cat', 'predict_label', 'real_label']
        csv_data = []
        for i in range(len(predict_labels)):
            if predict_labels[i] != self.test_labels[i]:
                temp = (prediction_y[i][0], prediction_y[i][1], predict_labels[i], self.test_labels[i])
                csv_data.append(temp)
        df = pd.DataFrame(csv_data, columns=name)
        df.to_csv(files_name, encoding='utf-8', index=0)
        pass

    # 输出指标
    def calculate(self, predict_labels, test_labels):
        logger.info("confusion_matrix")
        logger.info(str(metrics.confusion_matrix(test_labels, predict_labels)))
        accuracy_score = metrics.accuracy_score(test_labels, predict_labels)
        logger.info("accuracy_score:%s", str(accuracy_score))
        if self.n_classes == 2:
            precision_score = metrics.precision_score(test_labels, predict_labels)
            logger.info("precision_score:%s", str(precision_score))
            recall_score = metrics.recall_score(test_labels, predict_labels)
            logger.info("recall_score:%s", str(recall_score))
            f1_score = metrics.f1_score(test_labels, predict_labels)
            logger.info("f1_score:%s", str(f1_score))
            return accuracy_score, precision_score, recall_score, f1_score
        else:
            precision_score = metrics.precision_score(test_labels, predict_labels, average='micro')
            logger.info("precision_score:%s", str(precision_score))
            recall_score = metrics.recall_score(test_labels, predict_labels, average='micro')
            logger.info("recall_score:%s", str(recall_score))
            f1_score = metrics.f1_score(test_labels, predict_labels, average='micro')
            logger.info("f1_score:%s", str(f1_score))
            return accuracy_score, precision_score, recall_score, f1_score


    def evaluate(self, project_name, k=5, n=0):
        self.load_data(k,n)
        # self.create_model_mgcnn()
        self.create_model_dcnn()
        # self.create_model_dcnn_bn()
        self.model,history=self.fit()
        #分别绘制训练集和其中校验集的准确率、损失率随迭代次数的变化
        self.paint_acc_loss(history)
        prediction_y = self.model.predict(self.test_x)
        predict_labels = [np.argmax(one_hot) for one_hot in prediction_y]
        self.save_predict(n, prediction_y, predict_labels)
        logger.info("{}{}模型评价指标".format(n + 1, project_name))
        return self.calculate(predict_labels, self.test_labels) #accuracy_score, precision_score, recall_score, f1_score


    def k_cross_validation(self, k):
        evaluation = []
        for i in range(k):
            evaluation.append(self.evaluate('{}-{}/{}折评估指标'.format(self.project_name,str(i + 1), str(k)), k, i, ))
        evaluation_mean = np.mean(evaluation, 0)
        return evaluation, evaluation_mean
    # ------------------------------------------------------------------------------------------------------
    @staticmethod
    def permutation(dataset, labels):
        permutation = np.random.permutation(labels.shape[0])#对标签的索引随机,打乱数据
        shuffled_dataset = dataset[permutation, :, :]
        shuffled_labels = labels[permutation]
        return shuffled_dataset, shuffled_labels

    # -------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
    project_name = 'four_dcnn_fft'
    cnn = DCNN('D:\data\计算机不同线缆-FFT', project_name=project_name, shape=(16384, 1))
    # 开始进行k折交叉验证
    evaluation, e_mean = cnn.k_cross_validation(k=5)
    logger.info(evaluation)
    logger.info(e_mean)
赞赏

微信赞赏 支付宝赞赏

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注