Program Listing for File subscriber.h

Return to documentation for file (ecal/core/include/ecal/msg/subscriber.h)

/* ========================= eCAL LICENSE =================================
 *
 * Copyright (C) 2016 - 2019 Continental Corporation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * ========================= eCAL LICENSE =================================
*/

#pragma once

#include <ecal/ecal_publisher.h>
#include <ecal/ecal_subscriber.h>

#include <cassert>
#include <cstring>
#include <functional>
#include <mutex>
#include <string>
#include <vector>

namespace eCAL
{
  template <typename T>
  class CMsgSubscriber : public CSubscriber
  {
  public:
    CMsgSubscriber() : CSubscriber()
    {
    }

    CMsgSubscriber(const std::string& topic_name_, const std::string& topic_type_ = "", const std::string& topic_desc_ = "") : CSubscriber(topic_name_, topic_type_, topic_desc_ )
    {
    }

    CMsgSubscriber(const CMsgSubscriber&) = delete;

    CMsgSubscriber& operator=(const CMsgSubscriber&) = delete;

    CMsgSubscriber(CMsgSubscriber&& rhs)
      : CSubscriber(std::move(rhs))
      , m_cb_callback(std::move(rhs.m_cb_callback))
    {
      bool has_callback = (m_cb_callback != nullptr);

      if (has_callback)
      {
        // the callback bound to the CSubscriber belongs to rhs, bind to this callback instead
        CSubscriber::RemReceiveCallback();
        auto callback = std::bind(&CMsgSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2);
        CSubscriber::AddReceiveCallback(callback);
      }
    }

    CMsgSubscriber& operator=(CMsgSubscriber&& rhs)
    {
      CSubscriber::operator=(std::move(rhs));

      m_cb_callback = std::move(rhs.m_cb_callback);
      bool has_callback(m_cb_callback != nullptr);

      if (has_callback)
      {
        // the callback bound to the CSubscriber belongs to rhs, bind to this callback instead;
        CSubscriber::RemReceiveCallback();
        auto callback = std::bind(&CMsgSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2);
        CSubscriber::AddReceiveCallback(callback);
      }

      return *this;
    }

    virtual ~CMsgSubscriber() {}

    bool Create(const std::string& topic_name_, const std::string& topic_type_ = "", const std::string& topic_desc_ = "")
    {
      return(CSubscriber::Create(topic_name_, topic_type_, topic_desc_));
    }

    bool Destroy()
    {
      RemReceiveCallback();
      return(CSubscriber::Destroy());
    }

    bool Receive(T& msg_, long long* time_ = nullptr, int rcv_timeout_ = 0) const
    {
      assert(IsCreated());
      std::string rec_buf;
      bool success = CSubscriber::ReceiveBuffer(rec_buf, time_, rcv_timeout_);
      if(!success) return(false);
      return(Deserialize(msg_, rec_buf.c_str(), rec_buf.size()));
    }

    typedef std::function<void(const char* topic_name_, const T& msg_, long long time_, long long clock_, long long id_)> MsgReceiveCallbackT;

    bool AddReceiveCallback(MsgReceiveCallbackT callback_)
    {
      assert(IsCreated());
      RemReceiveCallback();

      m_cb_callback = callback_;
      auto callback = std::bind(&CMsgSubscriber::ReceiveCallback, this, std::placeholders::_1, std::placeholders::_2);
      return(CSubscriber::AddReceiveCallback(callback));
    }

    bool RemReceiveCallback()
    {
      if(m_cb_callback == nullptr) return(false);
      m_cb_callback = nullptr;
      return(CSubscriber::RemReceiveCallback());
    }

  private:
    virtual std::string GetTypeName() const = 0;
    virtual std::string GetDescription() const = 0;
    virtual bool Deserialize(T& msg_, const void* buffer_, size_t size_) const = 0;

    void ReceiveCallback(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_)
    {
      MsgReceiveCallbackT fn_callback(m_cb_callback);

      if(fn_callback == nullptr) return;

      T msg;
      if(Deserialize(msg, data_->buf, data_->size))
      {
        (fn_callback)(topic_name_, msg, data_->time, data_->clock, data_->id);
      }
    }

    MsgReceiveCallbackT m_cb_callback;
  };
}