Skip to content

Commit

Permalink
Add plugin package for embedding nix-snapshotter in containerd
Browse files Browse the repository at this point in the history
  • Loading branch information
elpdt852 committed Feb 14, 2024
1 parent e9daec1 commit 34be8ea
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 21 deletions.
2 changes: 1 addition & 1 deletion package.nix
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ let
".tar"
];
};
vendorHash = "sha256-QBLePOnfsr6I19ddyZNSFDih6mCaZ/NV2Qz1B1pSHxs=";
vendorHash = "sha256-tTAp3bwbtJP+3Jr9N2ULEuaC+P8tXUeiiHXCtL54BGc=";
passthru = { inherit buildImage; };
};

Expand Down
74 changes: 61 additions & 13 deletions pkg/nix/image_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"os"
"strings"
"sync"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content/local"
Expand All @@ -14,6 +16,10 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)

var (
ErrNotInitialized = errors.New("Nix-snapshotter Image Service not yet initialized")
)

// ImageServiceConfig is used to configure the image service instance.
type ImageServiceConfig struct {
Config
Expand All @@ -25,6 +31,7 @@ type ImageServiceOpt interface {
}

type imageService struct {
mu sync.Mutex
client *containerd.Client
imageServiceClient runtime.ImageServiceClient
nixBuilder NixBuilder
Expand All @@ -40,36 +47,69 @@ func NewImageService(ctx context.Context, containerdAddr string, opts ...ImageSe
opt.SetImageServiceOpt(&cfg)
}

client, err := containerd.New(containerdAddr)
if err != nil {
return nil, err
service := &imageService{
nixBuilder: cfg.nixBuilder,
}

return &imageService{
client: client,
imageServiceClient: runtime.NewImageServiceClient(client.Conn()),
nixBuilder: cfg.nixBuilder,
}, nil
go func() {
log.G(ctx).Debugf("Waiting for CRI service is started...")
for i := 0; i < 100; i++ {
client, err := containerd.New(containerdAddr)
if err == nil {
service.mu.Lock()
service.client = client
service.imageServiceClient = runtime.NewImageServiceClient(client.Conn())
service.mu.Unlock()
log.G(ctx).Info("Connected to backend CRI service")
return
}
log.G(ctx).WithError(err).Warnf("Failed to connect to CRI")
time.Sleep(10 * time.Second)
}
log.G(ctx).Warnf("No connection is available to CRI")
}()

return service, nil
}

func (is *imageService) getClient() runtime.ImageServiceClient {
is.mu.Lock()
client := is.imageServiceClient
is.mu.Unlock()
return client
}

// ListImages lists existing images.
func (is *imageService) ListImages(ctx context.Context, req *runtime.ListImagesRequest) (*runtime.ListImagesResponse, error) {
return is.imageServiceClient.ListImages(ctx, req)
client := is.getClient()
if client == nil {
return nil, ErrNotInitialized
}
return client.ListImages(ctx, req)
}

// ImageStatus returns the status of the image. If the image is not
// present, returns a response with ImageStatusResponse.Image set to
// nil.
func (is *imageService) ImageStatus(ctx context.Context, req *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) {
return is.imageServiceClient.ImageStatus(ctx, req)
client := is.getClient()
if client == nil {
return nil, ErrNotInitialized
}
return client.ImageStatus(ctx, req)
}

// PullImage pulls an image with authentication config.
func (is *imageService) PullImage(ctx context.Context, req *runtime.PullImageRequest) (*runtime.PullImageResponse, error) {
client := is.getClient()
if client == nil {
return nil, ErrNotInitialized
}

ref := req.Image.Image
if !strings.HasPrefix(ref, "nix:0") {
log.G(ctx).WithField("ref", ref).Info("[image-service] Falling back to CRI pull image")
resp, err := is.imageServiceClient.PullImage(ctx, req)
resp, err := client.PullImage(ctx, req)
return resp, err
}
archivePath := strings.TrimSuffix(
Expand Down Expand Up @@ -121,10 +161,18 @@ func (is *imageService) PullImage(ctx context.Context, req *runtime.PullImageReq
// This call is idempotent, and must not return an error if the image has
// already been removed.
func (is *imageService) RemoveImage(ctx context.Context, req *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
return is.imageServiceClient.RemoveImage(ctx, req)
client := is.getClient()
if client == nil {
return nil, ErrNotInitialized
}
return client.RemoveImage(ctx, req)
}

// ImageFSInfo returns information of the filesystem that is used to store images.
func (is *imageService) ImageFsInfo(ctx context.Context, req *runtime.ImageFsInfoRequest) (*runtime.ImageFsInfoResponse, error) {
return is.imageServiceClient.ImageFsInfo(ctx, req)
client := is.getClient()
if client == nil {
return nil, ErrNotInitialized
}
return client.ImageFsInfo(ctx, req)
}
18 changes: 13 additions & 5 deletions pkg/nix/nix.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@ package nix

import (
"context"
"errors"
"os/exec"
"strings"

"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshots/overlay/overlayutils"
)

// Supported returns nil when the remote snapshotter is functional on the system with the root directory.
// Supported is not called during plugin initialization, but exposed for downstream projects which uses
// this snapshotter as a library.
func Supported(root string) error {
return overlayutils.Supported(root)
}

// Config is used to configure common options.
type Config struct {
nixBuilder NixBuilder
Expand Down Expand Up @@ -57,12 +65,12 @@ func defaultNixBuilder(ctx context.Context, outLink, nixStorePath string) error
}
args = append(args, nixStorePath)

_, err := exec.Command("nix", args...).Output()
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
log.G(ctx).Infof("[nix-snapshotter] Calling nix %s", strings.Join(args, " "))
out, err := exec.Command("nix", args...).CombinedOutput()
if err != nil {
log.G(ctx).
WithField("nixStorePath", nixStorePath).
Debugf("Failed to create gc root:\n%s", string(exitErr.Stderr))
Errorf("Failed to create gc root: %s\n%s", err, string(out))
}
return err
}
4 changes: 2 additions & 2 deletions pkg/nix/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (o *nixSnapshotter) prepareNixGCRoots(ctx context.Context, key string, labe
sort.Strings(sortedLabels)

gcRootsDir := filepath.Join(o.root, "gcroots", id)
log.G(ctx).Infof("Preparing nix gc roots at %s", gcRootsDir)
log.G(ctx).Infof("[nix-snapshotter] Preparing %d nix gc roots at %s", len(sortedLabels), gcRootsDir)
for _, labelKey := range sortedLabels {
if !strings.HasPrefix(labelKey, nix2container.NixStorePrefixAnnotation) {
continue
Expand All @@ -166,7 +166,6 @@ func (o *nixSnapshotter) prepareNixGCRoots(ctx context.Context, key string, labe
}

func (o *nixSnapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {

mounts, err := o.Snapshotter.View(ctx, key, parent, opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -342,6 +341,7 @@ func (o *nixSnapshotter) withNixBindMounts(ctx context.Context, key string, moun
}
pathsSeen[nixStorePath] = struct{}{}

log.G(ctx).Debugf("[nix-snapshotter] Bind mounting nix store path %s", nixStorePath)
mounts = append(mounts, mount.Mount{
Type: "bind",
Source: nixStorePath,
Expand Down
83 changes: 83 additions & 0 deletions pkg/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package plugin

import (
"errors"
"net"
"os"
"path/filepath"

"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/pdtpartners/nix-snapshotter/pkg/config"
"github.com/pdtpartners/nix-snapshotter/pkg/nix"
"google.golang.org/grpc"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)

func init() {
plugin.Register(&plugin.Registration{
Type: plugin.SnapshotPlugin,
ID: "nix",
Config: &config.Config{},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())

cfg, ok := ic.Config.(*config.Config)
if !ok {
return nil, errors.New("invalid nix configuration")
}

root := ic.Root
if cfg.Root != "" {
root = cfg.Root
}

if cfg.ImageService.Enable {
criAddr := ic.Address
if containerdAddr := cfg.ImageService.ContainerdAddress; containerdAddr != "" {
criAddr = containerdAddr
}
if criAddr == "" {
return nil, errors.New("backend CRI service address is not specified")
}

ctx := ic.Context
imageService, err := nix.NewImageService(ctx, criAddr)
if err != nil {
return nil, err
}

rpc := grpc.NewServer()
runtime.RegisterImageServiceServer(rpc, imageService)

// Prepare the directory for the socket.
err = os.MkdirAll(filepath.Dir(cfg.Address), 0o700)
if err != nil {
return nil, err
}

// Try to remove the socket file to avoid EADDRINUSE.
err = os.RemoveAll(cfg.Address)
if err != nil {
return nil, err
}

l, err := net.Listen("unix", cfg.Address)
if err != nil {
return nil, err
}

go func() {
err := rpc.Serve(l)
if err != nil {
log.G(ctx).WithError(err).Warnf("error on serving nix-snapshotter image service via socket %q", cfg.Address)
}
}()
}

ic.Meta.Exports["root"] = root
return nix.NewSnapshotter(root)
},
})
}

0 comments on commit 34be8ea

Please sign in to comment.