Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

#pragma once

#include <deque>
#include <functional>
#include <future>
#include <mutex>
#include <string>
#include <vector>
#include <opendaq/utils/thread_ex.h>
#include <unordered_set>

Expand Down Expand Up @@ -88,6 +88,7 @@ class OpcUaServer final : public daq::utils::ThreadEx
void setClientConnectedHandler(const OnClientConnectedCallback& callback);
void setClientInfoHandler(const OnSetClientInfoCallback& callback);
void setClientDisconnectedHandler(const OnClientDisconnectedCallback& callback);
void scheduleClientInfoChainTask(std::function<void()> task);
void setAllowBrowsingNodeCallback(const OnAllowBrowsingNodeCallback& callback);
void setGetUserRightsMaskCallback(const OnGetUserRightsMaskCallback& callback);
void setGetUserAccessLevelCallback(const OnGetUserAccessLevelCallback& callback);
Expand Down Expand Up @@ -184,6 +185,8 @@ class OpcUaServer final : public daq::utils::ThreadEx
const sockaddr_storage& addr,
socklen_t addrLen);
void waitForPendingClientInfoFutures();
void processClientInfo(ClientConnectionInfo& info, const sockaddr_storage& addr, socklen_t addrLen);
void continueClientInfoChain();

static UA_StatusCode activateSession(UA_Server* server,
UA_AccessControl* ac,
Expand Down Expand Up @@ -220,8 +223,9 @@ class OpcUaServer final : public daq::utils::ThreadEx
OnClientConnectedCallback clientConnectedHandler;
OnSetClientInfoCallback clientInfoHandler;
OnClientDisconnectedCallback clientDisconnectedHandler;
std::vector<std::future<void>> pendingClientInfoFutures;
std::mutex pendingClientInfoFuturesMutex;
std::future<void> clientInfoChain;
std::deque<std::function<void()>> clientInfoChainWaiting;
std::mutex clientInfoChainMutex;
};

END_NAMESPACE_OPENDAQ_OPCUA
86 changes: 56 additions & 30 deletions shared/libraries/opcua/opcuaserver/src/opcuaserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <open62541/plugin/log_stdout.h>
#include <algorithm>
#include <cassert>
#include <chrono>
#include <future>
#include <coreobjects/authentication_provider_factory.h>
#include <coreobjects/exceptions.h>
Expand Down Expand Up @@ -133,45 +132,72 @@ void OpcUaServer::stop()

void OpcUaServer::waitForPendingClientInfoFutures()
{
std::lock_guard<std::mutex> lock(pendingClientInfoFuturesMutex);
for (auto& future : pendingClientInfoFutures)
std::future<void> chain;
{
if (future.valid())
future.wait();
std::lock_guard<std::mutex> lock(clientInfoChainMutex);
clientInfoChainWaiting.clear();
chain = std::move(clientInfoChain);
}
pendingClientInfoFutures.clear();
if (chain.valid())
chain.wait();
}

void OpcUaServer::scheduleClientInfoAsync(ClientConnectionInfo info,
const sockaddr_storage& addr,
socklen_t addrLen)
void OpcUaServer::processClientInfo(ClientConnectionInfo& info,
const sockaddr_storage& addr,
socklen_t addrLen)
{
const OnSetClientInfoCallback handler = clientInfoHandler;
if (!handler)
return;

std::lock_guard<std::mutex> lock(pendingClientInfoFuturesMutex);

pendingClientInfoFutures.erase(
std::remove_if(pendingClientInfoFutures.begin(),
pendingClientInfoFutures.end(),
[](std::future<void>& f) {
return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}),
pendingClientInfoFutures.end());

pendingClientInfoFutures.push_back(
std::async(std::launch::async, [handler, info = std::move(info), addr, addrLen]() mutable
const auto* sockAddr = reinterpret_cast<const struct sockaddr*>(&addr);
char ipBuf[NI_MAXHOST] = {};
char hostBuf[NI_MAXHOST] = {};
if (getnameinfo(sockAddr, addrLen, ipBuf, sizeof(ipBuf), nullptr, 0, NI_NUMERICHOST) == 0)
info.address = ipBuf;
if (getnameinfo(sockAddr, addrLen, hostBuf, sizeof(hostBuf), nullptr, 0, 0) == 0)
info.hostname = hostBuf;
handler(info);
}

void OpcUaServer::continueClientInfoChain()
{
while (true)
{
std::function<void()> task;
{
const auto* sockAddr = reinterpret_cast<const struct sockaddr*>(&addr);
char ipBuf[NI_MAXHOST] = {};
char hostBuf[NI_MAXHOST] = {};
if (getnameinfo(sockAddr, addrLen, ipBuf, sizeof(ipBuf), nullptr, 0, NI_NUMERICHOST) == 0)
info.address = ipBuf;
if (getnameinfo(sockAddr, addrLen, hostBuf, sizeof(hostBuf), nullptr, 0, 0) == 0)
info.hostname = hostBuf;
handler(info);
}));
std::lock_guard<std::mutex> lock(clientInfoChainMutex);
if (clientInfoChainWaiting.empty())
return;
task = std::move(clientInfoChainWaiting.front());
clientInfoChainWaiting.pop_front();
}
task();
}
}

void OpcUaServer::scheduleClientInfoChainTask(std::function<void()> task)
{
std::lock_guard<std::mutex> lock(clientInfoChainMutex);
const bool chainRunning = !clientInfoChainWaiting.empty() ||
(clientInfoChain.valid() &&
clientInfoChain.wait_for(std::chrono::seconds(0)) != std::future_status::ready);
clientInfoChainWaiting.push_back(std::move(task));
if (!chainRunning)
clientInfoChain = std::async(std::launch::async, [this]() { continueClientInfoChain(); });
}

void OpcUaServer::scheduleClientInfoAsync(ClientConnectionInfo info,
const sockaddr_storage& addr,
socklen_t addrLen)
{
if (!clientInfoHandler)
return;

scheduleClientInfoChainTask([this, info = std::move(info), addr, addrLen]() mutable
{
processClientInfo(info, addr, addrLen);
});
}

void OpcUaServer::prepare()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,27 @@ class TmsServerObject : public std::enable_shared_from_this<TmsServerObject>
virtual void createNonhierarchicalReferences();
virtual void onCoreEvent(const CoreEventArgsPtr& eventArgs);

static UA_Boolean allowBrowsingNodeCallback(UA_Server* server,
static UA_Boolean AllowBrowsingNodeCallback(UA_Server* server,
UA_AccessControl* ac,
const UA_NodeId* sessionId,
void* sessionContext,
const UA_NodeId* nodeId,
void* nodeContext);

static UA_UInt32 getUserRightsMaskCallback(UA_Server* server,
static UA_UInt32 GetUserRightsMaskCallback(UA_Server* server,
UA_AccessControl* ac,
const UA_NodeId* sessionId,
void* sessionContext,
const UA_NodeId* nodeId,
void* nodeContext);

static UA_Byte getUserAccessLevelCallback(UA_Server* server,
static UA_Byte GetUserAccessLevelCallback(UA_Server* server,
UA_AccessControl* ac,
const UA_NodeId* sessionId,
void* sessionContext,
const UA_NodeId* nodeId,
void* nodeContext);
static UA_Boolean getUserExecutableCallback(UA_Server* server,
static UA_Boolean GetUserExecutableCallback(UA_Server* server,
UA_AccessControl* ac,
const UA_NodeId* sessionId,
void* sessionContext,
Expand All @@ -110,7 +110,7 @@ class TmsServerObject : public std::enable_shared_from_this<TmsServerObject>
return ptr;
}

static bool checkPermission(const Permission permission, const UA_NodeId* const nodeId, void* const sessionContext, void* const nodeContext);
static bool CheckPermission(const Permission permission, const UA_NodeId* const nodeId, void* const sessionContext, void* const nodeContext);
virtual bool checkPermission(const Permission permission, const UA_NodeId* const nodeId, const OpcUaSession* const sessionContext);
std::string readBrowseName(const opcua::OpcUaNodeId& nodeId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class TmsServer
void start();
void stop();

private:
void addConnectedClientInfo(const OpcUaServer::ClientConnectionInfo& connInfo);
void removeConnectedClientInfo(const std::string& clientId);

protected:
DevicePtr device;
ContextPtr context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,43 +99,43 @@ void TmsServerObject::onCoreEvent(const CoreEventArgsPtr& /*eventArgs*/)
{
}

UA_Boolean TmsServerObject::allowBrowsingNodeCallback(UA_Server* server,
UA_Boolean TmsServerObject::AllowBrowsingNodeCallback(UA_Server* server,
UA_AccessControl* ac,
const UA_NodeId* sessionId,
void* sessionContext,
const UA_NodeId* nodeId,
void* nodeContext)
{
return checkPermission(Permission::Read, nodeId, sessionContext, nodeContext);
return CheckPermission(Permission::Read, nodeId, sessionContext, nodeContext);
}

UA_UInt32 TmsServerObject::getUserRightsMaskCallback(UA_Server *server, UA_AccessControl *ac,
UA_UInt32 TmsServerObject::GetUserRightsMaskCallback(UA_Server *server, UA_AccessControl *ac,
const UA_NodeId *sessionId, void *sessionContext,
const UA_NodeId *nodeId, void *nodeContext)
{
return checkPermission(Permission::Write, nodeId, sessionContext, nodeContext) ? 0xFFFFFFFF : 0;
return CheckPermission(Permission::Write, nodeId, sessionContext, nodeContext) ? 0xFFFFFFFF : 0;
}

UA_Byte TmsServerObject::getUserAccessLevelCallback(
UA_Byte TmsServerObject::GetUserAccessLevelCallback(
UA_Server* server, UA_AccessControl* ac, const UA_NodeId* sessionId, void* sessionContext, const UA_NodeId* nodeId, void* nodeContext)
{
constexpr UA_Byte readMask = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_HISTORYREAD;
constexpr UA_Byte writeMask = UA_ACCESSLEVELMASK_WRITE | UA_ACCESSLEVELMASK_HISTORYWRITE | UA_ACCESSLEVELMASK_SEMANTICCHANGE |
UA_ACCESSLEVELMASK_STATUSWRITE | UA_ACCESSLEVELMASK_TIMESTAMPWRITE;
UA_Byte mask = 0xFF;
mask = checkPermission(Permission::Read, nodeId, sessionContext, nodeContext) ? (mask | readMask) : (mask & ~readMask);
mask = checkPermission(Permission::Write, nodeId, sessionContext, nodeContext) ? (mask | writeMask) : (mask & ~writeMask);
mask = CheckPermission(Permission::Read, nodeId, sessionContext, nodeContext) ? (mask | readMask) : (mask & ~readMask);
mask = CheckPermission(Permission::Write, nodeId, sessionContext, nodeContext) ? (mask | writeMask) : (mask & ~writeMask);
return mask;
}

UA_Boolean TmsServerObject::getUserExecutableCallback(UA_Server* server,
UA_Boolean TmsServerObject::GetUserExecutableCallback(UA_Server* server,
UA_AccessControl* ac,
const UA_NodeId* sessionId,
void* sessionContext,
const UA_NodeId* methodId,
void* methodContext)
{
return checkPermission(Permission::Execute, methodId, sessionContext, methodContext);
return CheckPermission(Permission::Execute, methodId, sessionContext, methodContext);
}

NodeEventManagerPtr TmsServerObject::addEvent(const StringPtr& nodeName)
Expand Down Expand Up @@ -206,14 +206,14 @@ bool TmsServerObject::hasChildNode(const std::string& nodeName) const
return references.count(nodeName) != 0;
}

bool TmsServerObject::checkPermission(const Permission permission,
bool TmsServerObject::CheckPermission(const Permission permission,
const UA_NodeId* const nodeId,
void* const sessionContext,
void* const nodeContext)
{
if (nodeContext == nullptr || sessionContext == nullptr)
return true;
return static_cast<TmsServerObject*>(nodeContext)->checkPermission(permission, nodeId, static_cast<OpcUaSession*>(sessionContext));;
return static_cast<TmsServerObject*>(nodeContext)->checkPermission(permission, nodeId, static_cast<OpcUaSession*>(sessionContext));
}

bool TmsServerObject::checkPermission(const Permission permission, const UA_NodeId* const nodeId, const OpcUaSession* const sessionContext)
Expand Down
99 changes: 53 additions & 46 deletions shared/libraries/opcuatms/opcuatms_server/src/tms_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,57 +61,17 @@ void TmsServer::start()
registeredClientIds.insert({clientId, 0});
}
);
server->setClientInfoHandler(
[this](const OpcUaServer::ClientConnectionInfo& connInfo)
{
if (!running.load())
return;

std::lock_guard<std::mutex> lock(connectedClientsMutex);
const auto it = registeredClientIds.find(connInfo.clientId);
if (it == registeredClientIds.end())
return;

const auto loggerComponent = context.getLogger().getOrAddComponent("TmsServer");
LOG_I("Client address resolved, ID: {}, address: {}, hostname: {}",
connInfo.clientId,
connInfo.address,
connInfo.hostname);

SizeT clientNumber = 0;
if (device.assigned() && !device.isRemoved())
{
device.getInfo().asPtr<IDeviceInfoInternal>().addConnectedClient(
&clientNumber,
ConnectedClientInfo(connInfo.address,
ProtocolType::Configuration,
"OpenDAQOPCUA",
"Control",
connInfo.hostname));
}
it->second = clientNumber;
}
);
server->setClientInfoHandler([this](const OpcUaServer::ClientConnectionInfo& connInfo) { addConnectedClientInfo(connInfo); });
server->setClientDisconnectedHandler(
[this](const std::string& clientId)
{
std::lock_guard<std::mutex> lock(connectedClientsMutex);
if (auto it = registeredClientIds.find(clientId); it != registeredClientIds.end())
{
const auto loggerComponent = context.getLogger().getOrAddComponent("TmsServer");
LOG_I("Client disconnected, ID: {}", clientId);
if (device.assigned() && !device.isRemoved() && it->second != 0)
{
device.getInfo().asPtr<IDeviceInfoInternal>(true).removeConnectedClient(it->second);
}
registeredClientIds.erase(it);
}
server->scheduleClientInfoChainTask([this, clientId]() { removeConnectedClientInfo(clientId); });
}
);
server->setAllowBrowsingNodeCallback(TmsServerObject::allowBrowsingNodeCallback);
server->setGetUserAccessLevelCallback(TmsServerObject::getUserAccessLevelCallback);
server->setGetUserRightsMaskCallback(TmsServerObject::getUserRightsMaskCallback);
server->setGetUserExecutableCallback(TmsServerObject::getUserExecutableCallback);
server->setAllowBrowsingNodeCallback(TmsServerObject::AllowBrowsingNodeCallback);
server->setGetUserAccessLevelCallback(TmsServerObject::GetUserAccessLevelCallback);
server->setGetUserRightsMaskCallback(TmsServerObject::GetUserRightsMaskCallback);
server->setGetUserExecutableCallback(TmsServerObject::GetUserExecutableCallback);
server->prepare();

tmsContext = std::make_shared<TmsServerContext>(context, device);
Expand All @@ -131,6 +91,53 @@ void TmsServer::start()
server->start();
}

void TmsServer::addConnectedClientInfo(const OpcUaServer::ClientConnectionInfo& connInfo)
{
if (!running.load())
return;

std::lock_guard<std::mutex> lock(connectedClientsMutex);
const auto it = registeredClientIds.find(connInfo.clientId);
if (it == registeredClientIds.end())
return;

const auto loggerComponent = context.getLogger().getOrAddComponent("TmsServer");
LOG_I("Client address resolved, ID: {}, address: {}, hostname: {}",
connInfo.clientId,
connInfo.address,
connInfo.hostname);

SizeT clientNumber = 0;
if (device.assigned() && !device.isRemoved())
{
device.getInfo().asPtr<IDeviceInfoInternal>().addConnectedClient(
&clientNumber,
ConnectedClientInfo(connInfo.address,
ProtocolType::Configuration,
"OpenDAQOPCUA",
"Control",
connInfo.hostname));
}
it->second = clientNumber;
}

void TmsServer::removeConnectedClientInfo(const std::string& clientId)
{
if (!running.load())
return;

std::lock_guard<std::mutex> lock(connectedClientsMutex);
const auto it = registeredClientIds.find(clientId);
if (it == registeredClientIds.end())
return;

const auto loggerComponent = context.getLogger().getOrAddComponent("TmsServer");
LOG_I("Client disconnected, ID: {}", clientId);
if (device.assigned() && !device.isRemoved() && it->second != 0)
device.getInfo().asPtr<IDeviceInfoInternal>(true).removeConnectedClient(it->second);
registeredClientIds.erase(it);
}

void TmsServer::stop()
{
running = false;
Expand Down
Loading
Loading