Skip to content

Commit 542a4af

Browse files
author
Steffen Siering
authored
Move http_endpoint input to v2 input API (elastic#19815)
1 parent 72cc711 commit 542a4af

8 files changed

Lines changed: 241 additions & 306 deletions

File tree

x-pack/filebeat/include/list.go

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/filebeat/input/default-inputs/inputs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/elastic/beats/v7/libbeat/beat"
1212
"github.com/elastic/beats/v7/libbeat/logp"
1313
"github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
14+
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
1415
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
1516
)
1617

@@ -23,7 +24,8 @@ func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin
2324

2425
func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin {
2526
return []v2.Plugin{
26-
o365audit.Plugin(log, store),
2727
cloudfoundry.Plugin(),
28+
http_endpoint.Plugin(),
29+
o365audit.Plugin(log, store),
2830
}
2931
}

x-pack/filebeat/input/http_endpoint/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,11 @@ func (c *config) Validate() error {
4444
return errors.New("response_body must be valid JSON")
4545
}
4646

47+
if c.BasicAuth {
48+
if c.Username == "" || c.Password == "" {
49+
return errors.New("Username and password required when basicauth is enabled")
50+
}
51+
}
52+
4753
return nil
4854
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package http_endpoint
6+
7+
import (
8+
"bytes"
9+
"encoding/json"
10+
"errors"
11+
"fmt"
12+
"io"
13+
"io/ioutil"
14+
"net/http"
15+
"time"
16+
17+
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
18+
"github.com/elastic/beats/v7/libbeat/beat"
19+
"github.com/elastic/beats/v7/libbeat/common"
20+
"github.com/elastic/beats/v7/libbeat/logp"
21+
)
22+
23+
type httpHandler struct {
24+
log *logp.Logger
25+
publisher stateless.Publisher
26+
27+
messageField string
28+
responseCode int
29+
responseBody string
30+
}
31+
32+
var errBodyEmpty = errors.New("Body cannot be empty")
33+
var errUnsupportedType = errors.New("Only JSON objects are accepted")
34+
35+
// Triggers if middleware validation returns successful
36+
func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
37+
obj, status, err := httpReadJsonObject(r.Body)
38+
if err != nil {
39+
w.Header().Add("Content-Type", "application/json")
40+
sendErrorResponse(w, status, err)
41+
return
42+
}
43+
44+
h.publishEvent(obj)
45+
w.Header().Add("Content-Type", "application/json")
46+
h.sendResponse(w, h.responseCode, h.responseBody)
47+
}
48+
49+
func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) {
50+
w.WriteHeader(status)
51+
io.WriteString(w, message)
52+
}
53+
54+
func (h *httpHandler) publishEvent(obj common.MapStr) {
55+
event := beat.Event{
56+
Timestamp: time.Now().UTC(),
57+
Fields: common.MapStr{
58+
h.messageField: obj,
59+
},
60+
}
61+
62+
h.publisher.Publish(event)
63+
}
64+
65+
func withValidator(v validator, handler http.HandlerFunc) http.HandlerFunc {
66+
return func(w http.ResponseWriter, r *http.Request) {
67+
if status, err := v.ValidateHeader(r); status != 0 && err != nil {
68+
sendErrorResponse(w, status, err)
69+
} else {
70+
handler(w, r)
71+
}
72+
}
73+
}
74+
75+
func sendErrorResponse(w http.ResponseWriter, status int, err error) {
76+
w.Header().Add("Content-Type", "application/json")
77+
w.WriteHeader(status)
78+
fmt.Fprintf(w, `{"message": %q}`, err.Error())
79+
}
80+
81+
func httpReadJsonObject(body io.Reader) (obj common.MapStr, status int, err error) {
82+
if body == http.NoBody {
83+
return nil, http.StatusNotAcceptable, errBodyEmpty
84+
}
85+
86+
contents, err := ioutil.ReadAll(body)
87+
if err != nil {
88+
return nil, http.StatusInternalServerError, fmt.Errorf("failed reading body: %w", err)
89+
}
90+
91+
if !isObject(contents) {
92+
return nil, http.StatusBadRequest, errUnsupportedType
93+
}
94+
95+
obj = common.MapStr{}
96+
if err := json.Unmarshal(contents, &obj); err != nil {
97+
return nil, http.StatusBadRequest, fmt.Errorf("Malformed JSON body: %w", err)
98+
}
99+
100+
return obj, 0, nil
101+
}
102+
103+
func isObject(b []byte) bool {
104+
obj := bytes.TrimLeft(b, " \t\r\n")
105+
if len(obj) > 0 && obj[0] == '{' {
106+
return true
107+
}
108+
return false
109+
}

x-pack/filebeat/input/http_endpoint/httpserver.go

Lines changed: 0 additions & 77 deletions
This file was deleted.

0 commit comments

Comments
 (0)