Skip to content

Commit

Permalink
Merge pull request #28 from superfly/gh-cache
Browse files Browse the repository at this point in the history
Run tests on GH actions + release speed improvements
  • Loading branch information
jeromegn authored Aug 21, 2023
2 parents 5db0ff7 + c265cf9 commit 9bce2fe
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 64 deletions.
47 changes: 47 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
on:
push:
branches: [ main ]
pull_request:
branches:
- main

name: CI
env:
RUSTFLAGS: -D warnings
CARGO_TERM_COLOR: always

jobs:
build:
name: Test
strategy:
fail-fast: false
matrix:
include:
- target: x86_64-apple-darwin
os: macos-latest
- target: x86_64-unknown-linux-gnu
os: ubuntu-latest

runs-on: ${{ matrix.os }}

steps:
- uses: actions/checkout@v3

- uses: rui314/setup-mold@v1

- name: Install Rust stable
uses: dtolnay/rust-toolchain@stable
with:
target: ${{ matrix.target }}

- name: Install cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: cargo-nextest

- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

- name: Test with latest nextest release
run: cargo nextest run --workspace --target ${{ matrix.target }}
16 changes: 13 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ on:
tags:
- v[0-9]+.*

env:
RUSTFLAGS: -D warnings
CARGO_TERM_COLOR: always

jobs:
create-release:
runs-on: ubuntu-latest
Expand All @@ -32,12 +36,18 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- name: Install cross-compilation tools
uses: taiki-e/setup-cross-toolchain-action@v1

- uses: rui314/setup-mold@v1

- uses: taiki-e/setup-cross-toolchain-action@v1
with:
target: ${{ matrix.target }}
if: startsWith(matrix.os, 'ubuntu')
- uses: rui314/setup-mold@v1

- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

- uses: taiki-e/upload-rust-binary-action@v1
with:
bin: corrosion
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cmp;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::ops::RangeInclusive;
use std::sync::Arc;

Expand Down Expand Up @@ -273,7 +274,12 @@ async fn build_quinn_client_config(config: &GossipConfig) -> eyre::Result<quinn:

pub async fn gossip_client_endpoint(config: &GossipConfig) -> eyre::Result<quinn::Endpoint> {
let client_config = build_quinn_client_config(config).await?;
let mut client = quinn::Endpoint::client("[::]:0".parse()?)?;

let client_bind_addr = match config.bind_addr {
SocketAddr::V4(_) => "0.0.0.0:0".parse()?,
SocketAddr::V6(_) => "[::]:0".parse()?,
};
let mut client = quinn::Endpoint::client(client_bind_addr)?;

client.set_default_client_config(client_config);
Ok(client)
Expand Down
27 changes: 27 additions & 0 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,33 @@ impl ColumnType {
_ => return None,
})
}

pub fn from_str(s: &str) -> Option<Self> {
Some(match s {
"INTEGER" => Self::Integer,
"REAL" => Self::Float,
"TEXT" => Self::Text,
"BLOB" => Self::Blob,
_ => return None,
})
}
}

impl FromSql for ColumnType {
fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
match value {
ValueRef::Text(s) => Ok(match String::from_utf8_lossy(s).as_ref() {
"INTEGER" => Self::Integer,
"REAL" => Self::Float,
"TEXT" => Self::Text,
"BLOB" => Self::Blob,
_ => {
return Err(FromSqlError::InvalidType);
}
}),
_ => Err(FromSqlError::InvalidType),
}
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
Expand Down
5 changes: 0 additions & 5 deletions crates/corro-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,5 @@ pub enum LogFormat {
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct ConsulConfig {
#[serde(default)]
pub extra_services_columns: Vec<String>,
#[serde(default)]
pub extra_statements: Vec<String>,

pub client: consul_client::Config,
}
1 change: 1 addition & 0 deletions crates/corrosion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ corro-admin = { path = "../corro-admin" }
corro-agent = { path = "../corro-agent" }
corro-client = { path = "../corro-client" }
corro-types = { path = "../corro-types" }
corro-api-types = { path = "../corro-api-types" }
crc32fast = { workspace = true }
eyre = { workspace = true }
fallible-iterator = { workspace = true }
Expand Down
129 changes: 74 additions & 55 deletions crates/corrosion/src/command/consul/sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use consul_client::{AgentCheck, AgentService, Client};
use corro_api_types::ColumnType;
use corro_client::CorrosionClient;
use corro_types::{api::Statement, config::ConsulConfig};
use metrics::{histogram, increment_counter};
Expand All @@ -21,8 +22,6 @@ pub async fn run<P: AsRef<Path>>(
api_addr: SocketAddr,
db_path: P,
) -> eyre::Result<()> {
tracing_subscriber::fmt::init();

let (mut tripwire, tripwire_worker) = tripwire::Tripwire::new_signals();

let node: &'static str = Box::leak(
Expand All @@ -37,9 +36,7 @@ pub async fn run<P: AsRef<Path>>(

info!("Setting up corrosion for consul sync");
setup(
&corrosion,
config.extra_services_columns.as_slice(),
config.extra_statements.as_slice(),
&corrosion
)
.await?;

Expand Down Expand Up @@ -120,12 +117,9 @@ pub async fn run<P: AsRef<Path>>(

async fn setup(
corrosion: &CorrosionClient,
extra_services_columns: &[String],
extra_statements: &[String],
) -> eyre::Result<()> {
let mut conn = corrosion.pool().get().await?;
{
let mut conn = corrosion.pool().get().await?;

let tx = conn.transaction()?;

info!("Creating internal tables");
Expand All @@ -145,51 +139,51 @@ async fn setup(
tx.commit()?;
}
info!("Ensuring schema...");
corrosion
.schema(&build_schema(extra_services_columns, extra_statements))
.await?;
Ok(())
}

fn build_schema(extra_services_columns: &[String], extra_statements: &[String]) -> Vec<Statement> {
let extra = extra_services_columns.join(",");
let mut statements = vec![
Statement::Simple(format!("CREATE TABLE consul_services (
node TEXT NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
meta TEXT NOT NULL DEFAULT '{{}}',
port INTEGER NOT NULL DEFAULT 0,
address TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
{}
PRIMARY KEY (node, id)
) WITHOUT ROWID;", if extra.is_empty() { String::new() } else {format!("{extra},")})),
Statement::Simple("CREATE INDEX consul_services_node_id_updated_at ON consul_services (node, id, updated_at);".to_string()),

Statement::Simple("CREATE TABLE consul_checks (
node TEXT NOT NULL,
id TEXT NOT NULL,
service_id TEXT NOT NULL DEFAULT '',
service_name TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
output TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (node, id)
) WITHOUT ROWID;".to_string()),
Statement::Simple("CREATE INDEX consul_checks_node_id_updated_at ON consul_checks (node, id, updated_at);".to_string()),
Statement::Simple("CREATE INDEX consul_checks_node_service_id ON consul_checks (node, service_id);".to_string()),
struct ColumnInfo {
name: String,
kind: corro_api_types::ColumnType
}

let col_infos: Vec<ColumnInfo> = conn.prepare("PRAGMA table_info(consul_services)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_services' table_info: {e}"))?.collect::<Result<Vec<_>, _>>()?;

let expected_cols = [
("node", ColumnType::Text),
("id", ColumnType::Text),
("name", ColumnType::Text),
("tags", ColumnType::Text),
("meta", ColumnType::Text),
("port", ColumnType::Integer),
("address", ColumnType::Text),
("updated_at", ColumnType::Integer),
];

for s in extra_statements {
statements.push(Statement::Simple(s.clone()));
for (name, kind) in expected_cols {
if col_infos.iter().find(|info| info.name == name && info.kind == kind ).is_none() {
eyre::bail!("expected a column consul_services.{name} w/ type {kind:?}");
}
}

statements
let col_infos: Vec<ColumnInfo> = conn.prepare("PRAGMA table_info(consul_checks)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))?.collect::<Result<Vec<_>, _>>()?;

let expected_cols = [
("node", ColumnType::Text),
("id", ColumnType::Text),
("service_id", ColumnType::Text),
("service_name", ColumnType::Text),
("name", ColumnType::Text),
("status", ColumnType::Text),
("output", ColumnType::Text),
("updated_at", ColumnType::Integer),
];

for (name, kind) in expected_cols {
if col_infos.iter().find(|info| info.name == name && info.kind == kind ).is_none() {
eyre::bail!("expected a column consul_checks.{name} w/ type {kind:?}");
}
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -584,10 +578,39 @@ mod tests {
_ = tracing_subscriber::fmt::try_init();
let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();

let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let tmpdir = tempfile::TempDir::new()?;
tokio::fs::write(tmpdir.path().join("consul.sql"), b"
CREATE TABLE consul_services (
node TEXT NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
meta TEXT NOT NULL DEFAULT '{}',
port INTEGER NOT NULL DEFAULT 0,
address TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
app_id INTEGER AS (CAST(JSON_EXTRACT(meta, '$.app_id') AS INTEGER)),
PRIMARY KEY (node, id)
);
CREATE TABLE consul_checks (
node TEXT NOT NULL,
id TEXT NOT NULL,
service_id TEXT NOT NULL DEFAULT '',
service_name TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
output TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (node, id)
);
").await?;

let ta1 = launch_test_agent(|conf| conf.add_schema_path(tmpdir.path().display().to_string()).build(), tripwire.clone()).await?;
let ta2 = launch_test_agent(
|conf| {
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()])
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()]).add_schema_path(tmpdir.path().display().to_string())
.build()
},
tripwire.clone(),
Expand All @@ -598,8 +621,6 @@ mod tests {

setup(
&ta1_client,
&["app_id INTEGER AS (CAST (JSON_EXTRACT (meta, '$.app_id') AS INTEGER))".into()],
&[],
)
.await?;

Expand Down Expand Up @@ -655,8 +676,6 @@ mod tests {

setup(
&ta2_client,
&["app_id INTEGER AS (CAST (JSON_EXTRACT (meta, '$.app_id') AS INTEGER))".into()],
&[],
)
.await?;

Expand Down

0 comments on commit 9bce2fe

Please sign in to comment.