Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3785319
  • 博文数量: 880
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 6155
  • 用 户 组: 普通用户
  • 注册时间: 2016-11-11 09:12
个人简介

To be a better coder

文章分类

全部博文(880)

文章存档

2022年(5)

2021年(60)

2020年(175)

2019年(207)

2018年(210)

2017年(142)

2016年(81)

分类: LINUX

2020-01-20 10:46:34

https://blog.csdn.net/jinwang3526/article/details/81537297

一个基于Mqtt的小项目,服务器采用mosquitto,客户端有Python,C,Android三种,涉及SSL加密,传输内容:文字图片。

应用消息 Application Message MQTT协议通过网络传输应用数据。应用消息通过MQTT传输时,它们有关联的服务质量(QoS)和主题(Topic)。

客户端 Client
使用MQTT的程序或设备。客户端总是通过网络连接到服务端。它可以

  • 发布应用消息给其它相关的客户端。
  • 订阅以请求接受相关的应用消息。
  • 取消订阅以移除接受应用消息的请求。
  • 从服务端断开连接。

服务端 Server
一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。服务端

  • 接受来自客户端的网络连接。
  • 接受客户端发布的应用消息。
  • 处理客户端的订阅和取消订阅请求。
  • 转发应用消息给符合条件的已订阅客户端。

工作流:
服务器先启动,然后客户端订阅相关的Topic。Client A 和C发布主题为:Question的What's the temperature?。Client B因为订阅了Question这个Topic,所以可以收到信息,Client B收到信息做判断后发布答案Topic: Temperture出去,订阅了相关Topic的Client A 和Client C能接收到37°。
C的客户端:
//mqttclient.c
#include
#include
#include
#include
#include "MQTTClient.h"
#include
#include


#define NUM_THREADS 2
#define ADDRESS "tcp://xx.xxx.xx.xxx:1883"
#define CLIENTID "ExampleClient_pub"
#define SUB_CLIENTID    "ExampleClient_sub" //更改此处客户端ID
#define TOPICPUB    "Question"  //更改发送的话题
#define TOPICSUB    "temperature"
#define QOS         1
#define TIMEOUT     10000L
#define DISCONNECT  "out"

int CONNECT = 1;
volatile MQTTClient_deliveryToken deliverytoken;
long PAYLOADLEN;
char* PAYLOAD;

void delivered(void *context, MQTTClient_deliveryToken dt)
{
  printf("Message with token value %d delivery confirmed\n", dt);
  deliverytoken = dt;
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
  int i;
  char* payloadptr;

  printf("Message arrived\n");
  printf("    topic: %s\n", topicName);
  printf("  message: \n");

  payloadptr = message->payload;
  if (strcmp(payloadptr, DISCONNECT) == 0) {
    printf("\n out!!");
    CONNECT = 0;
  }

  for (i = 0; i < message->payloadlen; i++) {
    putchar(*payloadptr++);
  }
  printf("\n");

  MQTTClient_freeMessage(&message);
  MQTTClient_free(topicName);
  return 1;
}

void connlost(void *context, char *cause)
{
  printf("\nConnection lost\n");
  printf("     cause: %s\n", cause);
}

void *pubClient(void *threadid) {
  long tid;
  tid = (long)threadid;
  int count = 0;
  printf("Hello World! It's me, thread #%ld!\n", tid);
  //声明一个MQTTClient
  MQTTClient client;
  //初始化MQTT Client选项
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  //#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  //声明消息token
  MQTTClient_deliveryToken token;
  int rc;
  //使用参数创建一个client,并将其赋值给之前声明的client
  MQTTClient_create(&client, ADDRESS, CLIENTID,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 20;
  conn_opts.cleansession = 1;
  //使用MQTTClient_connect将client连接到服务器,使用指定的连接选项。成功则返回MQTTCLIENT_SUCCESS
  if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
  {
    printf("Failed to connect, return code %d\n", rc);
    exit(EXIT_FAILURE);
  }
  PAYLOAD = "What's the temperature";
  // printf("%s\n", PAYLOAD);
  pubmsg.payload = PAYLOAD;
  pubmsg.payloadlen = (int)strlen(PAYLOAD);
  pubmsg.qos = QOS;
  pubmsg.retained = 0;
  //循环发布
  while (CONNECT) {
    MQTTClient_publishMessage(client, TOPICPUB, &pubmsg, &token);
    printf("Waiting for up to %d seconds for publication of %s\n"
             "on topic %s for client with ClientID: %s\n",
             (int)(TIMEOUT/1000), PAYLOAD, TOPICPUB, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
    // thread sleep
    usleep(2000000L);
  }

  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);
}

void *subClient(void *threadid) {
  long tid;
  tid = (long)threadid;
  printf("Hello World! It's me, thread #%ld!\n", tid);

  MQTTClient client;
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  int rc;
  int ch;

  MQTTClient_create(&client, ADDRESS, SUB_CLIENTID,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 20;
  conn_opts.cleansession = 1;
  //设置回调函数
  MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);

  if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
  {
    printf("Failed to connect, return code %d\n", rc);
    exit(EXIT_FAILURE);
  }
  printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
         "Press Q to quit\n\n", TOPICSUB, SUB_CLIENTID, QOS);
  MQTTClient_subscribe(client, TOPICSUB, QOS);

  do
  {
    ch = getchar();
  } while (ch != 'Q' && ch != 'q');
  //quit
  MQTTClient_unsubscribe(client, TOPICSUB);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  pthread_exit(NULL);
}

int main(int argc, char* argv[])
{
  pthread_t threads[NUM_THREADS];
  pthread_create(&threads[0], NULL, subClient, (void *)0);
  pthread_create(&threads[1], NULL, pubClient, (void *)1);
  pthread_exit(NULL);
}


阅读(3456) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~