diff --git a/CMakeLists.txt b/CMakeLists.txt index 5acd01c1..26c70bde 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -185,6 +185,15 @@ endif() include_directories(SYSTEM ${RapidJSON_INCLUDE_DIRS}) add_definitions("-DRAPIDJSON_ASSERT=BOOST_ASSERT") +# msgpack-cxx +find_package(msgpack-cxx 6.0.0) +if (NOT msgpack-cxx_FOUND) + message(FATAL_ERROR "could not find msgpack-cxx") +endif() + +include_directories(${msgpack-cxx_INCLUDE_DIRS}) +set(MUJINCLIENT_LINK_DIRS ${MUJINCLIENT_LINK_DIRS} ${msgpack-cxx_LIBDIR}) + if( Boost_FOUND ) include_directories(${Boost_INCLUDE_DIRS}) set(MUJINCLIENT_LINK_DIRS ${MUJINCLIENT_LINK_DIRS} ${Boost_LIBRARY_DIRS}) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d975da3f..7a95e5ae 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -29,6 +29,7 @@ set(SOURCE_FILES mujincontrollerclient.cpp mujindefinitions.cpp mujinjson.cpp + mujinmsgpack.cpp utf8.h ) if (libzmq_FOUND) @@ -70,7 +71,7 @@ if( OPT_BUILD_STATIC ) CLEAN_DIRECT_OUTPUT 1 COMPILE_FLAGS "${EXTRA_COMPILE_FLAGS} ${Boost_CFLAGS} -DMUJINCLIENT_DLL_EXPORTS -DMUJINCLIENT_DLL" LINK_FLAGS "") - + target_link_libraries(libmujincontrollerclient_static ${CURL_LIBRARIES} ${EXTRA_LIBRARIES} ${libzmq_LIBRARIES} ${libzmq_LIBRARIES} ${LOG4CXX_LIBRARIES}) install(TARGETS libmujincontrollerclient_static DESTINATION lib${LIB_SUFFIX}) endif() diff --git a/src/binpickingtaskzmq.cpp b/src/binpickingtaskzmq.cpp index 2c97fb6b..8d062b83 100644 --- a/src/binpickingtaskzmq.cpp +++ b/src/binpickingtaskzmq.cpp @@ -15,6 +15,7 @@ #include "common.h" #include "controllerclientimpl.h" #include "binpickingtaskzmq.h" +#include "mujinmsgpack.h" #include "mujincontrollerclient/mujinzmq.h" #include // find @@ -257,11 +258,9 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ if (pollitem.revents & ZMQ_POLLIN) { zmq::message_t reply; socket->recv(&reply); - std::string replystring((char *)reply.data (), (size_t)reply.size()); rapidjson::Document pt(rapidjson::kObjectType); try{ - std::stringstream replystring_ss(replystring); - ParseJson(pt, replystring_ss.str()); + mujinmsgpack::ParseMsgPack(pt, reply.data(), reply.size()); heartbeat.Parse(pt); { boost::mutex::scoped_lock lock(_mutexTaskState); @@ -275,7 +274,7 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ } catch (std::exception const &e) { MUJIN_LOG_ERROR("HeartBeat reply is not JSON"); - MUJIN_LOG_ERROR(replystring); + MUJIN_LOG_ERROR(std::string((char *)reply.data (), (size_t)reply.size())); MUJIN_LOG_ERROR(e.what()); continue; } diff --git a/src/mujinmsgpack.cpp b/src/mujinmsgpack.cpp new file mode 100644 index 00000000..5ad63cda --- /dev/null +++ b/src/mujinmsgpack.cpp @@ -0,0 +1,337 @@ +// -*- coding: utf-8 -*- +// Copyright (C) 2012-2026 MUJIN Inc. +#include "mujinmsgpack.h" +#include "logging.h" + +#include +#include + +namespace msgpack { + +MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS) { + +namespace adaptor { + +template +struct convert< rapidjson::GenericDocument > { + msgpack::object const& operator()(msgpack::object const& o, rapidjson::GenericDocument& v) const { + switch (o.type) + { + case msgpack::type::BOOLEAN: v.SetBool(o.via.boolean); break;; + case msgpack::type::POSITIVE_INTEGER: v.SetUint64(o.via.u64); break; + case msgpack::type::NEGATIVE_INTEGER: v.SetInt64(o.via.i64); break; + case msgpack::type::FLOAT: v.SetDouble(o.via.f64); break; + case msgpack::type::BIN: // fall through + case msgpack::type::STR: v.SetString(o.via.str.ptr, o.via.str.size, v.GetAllocator()); break; + case msgpack::type::ARRAY:{ + v.SetArray(); + v.Reserve(o.via.array.size, v.GetAllocator()); + msgpack::object* ptr = o.via.array.ptr; + msgpack::object* END = ptr + o.via.array.size; + for (; ptr < END; ++ptr) + { + rapidjson::GenericDocument element(&v.GetAllocator()); + ptr->convert(element); + v.PushBack(static_cast&>(element), v.GetAllocator()); + } + } + break; + case msgpack::type::MAP: { + v.SetObject(); + msgpack::object_kv* ptr = o.via.map.ptr; + msgpack::object_kv* END = ptr + o.via.map.size; + for (; ptr < END; ++ptr) + { + rapidjson::GenericValue key(ptr->key.via.str.ptr, ptr->key.via.str.size, v.GetAllocator()); + rapidjson::GenericDocument val(&v.GetAllocator()); + ptr->val.convert(val); + + v.AddMember(key, val, v.GetAllocator()); + } + } + break; + case msgpack::type::EXT: { + if (o.via.ext.type() == -1) { + const std::chrono::system_clock::time_point tp = o.as(); + const std::time_t parsedTime = std::chrono::system_clock::to_time_t(tp); + + // RFC 3339 Nano format + char formatted[sizeof("2006-01-02T15:04:05.999999999Z07:00")]; + + // The extension does not include timezone information. By convention, we format to local time. + struct tm datetime = {0}; + std::size_t size = std::strftime(formatted, sizeof(formatted), "%FT%T", localtime_r(&parsedTime, &datetime)); + + // Add nanoseconds portion if present + const long nanoseconds = (std::chrono::duration_cast(tp.time_since_epoch()).count() % 1000000000 + 1000000000) % 1000000000; + if (nanoseconds != 0) { + size += sprintf(formatted + size, ".%09lu", nanoseconds); + // remove trailing zeros + while (formatted[size - 1] == '0') { + --size; + } + } + if (datetime.tm_gmtoff == 0) { + formatted[size] = 'Z'; + } else { + size += std::strftime(formatted + size, sizeof(formatted) - size, "%z", &datetime); + // fix timezone format (0000 -> 00:00) + formatted[size] = formatted[size - 1]; + formatted[size - 1] = formatted[size - 2]; + formatted[size - 2] = ':'; + } + formatted[++size] = '\0'; + + v.SetString(formatted, size, v.GetAllocator()); + } else { + MUJIN_LOG_WARN("Unrecognized msgpack extension type."); + } + break; + } + case msgpack::type::NIL: + default: + v.SetNull(); break; + + } + return o; + } +}; + +template +struct convert< rapidjson::GenericValue > { + msgpack::object const& operator()(msgpack::object const& o, rapidjson::GenericValue& v) const { + rapidjson::GenericDocument d; + o >> d; + return v = d; + } +}; + +template +struct pack< rapidjson::GenericValue > { + template + msgpack::packer& operator()(msgpack::packer& o, rapidjson::GenericValue const& v) const { + switch (v.GetType()) + { + case rapidjson::kNullType: + return o.pack_nil(); + case rapidjson::kFalseType: + return o.pack_false(); + case rapidjson::kTrueType: + return o.pack_true(); + case rapidjson::kObjectType: + { + o.pack_map(v.MemberCount()); + typename rapidjson::GenericValue::ConstMemberIterator i = v.MemberBegin(), END = v.MemberEnd(); + for (; i != END; ++i) + { + o.pack_str(i->name.GetStringLength()).pack_str_body(i->name.GetString(), i->name.GetStringLength()); + o.pack(i->value); + } + return o; + } + case rapidjson::kArrayType: + { + o.pack_array(v.Size()); + typename rapidjson::GenericValue::ConstValueIterator i = v.Begin(), END = v.End(); + for (;i < END; ++i) + o.pack(*i); + return o; + } + case rapidjson::kStringType: + return o.pack_str(v.GetStringLength()).pack_str_body(v.GetString(), v.GetStringLength()); + case rapidjson::kNumberType: + if (v.IsInt()) + return o.pack_int(v.GetInt()); + if (v.IsUint()) + return o.pack_unsigned_int(v.GetUint()); + if (v.IsInt64()) + return o.pack_int64(v.GetInt64()); + if (v.IsUint64()) + return o.pack_uint64(v.GetUint64()); + if (v.IsDouble()||v.IsNumber()) + return o.pack_double(v.GetDouble()); + default: + return o; + } + } +}; + +template +struct pack< rapidjson::GenericDocument > { + template + msgpack::packer& operator()(msgpack::packer& o, rapidjson::GenericDocument const& v) const { + o << static_cast&>(v); + return o; + } +}; + +template +struct object_with_zone< rapidjson::GenericValue > { + void operator()(msgpack::object::with_zone& o, rapidjson::GenericValue const& v) const { + switch (v.GetType()) + { + case rapidjson::kNullType: + o.type = type::NIL; + break; + case rapidjson::kFalseType: + o.type = type::BOOLEAN; + o.via.boolean = false; + break; + case rapidjson::kTrueType: + o.type = type::BOOLEAN; + o.via.boolean = true; + break; + case rapidjson::kObjectType: + { + o.type = type::MAP; + if (v.ObjectEmpty()) { + o.via.map.ptr = NULL; + o.via.map.size = 0; + } + else { + size_t sz = v.MemberCount(); + object_kv* p = (object_kv*)o.zone.allocate_align(sizeof(object_kv)*sz); + object_kv* const pend = p + sz; + o.via.map.ptr = p; + o.via.map.size = sz; + typename rapidjson::GenericValue::ConstMemberIterator it(v.MemberBegin()); + do { + p->key = msgpack::object(it->name, o.zone); + p->val = msgpack::object(it->value, o.zone); + ++p; + ++it; + } while (p < pend); + } + break; + } + case rapidjson::kArrayType: + { + o.type = type::ARRAY; + if (v.Empty()) { + o.via.array.ptr = NULL; + o.via.array.size = 0; + } + else { + msgpack::object* p = (msgpack::object*)o.zone.allocate_align(sizeof(msgpack::object)*v.Size()); + msgpack::object* const pend = p + v.Size(); + o.via.array.ptr = p; + o.via.array.size = v.Size(); + typename rapidjson::GenericValue::ConstValueIterator it(v.Begin()); + do { + *p = msgpack::object(*it, o.zone); + ++p; + ++it; + } while (p < pend); + } + break; + } + case rapidjson::kStringType: + { + o.type = type::STR; + size_t size = v.GetStringLength(); + char* ptr = (char*)o.zone.allocate_align(size); + memcpy(ptr, v.GetString(), size); + o.via.str.ptr = ptr; + o.via.str.size = size; + break; + } + case rapidjson::kNumberType: + if (v.IsInt()) + { + o.type = type::NEGATIVE_INTEGER; + o.via.i64 = v.GetInt(); + } + else if (v.IsUint()) + { + o.type = type::POSITIVE_INTEGER; + o.via.u64 = v.GetUint(); + } + else if (v.IsInt64()) + { + o.type = type::NEGATIVE_INTEGER; + o.via.i64 = v.GetInt64(); + } + else if (v.IsUint64()) + { + o.type = type::POSITIVE_INTEGER; + o.via.u64 = v.GetUint64(); + } + else if (v.IsDouble()) + { + o.type = type::FLOAT; + o.via.f64 = v.GetDouble(); + } + break; + default: + break; + + } + } +}; + +template +struct object_with_zone< rapidjson::GenericDocument > { + void operator()(msgpack::object::with_zone& o, rapidjson::GenericDocument const& v) const { + o << static_cast const&>(v); + } +}; + +} // namespace adaptor + +class vbuffer { +public: + vbuffer(std::vector& v) : _v(v) { } + void write(const char* buf, size_t len) { + std::copy_n(buf, len, std::back_inserter(_v)); + } +private: + std::vector& _v; +}; + + +class osbuffer { +public: + osbuffer(std::ostream& os) : _os(os) { } + void write(const char* buf, size_t len) { + _os.write(buf, len); + } +private: + std::ostream& _os; +}; + +} // MSGPACK_API_VERSION_NAMESPACE(MSGPACK_DEFAULT_API_NS) + +} // namespace msgpack + +void mujinmsgpack::DumpMsgPack(const rapidjson::Value& value, std::ostream& os) +{ + msgpack::osbuffer buf(os); + msgpack::pack(buf, value); +} + +void mujinmsgpack::DumpMsgPack(const rapidjson::Value& value, std::vector& output) +{ + msgpack::vbuffer buf(output); + msgpack::pack(buf, value); +} + +void mujinmsgpack::ParseMsgPack(rapidjson::Document& d, const std::string& str) +{ + msgpack::unpacked unpacked; + msgpack::unpack(unpacked, str.data(), str.size()); + unpacked.get().convert(d); +} + +void mujinmsgpack::ParseMsgPack(rapidjson::Document& d, const void* data, size_t size) +{ + msgpack::unpacked unpacked; + msgpack::unpack(unpacked, (const char*) data, size); + unpacked.get().convert(d); +} + +void mujinmsgpack::ParseMsgPack(rapidjson::Document& d, std::istream& is) +{ + std::string str; + str.assign(std::istreambuf_iterator(is), std::istreambuf_iterator()); + mujinmsgpack::ParseMsgPack(d, str); +} diff --git a/src/mujinmsgpack.h b/src/mujinmsgpack.h new file mode 100644 index 00000000..ca698e0b --- /dev/null +++ b/src/mujinmsgpack.h @@ -0,0 +1,37 @@ +// -*- coding: utf-8 -*- +// Copyright (C) 2012-2026 MUJIN Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +/** \file mujinmsgpack.h + \brief Wrapper for msgpack. + */ +#ifndef MUJIN_CONTROLLERCLIENT_MSGPACK_H +#define MUJIN_CONTROLLERCLIENT_MSGPACK_H + +#include +#include +#include + +#include +#include + +namespace mujinmsgpack { + +void DumpMsgPack(const rapidjson::Value& value, std::ostream& os); +void DumpMsgPack(const rapidjson::Value& value, std::vector& output); +void ParseMsgPack(rapidjson::Document& d, const std::string& str); +void ParseMsgPack(rapidjson::Document& d, std::istream& is); +void ParseMsgPack(rapidjson::Document& d, const void* data, size_t size); + +} // namespace mujinmsgpack + +#endif // MUJIN_CONTROLLERCLIENT_MSGPACK_H