Skip to content

Commit c0b9f93

Browse files
authored
feat: add multi output plugin, add nats metrics (#33)
* feat: add multi output plugin with configuration schema * fix: add ID for child plugins * feat: add dtap_output_nats_publish_duration_seconds metrics * doc: add comments
1 parent 5bfa593 commit c0b9f93

File tree

9 files changed

+316
-1
lines changed

9 files changed

+316
-1
lines changed

doc/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
- [stdout](https://pkg.go.dev/github.com/mimuret/dtap/v2/pkg/plugin/output/stdout#Stdout)
3535
- [tcp](https://pkg.go.dev/github.com/mimuret/dtap/v2/pkg/plugin/output/tcp#TCP)
3636
- [unix](https://pkg.go.dev/github.com/mimuret/dtap/v2/pkg/plugin/output/unix#Unix)
37+
- [multi](https://pkg.go.dev/github.com/mimuret/dtap/v2/pkg/plugin/output/multi#Unix)
3738

3839
### debug and experimental plugins
3940

pkg/core/plugins/plugins.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
_ "github.com/mimuret/dtap/v2/pkg/plugin/output/fluent"
3939
_ "github.com/mimuret/dtap/v2/pkg/plugin/output/kafka"
4040
_ "github.com/mimuret/dtap/v2/pkg/plugin/output/loki"
41+
_ "github.com/mimuret/dtap/v2/pkg/plugin/output/multi"
4142
_ "github.com/mimuret/dtap/v2/pkg/plugin/output/nats"
4243
_ "github.com/mimuret/dtap/v2/pkg/plugin/output/nop"
4344
_ "github.com/mimuret/dtap/v2/pkg/plugin/output/otel-log"
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"$id": "/pkg/plugin/output/multi/config-schema.json",
3+
"title": "multi output plugin",
4+
"type": "object",
5+
"required": ["Name", "ID", "Concurency", "Plugin"],
6+
"properties": {
7+
"Name": {
8+
"type": "string",
9+
"const": "multi",
10+
"description": "The name of the plugin. Must be 'multi'."
11+
},
12+
"ID": {
13+
"type": "string",
14+
"description": "The unique identifier for this MultiRunner plugin instance."
15+
},
16+
"Concurency": {
17+
"type": "integer",
18+
"minimum": 1,
19+
"description": "The number of concurrent instances of the child plugin. Must be greater than or equal to 1."
20+
},
21+
"Plugin": {
22+
"type": "object",
23+
"description": "The configuration for the child plugin.",
24+
"properties": {
25+
"Name": {
26+
"type": "string",
27+
"description": "The name of the child plugin."
28+
}
29+
},
30+
"required": ["Name"],
31+
"additionalProperties": true
32+
}
33+
},
34+
"additionalProperties": false
35+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2022 Manabu Sonoda
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package multi
17+
18+
import (
19+
"testing"
20+
21+
. "github.com/onsi/ginkgo/v2"
22+
. "github.com/onsi/gomega"
23+
)
24+
25+
func TestGinkgo(t *testing.T) {
26+
RegisterFailHandler(Fail)
27+
RunSpecs(t, "multi package test suite")
28+
}

pkg/plugin/output/multi/multi.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package multi
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/goccy/go-json"
8+
9+
"github.com/mimuret/dtap/v2/pkg/plugin"
10+
"github.com/mimuret/dtap/v2/pkg/plugin/registry"
11+
"github.com/mimuret/dtap/v2/pkg/types"
12+
"github.com/pkg/errors"
13+
)
14+
15+
func init() {
16+
_ = registry.RegisterOutputPlugin("multi", Setup)
17+
}
18+
19+
func Setup(bs json.RawMessage) (types.OutputPlugin, error) {
20+
s := &MultiRunner{
21+
Concurency: 1,
22+
}
23+
if err := json.Unmarshal(bs, s); err != nil {
24+
return nil, errors.Wrap(err, "failed to decode config")
25+
}
26+
if s.Concurency == 0 {
27+
return nil, errors.New("Concurency must be greater than 0")
28+
}
29+
var pluginName string
30+
if err := json.Unmarshal(s.Plugin["Name"], &pluginName); err != nil {
31+
return nil, errors.Wrap(err, "failed to unmarshal Plugin.Name")
32+
}
33+
if pluginName == "" {
34+
return nil, errors.New("`Plugin.Name` must not be empty")
35+
}
36+
for i := 0; i < int(s.Concurency); i++ {
37+
var err error
38+
s.Plugin["ID"], err = json.Marshal(fmt.Sprintf(`%s-%d`, s.GetID(), i))
39+
if err != nil {
40+
return nil, errors.Wrapf(err, "failed to marshal Plugin.ID for concurrency %d", i)
41+
}
42+
cfg, err := json.Marshal(s.Plugin)
43+
if err != nil {
44+
return nil, errors.Wrapf(err, "failed to marshal plugin config: %s", pluginName)
45+
}
46+
p, err := registry.CreateOutputPlugin(pluginName, cfg)
47+
if err != nil {
48+
return nil, errors.Wrapf(err, "failed to create output plugin %s", s.PluginCommon.Name)
49+
}
50+
s.plugns = append(s.plugns, p)
51+
}
52+
return s, nil
53+
}
54+
55+
// MultiRunner is a plugin that creates multiple instances of an Output Plugin
56+
// defined in the `Plugin` field. It is useful for load balancing or distributing
57+
// output to multiple destinations.
58+
//
59+
// The number of instances is controlled by the `Concurency` field. Each instance
60+
// of the plugin will have a unique ID, which is derived from the MultiRunner's ID
61+
// followed by a hyphen and a sequential number starting from 0.
62+
//
63+
// For example, the following configurations are equivalent:
64+
//
65+
// Configuration 1:
66+
// - Name: multi
67+
// ID: test-multi
68+
// Concurency: 2
69+
// Plugin:
70+
// Name: dummy
71+
//
72+
// Configuration 2:
73+
// - Name: dummy
74+
// ID: test-multi-0
75+
// - Name: dummy
76+
// ID: test-multi-1
77+
//
78+
// Fields:
79+
// - `Concurency`: Specifies the number of concurrent plugin instances to create.
80+
// - `Plugin`: Defines the configuration for the child plugin to be instantiated.
81+
//
82+
// The `Start` method initializes all created plugins, and errors are returned if
83+
// any of the plugins fail to start.
84+
85+
type MultiRunner struct {
86+
plugin.PluginCommon
87+
88+
Concurency uint
89+
Plugin map[string]json.RawMessage
90+
91+
plugns []types.OutputPlugin
92+
}
93+
94+
var _ types.OutputPlugin = &MultiRunner{}
95+
96+
func (m *MultiRunner) GetName() string {
97+
return m.PluginCommon.GetName()
98+
}
99+
func (m *MultiRunner) GetID() string {
100+
return m.PluginCommon.GetID()
101+
}
102+
103+
func (m *MultiRunner) Start(ctx context.Context, oc *types.OutputContext) error {
104+
for _, p := range m.plugns {
105+
if err := p.Start(ctx, oc); err != nil {
106+
return errors.Wrapf(err, "failed to start output plugin %s", p.GetName())
107+
}
108+
}
109+
return nil
110+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package multi
2+
3+
import "github.com/mimuret/dtap/v2/pkg/types"
4+
5+
func (m *MultiRunner) Plugins() []types.OutputPlugin {
6+
return m.plugns
7+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package multi_test
2+
3+
import (
4+
"context"
5+
6+
"github.com/goccy/go-json"
7+
"github.com/mimuret/dtap/v2/pkg/plugin"
8+
"github.com/mimuret/dtap/v2/pkg/plugin/output/multi"
9+
"github.com/mimuret/dtap/v2/pkg/plugin/registry"
10+
"github.com/mimuret/dtap/v2/pkg/types"
11+
. "github.com/onsi/ginkgo/v2"
12+
. "github.com/onsi/gomega"
13+
)
14+
15+
// モックプラグインを登録
16+
type MockOutputPlugin struct {
17+
plugin.PluginCommon
18+
startCalled bool
19+
}
20+
21+
func (m *MockOutputPlugin) Start(ctx context.Context, oc *types.OutputContext) error {
22+
m.startCalled = true
23+
return nil
24+
}
25+
26+
var _ = BeforeSuite(func() {
27+
_ = registry.RegisterOutputPlugin("mock", func(config json.RawMessage) (types.OutputPlugin, error) {
28+
o := &MockOutputPlugin{}
29+
if err := json.Unmarshal(config, o); err != nil {
30+
return nil, err
31+
}
32+
return o, nil
33+
})
34+
})
35+
36+
var _ = Describe("MultiRunner", func() {
37+
Context("Setup", func() {
38+
It("should succeed with valid configuration", func() {
39+
config := `{
40+
"Name": "multi",
41+
"ID": "test-multi",
42+
"Concurency": 2,
43+
"Plugin": {
44+
"Name": "mock"
45+
}
46+
}`
47+
48+
plugin, err := multi.Setup([]byte(config))
49+
Expect(err).To(Succeed())
50+
multiRunner, ok := plugin.(*multi.MultiRunner)
51+
Expect(ok).To(BeTrue())
52+
Expect(multiRunner.Concurency).To(Equal(uint(2)))
53+
Expect(multiRunner.Plugins()).To(HaveLen(2))
54+
Expect(multiRunner.Plugins()[0].GetName()).To(Equal("mock"))
55+
Expect(multiRunner.Plugins()[0].GetID()).To(Equal("test-multi-0"))
56+
Expect(multiRunner.Plugins()[1].GetName()).To(Equal("mock"))
57+
Expect(multiRunner.Plugins()[1].GetID()).To(Equal("test-multi-1"))
58+
})
59+
60+
It("should fail when Concurency is 0", func() {
61+
config := `{
62+
"Name": "multi",
63+
"ID": "test-multi",
64+
"Concurency": 0,
65+
"Plugin": {
66+
"Name": "mock",
67+
"ID": "mock-plugin"
68+
}
69+
}`
70+
71+
_, err := multi.Setup([]byte(config))
72+
Expect(err).To(HaveOccurred())
73+
Expect(err.Error()).To(ContainSubstring("Concurency must be greater than 0"))
74+
})
75+
76+
It("should fail when Plugin is invalid", func() {
77+
config := `{
78+
"Name": "multi",
79+
"ID": "test-multi",
80+
"Concurency": 1,
81+
"Plugin": {
82+
"Name": "invalid-plugin",
83+
"ID": "mock-plugin"
84+
}
85+
}`
86+
87+
_, err := multi.Setup([]byte(config))
88+
Expect(err).To(HaveOccurred())
89+
Expect(err.Error()).To(ContainSubstring("failed to create output plugin"))
90+
})
91+
})
92+
93+
Context("Start", func() {
94+
It("should call Start on all plugins", func() {
95+
config := `{
96+
"Name": "multi",
97+
"ID": "test-multi",
98+
"Concurency": 2,
99+
"Plugin": {
100+
"Name": "mock",
101+
"ID": "mock-plugin"
102+
}
103+
}`
104+
105+
plugin, err := multi.Setup([]byte(config))
106+
Expect(err).To(Succeed())
107+
multiRunner, ok := plugin.(*multi.MultiRunner)
108+
Expect(ok).To(BeTrue())
109+
110+
ctx := context.Background()
111+
oc := &types.OutputContext{}
112+
113+
err = multiRunner.Start(ctx, oc)
114+
Expect(err).To(Succeed())
115+
116+
for _, p := range multiRunner.Plugins() {
117+
mockPlugin, ok := p.(*MockOutputPlugin)
118+
Expect(ok).To(BeTrue())
119+
Expect(mockPlugin.startCalled).To(BeTrue())
120+
}
121+
})
122+
})
123+
})

pkg/plugin/output/nats/nats.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package nats
1818

1919
import (
2020
"sync"
21+
"time"
2122

2223
json "github.com/goccy/go-json"
2324
"github.com/mimuret/dtap/v2/pkg/promauto"
@@ -101,6 +102,12 @@ func Setup(bs json.RawMessage) (types.OutputPlugin, error) {
101102
Name: "write_errors_total",
102103
ConstLabels: prometheus.Labels{"ID": s.GetID()},
103104
})
105+
s.publishDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{
106+
Namespace: "dtap",
107+
Subsystem: "output_nats",
108+
Name: "publish_duration_seconds",
109+
ConstLabels: prometheus.Labels{"ID": s.GetID()},
110+
})
104111
return s, nil
105112
}
106113

@@ -148,6 +155,7 @@ type Nats struct {
148155

149156
publishCounter prometheus.Counter
150157
publishErrCounter prometheus.Counter
158+
publishDurationSeconds prometheus.Histogram
151159
writeMessageCounter prometheus.Counter
152160
writeMessageErrCounter prometheus.Counter
153161
}
@@ -185,7 +193,9 @@ func (f *Nats) Write(dm *types.DnstapMessage) error {
185193
}
186194

187195
func (f *Nats) Publish(data []byte) error {
196+
start := time.Now().Unix()
188197
err := f.conn.Publish(f.Subject, data)
198+
f.publishDurationSeconds.Observe(float64(time.Now().Unix() - start))
189199
f.publishCounter.Inc()
190200
if err != nil {
191201
f.publishErrCounter.Inc()

schemas/output_plugins.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"title":"Plugins","$id":"/schemas/output_plugins.json","oneOf":[{"$ref":"/pkg/plugin/output/dns/config-schema.json"},{"$ref":"/pkg/plugin/output/file/config-schema.json"},{"$ref":"/pkg/plugin/output/fluent/config-schema.json"},{"$ref":"/pkg/plugin/output/kafka/config-schema.json"},{"$ref":"/pkg/plugin/output/loki/config-schema.json"},{"$ref":"/pkg/plugin/output/nats/config-schema.json"},{"$ref":"/pkg/plugin/output/nop/config-schema.json"},{"$ref":"/pkg/plugin/output/stdout/config-schema.json"},{"$ref":"/pkg/plugin/output/tcp/config-schema.json"},{"$ref":"/pkg/plugin/output/unix/config-schema.json"}]}
1+
{"title":"Plugins","$id":"/schemas/output_plugins.json","oneOf":[{"$ref":"/pkg/plugin/output/dns/config-schema.json"},{"$ref":"/pkg/plugin/output/file/config-schema.json"},{"$ref":"/pkg/plugin/output/fluent/config-schema.json"},{"$ref":"/pkg/plugin/output/kafka/config-schema.json"},{"$ref":"/pkg/plugin/output/loki/config-schema.json"},{"$ref":"/pkg/plugin/output/multi/config-schema.json"},{"$ref":"/pkg/plugin/output/nats/config-schema.json"},{"$ref":"/pkg/plugin/output/nop/config-schema.json"},{"$ref":"/pkg/plugin/output/stdout/config-schema.json"},{"$ref":"/pkg/plugin/output/tcp/config-schema.json"},{"$ref":"/pkg/plugin/output/unix/config-schema.json"}]}

0 commit comments

Comments
 (0)