Skip to content

Commit 2f47dfa

Browse files
everesioMichal Budzyn
authored andcommitted
Simple LDAP plugin
1 parent 7763933 commit 2f47dfa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+5914
-39
lines changed

Gopkg.lock

Lines changed: 25 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,8 @@ protoc.auth:
7474
plugin.auth-user:
7575
CGO_ENABLED=0 go build -o build/auth-user $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-auth-user/main.go
7676

77+
plugin.auth-ldap:
78+
CGO_ENABLED=0 go build -o build/auth-ldap $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-auth-ldap/main.go
79+
7780
clean:
7881
@rm -rf build

README.md

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ responses received from the brokers are replaced by local counterparts.
1414
For discovered brokers (not configured as the boostrap servers), local listeners are started on random ports.
1515
The dynamic local listeners feature can be disabled and an additional list of external server mappings can be provided.
1616

17-
The Proxy can terminate TLS traffic and authenticate users locally using SASL/PLAIN. The credentials verification method
17+
The Proxy can terminate TLS traffic and authenticate users using SASL/PLAIN. The credentials verification method
1818
is configurable and uses golang plugin system over RPC.
1919

2020
Kafka API calls can be restricted to prevent some operations e.g. topic deletion.
@@ -34,20 +34,19 @@ See:
3434

3535
build/kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400" \
3636
--bootstrap-server-mapping "192.168.99.100:32401,127.0.0.1:32401" \
37-
--bootstrap-server-mapping "192.168.99.100:32402,127.0.0.1:32402"
37+
--bootstrap-server-mapping "192.168.99.100:32402,127.0.0.1:32402" \
38+
--dynamic-listeners-disable
39+
40+
build/kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400" \
41+
--external-server-mapping "192.168.99.100:32401,127.0.0.1:32402" \
42+
--external-server-mapping "192.168.99.100:32402,127.0.0.1:32403" \
43+
--forbidden-api-keys 20
3844

3945
build/kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
4046
--tls-enable --tls-insecure-skip-verify \
4147
--sasl-enable -sasl-username myuser --sasl-password mysecret
4248

43-
make clean build plugin.auth-user && build/kafka-proxy server \
44-
--proxy-listener-auth-enable \
45-
--proxy-listener-auth-command build/auth-user \
46-
--proxy-listener-auth-param "--username=my-test-user" \
47-
--proxy-listener-auth-param "--password=my-test-password" \
48-
--dynamic-listeners-disable \
49-
--forbidden-api-keys 20 \
50-
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32401"
49+
### Proxy authentication example
5150

5251
make clean build plugin.auth-user && build/kafka-proxy server --proxy-listener-key-file "server-key.pem" \
5352
--proxy-listener-cert-file "server-cert.pem" \
@@ -56,12 +55,16 @@ See:
5655
--proxy-listener-auth-enable \
5756
--proxy-listener-auth-command build/auth-user \
5857
--proxy-listener-auth-param "--username=my-test-user" \
59-
--proxy-listener-auth-param "--password=my-test-password" \
60-
--dynamic-listeners-disable \
61-
--forbidden-api-keys 20 \
62-
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32401" \
63-
--external-server-mapping "192.168.99.100:32401,127.0.0.1:32402" \
64-
--external-server-mapping "192.168.99.100:32402,127.0.0.1:32403"
58+
--proxy-listener-auth-param "--password=my-test-password"
59+
60+
make clean build plugin.auth-ldap && build/kafka-proxy server \
61+
--proxy-listener-auth-enable \
62+
--proxy-listener-auth-command build/auth-ldap \
63+
--proxy-listener-auth-param "--url=ldaps://ldap.example.com:636" \
64+
--proxy-listener-auth-param "--user-dn=cn=users,dc=exemple,dc=com" \
65+
--proxy-listener-auth-param "--user-attr=uid" \
66+
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"
67+
6568

6669
### What should be done
6770

@@ -77,7 +80,7 @@ See:
7780
3. counter: proxy_connections_total {broker}
7881
4. counter: proxy_requests_bytes {broker}
7982
5. counter: proxy_responses_bytes {broker}
80-
* [X] Pluggable local authentication
83+
* [X] Pluggable proxy authentication
8184
* [ ] Deploying Kafka Proxy as a sidecar container
8285
* [ ] Performance tests and tuning
8386
* [ ] Socket buffer sizing e.g. SO_RCVBUF = 32768, SO_SNDBUF = 131072

cmd/kafka-proxy/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func init() {
7878
// authentication plugin
7979
Server.Flags().BoolVar(&c.Proxy.Auth.Enable, "proxy-listener-auth-enable", false, "Enable SASL/PLAIN listener authentication")
8080
Server.Flags().StringVar(&c.Proxy.Auth.Command, "proxy-listener-auth-command", "", "Path to authentication plugin binary")
81-
Server.Flags().StringSliceVar(&c.Proxy.Auth.Parameters, "proxy-listener-auth-param", []string{}, "Authentication plugin parameter")
81+
Server.Flags().StringArrayVar(&c.Proxy.Auth.Parameters, "proxy-listener-auth-param", []string{}, "Authentication plugin parameter")
8282
Server.Flags().StringVar(&c.Proxy.Auth.LogLevel, "proxy-listener-auth-log-level", "trace", "Log level of the auth plugin")
8383

8484
// kafka

cmd/plugin-auth-ldap/main.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package main
2+
3+
import (
4+
"crypto/tls"
5+
"flag"
6+
"fmt"
7+
"github.com/grepplabs/kafka-proxy/plugin/auth/shared"
8+
"github.com/hashicorp/go-multierror"
9+
"github.com/hashicorp/go-plugin"
10+
"github.com/sirupsen/logrus"
11+
"gopkg.in/ldap.v2"
12+
"net"
13+
"net/url"
14+
"os"
15+
"strings"
16+
)
17+
18+
//TODO: connection pooling, credential caching (TTL, max number of entries), negative caching
19+
type LdapAuthenticator struct {
20+
Urls []string
21+
StartTLS bool
22+
UPNDomain string
23+
UserDN string
24+
UserAttr string
25+
}
26+
27+
func (pa LdapAuthenticator) Authenticate(username, password string) (bool, int32, error) {
28+
// logrus.Printf("Authenticate request for %s:%s,expected %s:%s ", username, password, pa.username, pa.password)
29+
l, err := pa.DialLDAP()
30+
if err != nil {
31+
logrus.Errorf("user %s ldap dial error %v", username, err)
32+
return false, 1, nil
33+
}
34+
defer l.Close()
35+
36+
err = l.Bind(pa.getUserBindDN(username), password)
37+
if err != nil {
38+
if ldapErr, ok := err.(*ldap.Error); ok && ldapErr.ResultCode == ldap.LDAPResultInvalidCredentials {
39+
logrus.Errorf("user %s credentials are invalid", username)
40+
return false, 0, nil
41+
}
42+
logrus.Errorf("user %s ldap bind error %v", username, err)
43+
return false, 2, nil
44+
}
45+
return true, 0, nil
46+
}
47+
48+
func (pa LdapAuthenticator) getUserBindDN(username string) string {
49+
50+
if pa.UPNDomain != "" {
51+
return fmt.Sprintf("%s@%s", escapeLDAPValue(username), pa.UPNDomain)
52+
}
53+
return fmt.Sprintf("%s=%s,%s", pa.UserAttr, escapeLDAPValue(username), pa.UserDN)
54+
}
55+
56+
func escapeLDAPValue(input string) string {
57+
// RFC4514 forbids un-escaped:
58+
// - leading space or hash
59+
// - trailing space
60+
// - special characters '"', '+', ',', ';', '<', '>', '\\'
61+
// - null
62+
for i := 0; i < len(input); i++ {
63+
escaped := false
64+
if input[i] == '\\' {
65+
i++
66+
escaped = true
67+
}
68+
switch input[i] {
69+
case '"', '+', ',', ';', '<', '>', '\\':
70+
if !escaped {
71+
input = input[0:i] + "\\" + input[i:]
72+
i++
73+
}
74+
continue
75+
}
76+
if escaped {
77+
input = input[0:i] + "\\" + input[i:]
78+
i++
79+
}
80+
}
81+
if input[0] == ' ' || input[0] == '#' {
82+
input = "\\" + input
83+
}
84+
if input[len(input)-1] == ' ' {
85+
input = input[0:len(input)-1] + "\\ "
86+
}
87+
return input
88+
}
89+
func (pa LdapAuthenticator) DialLDAP() (*ldap.Conn, error) {
90+
var retErr *multierror.Error
91+
var conn *ldap.Conn
92+
for _, uut := range pa.Urls {
93+
u, err := url.Parse(uut)
94+
if err != nil {
95+
retErr = multierror.Append(retErr, fmt.Errorf("error parsing url %q: %s", uut, err.Error()))
96+
continue
97+
}
98+
host, port, err := net.SplitHostPort(u.Host)
99+
if err != nil {
100+
host = u.Host
101+
}
102+
switch u.Scheme {
103+
case "ldap":
104+
if port == "" {
105+
port = "389"
106+
}
107+
conn, err = ldap.Dial("tcp", net.JoinHostPort(host, port))
108+
if err != nil {
109+
break
110+
}
111+
if conn == nil {
112+
err = fmt.Errorf("empty connection after dialing")
113+
break
114+
}
115+
if pa.StartTLS {
116+
err = conn.StartTLS(&tls.Config{InsecureSkipVerify: true})
117+
}
118+
case "ldaps":
119+
if port == "" {
120+
port = "636"
121+
}
122+
conn, err = ldap.DialTLS("tcp", net.JoinHostPort(host, port), &tls.Config{InsecureSkipVerify: true})
123+
default:
124+
retErr = multierror.Append(retErr, fmt.Errorf("invalid LDAP scheme in url %q", net.JoinHostPort(host, port)))
125+
continue
126+
}
127+
if err == nil {
128+
retErr = nil
129+
break
130+
}
131+
retErr = multierror.Append(retErr, fmt.Errorf("error connecting to host %q: %s", uut, err.Error()))
132+
}
133+
return conn, retErr.ErrorOrNil()
134+
}
135+
136+
type pluginMeta struct {
137+
url string
138+
startTLS bool
139+
upnDomain string
140+
userDN string
141+
userAttr string
142+
}
143+
144+
func (f *pluginMeta) flagSet() *flag.FlagSet {
145+
fs := flag.NewFlagSet("auth plugin settings", flag.ContinueOnError)
146+
147+
fs.StringVar(&f.url, "url", "", "LDAP URL to connect to (eg: ldaps://127.0.0.1:636). Multiple URLs can be specified by concatenating them with commas.")
148+
fs.BoolVar(&f.startTLS, "start-tls", true, "Issue a StartTLS command after establishing unencrypted connection (optional)")
149+
fs.StringVar(&f.upnDomain, "upn-domain", "", "Enables userPrincipalDomain login with [username]@UPNDomain (optional)")
150+
fs.StringVar(&f.userDN, "user-dn", "", "LDAP domain to use for users (eg: cn=users,dc=example,dc=org)")
151+
fs.StringVar(&f.userAttr, "user-attr", "uid", " Attribute used for users")
152+
return fs
153+
}
154+
155+
func (f *pluginMeta) getUrls() ([]string, error) {
156+
result := make([]string, 0)
157+
urls := strings.Split(f.url, ",")
158+
for _, uut := range urls {
159+
u, err := url.Parse(uut)
160+
if err != nil {
161+
return nil, err
162+
}
163+
host, port, err := net.SplitHostPort(u.Host)
164+
if err != nil {
165+
host = u.Host
166+
}
167+
switch u.Scheme {
168+
case "ldap", "ldaps":
169+
result = append(result, uut)
170+
default:
171+
return nil, fmt.Errorf("invalid LDAP scheme in url %q", net.JoinHostPort(host, port))
172+
}
173+
}
174+
if len(result) == 0 {
175+
return nil, fmt.Errorf("empty LDAP url list")
176+
}
177+
return result, nil
178+
}
179+
180+
func main() {
181+
182+
pluginMeta := &pluginMeta{}
183+
flags := pluginMeta.flagSet()
184+
flags.Parse(os.Args[1:])
185+
186+
urls, err := pluginMeta.getUrls()
187+
if err != nil {
188+
logrus.Error(err)
189+
os.Exit(1)
190+
}
191+
if pluginMeta.upnDomain == "" && (pluginMeta.userDN == "" || pluginMeta.userAttr == "") {
192+
logrus.Errorf("parameters user-dn and user-attr are required")
193+
os.Exit(1)
194+
}
195+
196+
plugin.Serve(&plugin.ServeConfig{
197+
HandshakeConfig: shared.Handshake,
198+
Plugins: map[string]plugin.Plugin{
199+
"passwordAuthenticator": &shared.PasswordAuthenticatorPlugin{Impl: &LdapAuthenticator{
200+
Urls: urls,
201+
StartTLS: pluginMeta.startTLS,
202+
UPNDomain: pluginMeta.upnDomain,
203+
UserDN: pluginMeta.userDN,
204+
UserAttr: pluginMeta.userAttr,
205+
}},
206+
},
207+
// A non-nil value here enables gRPC serving for this plugin...
208+
GRPCServer: plugin.DefaultGRPCServer,
209+
})
210+
}

0 commit comments

Comments
 (0)