forked from bshuster-repo/logrus-logstash-hook
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhook.go
More file actions
145 lines (125 loc) · 3.43 KB
/
Copy pathhook.go
File metadata and controls
145 lines (125 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package logrustash
import (
"io"
"net"
"sync"
"time"
"github.com/sirupsen/logrus"
)
const defaultBufSize uint = 8192
// Hook represents a logrus hook for Logstash.
// To initialize it use the `New` function.
type Hook struct {
writer io.Writer
formatter logrus.Formatter
levels []logrus.Level
timeout time.Duration
async bool
buf chan *logrus.Entry
wg sync.WaitGroup
mu sync.RWMutex
}
// New returns a new logrus.Hook for Logstash.
//
// To create a new hook that sends logs to `tcp://logstash.corp.io:9999`:
//
// conn, _ := net.Dial("tcp", "logstash.corp.io:9999")
// hook := logrustash.New(conn, logrustash.DefaultFormatter())
func New(w io.Writer, f logrus.Formatter) *Hook {
return &Hook{
writer: w,
formatter: f,
levels: logrus.AllLevels,
async: false,
}
}
// Fire takes, formats and sends the entry to Logstash.
// Hook's formatter is used to format the entry into Logstash format
// and Hook's writer is used to write the formatted entry to the Logstash instance.
func (h *Hook) Fire(entry *logrus.Entry) error {
h.mu.RLock() // Claim the mutex as a RLock - allowing multiple go routines to log simultaneously
defer h.mu.RUnlock()
if !h.async {
return h.fire(entry)
}
// send log asynchroniously and return no error.
// if a buffering is enabled push the entry to the buffer
// and process using a background process
if h.buf != nil {
h.wg.Add(1)
h.buf <- entry
} else {
// otherwise no buffer so just process the request in a background process
go h.fire(entry)
}
return nil
}
// Levels returns all logrus levels.
func (h *Hook) Levels() []logrus.Level {
return h.levels
}
// SetLevels sets logging level to fire this hook.
func (h *Hook) SetLevels(levels []logrus.Level) {
h.levels = levels
}
// SetTimeout sets the duration of time before writing a message timesout.
func (h *Hook) SetTimeout(d time.Duration) {
h.timeout = d
}
// UsePool creates a connection pool for logstash to enable support for handling
// connection failures, use of multiple logstash instances within a cluster.
func (h *Hook) UsePool(hosts []string, initialCap, maxCap int) error {
p, err := newPool(hosts, initialCap, maxCap)
if err != nil {
return err
}
h.writer = p
return nil
}
// Async sets async flag and send log asynchroniously.
// If use this option, Fire() does not return error.
func (h *Hook) Async() {
h.async = true
}
// AsyncBuffer creates a buffer for log entries and starts a
// background process to handle processing the buffer entries.
func (h *Hook) AsyncBuffer(bufsize uint) {
bsize := bufsize
if bsize <= 0 {
bsize = defaultBufSize
}
h.Async()
h.buf = make(chan *logrus.Entry, bsize)
go h.processBuffer() // Log in background
}
// Flush waits for the log queue to be empty.
func (h *Hook) Flush() {
if !h.async || h.buf == nil {
return
}
h.mu.Lock() // claim the mutex as a Lock - we want exclusive access to it
defer h.mu.Unlock()
h.wg.Wait()
}
func (h *Hook) processBuffer() {
for {
entry := <-h.buf // receive new entry on channel
if err := h.fire(entry); err != nil {
logrus.Warnf("Error during sending message to logstash: %v\n", err)
}
h.wg.Done()
}
}
func (h *Hook) fire(entry *logrus.Entry) error {
dataBytes, err := h.formatter.Format(entry)
if err != nil {
return err
}
if h.timeout > 0 {
if conn, ok := h.writer.(net.Conn); ok {
_ = conn.SetWriteDeadline(time.Now().Add(h.timeout))
}
}
_, err = h.writer.Write(dataBytes)
return err
}