Skip to content

Branch Scheduling #226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 177 additions & 32 deletions distsys/archetypeinterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,85 +4,237 @@ import (
"fmt"
"github.com/UBC-NSS/pgo/distsys/tla"
"github.com/benbjohnson/immutable"
"sync/atomic"
)

// ArchetypeInterface provides an archetype-centric interface to an MPCalContext.
// While just an opaque wrapper for an MPCalContext, it provides a separation of concerns between:
// (1) how to configure and run and MPCal archetype (available via a plain MPCalContext)
// (2) how the MPCal archetype's code accesses its configuration and internal state while running (available via ArchetypeInterface)
type ArchetypeInterface struct {
ctx *MPCalContext
ctx *MPCalContext
resourceStates map[ArchetypeResourceHandle]ArchetypeResourceState
killCh chan struct{} // TODO: not used yet, but intended for resource implementations to detect preemption
}

// Self returns the associated archetype's self binding. Requires a configured archetype.
func (iface ArchetypeInterface) Self() tla.TLAValue {
func (iface *ArchetypeInterface) Self() tla.TLAValue {
iface.ctx.requireRunnable()
return iface.ctx.self
}

func (iface ArchetypeInterface) ensureCriticalSectionWith(handle ArchetypeResourceHandle) {
iface.ctx.dirtyResourceHandles[handle] = true
func (iface *ArchetypeInterface) ensureCriticalSectionWith(handle ArchetypeResourceHandle) {
_ = iface.getResourceStateByHandle(handle)
}

// Write models the MPCal statement resourceFromHandle[indices...] := value.
// It is expected to be called only from PGo-generated code.
func (iface ArchetypeInterface) Write(handle ArchetypeResourceHandle, indices []tla.TLAValue, value tla.TLAValue) (err error) {
func (iface *ArchetypeInterface) Write(handle ArchetypeResourceHandle, indices []tla.TLAValue, value tla.TLAValue) (err error) {
iface.ensureCriticalSectionWith(handle)
res := iface.ctx.getResourceByHandle(handle)
state := iface.getResourceStateByHandle(handle)
var component ArchetypeResourceComponent = state
for _, index := range indices {
res, err = res.Index(index)
component, err = component.Index(index)
if err != nil {
return
}
}
err = res.WriteValue(value)
err = component.WriteValue(iface, value)
return
}

// Read models the MPCal expression resourceFromHandle[indices...].
// If is expected to be called only from PGo-generated code.
func (iface ArchetypeInterface) Read(handle ArchetypeResourceHandle, indices []tla.TLAValue) (value tla.TLAValue, err error) {
func (iface *ArchetypeInterface) Read(handle ArchetypeResourceHandle, indices []tla.TLAValue) (value tla.TLAValue, err error) {
iface.ensureCriticalSectionWith(handle)
res := iface.ctx.getResourceByHandle(handle)
state := iface.getResourceStateByHandle(handle)
var component ArchetypeResourceComponent = state
for _, index := range indices {
res, err = res.Index(index)
component, err = component.Index(index)
if err != nil {
return
}
}
value, err = res.ReadValue()
value, err = component.ReadValue(iface)
return
}

// NextFairnessCounter returns number [0,ceiling) indicating a non-deterministic branching decision which,
// given an MPCal critical section being retried indefinitely with no other changes, will try to guarantee that all
// possible non-deterministic branch choices will be attempted.
func (iface ArchetypeInterface) NextFairnessCounter(id string, ceiling uint) uint {
func (iface *ArchetypeInterface) NextFairnessCounter(id string, ceiling uint) uint {
return iface.ctx.fairnessCounter.NextFairnessCounter(id, ceiling)
}

type BranchResourceMap map[ArchetypeResourceHandle]ArchetypeResource

//type branch func(branchResources BranchResourceMap) error
type branch func(iface *ArchetypeInterface) error

func (iface *ArchetypeInterface) forkIFace() *ArchetypeInterface {
copiedStates := make(map[ArchetypeResourceHandle]ArchetypeResourceState, len(iface.resourceStates))
for handle, state := range iface.resourceStates {
copiedStates[handle] = state.ForkState()
}
return &ArchetypeInterface{
ctx: iface.ctx,
resourceStates: copiedStates,
killCh: make(chan struct{}),
}
}

func (iface *ArchetypeInterface) abort() {
resourceStates := iface.resourceStates
var nonTrivialAborts []chan struct{}
for _, res := range resourceStates {
nonTrivialAborts = append(nonTrivialAborts, res.Abort()...)
}
for _, ch := range nonTrivialAborts {
<-ch
}

// the go compiler optimizes this to a map clear operation
// secretly important: this also means repeated aborts will become no-ops,
// which simplifies non-deterministic cases that end up behaving like that
for resHandle := range resourceStates {
delete(resourceStates, resHandle)
}
}

func (iface *ArchetypeInterface) commit() (err error) {
resourceStates := iface.resourceStates
// dispatch all parts of the pre-commit phase asynchronously, so we only wait as long as the slowest resource
var nonTrivialPreCommits []chan error
for _, res := range resourceStates {
nonTrivialPreCommits = append(nonTrivialPreCommits, res.PreCommit()...)
}
for _, ch := range nonTrivialPreCommits {
localErr := <-ch
if localErr != nil {
err = localErr
}
}

// if there was an error, stop now, and expect either (1) total crash, or (2) Abort to be called
if err != nil {
return
}

// same as above, run all the commit processes async
var nonTrivialCommits []chan struct{}
for _, res := range resourceStates {
nonTrivialCommits = append(nonTrivialCommits, res.Commit()...)
}
for _, ch := range nonTrivialCommits {
<-ch
}

// the go compiler optimizes this to a map clear operation
for resHandle := range resourceStates {
delete(resourceStates, resHandle)
}
return
}

func (iface *ArchetypeInterface) RunBranchConcurrently(branches ...branch) error {

if len(branches) == 0 {
return ErrCriticalSectionAborted // no branches => no success
}

// Create set of forked ifaces
childIfaces := []*ArchetypeInterface{&ArchetypeInterface{
ctx: iface.ctx,
resourceStates: iface.resourceStates,
killCh: nil, // TODO
}}
iface.resourceStates = nil
for range branches[1:] {
childIfaces = append(childIfaces, iface.forkIFace())
}

type branchResult struct {
idx int32
err error
}

doneSignal := make(chan struct{})
result := branchResult{
idx: -1,
}
var abortCount int32 = 0

for i := range branches {
index := i
branch := branches[index]
iface := childIfaces[index]
go func() {
// Run branch
err := branch(iface)
if err == ErrCriticalSectionAborted {
currentAbortCount := atomic.AddInt32(&abortCount, 1)
if currentAbortCount == int32(len(branches)) {
result.err = ErrCriticalSectionAborted // ok to write, we know we're last
close(doneSignal) // we all aborted, give up
}
iface.abort() // abort on-thread, because abort might block a little

return // we aborted, so, unless we were the last, let someone else maybe succeed
}
// something happened that wasn't an abort. notify the waiting goroutine it was us
// (but only if we were the first to see something; ensure this with atomic CAS)
amIFirst := atomic.CompareAndSwapInt32(&result.idx, -1, int32(index))
if amIFirst {
// write before signal, so the waiting goroutine sees err
result.err = err
close(doneSignal)
} else {
iface.abort() // abort on-thread, because abort might block a little
}
}()
}

<-doneSignal // this counts as a memory barrier! that is, it's safe to read err and idx without atomics
if result.idx != -1 && result.err == nil {
// steal resources of successful child to continue, if there is one
iface.resourceStates = childIfaces[result.idx].resourceStates
}
return result.err
}

// GetConstant returns the constant operator bound to the given name as a variadic Go function.
// The function is generated in DefineConstantOperator, and is expected to check its own arguments.
func (iface ArchetypeInterface) GetConstant(name string) func(args ...tla.TLAValue) tla.TLAValue {
func (iface *ArchetypeInterface) GetConstant(name string) func(args ...tla.TLAValue) tla.TLAValue {
fn, wasFound := iface.ctx.constantDefns[name]
if !wasFound {
panic(fmt.Errorf("could not find constant definition %s", name))
}
return fn
}

func (iface *ArchetypeInterface) getResourceStateByHandle(handle ArchetypeResourceHandle) ArchetypeResourceState {
resourceStates := iface.resourceStates
if state, ok := resourceStates[handle]; ok {
return state
}
res := iface.ctx.getResourceByHandle(handle)
state := res.FreshState()
resourceStates[handle] = state
return state
}

// RequireArchetypeResource returns a handle to the archetype resource with the given name. It panics if this resource
// does not exist.
func (iface ArchetypeInterface) RequireArchetypeResource(name string) ArchetypeResourceHandle {
func (iface *ArchetypeInterface) RequireArchetypeResource(name string) ArchetypeResourceHandle {
handle := ArchetypeResourceHandle(name)
_ = iface.ctx.getResourceByHandle(handle)
_ = iface.getResourceStateByHandle(handle)
return handle
}

// RequireArchetypeResourceRef returns a handle to the archetype resource with the given name, when the name refers
// to a resource that was passed by ref in MPCal (in Go, ref-passing has an extra indirection that must be followed).
// If the resource does not exist, or an invalid indirection is used, this method will panic.
func (iface ArchetypeInterface) RequireArchetypeResourceRef(name string) (ArchetypeResourceHandle, error) {
func (iface *ArchetypeInterface) RequireArchetypeResourceRef(name string) (ArchetypeResourceHandle, error) {
ptr := iface.RequireArchetypeResource(name)
ptrVal, err := iface.Read(ptr, nil)
if err != nil {
Expand All @@ -93,25 +245,18 @@ func (iface ArchetypeInterface) RequireArchetypeResourceRef(name string) (Archet

// EnsureArchetypeResourceLocal ensures that a local state variable exists (local to an archetype or procedure), creating
// it with the given default value if not.
func (iface ArchetypeInterface) EnsureArchetypeResourceLocal(name string, value tla.TLAValue) {
func (iface *ArchetypeInterface) EnsureArchetypeResourceLocal(name string, value tla.TLAValue) {
_ = iface.ctx.ensureArchetypeResource(name, LocalArchetypeResourceMaker(value))
}

// ReadArchetypeResourceLocal is a short-cut to reading a local state variable, which, unlike other resources, is
// statically known to not require any critical section management. It will return the resource's value as-is, and
// will crash if the named resource isn't exactly a local state variable.
func (iface ArchetypeInterface) ReadArchetypeResourceLocal(name string) tla.TLAValue {
return iface.ctx.getResourceByHandle(ArchetypeResourceHandle(name)).(*LocalArchetypeResource).value
}

func (iface ArchetypeInterface) getCriticalSection(name string) MPCalCriticalSection {
func (iface *ArchetypeInterface) getCriticalSection(name string) MPCalCriticalSection {
if criticalSection, ok := iface.ctx.jumpTable[name]; ok {
return criticalSection
}
panic(fmt.Errorf("could not find critical section %s", name))
}

func (iface ArchetypeInterface) getProc(name string) MPCalProc {
func (iface *ArchetypeInterface) getProc(name string) MPCalProc {
if proc, ok := iface.ctx.procTable[name]; ok {
return proc
}
Expand All @@ -120,14 +265,14 @@ func (iface ArchetypeInterface) getProc(name string) MPCalProc {

var defaultLocalArchetypeResourceMaker = LocalArchetypeResourceMaker(tla.TLAValue{})

func (iface ArchetypeInterface) ensureArchetypeResourceLocalWithDefault(name string) ArchetypeResourceHandle {
func (iface *ArchetypeInterface) ensureArchetypeResourceLocalWithDefault(name string) ArchetypeResourceHandle {
return iface.ctx.ensureArchetypeResource(name, defaultLocalArchetypeResourceMaker)
}

// Goto sets the running archetype's program counter to the target value.
// It will panic if the target is not a valid label name.
// This method should be called at the end of a critical section.
func (iface ArchetypeInterface) Goto(target string) error {
func (iface *ArchetypeInterface) Goto(target string) error {
_ = iface.getCriticalSection(target) // crash now if the new pc isn't in the jump table
pc := iface.RequireArchetypeResource(".pc")
return iface.Write(pc, nil, tla.MakeTLAString(target))
Expand All @@ -142,7 +287,7 @@ func (iface ArchetypeInterface) Goto(target string) error {
// - jump to the callee's first label via Goto
//
// This method should be called at the end of a critical section.
func (iface ArchetypeInterface) Call(procName string, returnPC string, argVals ...tla.TLAValue) error {
func (iface *ArchetypeInterface) Call(procName string, returnPC string, argVals ...tla.TLAValue) error {
proc := iface.getProc(procName)
stack := iface.RequireArchetypeResource(".stack")
stackVal, err := iface.Read(stack, nil)
Expand Down Expand Up @@ -210,7 +355,7 @@ func (iface ArchetypeInterface) Call(procName string, returnPC string, argVals .
// Note: like Return, this should never be called outside a procedure, as it relies on an existing stack frame.
//
// This method, like those it wraps, should be called at the end of a critical section.
func (iface ArchetypeInterface) TailCall(procName string, argVals ...tla.TLAValue) error {
func (iface *ArchetypeInterface) TailCall(procName string, argVals ...tla.TLAValue) error {
// pull the top-of-stack return address from the initial stack, so we can use it in the tail-call process below
stack := iface.RequireArchetypeResource(".stack")
stackVal, err := iface.Read(stack, nil)
Expand Down Expand Up @@ -243,7 +388,7 @@ func (iface ArchetypeInterface) TailCall(procName string, argVals ...tla.TLAValu
// is needed for that.
//
// This method should be called at the end of a critical section.
func (iface ArchetypeInterface) Return() error {
func (iface *ArchetypeInterface) Return() error {
stack := iface.RequireArchetypeResource(".stack")
// rewrite the stack, "popping" one the head element
stackVal, err := iface.Read(stack, nil)
Expand Down
Loading