Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions components/ambient-api-server/plugins/sessions/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (

type sessionGRPCHandler struct {
pb.UnimplementedSessionServiceServer
service SessionService
generic services.GenericService
brokerFunc func() *server.EventBroker
msgService MessageService
service SessionService
generic services.GenericService
brokerFunc func() *server.EventBroker
msgService MessageService
grpcServiceAccount string
}

func NewSessionGRPCHandler(service SessionService, generic services.GenericService, brokerFunc func() *server.EventBroker, msgService MessageService) pb.SessionServiceServer {
func NewSessionGRPCHandler(service SessionService, generic services.GenericService, brokerFunc func() *server.EventBroker, msgService MessageService, grpcServiceAccount string) pb.SessionServiceServer {
return &sessionGRPCHandler{
service: service,
generic: generic,
brokerFunc: brokerFunc,
msgService: msgService,
service: service,
generic: generic,
brokerFunc: brokerFunc,
msgService: msgService,
grpcServiceAccount: grpcServiceAccount,
}
}

Expand Down Expand Up @@ -286,7 +288,7 @@ func (h *sessionGRPCHandler) WatchSessionMessages(req *pb.WatchSessionMessagesRe

if !middleware.IsServiceCaller(ctx) {
username := auth.GetUsernameFromContext(ctx)
if username != "" {
if username != "" && (h.grpcServiceAccount == "" || username != h.grpcServiceAccount) {
session, svcErr := h.service.Get(ctx, req.GetSessionId())
if svcErr != nil {
return grpcutil.ServiceErrorToGRPC(svcErr)
Expand Down
6 changes: 5 additions & 1 deletion components/ambient-api-server/plugins/sessions/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package sessions

import (
"net/http"
"os"
"sync"


pb "github.com/ambient-code/platform/components/ambient-api-server/pkg/api/grpc/ambient/v1"
pkgrbac "github.com/ambient-code/platform/components/ambient-api-server/plugins/rbac"
"github.com/gorilla/mux"
Expand All @@ -22,6 +24,8 @@ import (

const EventSource = "Sessions"

var grpcServiceAccount = os.Getenv("GRPC_SERVICE_ACCOUNT")

type ServiceLocator func() SessionService

func NewServiceLocator(env *environments.Env) ServiceLocator {
Expand Down Expand Up @@ -135,7 +139,7 @@ func init() {
}
return nil
}
pb.RegisterSessionServiceServer(grpcServer, NewSessionGRPCHandler(sessionService, genericService, brokerFunc, msgService))
pb.RegisterSessionServiceServer(grpcServer, NewSessionGRPCHandler(sessionService, genericService, brokerFunc, msgService, grpcServiceAccount))
})

db.RegisterMigration(migration())
Expand Down
22 changes: 17 additions & 5 deletions components/ambient-cli/cmd/acpctl/apply/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,24 +294,24 @@ func buildCredentialPatch(existing *sdktypes.Credential, doc resource) (map[stri
patch = patch.Description(doc.Description)
changed = true
}
if doc.URL != "" {
if doc.URL != "" && doc.URL != existing.Url {
patch = patch.Url(doc.URL)
changed = true
}
if doc.Email != "" {
if doc.Email != "" && doc.Email != existing.Email {
patch = patch.Email(doc.Email)
changed = true
}
token := os.ExpandEnv(doc.Token)
if token != "" {
if token != "" && token != existing.Token {
patch = patch.Token(token)
changed = true
}
if len(doc.Labels) > 0 {
if len(doc.Labels) > 0 && marshalStringMap(doc.Labels) != existing.Labels {
patch = patch.Labels(marshalStringMap(doc.Labels))
changed = true
}
if len(doc.Annotations) > 0 {
if len(doc.Annotations) > 0 && marshalStringMap(doc.Annotations) != existing.Annotations {
patch = patch.Annotations(marshalStringMap(doc.Annotations))
changed = true
}
Expand Down Expand Up @@ -649,6 +649,18 @@ func strategicMerge(base, patch resource) resource {
if patch.Prompt != "" {
base.Prompt = patch.Prompt
}
if patch.Provider != "" {
base.Provider = patch.Provider
}
if patch.Token != "" {
base.Token = patch.Token
}
if patch.URL != "" {
base.URL = patch.URL
}
if patch.Email != "" {
base.Email = patch.Email
}
for k, v := range patch.Labels {
if base.Labels == nil {
base.Labels = make(map[string]string)
Expand Down
49 changes: 38 additions & 11 deletions components/ambient-cli/demo-github.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ CREATED_PROJECT=""
CREATED_CREDENTIAL_ID=""

cleanup() {
if [[ -n "${NO_CLEANUP:-}" ]]; then
echo
yellow " NO_CLEANUP set — skipping cleanup"
dim " session: ${CREATED_SESSION_ID}"
dim " credential: ${CREATED_CREDENTIAL_ID}"
dim " project: ${CREATED_PROJECT}"
return
fi
echo
announce "Cleanup"
if [[ -n "${CREATED_SESSION_ID}" ]]; then
Expand Down Expand Up @@ -270,13 +278,32 @@ echo
announce "4 · Create GitHub credential"

sep; bold "▶ Create credential: ${CRED_NAME}"; sleep "$PAUSE"
_CRED_MANIFEST=$(mktemp --suffix=.yaml)
cat > "${_CRED_MANIFEST}" <<'CRED_EOF'
kind: Credential
name: CRED_NAME_PLACEHOLDER
provider: github
token: $DEMO_GITHUB_PAT
description: CRED_DESC_PLACEHOLDER
CRED_EOF
sed -i \
-e "s/CRED_NAME_PLACEHOLDER/${CRED_NAME}/" \
-e "s/CRED_DESC_PLACEHOLDER/GitHub PAT for demo ${RUN_ID}/" \
"${_CRED_MANIFEST}"
DEMO_GITHUB_PAT="${GITHUB_TOKEN_VALUE}" \
"$ACPCTL" apply -f "${_CRED_MANIFEST}" 2>/dev/null
rm -f "${_CRED_MANIFEST}"
CRED_JSON=$(
"$ACPCTL" credential create \
--name "${CRED_NAME}" \
--provider github \
--token "${GITHUB_TOKEN_VALUE}" \
--description "GitHub PAT for demo ${RUN_ID}" \
-o json 2>/dev/null
"$ACPCTL" get credentials -o json 2>/dev/null \
| python3 -c "
import sys, json
data = json.load(sys.stdin)
items = data.get('items', []) if isinstance(data, dict) else data
for c in items:
if c.get('name') == '${CRED_NAME}':
print(json.dumps(c))
break
" 2>/dev/null
)
CREDENTIAL_ID=$(json_field "$CRED_JSON" "id")
[[ -z "${CREDENTIAL_ID}" ]] && die "Failed to parse credential ID"
Expand All @@ -291,15 +318,15 @@ step "Verify credential visible" \

announce "5 · Bind credential to agent"

sep; bold "▶ Look up credential:reader role ID"; sleep "$PAUSE"
sep; bold "▶ Look up credential:token-reader role ID"; sleep "$PAUSE"
ROLES_JSON=$("$ACPCTL" get roles -o json 2>/dev/null)
READER_ROLE_ID=$(
echo "$ROLES_JSON" | python3 -c "
import sys, json
data = json.load(sys.stdin)
items = data.get('items', []) if isinstance(data, dict) else data
for r in items:
if r.get('name') == 'credential:reader':
if r.get('name') == 'credential:token-reader':
print(r['id'])
break
" 2>/dev/null
Expand All @@ -311,13 +338,13 @@ MY_USER_ID=$(
)

if [[ -z "${READER_ROLE_ID}" ]]; then
yellow " credential:reader role not in this deployment — skipping role binding"
yellow " credential:token-reader role not in this deployment — skipping role binding"
dim " (credential roles are seeded by the api-server migration; redeploy may be needed)"
else
dim " credential:reader role ID: ${READER_ROLE_ID}"
dim " credential:token-reader role ID: ${READER_ROLE_ID}"
dim " my user ID: ${MY_USER_ID}"

sep; bold "▶ Create role-binding: credential:reader scope=agent"; sleep "$PAUSE"
sep; bold "▶ Create role-binding: credential:token-reader scope=agent"; sleep "$PAUSE"
"$ACPCTL" create role-binding \
--user-id "${MY_USER_ID}" \
--role-id "${READER_ROLE_ID}" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func runKubeMode(ctx context.Context, cfg *config.ControlPlaneConfig) error {
sessionReconcilers := createSessionReconcilers(cfg.Reconcilers, factory, kube, projectKube, provisioner, kubeReconcilerCfg, log.Logger)
for _, sessionRec := range sessionReconcilers {
inf.RegisterHandler("sessions", sessionRec.Reconcile)
if kr, ok := sessionRec.(*reconciler.SimpleKubeReconciler); ok {
kr.StartTokenRefreshLoop(ctx)
}
}

return inf.Run(ctx)
Expand Down
84 changes: 53 additions & 31 deletions components/ambient-control-plane/internal/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ const (
)

type retryEvent struct {
event ResourceEvent
attempt int
fireAt time.Time
event ResourceEvent
handlerIndex int
attempt int
fireAt time.Time
}

type Informer struct {
Expand Down Expand Up @@ -162,13 +163,15 @@ func (inf *Informer) retryLoop(ctx context.Context) {
case re := <-inf.retryCh:
wait := time.Until(re.fireAt)
if wait > 0 {
timer := time.NewTimer(wait)
select {
case <-time.After(wait):
case <-timer.C:
case <-ctx.Done():
timer.Stop()
return
}
}
inf.dispatchEvent(ctx, re.event, re.attempt)
inf.dispatchHandler(ctx, re.event, re.handlerIndex, re.attempt)
}
}
}
Expand All @@ -178,34 +181,53 @@ func (inf *Informer) dispatchEvent(ctx context.Context, event ResourceEvent, att
handlers := inf.handlers[event.Resource]
inf.mu.RUnlock()

for _, handler := range handlers {
for i, handler := range handlers {
if err := handler(ctx, event); err != nil {
if attempt < retryMaxAttempts {
delay := retryBaseDelay * (1 << attempt)
if delay > retryMaxDelay {
delay = retryMaxDelay
}
inf.logger.Warn().
Err(err).
Str("resource", event.Resource).
Str("event_type", string(event.Type)).
Int("attempt", attempt+1).
Int("max_attempts", retryMaxAttempts).
Dur("retry_in", delay).
Msg("handler failed, will retry")
select {
case inf.retryCh <- retryEvent{event: event, attempt: attempt + 1, fireAt: time.Now().Add(delay)}:
case <-ctx.Done():
}
} else {
inf.logger.Error().
Err(err).
Str("resource", event.Resource).
Str("event_type", string(event.Type)).
Int("attempts", attempt+1).
Msg("handler failed after max retries")
}
inf.scheduleRetry(ctx, event, i, attempt, err)
}
}
}

func (inf *Informer) dispatchHandler(ctx context.Context, event ResourceEvent, handlerIndex, attempt int) {
inf.mu.RLock()
handlers := inf.handlers[event.Resource]
inf.mu.RUnlock()

if handlerIndex >= len(handlers) {
return
}
if err := handlers[handlerIndex](ctx, event); err != nil {
inf.scheduleRetry(ctx, event, handlerIndex, attempt, err)
}
}

func (inf *Informer) scheduleRetry(ctx context.Context, event ResourceEvent, handlerIndex, attempt int, err error) {
if attempt < retryMaxAttempts {
delay := retryBaseDelay * (1 << attempt)
if delay > retryMaxDelay {
delay = retryMaxDelay
}
inf.logger.Warn().
Err(err).
Str("resource", event.Resource).
Str("event_type", string(event.Type)).
Int("handler", handlerIndex).
Int("attempt", attempt+1).
Int("max_attempts", retryMaxAttempts).
Dur("retry_in", delay).
Msg("handler failed, will retry")
select {
case inf.retryCh <- retryEvent{event: event, handlerIndex: handlerIndex, attempt: attempt + 1, fireAt: time.Now().Add(delay)}:
case <-ctx.Done():
}
} else {
inf.logger.Error().
Err(err).
Str("resource", event.Resource).
Str("event_type", string(event.Type)).
Int("handler", handlerIndex).
Int("attempts", attempt+1).
Msg("handler failed after max retries")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func (kc *KubeClient) CreateSecret(ctx context.Context, obj *unstructured.Unstru
return kc.dynamic.Resource(SecretGVR).Namespace(obj.GetNamespace()).Create(ctx, obj, metav1.CreateOptions{})
}

func (kc *KubeClient) UpdateSecret(ctx context.Context, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
return kc.dynamic.Resource(SecretGVR).Namespace(obj.GetNamespace()).Update(ctx, obj, metav1.UpdateOptions{})
}

func (kc *KubeClient) DeleteSecretsByLabel(ctx context.Context, namespace, labelSelector string) error {
return kc.dynamic.Resource(SecretGVR).Namespace(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: labelSelector})
}
Expand Down
Loading
Loading