[dss-commits] r8745 - in dss/trunk: core unix

dss-commits at forum.digitalstrom.org dss-commits at forum.digitalstrom.org
Tue Sep 8 16:27:06 CEST 2009


Author: pstaehlin
Date: 2009-09-08 16:27:06 +0200 (Tue, 08 Sep 2009)
New Revision: 8745

Modified:
   dss/trunk/core/ds485client.cpp
   dss/trunk/core/ds485client.h
   dss/trunk/unix/ds485proxy.cpp
   dss/trunk/unix/ds485proxy.h
Log:
- Implemented client interface for DS485
- Added abstract baseclass for FrameBucket

Closes #54


Modified: dss/trunk/core/ds485client.cpp
===================================================================
--- dss/trunk/core/ds485client.cpp	2009-09-08 14:20:45 UTC (rev 8744)
+++ dss/trunk/core/ds485client.cpp	2009-09-08 14:27:06 UTC (rev 8745)
@@ -21,7 +21,99 @@
 
 #include "ds485client.h"
 
+#include "core/dss.h"
+#include "core/foreach.h"
+#include "unix/ds485proxy.h"
+
 namespace dss {
 
+  //================================================== DS485ClientImpl
 
+  class FrameBucketCallback;
+
+  class DS485ClientImpl {
+  public:
+    std::vector<boost::shared_ptr<FrameBucketBase> > buckets;
+  }; // DS485ClientImpl
+
+
+  //================================================== FrameBucketCallback
+
+  class FrameBucketCallback : public FrameBucketBase {
+  public:
+    FrameBucketCallback(DS485Proxy* _proxy, int _functionID, int _sourceID, DS485Client::FrameCallback_t _callback)
+    : FrameBucketBase(_proxy, _functionID, _sourceID),
+      m_callBack(_callback)
+    {
+      assert(_callback != NULL);
+    } // ctor
+
+    virtual bool addFrame(boost::shared_ptr<ReceivedFrame> _frame) {
+      m_callBack(_frame->getFrame());
+      return false;
+    } // addFrame
+  private:
+    DS485Client::FrameCallback_t m_callBack;
+  }; // FrameBucketCallback
+
+
+  //================================================== DS485Client
+
+  DS485Client::DS485Client()
+  : m_pImpl(new DS485ClientImpl)
+  { } // ctor
+
+  void DS485Client::sendFrameDiscardResult(DS485CommandFrame& _frame) {
+    DSS::getInstance()->getDS485Interface().sendFrame(_frame);
+  } // sendFrameDiscardResult
+
+  boost::shared_ptr<DS485CommandFrame> DS485Client::sendFrameSingleResult(DS485CommandFrame& _frame, int _functionID, int _timeoutMS) {
+    DS485Proxy* proxy = dynamic_cast<DS485Proxy*>(&DSS::getInstance()->getDS485Interface());
+    assert(proxy != NULL);
+
+    boost::shared_ptr<DS485CommandFrame> result;
+    boost::shared_ptr<FrameBucketCollector> bucket = proxy->sendFrameAndInstallBucket(_frame, _functionID);
+    if(bucket->waitForFrame(_timeoutMS)) {
+      result = bucket->popFrame()->getFrame();
+    }
+    return result;
+  } // sendFrameSingleResult
+
+  std::vector<boost::shared_ptr<DS485CommandFrame> > DS485Client::sendFrameMultipleResults(DS485CommandFrame& _frame, int _functionID, int _timeoutMS) {
+    DS485Proxy* proxy = dynamic_cast<DS485Proxy*>(&DSS::getInstance()->getDS485Interface());
+    assert(proxy != NULL);
+
+    boost::shared_ptr<FrameBucketCollector> bucket = proxy->sendFrameAndInstallBucket(_frame, _functionID);
+    bucket->waitForFrame(_timeoutMS);
+
+    std::vector<boost::shared_ptr<DS485CommandFrame> > result;
+    boost::shared_ptr<ReceivedFrame> frame;
+    while((frame = bucket->popFrame()) != NULL) {
+      result.push_back(frame->getFrame());
+    }
+    return result;
+  } // sendFrameMultipleResults
+
+  void DS485Client::subscribeTo(int _functionID, int _source, FrameCallback_t _callback) {
+    assert(_callback != NULL);
+
+    DS485Proxy* proxy = dynamic_cast<DS485Proxy*>(&DSS::getInstance()->getDS485Interface());
+    assert(proxy != NULL);
+
+    boost::shared_ptr<FrameBucketBase> bucket(new FrameBucketCallback(proxy, _functionID, _source, _callback));
+    m_pImpl->buckets.push_back(bucket);
+  } // subscribeTo
+
+  void DS485Client::unsubscribeFrom(int _functionID, int _source) {
+    for(std::vector<boost::shared_ptr<FrameBucketBase> >::iterator ipBucket = m_pImpl->buckets.begin(), e = m_pImpl->buckets.end();
+        ipBucket != e; ++ipBucket) {
+      if(((*ipBucket)->getFunctionID() == _functionID) &&
+          ((*ipBucket)->getSourceID() == _source)) {
+        m_pImpl->buckets.erase(ipBucket);
+        break;
+      }
+    }
+   } // unsubscribeFrom
+
+
 } // namespace dss

Modified: dss/trunk/core/ds485client.h
===================================================================
--- dss/trunk/core/ds485client.h	2009-09-08 14:20:45 UTC (rev 8744)
+++ dss/trunk/core/ds485client.h	2009-09-08 14:27:06 UTC (rev 8745)
@@ -25,25 +25,31 @@
 #include <vector>
 
 #include <boost/shared_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
 
 namespace dss {
 
   class DS485CommandFrame;
+  class DS485ClientImpl;
 
   class __attribute__ ((visibility("default"))) DS485Client {
   public:
+    DS485Client();
+
     typedef void (*FrameCallback_t)(boost::shared_ptr<DS485CommandFrame>);
 
     /** Sends a frame and discards incoming frames */
-    void SendFrameDiscardResult(DS485CommandFrame& _frame);
+    void sendFrameDiscardResult(DS485CommandFrame& _frame);
     /** Sends a frame and receives one response frame with the same functionID as \a _functionID within _timeoutMS */
-    boost::shared_ptr<DS485CommandFrame> SendFrameSingleResult(DS485CommandFrame& _frame, int _functionID, int _timeoutMS);
+    boost::shared_ptr<DS485CommandFrame> sendFrameSingleResult(DS485CommandFrame& _frame, int _functionID, int _timeoutMS);
     /** Sends a frame and receives all frames with the same functionID as \a _functionID within _timeoutMS */
-    std::vector<boost::shared_ptr<DS485CommandFrame> > SendFrameMultipleResults(DS485CommandFrame& _frame, int _functionID, int _timeoutMS);
+    std::vector<boost::shared_ptr<DS485CommandFrame> > sendFrameMultipleResults(DS485CommandFrame& _frame, int _functionID, int _timeoutMS);
     /** Subscribes to all frames with the functionID \a _functionID and source \a _source. If \a _source is -1 all Frames will captured. \a _callback is called once per received frame. */
-    void SubscribeTo(int _functionID, int _source, FrameCallback_t _callback);
+    void subscribeTo(int _functionID, int _source, FrameCallback_t _callback);
     /** Unsubscribes the first description that matches \a _functionID and \a _source. */
-    void UnsubscribeFrom(int _functionID, int _source);
+    void unsubscribeFrom(int _functionID, int _source);
+  private:
+    boost::scoped_ptr<DS485ClientImpl> m_pImpl;
   }; // DS485Client
 }
 

Modified: dss/trunk/unix/ds485proxy.cpp
===================================================================
--- dss/trunk/unix/ds485proxy.cpp	2009-09-08 14:20:45 UTC (rev 8744)
+++ dss/trunk/unix/ds485proxy.cpp	2009-09-08 14:27:06 UTC (rev 8745)
@@ -481,9 +481,9 @@
     collectFrame(pFrame);
   } // sendFrame
 
-  boost::shared_ptr<FrameBucket> DS485Proxy::sendFrameAndInstallBucket(DS485CommandFrame& _frame, const int _functionID) {
+  boost::shared_ptr<FrameBucketCollector> DS485Proxy::sendFrameAndInstallBucket(DS485CommandFrame& _frame, const int _functionID) {
     int sourceID = _frame.getHeader().isBroadcast() ? -1 :  _frame.getHeader().getDestination();
-    boost::shared_ptr<FrameBucket> result(new FrameBucket(this, _functionID, sourceID));
+    boost::shared_ptr<FrameBucketCollector> result(new FrameBucketCollector(this, _functionID, sourceID));
     sendFrame(_frame);
     return result;
   }
@@ -552,7 +552,7 @@
     log("Proxy: GetModulators");
 
     cmdFrame.getPayload().add<uint8_t>(FunctionGetTypeRequest);
-    boost::shared_ptr<FrameBucket> bucket = sendFrameAndInstallBucket(cmdFrame, FunctionGetTypeRequest);
+    boost::shared_ptr<FrameBucketCollector> bucket = sendFrameAndInstallBucket(cmdFrame, FunctionGetTypeRequest);
     bucket->waitForFrames(1000);
 
     map<int, bool> resultFrom;
@@ -684,7 +684,7 @@
     cmdFrame.getPayload().add<uint8_t>(FunctionDeviceGetGroups);
     cmdFrame.getPayload().add<uint16_t>(_deviceID);
 
-    boost::shared_ptr<FrameBucket> bucket = sendFrameAndInstallBucket(cmdFrame, FunctionDeviceGetGroups);
+    boost::shared_ptr<FrameBucketCollector> bucket = sendFrameAndInstallBucket(cmdFrame, FunctionDeviceGetGroups);
 
     bucket->waitForFrame(1000);
 
@@ -913,7 +913,7 @@
     cmdFrame.setCommand(CommandRequest);
     cmdFrame.getPayload().add<uint8_t>(FunctionModulatorGetEnergyBorder);
 
-    boost::shared_ptr<FrameBucket> bucket = sendFrameAndInstallBucket(cmdFrame, FunctionModulatorGetEnergyBorder);
+    boost::shared_ptr<FrameBucketCollector> bucket = sendFrameAndInstallBucket(cmdFrame, FunctionModulatorGetEnergyBorder);
 
     bucket->waitForFrame(1000);
 
@@ -939,7 +939,7 @@
     cmdFrame.getPayload().add<uint16_t>(_flags);
 
     if((_flags & DSLinkSendWriteOnly) == 0) {
-      boost::shared_ptr<FrameBucket> bucket = sendFrameAndInstallBucket(cmdFrame, FunctionDSLinkReceive);
+      boost::shared_ptr<FrameBucketCollector> bucket = sendFrameAndInstallBucket(cmdFrame, FunctionDSLinkReceive);
       bucket->waitForFrame(10000);
       boost::shared_ptr<ReceivedFrame> recFrame = bucket->popFrame();
       if(recFrame.get() == NULL) {
@@ -972,7 +972,7 @@
   } // removeUserGroup
 
   boost::shared_ptr<ReceivedFrame> DS485Proxy::receiveSingleFrame(DS485CommandFrame& _frame, uint8_t _functionID) {
-    boost::shared_ptr<FrameBucket> bucket = sendFrameAndInstallBucket(_frame, _functionID);
+    boost::shared_ptr<FrameBucketCollector> bucket = sendFrameAndInstallBucket(_frame, _functionID);
     bucket->waitForFrame(1000);
 
     if(bucket->isEmpty()) {
@@ -1273,7 +1273,7 @@
             bool bucketFound = false;
             // search for a bucket to put the frame in
             m_FrameBucketsGuard.lock();
-            foreach(FrameBucket* bucket, m_FrameBuckets) {
+            foreach(FrameBucketBase* bucket, m_FrameBuckets) {
               if(bucket->getFunctionID() == functionID) {
                 if((bucket->getSourceID() == -1) || (bucket->getSourceID() == frame->getHeader().getSource())) {
                   if(bucket->addFrame(rf)) {
@@ -1306,15 +1306,15 @@
     }
   } // collectFrame
 
-  void DS485Proxy::addFrameBucket(FrameBucket* _bucket) {
+  void DS485Proxy::addFrameBucket(FrameBucketBase* _bucket) {
     m_FrameBucketsGuard.lock();
     m_FrameBuckets.push_back(_bucket);
     m_FrameBucketsGuard.unlock();
   } // addFrameBucket
 
-  void DS485Proxy::removeFrameBucket(FrameBucket* _bucket) {
+  void DS485Proxy::removeFrameBucket(FrameBucketBase* _bucket) {
     m_FrameBucketsGuard.lock();
-    std::vector<FrameBucket*>::iterator pos = find(m_FrameBuckets.begin(), m_FrameBuckets.end(), _bucket);
+    std::vector<FrameBucketBase*>::iterator pos = find(m_FrameBuckets.begin(), m_FrameBuckets.end(), _bucket);
     if(pos != m_FrameBuckets.end()) {
       m_FrameBuckets.erase(pos);
     }
@@ -1330,24 +1330,31 @@
   } // ctor
 
 
-  //================================================== FrameBucket
+  //================================================== FrameBucketBase
 
-  FrameBucket::FrameBucket(DS485Proxy* _proxy, int _functionID, int _sourceID)
+  FrameBucketBase::FrameBucketBase(DS485Proxy* _proxy, int _functionID, int _sourceID)
   : m_pProxy(_proxy),
     m_FunctionID(_functionID),
-    m_SourceID(_sourceID),
-    m_SingleFrame(false)
+    m_SourceID(_sourceID)
   {
     Logger::getInstance()->log("Bucket: Registering for fid: " + intToString(_functionID) + " sid: " + intToString(_sourceID));
     m_pProxy->addFrameBucket(this);
   } // ctor
 
-  FrameBucket::~FrameBucket() {
+  FrameBucketBase::~FrameBucketBase() {
     Logger::getInstance()->log("Bucket: Removing for fid: " + intToString(m_FunctionID) + " sid: " + intToString(m_SourceID));
     m_pProxy->removeFrameBucket(this);
   } // dtor
 
-  bool FrameBucket::addFrame(boost::shared_ptr<ReceivedFrame> _frame) {
+
+  //================================================== FrameBucket
+
+  FrameBucketCollector::FrameBucketCollector(DS485Proxy* _proxy, int _functionID, int _sourceID)
+  : FrameBucketBase(_proxy, _functionID, _sourceID),
+    m_SingleFrame(false)
+  { } // ctor
+
+  bool FrameBucketCollector::addFrame(boost::shared_ptr<ReceivedFrame> _frame) {
     bool result = false;
     m_FramesMutex.lock();
     if(!m_SingleFrame || m_Frames.empty()) {
@@ -1362,7 +1369,7 @@
     return result;
   } // addFrame
 
-  boost::shared_ptr<ReceivedFrame> FrameBucket::popFrame() {
+  boost::shared_ptr<ReceivedFrame> FrameBucketCollector::popFrame() {
     boost::shared_ptr<ReceivedFrame> result;
 
     m_FramesMutex.lock();
@@ -1374,27 +1381,29 @@
     return result;
   } // popFrame
 
-  void FrameBucket::waitForFrames(int _timeoutMS) {
+  void FrameBucketCollector::waitForFrames(int _timeoutMS) {
     sleepMS(_timeoutMS);
   } // waitForFrames
 
-  void FrameBucket::waitForFrame(int _timeoutMS) {
+  bool FrameBucketCollector::waitForFrame(int _timeoutMS) {
     m_SingleFrame = true;
     if(m_Frames.empty()) {
-      Logger::getInstance()->log("*** Waiting");
+      Logger::getInstance()->log("FrameBucket::waitForFrame: Waiting for frame");
       if(m_PacketHere.waitFor(_timeoutMS)) {
-        Logger::getInstance()->log("*** Got Frame");
+        Logger::getInstance()->log("FrameBucket::waitForFrame: Got frame");
       } else {
-        Logger::getInstance()->log("*** No Frame");
+        Logger::getInstance()->log("FrameBucket::waitForFrame: No frame received");
+        return false;
       }
     }
+    return true;
   } // waitForFrame
 
-  int FrameBucket::getFrameCount() const {
+  int FrameBucketCollector::getFrameCount() const {
     return m_Frames.size();
   } // getFrameCount
 
-  bool FrameBucket::isEmpty() const {
+  bool FrameBucketCollector::isEmpty() const {
     return m_Frames.empty();
   } // isEmpty
 

Modified: dss/trunk/unix/ds485proxy.h
===================================================================
--- dss/trunk/unix/ds485proxy.h	2009-09-08 14:20:45 UTC (rev 8744)
+++ dss/trunk/unix/ds485proxy.h	2009-09-08 14:27:06 UTC (rev 8745)
@@ -75,26 +75,40 @@
     int getReceivedAt() const { return m_ReceivedAtToken; };
   }; // ReceivedFrame
 
-  /** A frame bucket holds response-frames for any given function id/source id pair.
-   *  If m_SourceID is -1 every source matches. */
-  class FrameBucket {
+
+  /** A frame bucket gets notified on every frame that matches any given
+   *  function-/source-id pair.
+   *  If \a m_SourceID is -1 every source matches. */
+  class FrameBucketBase {
+  public:
+    FrameBucketBase(DS485Proxy* _proxy, int _functionID, int _sourceID);
+    virtual ~FrameBucketBase();
+
+    int getFunctionID() const { return m_FunctionID; }
+    int getSourceID() const { return m_SourceID; }
+
+    virtual bool addFrame(boost::shared_ptr<ReceivedFrame> _frame) = 0;
   private:
-    deque<boost::shared_ptr<ReceivedFrame> > m_Frames;
     DS485Proxy* m_pProxy;
     int m_FunctionID;
     int m_SourceID;
+  }; // FrameBucketBase
+
+
+  /** FrameBucketCollector holds its received frames in a queue.
+    */
+  class FrameBucketCollector : public FrameBucketBase {
+  private:
+    deque<boost::shared_ptr<ReceivedFrame> > m_Frames;
     SyncEvent m_PacketHere;
     Mutex m_FramesMutex;
     bool m_SingleFrame;
   public:
-    FrameBucket(DS485Proxy* _proxy, int _functionID, int _sourceID);
-    ~FrameBucket();
+    FrameBucketCollector(DS485Proxy* _proxy, int _functionID, int _sourceID);
+    virtual ~FrameBucketCollector() { }
 
-    int getFunctionID() const { return m_FunctionID; }
-    int getSourceID() const { return m_SourceID; }
-
     /** Adds a ReceivedFrame to the frames queue */
-    bool addFrame(boost::shared_ptr<ReceivedFrame> _frame);
+    virtual bool addFrame(boost::shared_ptr<ReceivedFrame> _frame);
     /** Returns the least recently received item int the queue.
      * The pointer will contain NULL if isEmpty() returns true. */
     boost::shared_ptr<ReceivedFrame> popFrame();
@@ -103,12 +117,13 @@
     void waitForFrames(int _timeoutMS);
     /** Waits for a frame to arrive in \a _timeoutMS.
      * If a frame arrives earlier, the function returns */
-    void waitForFrame(int _timeoutMS);
+    bool waitForFrame(int _timeoutMS);
 
     int getFrameCount() const;
     bool isEmpty() const;
-  }; // FrameBucket
+  }; // FrameBucketCollector
 
+
   typedef std::vector<boost::shared_ptr<DS485CommandFrame> > CommandFrameSharedPtrVector;
 
   class DS485Proxy : protected Thread,
@@ -124,7 +139,7 @@
     uint8_t receiveSingleResult(DS485CommandFrame& _frame, const uint8_t _functionID);
     uint16_t receiveSingleResult16(DS485CommandFrame& _frame, const uint8_t _functionID);
 
-    std::vector<FrameBucket*> m_FrameBuckets;
+    std::vector<FrameBucketBase*> m_FrameBuckets;
 
     void signalEvent();
 
@@ -147,7 +162,8 @@
     virtual bool isReady();
 
     virtual void sendFrame(DS485CommandFrame& _frame);
-    boost::shared_ptr<FrameBucket> sendFrameAndInstallBucket(DS485CommandFrame& _frame, const int _functionID);
+    boost::shared_ptr<FrameBucketCollector> sendFrameAndInstallBucket(DS485CommandFrame& _frame, const int _functionID);
+    void installBucket(boost::shared_ptr<FrameBucketBase> _bucket);
 
     //------------------------------------------------ Handling
     virtual void initialize();
@@ -155,8 +171,8 @@
 
     virtual void collectFrame(boost::shared_ptr<DS485CommandFrame> _frame);
 
-    void addFrameBucket(FrameBucket* _bucket);
-    void removeFrameBucket(FrameBucket* _bucket);
+    void addFrameBucket(FrameBucketBase* _bucket);
+    void removeFrameBucket(FrameBucketBase* _bucket);
 
     //------------------------------------------------ Specialized Commands (system)
     virtual std::vector<ModulatorSpec_t> getModulators();
@@ -206,7 +222,8 @@
     void setValueDevice(const Device& _device, const uint16_t _value, const uint16_t _parameterID, const int _size);
     //------------------------------------------------ Helpers
     DS485Controller& getController() { return m_DS485Controller; }
-  };
+  }; // DS485Proxy
+
 }
 
 #endif



More information about the dss-commits mailing list