Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: SkynetLabs/pinner
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.5.0
Choose a base ref
...
head repository: SkynetLabs/pinner
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: main
Choose a head ref
Loading
1 change: 0 additions & 1 deletion .github/CODEOWNERS

This file was deleted.

4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.18.3 as builder
FROM golang:1.19.0 as builder
LABEL maintainer="SkynetLabs <devs@skynetlabs.com>"

WORKDIR /root
@@ -9,7 +9,7 @@ COPY . .

RUN go mod download && make release

FROM alpine:3.16.0
FROM alpine:3.16.2
LABEL maintainer="SkynetLabs <devs@skynetlabs.com>"

COPY --from=builder /go/bin/pinner /usr/bin/pinner
21 changes: 9 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -15,10 +15,8 @@ all: release
count = 1
# pkgs changes which packages the makefile calls operate on. run changes which
# tests are run during testing.
pkgs = ./ ./api ./conf ./database ./logger ./skyd ./test ./workers

# integration-pkgs defines the packages which contain integration tests
integration-pkgs = ./test ./test/api ./test/database
pkgs = ./ ./api ./conf ./database ./lib ./logger ./scanner ./skyd ./sweeper \
./test ./test/api ./test/conf ./test/database ./test/sweeper

# run determines which tests run when running any variation of 'make test'.
run = .
@@ -52,7 +50,7 @@ lint-ci:
# revive is skipped on Windows.
ifneq ("$(OS)","Windows_NT")
# Linux
go install github.com/mgechev/revive@latest
go install github.com/mgechev/revive@v1.2.0
revive -set_exit_status $(pkgs)
go mod tidy
endif
@@ -86,7 +84,7 @@ start-mongo:
-e MONGO_INITDB_ROOT_USERNAME=$(MONGO_USER) \
-e MONGO_INITDB_ROOT_PASSWORD=$(MONGO_PASSWORD) \
-v $(shell pwd)/test/fixtures/mongo_keyfile:/data/mgkey \
mongo:4.4.1 mongod --port=$(MONGO_PORT) --replSet=skynet --keyFile=/data/mgkey 1>/dev/null 2>&1
mongo:4.4.2 mongod --port=$(MONGO_PORT) --replSet=skynet --keyFile=/data/mgkey 1>/dev/null 2>&1
# wait for mongo to start before we try to configure it
status=1 ; while [[ $$status -gt 0 ]]; do \
sleep 1 ; \
@@ -130,14 +128,13 @@ bench: fmt
test:
go test -short -tags='debug testing netgo' -timeout=5s $(pkgs) -run=. -count=$(count)

# Tests in this group may rely on external services (such as MongoDB).
test-long: lint lint-ci start-mongo
test-long: lint lint-ci start-mongo test-long-ci stop-mongo

test-long-ci:
@mkdir -p cover
GORACE='$(racevars)' go test -race --coverprofile='./cover/cover.out' -v -failfast -tags='testing debug netgo' -timeout=60s $(pkgs) -run=$(run) -count=$(count)
GORACE='$(racevars)' go test -race --coverprofile='./cover/cover.out' -v -tags='testing debug netgo' -timeout=600s $(integration-pkgs) -run=$(run) -count=$(count)
-make stop-mongo
GORACE='$(racevars)' go test -race --coverprofile='./cover/cover.out' -v -tags='testing debug netgo' -timeout=300s $(pkgs) -run=$(run) -count=$(count)

run-dev:
go run -tags="dev" .

.PHONY: all fmt install release check test test-long start-mongo stop-mongo run-dev
.PHONY: all fmt install release check bench test test-long test-long-ci start-mongo stop-mongo run-dev
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,30 @@
# pinner

Ensures that relevant skyfiles will be properly pinned by the portal even when individual servers are removed

## Environment variables

### Required

- `SERVER_DOMAIN`: current server name, e.g. `eu-ger-1.siasky.net`
- `SKYNET_DB_USER`: database user
- `SKYNET_DB_PASS`: database password
- `SKYNET_DB_HOST`: database host, e.g. `mongo`
- `SKYNET_DB_PORT`: database port, e.g. `27017`
- `SIA_API_PASSWORD`: API password for the local `skyd` instance. Typically available in
the `/home/user/skynet-webportal/docker/data/sia/apipassword` file.

### Optional

- `SKYNET_ACCOUNTS_HOST`: the host where `accounts` service is running, e.g. `10.10.10.70`
- `SKYNET_ACCOUNTS_PORT`: the port on which `accounts` service is running, e.g. `3000`
- `PINNER_LOG_FILE`: path to `pinner`'s log file, relative to `/home/user/skynet-webportal/docker/data/pinner/logs/`
directory. Directory traversal (e.g. `../`) is not allowed. If this value is empty, `pinner` won't log to disk!
- `PINNER_LOG_LEVEL`: log [level](https://github.com/sirupsen/logrus#level-logging), defaults to `info`
- `PINNER_SCANNER_THREADS`: number of parallel threads pinning files, defaults to 5
- `PINNER_SLEEP_BETWEEN_SCANS`: defines the time to wait between *initiating* new scans, e.g. if this value is 10 hours
and a scan was triggered at 10:00, the next scan will be triggered at 20:00 regardless of how long the first scan
took. If there is a scan running when the next scan is scheduled to start the new scan doesn't start (we don't overlap
scans). This value is given in seconds and it defaults to 19 hours.
- `API_HOST`: host on which `skyd` is running, e.g. `10.10.10.10`
- `API_PORT`: port on which `skyd` is running, e.g. `9980`
7 changes: 5 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package api
import (
"encoding/json"
"fmt"
"github.com/skynetlabs/pinner/scanner"
"net/http"

"github.com/julienschmidt/httprouter"
@@ -19,8 +20,9 @@ type (
API struct {
staticServerName string
staticDB *database.DB
staticLogger logger.ExtFieldLogger
staticLogger logger.Logger
staticRouter *httprouter.Router
staticScanner *scanner.Scanner
staticSkydClient skyd.Client
staticSweeper *sweeper.Sweeper
}
@@ -32,7 +34,7 @@ type (
)

// New returns a new initialised API.
func New(serverName string, db *database.DB, logger logger.ExtFieldLogger, skydClient skyd.Client, sweeper *sweeper.Sweeper) (*API, error) {
func New(serverName string, db *database.DB, logger logger.Logger, skydClient skyd.Client, scanner *scanner.Scanner, sweeper *sweeper.Sweeper) (*API, error) {
if db == nil {
return nil, errors.New("no DB provided")
}
@@ -47,6 +49,7 @@ func New(serverName string, db *database.DB, logger logger.ExtFieldLogger, skydC
staticDB: db,
staticLogger: logger,
staticRouter: router,
staticScanner: scanner,
staticSkydClient: skydClient,
staticSweeper: sweeper,
}
182 changes: 176 additions & 6 deletions api/handlers.go
Original file line number Diff line number Diff line change
@@ -3,19 +3,43 @@ package api
import (
"encoding/json"
"net/http"
"strconv"

"github.com/julienschmidt/httprouter"
"github.com/skynetlabs/pinner/conf"
"github.com/skynetlabs/pinner/database"
"github.com/skynetlabs/pinner/lib"
"gitlab.com/NebulousLabs/errors"
"gitlab.com/SkynetLabs/skyd/skymodules"
"go.mongodb.org/mongo-driver/mongo"
)

type (
// HealthGET is the response type of GET /health
// ServerRemoveRequest describes a payload that marks a server as dead.
ServerRemoveRequest struct {
Server string `json:"server"`
}
// ServerRemoveResponse returns the removed server and the number of
// skylinks it was pinning.
ServerRemoveResponse struct {
Server string `json:"server"`
NumSkylinks int64 `json:"numSkylinks"`
}
// HealthGET is the response type of GET /health.
// Primary field is only populated on error.
HealthGET struct {
DBAlive bool `json:"dbAlive"`
MinPinners int `json:"minPinners"`
DBAlive bool `json:"dbAlive"`
Error error `json:"error,omitempty"`
MinPinners int `json:"minPinners"`
Primary string `json:"primary,omitempty"`
}
// ExtendedHealth is a comprehensive set of information about the health
// of the DB node which includes some sensitive information. That's why we
// only log that data and we don't return it to callers.
ExtendedHealth struct {
Health *HealthGET `json:"health"`
Hello *database.Hello `json:"hello"`
NumberSessionsInProgress int `json:"numberSessionsInProgress"`
}
// SkylinkRequest describes a request that only provides a skylink.
SkylinkRequest struct {
@@ -29,13 +53,93 @@ type (

// healthGET returns the status of the service
func (api *API) healthGET(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
// The public status that we'll return as response to this call.
status := &HealthGET{
DBAlive: true,
}
// Extended health status that we'll log for the benefit of the service's
// administrators.
extHealth := ExtendedHealth{
Health: status,
NumberSessionsInProgress: api.staticDB.NumberSessionsInProgress(),
}
// Ensure that we log the extended health information after we gather as
// much of it as possible.
defer func() {
b, err := json.Marshal(extHealth)
if err != nil {
api.staticLogger.Warnf("failed to serialize extended health information. Error: %v", err)
}
api.staticLogger.Info(string(b))
}()

err := api.staticDB.Ping(req.Context())
if err != nil {
status.DBAlive = false
status.Error = errors.Compose(status.Error, err)
}
hello, err := api.staticDB.Hello(req.Context())
if err != nil {
status.Error = errors.Compose(status.Error, err)
} else {
extHealth.Hello = hello
}

mp, err := conf.MinPinners(req.Context(), api.staticDB)
var status HealthGET
status.DBAlive = err == nil
status.MinPinners = mp
if err != nil {
status.Error = errors.Compose(status.Error, err)
// Since an error has occurred, we want to add MongoDB's primary to
// the health status. We do not include this information with each call
// to health because it gives important information to potential
// attackers.
if hello != nil {
status.Primary = hello.Primary
}
} else {
status.MinPinners = mp
}

api.WriteJSON(w, status)
}

// listServersGET returns the list of servers pinning a given skylink.
func (api *API) listServersGET(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
sl, err := database.SkylinkFromString(ps.ByName("skylink"))
if err != nil {
api.WriteError(w, errors.AddContext(err, database.ErrInvalidSkylink.Error()), http.StatusBadRequest)
return
}
servers, err := api.staticDB.ServersForSkylink(req.Context(), sl)
if errors.Contains(err, database.ErrSkylinkNotExist) {
api.WriteError(w, err, http.StatusNotFound)
return
}
if err != nil {
api.WriteError(w, err, http.StatusInternalServerError)
return
}
api.WriteJSON(w, servers)
}

// listSkylinksGET returns a list of skylinks pinned by the given server.
func (api *API) listSkylinksGET(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
server := ps.ByName("server")
if server == "" {
api.WriteError(w, errors.New("invalid server value"), http.StatusBadRequest)
return
}
sls, err := api.staticDB.SkylinksForServer(req.Context(), server)
if errors.Contains(err, mongo.ErrNoDocuments) {
api.WriteError(w, err, http.StatusNotFound)
return
}
if err != nil {
api.WriteError(w, err, http.StatusInternalServerError)
return
}
api.WriteJSON(w, sls)
}

// pinPOST informs pinner that a given skylink is pinned on the current server.
// If the skylink already exists and it's marked for unpinning, this method will
// unmark it.
@@ -95,6 +199,56 @@ func (api *API) unpinPOST(w http.ResponseWriter, req *http.Request, _ httprouter
api.WriteSuccess(w)
}

// serverRemovePOST informs pinner that a given server is dead and should be removed as
// pinner from all skylinks it's marked as pinning.
func (api *API) serverRemovePOST(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
var body ServerRemoveRequest
err := json.NewDecoder(req.Body).Decode(&body)
if err != nil {
api.WriteError(w, err, http.StatusBadRequest)
return
}
if body.Server == "" {
api.WriteError(w, errors.New("no server found in request body"), http.StatusBadRequest)
return
}
ctx := req.Context()
// Schedule a scan for underpinned skylinks in an hour (unless one is
// already pending), so all of them can be repinned ASAP but also all
// servers in the cluster will have enough time to get the memo for the scan.
tNew := lib.Now().Add(conf.DefaultNextScanOffset)
tOld, err := conf.NextScan(ctx, api.staticDB, api.staticLogger)
// We just set it when we encounter an error because we can get such an
// error in two cases - there is no next scan scheduled or there is a
// problem with the DB. In the first case we want to schedule one and in the
// second we'll get the error again with the next operation.
if err != nil || tNew.Before(tOld) {
err1 := conf.SetNextScan(ctx, api.staticDB, tNew)
if err1 != nil {
err = errors.Compose(err1, errors.AddContext(err, "failed to fetch next scan"))
api.WriteError(w, errors.AddContext(err, "failed to schedule a scan"), http.StatusInternalServerError)
return
}
}
// Remove the server as pinner.
n, err := api.staticDB.RemoveServer(ctx, body.Server)
if err != nil {
api.WriteError(w, errors.AddContext(err, "failed to remove server"), http.StatusInternalServerError)
return
}
// Remove the server's load.
err = api.staticDB.DeleteServerLoad(ctx, body.Server)
if err != nil && !errors.Contains(err, database.ErrServerLoadNotFound) {
api.WriteError(w, errors.AddContext(err, "failed to clean up server's load records, please retry"), http.StatusInternalServerError)
return
}
resp := ServerRemoveResponse{
Server: body.Server,
NumSkylinks: n,
}
api.WriteJSON(w, resp)
}

// sweepPOST instructs pinner to scan the list of skylinks pinned by skyd and
// update its database. This call is non-blocking, i.e. it will immediately
// return with a success and it will only start a new sweep if there isn't one
@@ -110,6 +264,22 @@ func (api *API) sweepStatusGET(w http.ResponseWriter, _ *http.Request, _ httprou
api.WriteJSON(w, api.staticSweeper.Status())
}

// scannerStatusGET responds with the status of the latest scan.
func (api *API) scannerStatusGET(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
vStr := r.FormValue("verbose")
verbose, err := strconv.ParseBool(vStr)
if err != nil && vStr != "" {
api.WriteError(w, errors.AddContext(err, "invalid 'verbose' value"), http.StatusBadRequest)
return
}
st := api.staticScanner.Status()
if !verbose {
// Do not list all failed skylinks.
st.Failed = nil
}
api.WriteJSON(w, st)
}

// parseAndResolve parses the given string representation of a skylink and
// resolves it to a V1 skylink, in case it's a V2.
func (api *API) parseAndResolve(skylink string) (skymodules.Skylink, error) {
8 changes: 8 additions & 0 deletions api/routes.go
Original file line number Diff line number Diff line change
@@ -4,8 +4,16 @@ package api
func (api *API) buildHTTPRoutes() {
api.staticRouter.GET("/health", api.healthGET)

api.staticRouter.GET("/list/servers/:skylink", api.listServersGET)
api.staticRouter.GET("/list/skylinks/:server", api.listSkylinksGET)

api.staticRouter.POST("/pin", api.pinPOST)
api.staticRouter.POST("/unpin", api.unpinPOST)

api.staticRouter.GET("/scan/status", api.scannerStatusGET)

api.staticRouter.POST("/server/remove", api.serverRemovePOST)

api.staticRouter.POST("/sweep", api.sweepPOST)
api.staticRouter.GET("/sweep/status", api.sweepStatusGET)
}
Loading