Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion behave_framework/src/minifi_behave/steps/checking_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def verify_minifi_logs_match_regex(context, regex, duration):
@step('no errors were generated on the http-proxy regarding "{url}"')
def verify_no_errors_on_http_proxy(context: MinifiTestContext, url: str):
http_proxy_container = next(container for container in context.containers.values() if isinstance(container, HttpProxy))
assert http_proxy_container.check_http_proxy_access(url) or http_proxy_container.log_app_output()
assert http_proxy_container.check_http_proxy_access(url) or log_due_to_failure(context)


@then('in the "{container}" container no files are placed in the "{directory}" directory in {duration} of running time')
Expand Down
22 changes: 21 additions & 1 deletion core-framework/common/include/core/PropertyDefinitionBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,21 @@ namespace org::apache::nifi::minifi::core {
namespace detail {
template<typename... Types>
inline constexpr auto TypeNames = std::array<std::string_view, sizeof...(Types)>{core::className<Types>()...};
}

template <size_t N>
struct StringLiteral {
char value[N];
constexpr StringLiteral(const char (&str)[N]) { // NOLINT(runtime/explicit)
for (size_t i = 0; i < N; ++i) {
value[i] = str[i];
}
}
};

// A variable template that creates permanent static memory for the span to point to
template <StringLiteral str>
inline constexpr auto StaticAllowedType = std::array<std::string_view, 1>{std::string_view{str.value, sizeof(str.value) - 1}};
} // namespace detail
Comment on lines +30 to +44
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


template<size_t NumAllowedValues = 0>
struct PropertyDefinitionBuilder {
Expand Down Expand Up @@ -81,6 +95,12 @@ struct PropertyDefinitionBuilder {
return *this;
}

template <detail::StringLiteral TypeName>
constexpr PropertyDefinitionBuilder<NumAllowedValues> withAllowedType() {
property.allowed_types = detail::StaticAllowedType<TypeName>;
return *this;
}

constexpr PropertyDefinitionBuilder<NumAllowedValues> withValidator(const PropertyValidator& property_validator) {
#if defined(__clang__) && (__clang_major__ <= 18)
property.validator = &property_validator;
Expand Down
2 changes: 1 addition & 1 deletion extension-framework/cpp-extension-lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ target_include_directories(minifi-cpp-extension-lib PUBLIC include)
target_link_libraries(minifi-cpp-extension-lib PUBLIC minifi-core-framework-common minifi-c-api)

add_subdirectory(libtest)

add_subdirectory(mocklib)
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
#pragma once

#include <exception>
#include <memory>
#include <stdexcept>

#include "minifi-c.h"

namespace org::apache::nifi::minifi::api::core {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
Expand All @@ -17,26 +17,52 @@

#pragma once

#include <string>
#include <expected>
#include <string>

#include "api/core/FlowFile.h"
#include "api/utils/Proxy.h"
#include "api/utils/Ssl.h"
#include "minifi-c.h"
#include "minifi-cpp/core/PropertyDefinition.h"
#include "api/core/FlowFile.h"

namespace org::apache::nifi::minifi::api::core {

class ProcessContext {
public:
explicit ProcessContext(MinifiProcessContext* impl): impl_(impl) {}
virtual ~ProcessContext() noexcept = default;

ProcessContext() = default;
ProcessContext(const ProcessContext&) = delete;
ProcessContext(ProcessContext&&) = delete;
ProcessContext& operator=(const ProcessContext&) = delete;
ProcessContext& operator=(ProcessContext&&) = delete;

[[nodiscard]] virtual std::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& prop,
const FlowFile* ff) const = 0;
[[nodiscard]] virtual std::expected<MinifiControllerService*, std::error_code> getControllerService(const minifi::core::PropertyReference& prop) const = 0;
[[nodiscard]] virtual bool hasNonEmptyProperty(std::string_view name) const = 0;
[[nodiscard]] virtual std::map<std::string, std::string> getDynamicProperties(const FlowFile* flow_file) const = 0;

[[nodiscard]] virtual std::expected<std::optional<utils::net::SslData>, std::error_code> getSslData(const minifi::core::PropertyReference& prop) const = 0;
[[nodiscard]] virtual std::expected<std::optional<utils::ProxyData>, std::error_code> getProxyData(const minifi::core::PropertyReference& prop) const = 0;
};

class CffiProcessContext : public ProcessContext {
public:
explicit CffiProcessContext(MinifiProcessContext* impl) : impl_(impl) {}

[[nodiscard]] std::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference,
const FlowFile* flow_file) const override;
[[nodiscard]] std::expected<MinifiControllerService*, std::error_code> getControllerService(const minifi::core::PropertyReference& prop) const override;
[[nodiscard]] std::map<std::string, std::string> getDynamicProperties(const FlowFile* flow_file) const override;
[[nodiscard]] bool hasNonEmptyProperty(std::string_view name) const override;

std::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile* flow_file = nullptr) const;
std::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference, const FlowFile* flow_file = nullptr) const {
return getProperty(property_reference.name, flow_file);
}
[[nodiscard]] std::expected<MinifiControllerService*, std::error_code> getControllerService(std::string_view controller_service_name, std::string_view controller_service_class) const;
[[nodiscard]] std::expected<std::optional<utils::net::SslData>, std::error_code> getSslData(const minifi::core::PropertyReference& prop) const override;
[[nodiscard]] std::expected<std::optional<utils::ProxyData>, std::error_code> getProxyData(const minifi::core::PropertyReference& prop) const override;

[[nodiscard]] bool hasNonEmptyProperty(std::string_view name) const;
private:
[[nodiscard]] std::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile* flow_file) const;

private:
MinifiProcessContext* impl_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,54 @@ namespace org::apache::nifi::minifi::api::core {

class ProcessSession {
public:
explicit ProcessSession(MinifiProcessSession* impl): impl_(impl) {}
virtual ~ProcessSession() = default;

FlowFile create(const FlowFile* parent = nullptr);
FlowFile get();
void transfer(FlowFile ff, const minifi::core::Relationship& relationship);
void remove(FlowFile ff);
void write(FlowFile& flow, const io::OutputStreamCallback& callback);
void read(FlowFile& flow, const io::InputStreamCallback& callback);
ProcessSession() = default;

void setAttribute(FlowFile& ff, std::string_view key, std::string value);
void removeAttribute(FlowFile& ff, std::string_view key);
std::optional<std::string> getAttribute(FlowFile& ff, std::string_view key);
std::map<std::string, std::string> getAttributes(FlowFile& ff);
ProcessSession(const ProcessSession&) = delete;
ProcessSession(ProcessSession&&) = delete;
ProcessSession& operator=(const ProcessSession&) = delete;
ProcessSession& operator=(ProcessSession&&) = delete;

virtual FlowFile create(const FlowFile* parent = nullptr) = 0;
virtual FlowFile get() = 0;

virtual void penalize(FlowFile& ff) = 0;
virtual void transfer(FlowFile ff, const minifi::core::Relationship& relationship) = 0;
virtual void remove(FlowFile ff) = 0;
virtual void write(FlowFile& flow, const io::OutputStreamCallback& callback) = 0;
virtual void read(FlowFile& flow, const io::InputStreamCallback& callback) = 0;

virtual void setAttribute(FlowFile& ff, std::string_view key, std::string value) = 0;
virtual void removeAttribute(FlowFile& ff, std::string_view key) = 0;
[[nodiscard]] virtual std::optional<std::string> getAttribute(FlowFile& ff, std::string_view key) = 0;
[[nodiscard]] virtual std::map<std::string, std::string> getAttributes(const FlowFile& ff) const = 0;
[[nodiscard]] virtual std::string getFlowFileId(const FlowFile& ff) const = 0;
[[nodiscard]] virtual uint64_t getFlowFileSize(const FlowFile& ff) const = 0;

void writeBuffer(FlowFile& flow_file, std::span<const char> buffer);
void writeBuffer(FlowFile& flow_file, std::span<const std::byte> buffer);
std::vector<std::byte> readBuffer(FlowFile& flow_file);
[[nodiscard]] std::vector<std::byte> readBuffer(FlowFile& flow_file);
};

class CffiProcessSession : public ProcessSession {
public:
explicit CffiProcessSession(MinifiProcessSession* impl): impl_(impl) {}

FlowFile create(const FlowFile* parent = nullptr) override;
FlowFile get() override;
void penalize(FlowFile& ff) override;
void transfer(FlowFile ff, const minifi::core::Relationship& relationship) override;
void remove(FlowFile ff) override;
void write(FlowFile& flow, const io::OutputStreamCallback& callback) override;
void read(FlowFile& flow, const io::InputStreamCallback& callback) override;

void setAttribute(FlowFile& ff, std::string_view key, std::string value) override;
void removeAttribute(FlowFile& ff, std::string_view key) override;
[[nodiscard]] std::optional<std::string> getAttribute(FlowFile& ff, std::string_view key) override;
[[nodiscard]] std::map<std::string, std::string> getAttributes(const FlowFile& ff) const override;
[[nodiscard]] std::string getFlowFileId(const FlowFile& ff) const override;
[[nodiscard]] uint64_t getFlowFileSize(const FlowFile& ff) const override;

private:
MinifiProcessSession* impl_;
Expand Down
50 changes: 32 additions & 18 deletions extension-framework/cpp-extension-lib/include/api/core/Resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,38 @@ void useProcessorClassDefinition(Fn&& fn) {
std::vector<MinifiDynamicPropertyDefinition> dynamic_properties;
for (auto& prop : Class::DynamicProperties) {
dynamic_properties.push_back(MinifiDynamicPropertyDefinition {
.name = utils::toStringView(prop.name),
.value = utils::toStringView(prop.value),
.description = utils::toStringView(prop.description),
.name = utils::minifiStringView(prop.name),
.value = utils::minifiStringView(prop.value),
.description = utils::minifiStringView(prop.description),
.supports_expression_language = prop.supports_expression_language
});
}
std::vector<MinifiRelationshipDefinition> relationships;
for (auto& rel : Class::Relationships) {
relationships.push_back(MinifiRelationshipDefinition{
.name = utils::toStringView(rel.name),
.description = utils::toStringView(rel.description)
.name = utils::minifiStringView(rel.name),
.description = utils::minifiStringView(rel.description)
});
}
std::vector<std::vector<MinifiStringView>> attribute_relationships_cache;
std::vector<MinifiOutputAttributeDefinition> output_attributes;
for (auto& attr : Class::OutputAttributes) {
std::vector<MinifiStringView> rel_cache;
for (auto& rel : attr.relationships) {
rel_cache.push_back(utils::toStringView(rel.name));
rel_cache.push_back(utils::minifiStringView(rel.name));
}
output_attributes.push_back(MinifiOutputAttributeDefinition {
.name = utils::toStringView(attr.name),
.name = utils::minifiStringView(attr.name),
.relationships_count = gsl::narrow<uint32_t>(attr.relationships.size()),
.relationships_ptr = rel_cache.data(),
.description = utils::toStringView(attr.description)
.description = utils::minifiStringView(attr.description)
});
attribute_relationships_cache.push_back(std::move(rel_cache));
}

MinifiProcessorClassDefinition definition{
.full_name = utils::toStringView(full_name),
.description = utils::toStringView(Class::Description),
.full_name = utils::minifiStringView(full_name),
.description = utils::minifiStringView(Class::Description),
.class_properties_count = gsl::narrow<uint32_t>(class_properties.size()),
.class_properties_ptr = class_properties.data(),
.dynamic_properties_count = gsl::narrow<uint32_t>(dynamic_properties.size()),
Expand All @@ -100,7 +100,7 @@ void useProcessorClassDefinition(Fn&& fn) {
return new Class{minifi::core::ProcessorMetadata{
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
.name = std::string{metadata.name.data, metadata.name.length},
.logger = std::make_shared<logging::Logger>(metadata.logger)}};
.logger = std::make_shared<logging::CffiLogger>(metadata.logger)}};
} catch (...) { return nullptr; }
},
.destroy = [] (MINIFI_OWNED void* self) -> void {
Expand All @@ -110,16 +110,16 @@ void useProcessorClassDefinition(Fn&& fn) {
return static_cast<Class*>(self)->getTriggerWhenEmpty();
},
.onTrigger = [] (void* self, MinifiProcessContext* context, MinifiProcessSession* session) -> MinifiStatus {
ProcessContext context_wrapper(context);
ProcessSession session_wrapper(session);
CffiProcessContext context_wrapper(context);
CffiProcessSession session_wrapper(session);
try {
return static_cast<Class*>(self)->onTrigger(context_wrapper, session_wrapper);
} catch (...) {
return MINIFI_STATUS_UNKNOWN_ERROR;
}
},
.onSchedule = [] (void* self, MinifiProcessContext* context) -> MinifiStatus {
ProcessContext context_wrapper(context);
CffiProcessContext context_wrapper(context);
try {
return static_cast<Class*>(self)->onSchedule(context_wrapper);
} catch (...) {
Expand All @@ -136,7 +136,7 @@ void useProcessorClassDefinition(Fn&& fn) {
std::vector<MinifiStringView> names;
std::vector<double> values;
for (auto& [name, val] : metrics) {
names.push_back(utils::toStringView(name));
names.push_back(utils::minifiStringView(name));
values.push_back(val);
}
return MinifiPublishedMetricsCreate(gsl::narrow<uint32_t>(metrics.size()), names.data(), values.data());
Expand All @@ -155,8 +155,8 @@ void useControllerServiceClassDefinition(Fn&& fn) {

std::vector<MinifiPropertyDefinition> class_properties = utils::toProperties(Class::Properties, string_vector_cache);

MinifiControllerServiceClassDefinition definition{.full_name = utils::toStringView(full_name),
.description = utils::toStringView(Class::Description),
MinifiControllerServiceClassDefinition definition{.full_name = utils::minifiStringView(full_name),
.description = utils::minifiStringView(Class::Description),
.class_properties_count = gsl::narrow<uint32_t>(class_properties.size()),
.class_properties_ptr = class_properties.data(),

Expand All @@ -166,7 +166,7 @@ void useControllerServiceClassDefinition(Fn&& fn) {
return new Class{minifi::core::ControllerServiceMetadata{
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
.name = std::string{metadata.name.data, metadata.name.length},
.logger = std::make_shared<logging::Logger>(metadata.logger)}};
.logger = std::make_shared<logging::CffiLogger>(metadata.logger)}};
} catch (...) { return nullptr; }
},
.destroy = [](MINIFI_OWNED void* self) -> void { delete static_cast<Class*>(self); },
Expand All @@ -186,4 +186,18 @@ void useControllerServiceClassDefinition(Fn&& fn) {
fn(definition);
}

template <typename... Processors>
void registerProcessors(MinifiExtension* extension) {
(core::useProcessorClassDefinition<Processors>([&](const MinifiProcessorClassDefinition& definition) {
MinifiRegisterProcessor(extension, &definition);
}), ...);
}

template <typename... ControllerServices>
void registerControllerServices(MinifiExtension* extension) {
(core::useControllerServiceClassDefinition<ControllerServices>([&](const MinifiControllerServiceClassDefinition& definition) {
MinifiRegisterControllerService(extension, &definition);
}), ...);
}

} // namespace org::apache::nifi::minifi::api::core
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
#include <iostream>
#include <string>

#include "fmt/chrono.h"
#include "minifi-c.h"
#include "minifi-cpp/core/logging/Logger.h"

namespace org::apache::nifi::minifi::api::core::logging {

class Logger : public minifi::core::logging::Logger {
class CffiLogger : public minifi::core::logging::Logger {
public:
explicit Logger(MinifiLogger* impl): impl_(impl) {}
explicit CffiLogger(MinifiLogger* impl): impl_(impl) {}

void set_max_log_size(int size) override;
void log_string(minifi::core::logging::LOG_LEVEL level, std::string str) override;
Expand Down
Loading
Loading