Skip to content

Commit

Permalink
Restructure move to single repo. Project now includes pipesdl, pipesp…
Browse files Browse the repository at this point in the history
…ool, pipescli, examples, plugins and, documentation
  • Loading branch information
cbergoon committed Sep 9, 2019
1 parent 72add60 commit 99b8348
Show file tree
Hide file tree
Showing 44 changed files with 1,816 additions and 37 deletions.
Empty file added .github/ISSUE_TEMPLATE.md
Empty file.
Empty file.
79 changes: 59 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@
<a href="#"><img src="https://img.shields.io/badge/version-0.1.0-brightgreen.svg" alt="Version"></a>
</p>

Pipes provides the ability to rapidly define an application using prebuilt components (processes) that are dynamically
defined.
Pipes provides the ability to rapidly define an application using prebuilt components (processes) that are linked to form pipelines. Processes and Pipelines can
be defined using a definition language called pipesdl. Pipes is a "pluggable" system meaning you can create your own process components to accomplish specialized tasks.

For now Pipes is a proof of concept and should not be used in production yet.
The goal of pipes is to provide a black box programming model that enables developers and non-devlopers to efficiently build processes tailored to a specific use case.

For now Pipes is a proof of concept and should not be used in production.

#### Features

* Concurrent execution of pipeline paths.
* Dynamic Javascript process.
* Prebuilt start shapes: HTTP, JSON FILE, Static Generator.
* Prebuilt entrypoint components: HTTP, JSON FILE, CSV (TODO), FILE (TODO), DIRECTORY (TODO), ...
* Pipeline definable with JSON.
* State tracking of process blocks.
* Customizable state changed handler.
* Process level error reporting.
* Worker pool to enable concurrent execution of pipelines.
* FUTURE: Plugin system to create custom process components.
* FUTURE: Robust daemon with CLI tooling.
* FUTURE: External plugin system for process blocks running as their own process in any language.
* FUTURE: Built-in database processes.

NEW: A definition language for pipes called [pipes-dl](https://github.com/cbergoon/pipes-dl) is also available. This provides a simple
DSL that can be used to define a pipeline.
* FUTURE: Server application providing visual composition of processes and pipelines, execution management, scheduling, deployment, etc.
* FUTURE: Connections that provide control structures.

![flow](/resources/pipes-diagram.png)
![flow](/docs/images/pipes-diagram.png)

#### Installation

Expand All @@ -38,7 +41,9 @@ $ go get github.com/cbergoon/pipes
Then import the library in your project:

```go
import "github.com/cbergoon/pipes"
import "github.com/cbergoon/pipes/pkg/pipeline"
import "github.com/cbergoon/pipes/pkg/dl"
import "github.com/cbergoon/pipes/pkg/pool"
```

#### Documentation
Expand All @@ -47,28 +52,62 @@ A Pipes pipeline consists of two main concepts: processes and connections. As yo
are linked and communicate via connections to form a pipeline. These connections also define the process graph which
define the flow of messages and execution through the pipeline.

##### Processes
Also see the [godocs](https://godoc.org/github.com/cbergoon/pipes) and the [docs](/docs/README.md) directory.

#### Pipelines

Pipelines represent the entire flow through the application formed by a group of Processes and Connections. Pipelines always
start with a generator type (a process with only outputs) and always end with a 'sink' type (a process with only inputs).
Process components that make up the middle have both inputs and outputs with varying degrees of branching.

Processes are the main parts of a process. These are similar to functions in a traditional program and define the logic
of the pipeline. Processes consist of a name, type, input ports, output ports, and a state.
Pipelines may be completely sequential, or branch off into concurrently executed components. All pipelines have at most one
'generator' and one 'sink'.

#### Processes

In pipes, processes are components that handle specific operations or tasks. These building blocks are composable and
customizable. Processes have similar characteristics to a function and can be connected and invoked by other components
in the pipeline. Processes are the main piece of a pipeline. Processes consist of a name, type, input ports, output ports,
and a state.

The type of the process specifies which of the built in types the process should use. An example of type is an HTTP
process which make HTTP requests.
process which make HTTP requests. The process type is also used to identify process plugins.

Inputs and outputs are named "ports" that the processes use to communicate.
Inputs and outputs in pipes process components are often refered to as "ports". Processes components use ports to communicate
and indicate stage completion.

The state of a process is set of definable initial data which is specifically defined per process instance.

There are currently four built-in process types: HTTP, JSON, DYNAMICJS, and GENERATOR.
There are currently four built-in process types: HTTP, JSON, DYNAMICJS, and GENERATOR. Additional official plugins can be
found in the /plugins directory at the root of the project. In the future, we hope to have a community plugin repository.

##### Connections
#### Connections

Connections define the flow of the pipeline. A complete pipeline's connections will form a subset of a p-graph where only
one start and end vertex exists. Connections pass JSON data.
one start and end vertex exists. Connections pass JSON data amongst process blocks.

Connections are defined in pipesdl by enumerating which input port the output port of a preceeding process should connect to.

Connections have a secondary purpose to communication, they define the pipeline flow and in the future will provide control-flow
primitives.

Process ports are blocking which means that if a process has 4 input ports all 4 MUST receive a message in order for the process
to continue.

#### Plugins

Custom processes can be built by creating a plugin that implements the process API. More information about using and creating custom
processes can be found [here](/docs/plugins.md).

#### Definition Language

The definition language allows entire pipelines to be scripted. This provides a human readable representation and a way to preserve
and replicate pipelines. A readable language also simplifies reuse of process components.

##### Pipelines
#### Worker Pool

Pipelines represent the entire flow through the application.
The worker pool provides a safe way to execute many pipelines at once with varying workloads in a way that preserves and extends the
existing pipeline API and state.

#### Example Usage

Expand Down
79 changes: 79 additions & 0 deletions cmd/pdlc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"github.com/cbergoon/pipes/pkg/dl"
"github.com/pkg/errors"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
)

func main() {

flag.Usage = func() {
fmt.Printf("[USAGE]: pdlc [-out-file|-minify-output] <in-file-name>\n")
flag.PrintDefaults()
}

var optOutputFile string
flag.StringVar(&optOutputFile, "out-file", "", "name of output file with extension")

var optMinifyOutput bool
flag.BoolVar(&optMinifyOutput, "minify-output", true, "minifies output file json contents")

flag.Parse()

args := flag.Args()

if len(args) != 1 {
flag.Usage()
os.Exit(-1)
}


inFileName := args[0]

source, err := ioutil.ReadFile(inFileName)
if err != nil {
log.Fatal(errors.Wrapf(err, "compile failed: could not read input file %s", inFileName))
}

l := dl.NewLexer(string(source))
p := dl.NewParser(l)

pd, err := p.ParseProgram()
if err != nil {
log.Fatal(err)
}

var definition []byte
if optMinifyOutput {
definition, err = json.Marshal(pd)
if err != nil {
log.Fatal(errors.Wrap(err, "compile failed: could not marshal definition"))
}
} else {
definition, err = json.MarshalIndent(pd, "", " ")
if err != nil {
log.Fatal(errors.Wrap(err, "compile failed: could not marshal definition"))
}
}

outFileName := ""
if optOutputFile != "" {
outFileName = optOutputFile
}else{
outFileName = strings.TrimSuffix(inFileName, filepath.Ext(inFileName)) + ".cpdl"
}

err = ioutil.WriteFile(outFileName, definition, 0644)
if err != nil {
log.Fatal(errors.Wrapf(err, "compile failed: could not write output file %s", outFileName))
}

}
59 changes: 59 additions & 0 deletions cmd/pdlrc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"

"github.com/cbergoon/pipes/pkg/dl"
"github.com/pkg/errors"
)

func main() {

flag.Usage = func() {
fmt.Printf("[USAGE]: pdlrc [-out-file] <in-file-name>\n")
flag.PrintDefaults()
}

var optOutputFile string
flag.StringVar(&optOutputFile, "out-file", "", "name of output file with extension")

flag.Parse()

args := flag.Args()

if len(args) != 1 {
flag.Usage()
os.Exit(-1)
}

inFileName := args[0]

source, err := ioutil.ReadFile(inFileName)
if err != nil {
log.Fatal(errors.Wrapf(err, "reverse compile failed: could not read input file %s", inFileName))
}

definition, err := dl.GenerateDLFromPipelineDefinitionJSON(source)
if err != nil {
log.Fatal(errors.Wrap(err, "reverse compile failed: failed to generate DL"))
}

outFileName := ""
if optOutputFile != "" {
outFileName = optOutputFile
} else {
outFileName = strings.TrimSuffix(inFileName, filepath.Ext(inFileName)) + ".pdl"
}

err = ioutil.WriteFile(outFileName, []byte(definition), 0644)
if err != nil {
log.Fatal(errors.Wrapf(err, "reverse compile failed: could not write output file %s", outFileName))
}

}
77 changes: 77 additions & 0 deletions cmd/pipescli/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package main

import (
"fmt"
"log"
"os"

"github.com/urfave/cli"
)

func main() {
app := cli.NewApp()

app.Version = "19.99.0"
app.Name = "kənˈtrīv"
app.HelpName = "contrive"

app.Commands = []cli.Command{
{
Name: "add",
Aliases: []string{"a"},
Usage: "add a task to the list",
Flags: []cli.Flag{
cli.BoolFlag{Name: "forever, forevvarr"},
},
Action: func(c *cli.Context) error {
fmt.Println("added task: ", c.Args().First(), c.Bool("forever"))
return nil
},
Before: func(c *cli.Context) error {
fmt.Fprintf(c.App.Writer, "brace for impact\n")
return nil
},
After: func(c *cli.Context) error {
fmt.Fprintf(c.App.Writer, "did we lose anyone?\n")
return nil
},
},
{
Name: "complete",
Aliases: []string{"c"},
Usage: "complete a task on the list",
Action: func(c *cli.Context) error {
fmt.Println("completed task: ", c.Args().First())
return nil
},
},
{
Name: "template",
Aliases: []string{"t"},
Usage: "options for task templates",
Subcommands: []cli.Command{
{
Name: "add",
Usage: "add a new template",
Action: func(c *cli.Context) error {
fmt.Println("new task template: ", c.Args().First())
return nil
},
},
{
Name: "remove",
Usage: "remove an existing template",
Action: func(c *cli.Context) error {
fmt.Println("removed task template: ", c.Args().First())
return nil
},
},
},
},
}

err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}
Empty file added cmd/pipesserver/main.go
Empty file.
5 changes: 0 additions & 5 deletions cmd/scheduled-pipeline/main.go

This file was deleted.

10 changes: 10 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### Pipes Documentation

1. [Getting Started](http://github.com/cbergoon/pipes/docs/getting_started.md)
2. [Overview](http://github.com/cbergoon/pipes/docs/overview.md)
3. [Pipeline](http://github.com/cbergoon/pipes/docs/pipline.md)
2. [Definition Language](http://github.com/cbergoon/pipes/docs/definition_language.md)
2. [Worker Pool](http://github.com/cbergoon/pipes/docs/worker_pool.md)
2. [CLI](http://github.com/cbergoon/pipes/docs/cli.md)
2. [Daemon](http://github.com/cbergoon/pipes/docs/daemon.md)
2. [Plugins](http://github.com/cbergoon/pipes/docs/plugins.md)
Empty file added docs/cli.md
Empty file.
Empty file added docs/daemon.md
Empty file.
Empty file added docs/definition_language.md
Empty file.
Empty file added docs/getting_started.md
Empty file.
File renamed without changes
Empty file added docs/overview.md
Empty file.
Empty file added docs/pipeline.md
Empty file.
Empty file added docs/plugins.md
Empty file.
Empty file added docs/worker_pool.md
Empty file.
1 change: 1 addition & 0 deletions examples/pdl-compiled/example.cpdl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"pipeline":{"name":"MyPipeline"},"processes":[{"typeName":"Generator","processName":"Alfa","sink":false,"inputs":null,"outputs":["Out1","Out2"],"state":null},{"typeName":"DynamicJs","processName":"Beta","sink":false,"inputs":["In"],"outputs":["Out"],"state":{"gg":"kk","src":"o = {\n \"MyVal\": In1 + \"hello\" + In2\n };\n console.log(\"hellofrom js\");\n Out = JSON.stringify(o);"}}],"connections":[{"originProcessName":"Alfa","originPortName":"Out1","destinationProcessName":"Beta","destinationPortName":"In1"},{"originProcessName":"Alfa","originPortName":"Out2","destinationProcessName":"Beta","destinationPortName":"In2"},{"originProcessName":"Beta","originPortName":"Out","destinationProcessName":"Charlie","destinationPortName":"In"}]}
17 changes: 17 additions & 0 deletions examples/pdl-source/example.pdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE PIPELINE "MyPipeline";

ADD "Alfa" OF "Generator" OUTPUTS = ("Out1", "Out2");
ADD "Beta" OF "DynamicJs"
INPUTS = ("In1", "In2")
OUTPUTS = ("Out")
SET "src" = 'o = {
"MyVal": In1 + "hello" + In2
};
console.log("hellofrom js");
Out = JSON.stringify(o);',
"gg" = "kk";
ADD SINK "Charlie" OF "Printer" INPUTS = ("In");

CONNECT "Alfa":"Out1" TO "Beta":"In1";
CONNECT "Alfa":"Out2" TO "Beta":"In2";
CONNECT "Beta":"Out" TO "Charlie":"In";
Loading

0 comments on commit 99b8348

Please sign in to comment.