VAPS XT入门46:接收OpenDDS数据测试

系列索引:VAPS XT入门系列索引

上一篇:Windows下OpenDDS编译测试

甲方爸爸要求VAPS XT的程序运行在天脉操作系统上,根据现有信息判断,天脉应该和VxWorks差不多。VAPS XT提供的通信机制nCom只能运行在Windows上,在VxWorks上应该是使用DDS。

DDS

本文测试VAPS XT与OpenDDS的数据通信能力。其实就是使用上一篇的UserIntegration方式。信息格式就是上一篇文章使用的Messenger.Minimal中的格式。格式定义为

1
2
3
4
5
6
7
8
9
10
11
12
13
module Messenger {

#pragma DCPS_DATA_TYPE "Messenger::Message"
#pragma DCPS_DATA_KEY "Messenger::Message subject_id"

struct Message {
string from;
string subject;
long subject_id;
string text;
long count;
};
};

工程

新建一个工程,命名为DDS。

将工程自带的Float50数据格式复制一份,修改为上面的格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?xml version="1.0" encoding="UTF-8"?>

<!DOCTYPE dataDescription>

<dataDescription name="Messenger">

<field>
<name>from</name>
<type>string</type>
<cardinality>50</cardinality>
</field>
<field>
<name>subject</name>
<type>string</type>
<cardinality>50</cardinality>
</field>
<field>
<name>subject_id</name>
<type>uint32</type>
<cardinality>1</cardinality>
</field>
<field>
<name>text</name>
<type>string</type>
<cardinality>50</cardinality>
</field>
<field>
<name>count</name>
<type>uint32</type>
<cardinality>1</cardinality>
</field>

</dataDescription>

并将其添加到工程中。其中string只表示一个字符,所以字符串长度设置为50,这样一个字符串就可以显示最多50个字符。

根据此数据结构创建Data Sender/Receiver,I/O Buffer名称为MessengerBuf

然后创建一个Format(只是测试,就简单一点),名称为DDSFmt,并添加控件。

默认界面为

default

然后将VAPS XT安装位置下的UserIntegration的C示例复制过来。

自测试

先测试一下能否正常显示。

全局变量

1
static const vxtChar VXT_BUFFER_NAME_MESSENGER[] = "MessengerBuf";

构建一个和数据结构一样的结构体

1
2
3
4
5
6
7
typedef struct Messenger{
vxtChar from[50];
vxtChar subject[50];
vxtInt subject_id;
vxtChar text[50];
vxtInt count;
} Messenger;

创建变量

1
Messenger      *g_pMessenge;

初始化函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void vInitIOBuffersFunction()
{
vxtUInt Size = 0;

/*
** Initialize the global pointers to buffers.
*/
g_pMessenge = (Messenger*)(vxtRTCUI_pGetDataAddress(
vxtRTCUI_pGetIOBuffer(VXT_BUFFER_NAME_MESSENGER), &Size));

/*
** Initialize the callback functions
*/
vxtRTCUI_vSetDataSendHandler(vxtRTCUI_pGetIOBuffer(VXT_BUFFER_NAME_MESSENGER),
vMessengerDataSendHandlerFunction);
}

初始化回调函数

1
2
3
4
5
6
7
8
9
void vMessengerDataSendHandlerFunction
(
const vxtChar *a_pIOBufferName,
vxtRTIOBufferI *a_pIOBuffer
)
{
vxtUInt Size;
void *pBufferData = vxtRTCUI_pGetDataAddress(a_pIOBuffer, &Size);
}

预更新函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void vPreUpdateFunction
(
#if VXT_CFG_TIME_CRITICAL_OBJECTS_SUPPORT
vxtBool a_IsCriticalUpdate
#endif
)
{
#if VXT_CFG_TIME_CRITICAL_OBJECTS_SUPPORT
if(VXT_FALSE == a_IsCriticalUpdate)
#endif
{
char a[]="Test text";
memcpy(g_pMessenge->from,a,sizeof(a));
memcpy(g_pMessenge->text,a,sizeof(a));
memcpy(g_pMessenge->subject,a,sizeof(a));
g_pMessenge->subject_id++;
g_pMessenge->count++;

vxtRTCUI_vNotifyDataUpdated(vxtRTCUI_pGetIOBuffer(VXT_BUFFER_NAME_MESSENGER));
}
}

更新后函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void vPostUpdateFunction
(
#if VXT_CFG_TIME_CRITICAL_OBJECTS_SUPPORT
vxtBool a_IsCriticalUpdate
#endif
)
{
#if VXT_CFG_TIME_CRITICAL_OBJECTS_SUPPORT
if(VXT_FALSE == a_IsCriticalUpdate)
#endif
{
vxtUInt Size = 0;
}
}

然后编译运行

1
./CApplication.exe -center -width 800 -height 600

执行效果为

demo

接收

将Messenger.Minimal的部分文件复制过来

1
2
3
4
5
6
7
8
9
10
11
Boilerplate.h/cpp
DataReaderListenerImpl.h/cpp
MessengerC.h/cpp
MessengerS.h/cpp
MessengerTypeSupportC.h/cpp
MessengerTypeSupportImpl.h/cpp
MessengerTyperSupport.h/cpp
Messenger.idl
MessengerTypeSupport.idl
MessengerTypeSupportC.inl
MessengerC.inl

把h/cpp/inl文件添加到工程,将.idl放到代码目录中。

添加头文件位置

1
2
3
4
D:\OpenDDS-DDS-3.8
D:\OpenDDS-DDS-3.8\ACE_wrappers
D:\OpenDDS-DDS-3.8\ACE_wrappers\TAO
D:\OpenDDS-DDS-3.8\tools\modeling\codegen

添加链接库

1
2
3
4
5
6
7
8
9
10
Ws2_32.lib
ACEd.lib
TAOd.lib
TAO_AnyTypeCoded.lib
TAO_PortableServerd.lib
OpenDDS_Dcpsd.lib
OpenDDS_InfoRepoDiscoveryd.lib
OpenDDS_Tcpd.lib
OpenDDS_Modeld.lib
iphlpapi.lib

在CSampleMain.cpp中添加

1
2
3
4
5
6
7
8
9
#include "DataReaderListenerImpl.h"
#include "Boilerplate.h"
#include <dds/DCPS/Service_Participant.h>
#include <model/Sync.h>
#include <stdexcept>

#include "dds/DCPS/StaticIncludes.h"

using namespace examples::boilerplate;

修改结构体

1
2
3
4
5
6
7
typedef struct _Messenger{
vxtChar from[50];
vxtChar subject[50];
vxtInt subject_id;
vxtChar text[50];
vxtInt count;
}_Messenger;

在main函数中添加一行

1
CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)ThreadFunc,0,0,NULL);

创建一个线程运行其他函数。

设置回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int ThreadFunc(){
int argc = 4;
char *argv[]={"-DCPSInfoRepo","corbaloc::192.168.50.20:6004/DCPSInfoRepo","-DCPSDebugLevel","1"};
while(1){
try {
// Initialize DomainParticipantFactory, handling command line args
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);

// Create domain participant
DDS::DomainParticipant_var participant = createParticipant(dpf);

// Register type support and create topic
DDS::Topic_var topic = createTopic(participant);

// Create subscriber
DDS::Subscriber_var subscriber = createSubscriber(participant);

// Create Listener
DataReaderListenerImpl* listener_impl = new DataReaderListenerImpl;
DDS::DataReaderListener_var listener(listener_impl);

// Create DataReader with the listener attached
DDS::DataReader_var reader = createDataReader(subscriber,
topic,
listener);

{
// Block until reader has associated with a writer
// but is no longer associated with any writer
OpenDDS::Model::ReaderSync rs(reader);
}

// Output the sample count
std::cout << "Subscriber received " << listener_impl->sample_count
<< " samples" << std::endl;

// Clean-up!
cleanup(participant, dpf);

// Listener will be cleaned up when reader goes out of scope
} catch (const CORBA::Exception& e) {
e._tao_print_exception("Exception caught in main():");
return -1;
} catch (std::runtime_error& err) {
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("ERROR: main() - %s\n"),
err.what()), -1);
} catch (std::string& msg) {
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("ERROR: main() - %s\n"),
msg.c_str()), -1);
}
}
}

我把运行程序时传入的参数写死,因为VAPS XT程序也有传入的参数,可能会冲突,待测试。

可以正常收到数据

receive

要想把接收的数据显示到界面上,就得简单修改一下源码。

在DataReaderListenImpl类中,有官方实现的参考代码,再次简单修改测试。

在类中添加

1
2
3
Messenger::Message getMsgData();

bool isValid;

isValid用于判断是否开始接收数据

getMsgData函数用于获取接收的数据

在实现中,如果获取到有效数据isValid就为TRUE,然后保存数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

void
DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader)
{
// Safely downcast data reader to type-specific data reader
Messenger::MessageDataReader_var reader_i = narrow(reader);

Messenger::Message msg;
DDS::SampleInfo info;

// Remove (take) the next sample from the data reader
DDS::ReturnCode_t error = reader_i->take_next_sample(msg, info);

// Make sure take was successful
if (error == DDS::RETCODE_OK) {
// Make sure this is not a sample dispose message
if (info.valid_data) {
isValid = true;
_msg = msg;
++sample_count;
std::cout << "Msg: subject " << msg.subject.in() << std::endl
<< " subject_id " << msg.subject_id << std::endl
<< " from " << msg.from.in() << std::endl
<< " count " << msg.count << std::endl
<< " text " << msg.text.in() << std::endl;
}
} else {
ACE_ERROR((LM_ERROR,
ACE_TEXT("ERROR: %N:%l: on_data_available() -")
ACE_TEXT(" take_next_sample failed!\n")));
}
}

Messenger::Message DataReaderListenerImpl::getMsgData(){
return _msg;
}

在获取线程中添加

1
2
3
4
5
6
7
8
if(listener_impl->isValid){
Messenger::Message msg = listener_impl->getMsgData();
g_pMessenge->count = msg.count;
memcpy(g_pMessenge->from,msg.from.in(),sizeof(msg.from.in()));
memcpy(g_pMessenge->subject,msg.subject.in(),sizeof(msg.subject.in()));
memcpy(g_pMessenge->text,msg.text.in(),sizeof(msg.text.in()));
g_pMessenge->subject_id = msg.subject_id;
}

效果为

display

完整工程地址:VAPS_XT下的DDS中

说明

  • 数据会阻塞,只显示最后一个数据包
  • 开发代码不要偷懒,C/C++不要混用。
  • 要合理设计代码架构

如果你有问题,如果是简单的问题可以发邮件给免费解惑,如果涉及难问题或者需要提供附加的服务(比如授权、大工程集成编译、多分区相关,或者作为中间商联系Presagis)可以联系上海亥伯智能科技有限公司 邮箱


VAPS XT入门46:接收OpenDDS数据测试
https://feater.top/vapsxt/vapsxt-communicate-with-opendds
作者
JackeyLea
发布于
2021年10月27日
许可协议