Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed compilation issue #2

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
8 changes: 6 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -16,9 +16,13 @@ RUN apt-get -q update \
libpcre3-dev \
valgrind

# Zyre 2.0.1 is not yet available.
# But consider at least 2.0.0, containing hash # ee1452a.
# And then ... How to setup this in the Dockerfile ???

ENV LIBSODIUM_VERSION=1.0.12 \
LIBZMQ_VERSION=v4.2.2 \
CZMQ_VERSION=v4.0.2 \
LIBZMQ_VERSION=v4.2.5 \
CZMQ_VERSION=v4.1.1 \
ZYRE_VERSION=v2.0.0

RUN declare -A _deps=( \
10 changes: 9 additions & 1 deletion event.go
Original file line number Diff line number Diff line change
@@ -97,7 +97,15 @@ func (ev *Event) Address() string { return C.GoString(C.zyre_event_peer_addr(ev.
// XXX: Do we need to lock the headers? The underlying implementation uses a
// cursor to iterate through the items and may introduce race conditions.
func (ev *Event) Headers() map[string]string {
return zHashToMap(C.zyre_event_headers(ev.czyreEvent), false)
czev := ev.czyreEvent
if (czev == nil) {
return nil
}
czhdr := C.zyre_event_headers(czev)
if (czhdr == nil) {
return nil
}
return zHashToMap(czhdr, false)
}

// Header returns the header value from the header name
9 changes: 9 additions & 0 deletions gozyre.go
Original file line number Diff line number Diff line change
@@ -56,4 +56,13 @@ var (

// ErrAddingFrame is returned when constructing message frames for sending
ErrAddingFrame = errors.New("error adding message frame")

// ErrPollTerminated is returned when POll received SIGINT
ErrPollInterrupted = errors.New("polling interrupted")

// ErrPollExpired is returned when POll is timed-out
ErrPollExpired = errors.New("polling expired")

// ErrPollUnknown is returned when unkwown error occured
ErrPollUnknown = errors.New("polling unknown error")
)
33 changes: 29 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
@@ -2,21 +2,22 @@ package gozyre

/*
#include "zyre.h"
#include "czmq.h"

int wrap_set_endpoint(zyre_t *self, const char *ep) {
return zyre_set_endpoint(self, ep);
return zyre_set_endpoint(self, "%s", ep);
}

void wrap_gossip_bind(zyre_t *self, const char *bind) {
zyre_gossip_bind(self, bind);
zyre_gossip_bind(self, "%s", bind);
}

void wrap_gossip_connect(zyre_t *self, const char *conn) {
zyre_gossip_connect(self, conn);
zyre_gossip_connect(self, "%s", conn);
}

void wrap_set_header(zyre_t *self, const char *name, const char *val) {
zyre_set_header(self, name, val);
zyre_set_header(self, name, "%s", val);
}
*/
import "C"
@@ -31,6 +32,16 @@ type Zyre struct {
czyre *C.struct__zyre_t
}

// Force ZMQ init
// Ensure ZMQ won't interact with GO signal handling mechanism.
func init() {
C.zsys_init()
C.zsys_handler_set(nil)
}

// Instruct ZMQ to exit and free all its resources.
func Exit() { C.zsys_shutdown() }

// New constructs a new node for peer-to-peer discovery
// Constructor, creates a new Zyre node. Note that until you start the
// node, it is silent and invisible to other nodes on the network.
@@ -91,6 +102,16 @@ func (n *Zyre) Name() string { return C.GoString(C.zyre_name(n.czyre)) }
// UUID returns our node UUID string, after successful initialization
func (n *Zyre) UUID() string { return C.GoString(C.zyre_uuid(n.czyre)) }

// Set beacon TCP ephemeral port to a well known value.
func (n *Zyre) SetBeaconPeerPort(port uint16) {
C.zyre_set_beacon_peer_port(n.czyre, C.int(port))
}

// Old name of the above, deprecated.
func (n *Zyre) SetEphemeralPort(port uint16) {
C.zyre_set_beacon_peer_port(n.czyre, C.int(port))
}

// SetEvasive sets the node evasiveness timeout. Default is 5 * time.Millisecond.
func (n *Zyre) SetEvasive(timeout time.Duration) {
ctimeout := C.int(float64(timeout.Seconds()) * 1000)
@@ -211,3 +232,7 @@ func (n *Zyre) Gossip(endpoint, hub string) error {

// Stop signals to other nodes that this node will go away
func (n *Zyre) Stop() { C.zyre_stop(n.czyre) }

// Socket returns the socket, used by ZYRE.
func (n *Zyre) Socket() *C.struct__zsock_t { return C.zyre_socket(n.czyre) }

88 changes: 88 additions & 0 deletions poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Package gozyre provides ...
package gozyre

/*
#include "czmq.h"
#include "czmq_library.h"

zpoller_t *wrap_zpoller_new() {
return zpoller_new(NULL);
}
*/
import "C"

import (
"unsafe"
)

type ZyrePoller struct {
czpoller *C.struct__zpoller_t
czsocket *C.struct__zsock_t
}

// New Zyre-socket POLLER.
func NewPoller(node *Zyre) *ZyrePoller {
sk := node.Socket()
if (sk == nil) {
return nil
}

zp := C.wrap_zpoller_new()
if (zp == nil) {
return nil
}
if (C.zpoller_add(zp, unsafe.Pointer(sk)) < 0) {
C.zpoller_destroy(&zp)
return nil
}

pol := &ZyrePoller{}
pol.czpoller = zp
pol.czsocket = sk

return pol
}

// Zyre-Poller destruction
func (poller *ZyrePoller) Destroy() {
if (poller == nil) {
return
}
if (poller.czpoller == nil) {
return
}
if (poller.czsocket != nil) {
C.zpoller_remove(poller.czpoller, unsafe.Pointer(poller.czsocket))
poller.czsocket = nil
}
C.zpoller_destroy(&poller.czpoller)
}

// Poll ZYRE socket, until timeout or message is received.
// <timeout> is given in MILISECOND.
// - 0 : no wait.
// - -1 : wait indefinitely.
//
// Note:
// Polling is not stopped by any SIGNAL (thx to GO signal handler).
// Hence, timeout has to be rather small if response time must be short.
// Good compromise should be around 200 (ms).
func (poller *ZyrePoller) Poll(timeout int) (bool, error) {
socket := C.zpoller_wait(poller.czpoller, C.int(timeout))
if (socket == nil) {
if C.zpoller_terminated(poller.czpoller) {
// We've been interrupted by SIGINT.
return false, ErrPollInterrupted
}
if C.zpoller_expired(poller.czpoller) {
return false, ErrPollExpired
}

// Wall... not EXPIRED & not INTERRUPTED...
// What is this strange state ?
return false, ErrPollUnknown
}

return true, nil
}

10 changes: 7 additions & 3 deletions util.go
Original file line number Diff line number Diff line change
@@ -29,8 +29,12 @@ func zHashToMap(zhash *C.struct__zhash_t, free bool) map[string]string {
keyVal := make(map[string]string)
for item := C.zhash_first(zhash); item != nil; item = C.zhash_next(zhash) {
ckey := C.zhash_cursor(zhash)
cval := C.GoString((*C.char)(C.zhash_lookup(zhash, ckey)))
keyVal[C.GoString(ckey)] = cval
key := C.GoString((*C.char)(ckey))

cval := C.zhash_lookup(zhash, ckey)
val := C.GoString((*C.char)(cval))

keyVal[key] = val
}
return keyVal
}
@@ -58,7 +62,7 @@ func bytesToZmsg(ch <-chan []byte) (*C.struct__zmsg_t, error) {
// gc collects it too early
ret := C.zmsg_addmem(zmsg, unsafe.Pointer(&frame[0]), C.size_t(len(frame)))
if ret != 0 {
C.zmsg_destroy(unsafe.Pointer(zmsg))
C.zmsg_destroy(&zmsg)
err = ErrAddingFrame
break
}