26.消息队列-FreeRTOS

一.队列介绍

1.简述

[!TIP]

我陈述时使用了多线程的概念,在实时操作系统中貌似是没有的,但类似,就像线程同步可以类比临界区一样

简述: 消息队列是FreeRTOS中任务到任务、任务到中断、中断到任务数据交流的一种机制(消息传递),其入队出队均会在临界区中进行,保持多任务(线程)间的线程同步,避免多任务同时操作同一数据造成数据的损坏

  • 注意: FreeRTOS基于队列,实现了多种功能,其中包括队列集、互斥信号量、计数信号量、二值信号量、递归互斥信号量,因此很有必要深入了解FreeRTOS的队列

消息队列与全局变量:

  • 消息队列: 做好了数据保护 ,防止多任务或中断同时访问产生冲突。我们只需直接调用API函数即可,简单易用
  • 全局变量: 数据无保护 ,导致数据不安全,当多个任务同时对该变量操作时,数据易受损

特点:

  • 数据入队: 支持两种数据存储缓冲机制,通常是第一种
    • 队列通常采用"先进先出(FIFO)"的数据存储缓冲机制
    • 也可以配置为"后进先出(LIFO)"的数据存储缓冲机制
  • 数据传递方式: 支持两种数据传递方式
    • 通常采用实际值传递,即将数据拷贝到队列中进行传递(可避免数据被外部修改)
    • 传递大数据时,可以采用指针传递
  • 多任务访问: 队列不属于某个任务,任何任务和中断都可以向队列写入/读取消息
  • 出队、入队阻塞: 当任务向一个队列发送/读取消息时,可以指定一个阻塞时间,假设此时当队列已满无法入队,就会阻塞对应的时间

示意图:

2.阻塞

(1)时间设置

简述: 如前面所言,向消息队列发送或读取消息时,可以指定一个阻塞时间,当队列已满无法入队,就会阻塞对应的时间

可设置值:

  • 若阻塞时间为0 :直接返回不会等待。
  • 若阻塞时间为0~portMAX_DELAY :等待设定阻塞时间,若在该时间内无法入队/出队,超时后直接返回不再等待。
  • 若阻塞时间为portMAX_DELAY :死等,一直等到可以入队/出队为止。

(2)入队/出队阻塞过程

[!NOTE]

多个任务阻塞时,当队列可读或可写时:

  • 高优先级的任务会优先进入就绪态
  • 优先级相同,那等待时间最久的任务进入就绪态

入队阻塞:

  • 将该任务的状态列表项挂载在pxDelayedTaskList(阻塞列表)
  • 将该任务的事件列表项挂载在xTasksWaitingToSend(等待发送列表)

出队阻塞:

  • 将该任务的状态列表挂载在pxDelayedTaskList;
  • 将该任务的事件列表项挂载在xTasksWaitingToReceive(等待接收列表);

3.队列创建、写入和读出流程

(1)创建队列

创建队列: 队列创建初期,队列项均为空

(2)队列写入

队列写入: 数据入队会从右到左逐步存储

(3)队列读出

队列读出: 数据出队会从右到左逐步读取,读出一个左侧的数据会集体右移,始终只读最右边那个(针对删除消息的读取)

二.队列结构体

1.结构体

[!TIP]

队列锁: 其锁定时,依旧是可以读写队列的,但没办法操作等待发送/接收列表

结构体: 大致成员如下,其中联合体的内容根据实现的功能会有所差异,具体见下

typedef struct QueueDefinition 
{
    int8_t * pcHead;                       /* 存储区域(队列项)的起始地址 */
    int8_t * pcWriteTo;                    /* 下一个写入的位置 */
    union
    {
        QueuePointers_t     xQueue;
        SemaphoreData_t  xSemaphore; 
    } u ;
    List_t xTasksWaitingToSend;             /* 等待发送列表 */
    List_t xTasksWaitingToReceive;          /* 等待接收列表 */
    volatile UBaseType_t uxMessagesWaiting; /* 非空闲队列项目的数量,就是队列中非空的队列项数量 */
    UBaseType_t uxLength;                  /* 队列长度 */
    UBaseType_t uxItemSize;                 /* 队列项目的大小 */
    volatile int8_t cRxLock;                /* 读取上锁计数器 */
    volatile int8_t cTxLock;                /* 写入上锁计数器 */
   /* 其他的一些条件编译 */
} xQUEUE;

示意图: 队列内存分为两部分: 队列结构体 + 队列项存储空间

2.联合体

当用于队列使用时:

typedef struct QueuePointers
{
    int8_t * pcTail;                 /* 存储区的结束地址 */
    int8_t * pcReadFrom;             /* 最后一个读取队列的地址 */
} QueuePointers_t;

当用于互斥信号量和递归互斥信号量时:

typedef struct SemaphoreData
{
    TaskHandle_t xMutexHolder;		    /* 互斥信号量持有者 */
    UBaseType_t uxRecursiveCallCount;	/* 递归互斥信号量的获取计数器 */
} SemaphoreData_t;

三.队列相关API函数

头文件: #include "queue.h"

1.创建队列函数

(1)动态创建

动态创建: configSUPPORT_DYNAMIC_ALLOCATION必须在 FreeRTOSConfig.h 中设置为 1

  • 队列所需的内存空间由 FreeRTOS 从 FreeRTOS 管理的堆中分配
 QueueHandle_t xQueueCreate( UBaseType_t uxQueueLength,
                             UBaseType_t uxItemSize );

参数:

  • uxQueueLength

    队列一次可存储的最大项目数

  • uxItemSize

    存储队列中每个项目所需的大小(以字节为单位)

返回值:

  • 创建成功: 返回所创建队列的句柄
  • 创建失败: 返回NULL

(2)静态创建(无演示)

静态创建: configSUPPORT_STATIC_ALLOCATION必须在 FreeRTOSConfig.h 中设置为 1

  • 需要用户自行分配内存
 QueueHandle_t xQueueCreateStatic(
                             UBaseType_t uxQueueLength,
                             UBaseType_t uxItemSize,
                             uint8_t *pucQueueStorageBuffer,
                             StaticQueue_t *pxQueueBuffer );

参数:

  • uxQueueLength

    队列一次可存储的最大项目数

  • uxItemSize

    存储队列中每个项目所需的大小(以字节为单位)

    项目通过复制而非引用的方式入队,因此该参数值是每个入队项目将复制的 字节数。队列中的每个项目必须具有相同的大小

  • pucQueueStorageBuffer

    如果 uxItemSize 不为零,则 pucQueueStorageBuffer 必须指向一个 uint8_t 数组,该数组的大小 至少要能容纳队列中最多可能存在的项目的总字节数, 即 ( uxQueueLength * uxItemSize ) 字节。如果 uxItemSize 为零,则 pucQueueStorageBuffer 可以为 NULL

  • pxQueueBuffer

    必须指向 StaticQueue_t 类型的变量,该变量将用于保存队列的数据结构体

返回值:

  • 创建成功: 返回所创建队列的句柄
  • 创建失败: 返回NULL

(3)队列的类型

前言: 这里唠的是底层,如果是使用上面两个宏函数的话,默认已经设置成queueQUEUE_TYPE_BASE了,无需手动传入

简述: FreeRTOS 基于队列实现了多种功能,每一种功能对应一种队列类型,队列类型的 queue.h 文件中有定义,使用xQueueGenericCreate(上面两个宏函数所调用的函数)创建的时候,传入不同队列类型,就可以创建不同功能的队列了

具体的功能:

#define queueQUEUE_TYPE_BASE                    ( ( uint8_t ) 0U )  /* 队列 */
#define queueQUEUE_TYPE_SET                     ( ( uint8_t ) 0U )  /* 队列集 */
#define queueQUEUE_TYPE_MUTEX                   ( ( uint8_t ) 1U )  /* 互斥信号量 */
#define queueQUEUE_TYPE_COUNTING_SEMAPHORE      ( ( uint8_t ) 2U )  /* 计数型信号量 */
#define queueQUEUE_TYPE_BINARY_SEMAPHORE        ( ( uint8_t ) 3U )  /* 二值信号量 */
#define queueQUEUE_TYPE_RECURSIVE_MUTEX         ( ( uint8_t ) 4U )  /* 递归互斥信号量 */

2.队列写入消息函数

(1)写入函数

函数: 分为了任务级和中断级函数,带ISR的为中断级函数

函数 描述
xQueueSend() 往队列的尾部写入消息
xQueueSendToBack() 同 xQueueSend()
xQueueSendToFront() 往队列的头部写入消息
xQueueOverwrite() 覆写队列消息(只用于队列长度为1的情况)
xQueueSendFromISR() 在中断中往队列的尾部写入消息
xQueueSendToBackFromISR() 同 xQueueSendFromISR()
xQueueSendToFrontFromISR() 在中断中往队列的头部写入消息
xQueueOverwriteFromISR() 在中断中覆写队列消息(只用于队列长度为1的情况)

函数原型: 四个宏函数的入口函数都为xQueueGenericSend,仅仅只是后面的队列写入位置宏不一样罢了,覆写函数不需要,因为它只能操作只有 一位 的队列,对其进行覆写

#define  xQueueSend( xQueue, pvItemToQueue, xTicksToWait  )
         xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_BACK )
#define  xQueueSendToBack( xQueue, pvItemToQueue, xTicksToWait  )
         xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_BACK )
#define  xQueueSendToFront( xQueue, pvItemToQueue, xTicksToWait  )
         xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_FRONT )
#define  xQueueOverwrite(  xQueue,   pvItemToQueue  )
         xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), 0, queueOVERWRITE )

参数:

  • xQueue:队列句柄
  • pvItemToQueue: 要写入队列的值,无论是写入大数据亦或是小数据,均需要取地址
  • xTicksToWait:阻塞时间,具体见上面时间设置

(2)入口函数(底层)

入口函数原型: 上面的写入函数归根结底,调用的都是它,唯一区别就是参数不同

BaseType_t      xQueueGenericSend(  QueueHandle_t       xQueue,
                                    const void * const  pvItemToQueue,
                                    TickType_t          xTicksToWait,
                                    const BaseType_t    xCopyPosition   );

参数:

  • xQueue:待写入的队列

  • pvItemToQueue:待写入消息

  • xTicksToWait: 阻塞超时时间

  • xCopyPosition:写入位置

    #define queueSEND_TO_BACK                ( ( BaseType_t ) 0 )       /* 写入队列尾部 */
    #define queueSEND_TO_FRONT              ( ( BaseType_t ) 1 )        /* 写入队列头部 */
    #define queueOVERWRITE                  ( ( BaseType_t ) 2 )        /* 覆写队列*/
    

返回值: 上面两个写入函数也可以利用这个返回值判断写入情况

  • pdTRUE: 队列写入成功
  • errQUEUE_FULL:队列写入失败

3.队列读出消息函数

(1)函数总概

函数 描述
xQueueReceive() 从队列头部读取消息,并删除消息
xQueuePeek() 从队列头部读取消息
xQueueReceiveFromISR() 在中断中从队列头部读取消息,并删除消息
xQueuePeekFromISR() 在中断中从队列头部读取消息

(2)xQueueReceive()

功能: 此函数用于在任务中,从队列中读取消息,并且消息读取成功后,会将消息从队列中移除。

函数原型:

BaseType_t    xQueueReceive( QueueHandle_t  xQueue, 
                             void *   const pvBuffer,  
                             TickType_t     xTicksToWait )

参数:

  • xQueue:待写入的队列
  • pvBuffer:信息读取缓冲区
  • xTicksToWait:阻塞超时时间

返回值:

  • pdTRUE: 读取成功
  • pdFALSE: 读取失败

(3)xQueuePeek()

功能: 此函数用于在任务中,从队列中读取消息, 但与函数 xQueueReceive()不同,此函数在成功读取消息后, 并不会移除已读取的消息!

函数原型:

BaseType_t   xQueuePeek( QueueHandle_t   xQueue,
                         void * const   pvBuffer,
                         TickType_t   xTicksToWait )

参数:

  • xQueue:待写入的队列
  • pvBuffer:信息读取缓冲区
  • xTicksToWait:阻塞超时时间

返回值:

  • pdTRUE: 读取成功
  • pdFALSE: 读取失败

四.示例

1.前言

[!WARNING]

注意下指针传递的实现,传的是 二级指针 ,要入队的大数据是外部定义好的,入队任务中会定义一个指针,指向待入队数据,然后调用xQueueSend函数时pvItemToQueue参数也是对指针的取地址,同样的,出队时也会定义一个指针,取地址后放入出队函数中,如下面代码,我测试过了,只有这样才能正确出入队数据

  • 要存入的大数据: char buffer[] = "Big DataArr\0"
  • 入队任务定义指针char * buf = &buffer[0],入队函数调用xQueueSend( big_date_queue, &buf, portMAX_DELAY )
  • 出队任务定义指针char * buf,出队函数调用xQueueReceive(big_date_queue,&buf,portMAX_DELAY)

描述: 创建5个任务,start初始化任务及四个任务

  • task1: 实现小数据入队

    PF8按键按下,往队列写入1

  • task2: 实现小数据出队

    PF9按键按下,从队列中读出小数据,并打印

  • task3: 实现大数据入队

    PF6按键按下,往队列中写入一个字符数组(大数据)

  • task4: 实现大数据出队

    PF7按键按下,从队列中读出字符数组(大数据),并打印

结果:

2.实现

[!TIP]
完整例程: GitHub

实现:

#include "gd32f4xx.h"
#include "systick.h"
#include <stdio.h>
#include "main.h"
#include "Usart0.h"

//导入操作系统相关头文件
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"

//任务句柄
TaskHandle_t start_handler;
TaskHandle_t task1_handler;
TaskHandle_t task2_handler;
TaskHandle_t task3_handler;
TaskHandle_t task4_handler;

QueueHandle_t key_queue;        /* 小数据句柄 */
QueueHandle_t big_date_queue;   /* 大数据句柄 */

char buffer[] = "Big DataArr\0";

//数据接收中断回调函数
void Usart0_on_recv(uint8_t *dat, uint32_t len) {
    printf("recv: %s\r\n", dat);
}


/*
* 数据入队
* - PF8按键按下,录入1
*/
void task1( void * arg) {

    FlagStatus pre = SET;
    BaseType_t   err = 0;//记录状态值
    while(1) {
        FlagStatus current = gpio_input_bit_get(GPIOF,GPIO_PIN_8);
        uint8_t key_num = 1;
        if(current == RESET && pre == SET) {
            //按下不做任何操作
            pre = current;
        } else if(current == SET && pre == RESET) {
            //将数据1放入队列
            err = xQueueSend( key_queue, &key_num, portMAX_DELAY );//发送数据,若队列已满则阻塞等待
            if(err == pdTRUE) {
                printf("Key Num Send Queue Succeed!!\r\n");
            } else {
                printf("Key Num Send Queue Error!!\r\n");
            }
            pre = current;
        }

        vTaskDelay(50);
    }
    vTaskDelete(task1_handler);//每个子任务必须在任务末尾删除自身
}


/*PF9按下,小数据出队*/
void task2( void * arg) {

    FlagStatus pre = SET;
    uint8_t key = 0;
    BaseType_t state = 0;
    while(1)
    {
        FlagStatus current = gpio_input_bit_get(GPIOF,GPIO_PIN_9);
        if(current == RESET && pre == SET) {
            //无逻辑
            pre = current;
        } else if(current == SET && pre == RESET) {
            state = xQueueReceive(key_queue,&key,portMAX_DELAY); //若队列为空,则阻塞等待
            if(state == pdTRUE) {
                printf("Key Num Receive Queue Succeed-->key:%d \r\n",key);
            } else {
                printf("Key Num Receive Queue Error!!\r\n");
            }
            pre = current;
        }

        vTaskDelay(50);
    }
    vTaskDelete(task2_handler);//每个子任务必须在任务末尾删除自身
}

/*PF6按下,录入大数据*/
void task3( void * arg) {

    FlagStatus pre = SET;
    BaseType_t   err = 0;//记录状态值
    char * buf = &buffer[0];

    while(1)
    {
        FlagStatus current = gpio_input_bit_get(GPIOF,GPIO_PIN_6);
        if(current == RESET && pre == SET) {
            //不执行任何逻辑
            pre = current;
        } else if(current == SET && pre == RESET) {
            err = xQueueSend( big_date_queue, &buf, portMAX_DELAY );//大数据录入的是指向大数据的指针的指针,即二级指针
            if(err == pdTRUE) {
                printf("Big Data Send Queue Succeed!!\r\n");
            } else {
                printf("Big Data Send Queue Error!!\r\n");
            }
            pre = current;
        }
        vTaskDelay(50);
    }
    vTaskDelete(task3_handler);//每个子任务必须在任务末尾删除自身
}

/*PF7按下,大数据出队*/
void task4( void * arg) {

    FlagStatus pre = SET;
    BaseType_t   state = 0;//记录状态值
    char * buf;
    while(1)
    {
        FlagStatus current = gpio_input_bit_get(GPIOF,GPIO_PIN_7);
        if(current == RESET && pre == SET) {
            //不执行逻辑
            pre = current;
        } else if(current == SET && pre == RESET) {
            state = xQueueReceive(big_date_queue,&buf,portMAX_DELAY); //若队列为空,则阻塞等待
            if(state == pdTRUE) {
                printf("Big Data Receive Queue Succeed-->key:%s \r\n",buf);
            } else {
                printf("Big Data Receive Queue Error!!\r\n");
            }
            pre = current;
        }
        vTaskDelay(50);
    }
    vTaskDelete(task4_handler);//每个子任务必须在任务末尾删除自身
}


/*这三个引脚所连接按键未按下时全部为浮空,按下时接GND,所以需要上拉引脚*/
void GPIO_config() {
    //初始化按键1 -- PF8
    rcu_periph_clock_enable(RCU_GPIOF);
    gpio_mode_set(GPIOF,GPIO_MODE_INPUT,GPIO_PUPD_PULLUP,GPIO_PIN_8);
    gpio_output_options_set(GPIOF,GPIO_OTYPE_PP,GPIO_OSPEED_MAX,GPIO_PIN_8);

    //初始化按键2 -- PF6
    rcu_periph_clock_enable(RCU_GPIOF);
    gpio_mode_set(GPIOF,GPIO_MODE_INPUT,GPIO_PUPD_PULLUP,GPIO_PIN_6);
    gpio_output_options_set(GPIOF,GPIO_OTYPE_PP,GPIO_OSPEED_MAX,GPIO_PIN_6);

    //初始化按键3 -- PF9
    rcu_periph_clock_enable(RCU_GPIOF);
    gpio_mode_set(GPIOF,GPIO_MODE_INPUT,GPIO_PUPD_PULLUP,GPIO_PIN_9);
    gpio_output_options_set(GPIOF,GPIO_OTYPE_PP,GPIO_OSPEED_MAX,GPIO_PIN_9);

    //初始化按下4 -- PF7
    rcu_periph_clock_enable(RCU_GPIOF);
    gpio_mode_set(GPIOF,GPIO_MODE_INPUT,GPIO_PUPD_PULLUP,GPIO_PIN_7);
    gpio_output_options_set(GPIOF,GPIO_OTYPE_PP,GPIO_OSPEED_MAX,GPIO_PIN_7);
}


//起始任务,只创建和运行一次,
void start_task( void * arg) {
    GPIO_config();

    taskENTER_CRITICAL();

    //任务优先级均设置为2
    xTaskCreate(task1,"task1",64,NULL,2,&task1_handler);
    xTaskCreate(task2,"task2",128,NULL,2,&task2_handler);
    xTaskCreate(task3,"task3",128,NULL,2,&task3_handler);
    xTaskCreate(task4,"task4",128,NULL,2,&task4_handler);

    vTaskDelete(start_handler);

    taskEXIT_CRITICAL();

}


int main(void) {
    nvic_priority_group_set(NVIC_PRIGROUP_PRE4_SUB0);//FreeRTOS要求全部为抢占

    Usart0_init();

    /*以动态的方式创建队列*/
    //创建小数据队列
    key_queue = xQueueCreate( 2, sizeof(uint8_t) );//存储的是实际数据
    if(key_queue == NULL) {
        printf("key_queue Queue Create Error!!\r\n");
    } else {
        printf("key_queue Queue Create Succeed!!\r\n");
    }
    //创建大数据队列
    big_date_queue = xQueueCreate( 1, sizeof(char *) );//存储的仅仅只是大数据的指针
    if(big_date_queue == NULL) {
        printf("big_date_queue Queue Create Error!!\r\n");
    } else {
        printf("big_date_queue Queue Create Succeed!!\r\n");
    }

    xTaskCreate(start_task,//指向任务函数的指针
                "start_task",//任务名字
                128,//任务堆栈大小
                NULL,//传递的任务函数的参数
                1,//任务优先级
                &start_handler);//任务句柄

    vTaskStartScheduler();

    while (1) {}
}
最后修改:2024 年 10 月 07 日
如果觉得我的文章对你有用,请随意赞赏