Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/149 disable subscriptions #259

Merged
merged 18 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"eslint.workingDirectories": ["./flow_manager", "./stalker_ui"],
"eslint.workingDirectories": [
"./packages/frontend/stalker-app",
"./packages/backend/jobs-manager/service",
"./packages/backend/cron/service"
],
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ export class CronSubscription {
@Prop()
public name!: string;

@Prop()
public isEnabled!: boolean;

@Prop()
public cronExpression!: string;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,33 +94,10 @@ export class CronSubscriptionsService {

try {
await this.cacheMutex.runExclusive(async () => {
const currentRunStart = Date.now();
const now = Date.now();
for (const subscription of this.cronSubscriptionsCache) {
if (!subscription) {
this.logger.error(
'Failed to run the cron subscription as it is falsy',
);
continue;
}

try {
if (
CronSubscriptionsService.cronShouldRun(
subscription.cronExpression,
this.lastJobLaunchStart,
currentRunStart,
)
) {
// Fire and forget
this.cronConnector
.notify(subscription._id.toString())
.catch((reason) => {
this.logger.error(reason);
this.logger.error(
'Error while notifying the jobs manager. Continuing...',
);
});
}
this.notifyCronJob(subscription, now);
} catch (e) {
this.logger.error(e);
this.logger.error(
Expand All @@ -129,7 +106,7 @@ export class CronSubscriptionsService {
}
}

this.lastJobLaunchStart = currentRunStart;
this.lastJobLaunchStart = now;
});
} catch (e) {
if (e === E_TIMEOUT) {
Expand All @@ -140,4 +117,37 @@ export class CronSubscriptionsService {
this.jobLaunchRunning = false;
}
}

private notifyCronJob(subscription: CronSubscriptionsDocument, now: number) {
if (!subscription) {
this.logger.error('Failed to run the cron subscription as it is falsy');
return;
}

const { _id, cronExpression, isEnabled, name } = subscription;

const cronShouldRun = CronSubscriptionsService.cronShouldRun(
cronExpression,
this.lastJobLaunchStart,
now,
);
if (!cronShouldRun) {
return;
}

if (!isEnabled) {
this.logger.debug(
`Skipping execution for "${name}" because it is disabled.`,
);
return;
}
Aboisier marked this conversation as resolved.
Show resolved Hide resolved

// Fire and forget
this.cronConnector.notify(_id.toString()).catch((reason) => {
this.logger.error(reason);
this.logger.error(
'Error while notifying the jobs manager. Continuing...',
);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class Port {
}

export const PortSchema = SchemaFactory.createForClass(Port);

// The project id is not included here as the hostId-projectId combination is already unique
PortSchema.index(
{ 'host.id': 1, port: 1, layer4Protocol: 1 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
Patch,
Post,
Put,
Query,
UseGuards,
} from '@nestjs/common';
import { DeleteResult, UpdateResult } from 'mongodb';
Expand Down Expand Up @@ -53,13 +52,23 @@ export class CronSubscriptionsController {
@UseGuards(JwtAuthGuard, RolesGuard)
@Roles(Role.User)
@Patch(':id')
async revertSubscription(
async patch(
@Param() idDto: MongoIdDto,
@Query() queryParams: PatchSubscriptionDto,
): Promise<UpdateResult> {
if (queryParams.revert)
return await this.subscriptionsService.revertToDefaults(idDto.id);
else throw new HttpBadRequestException();
@Body() body: PatchSubscriptionDto,
): Promise<void> {
if (body == null) throw new HttpBadRequestException();

const { revert, isEnabled } = body;
if (isEnabled == null && revert == null)
throw new HttpBadRequestException();

if (revert) {
await this.subscriptionsService.revertToDefaults(idDto.id);
}

if (isEnabled != null) {
await this.subscriptionsService.updateEnabled(idDto.id, isEnabled);
}
}

@UseGuards(CronApiTokenGuard)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Type } from 'class-transformer';
import {
IsArray,
IsBoolean,
IsIn,
IsMongoId,
IsNotEmpty,
Expand All @@ -18,6 +19,10 @@ export class CronSubscriptionDto {
@IsNotEmpty()
public name!: string;

@IsBoolean()
@IsOptional()
public isEnabled?: boolean;

@IsMongoId()
@IsOptional()
public projectId?: string; // if projectId is not set, the subscription is for all projects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ describe('Cron Subscriptions Controller (e2e)', () => {
expect(foundSubscription).toBe(true);
});

it('Should revert a built-in cron subscription (PATCH /cron-subscriptions/{id}?revert=true)', async () => {
it('Should revert a built-in cron subscription (PATCH /cron-subscriptions/{id})', async () => {
// Arrange
let r = await getReq(app, testData.user.token, '/cron-subscriptions');
let builtInSub: CronSubscriptionsDocument;
Expand All @@ -137,21 +137,22 @@ describe('Cron Subscriptions Controller (e2e)', () => {
r = await patchReq(
app,
testData.user.token,
`/cron-subscriptions/${builtInSub._id}?revert=true`,
`/cron-subscriptions/${builtInSub._id}`,
{
...builtInSub,
_id: null,
builtIn: null,
name: changedName,
revert: true,
},
);

// Act
r = await patchReq(
app,
testData.user.token,
`/cron-subscriptions/${builtInSub._id}?revert=true`,
{},
`/cron-subscriptions/${builtInSub._id}`,
{ revert: true },
);

// Assert
Expand Down Expand Up @@ -238,16 +239,19 @@ describe('Cron Subscriptions Controller (e2e)', () => {
expect(success).toBe(true);
});

it('Should have proper authorizations (PATCH /cron-subscriptions/{id}?revert=true)', async () => {
it('Should have proper authorizations (PATCH /cron-subscriptions/{id})', async () => {
const success = await checkAuthorizations(
testData,
Role.User,
async (givenToken: string) => {
return await patchReq(
app,
givenToken,
`/cron-subscriptions/${subscriptionId}?revert=true`,
{},
`/cron-subscriptions/${subscriptionId}`,
{
revert: true,
isEnabled: false,
},
);
},
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ export class CronSubscription {
@Prop()
public name!: string;

@Prop()
public isEnabled!: boolean;

@Prop()
public projectId?: Types.ObjectId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export class CronSubscriptionsService {
public async create(dto: CronSubscriptionDto) {
const sub: CronSubscription = {
projectId: dto.projectId ? new Types.ObjectId(dto.projectId) : null,
isEnabled: dto.isEnabled == null ? dto.isEnabled : true,
name: dto.name,
input: dto.input ? dto.input : null,
cronExpression: dto.cronExpression,
Expand All @@ -66,6 +67,17 @@ export class CronSubscriptionsService {
return await this.subscriptionModel.create(sub);
}

public async updateEnabled(id: string, enabled: boolean) {
const subUpdate: Partial<CronSubscription> = {
isEnabled: enabled,
};

return await this.subscriptionModel.updateOne<CronSubscription>(
{ _id: { $eq: new Types.ObjectId(id) } },
subUpdate,
);
}

public async getAll() {
return await this.subscriptionModel.find({}, '-file');
}
Aboisier marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -306,8 +318,18 @@ export class CronSubscriptionsService {
finding: Finding,
projectId: string,
) {
if (!SubscriptionsUtils.shouldExecuteFromFinding(sub.conditions, finding))
if (
!SubscriptionsUtils.shouldExecuteFromFinding(
sub.isEnabled,
sub.conditions,
finding,
)
) {
this.logger.debug(
`Skipping job publication for ${sub.name}; conditions not met or subscription is disabled.`,
);
return;
Aboisier marked this conversation as resolved.
Show resolved Hide resolved
}

const parametersCopy: JobParameter[] = JSON.parse(
JSON.stringify(sub.jobParameters),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ describe('Cron Subscriptions Service', () => {
name: 'test subscription all domains',
input: 'ALL_DOMAINS',
builtIn: true,
isEnabled: true,
cronExpression: '* * * * *',
jobName: 'CustomJob',
projectId: c._id,
Expand Down Expand Up @@ -372,6 +373,7 @@ describe('Cron Subscriptions Service', () => {
name: 'test subscription all domains',
input: 'ALL_DOMAINS',
builtIn: true,
isEnabled: true,
cronExpression: '* * * * *',
jobName: 'CustomJob',
projectId: c._id,
Expand Down Expand Up @@ -530,6 +532,7 @@ describe('Cron Subscriptions Service', () => {
name: 'independent cron subscription',
input: 'ALL_DOMAINS',
builtIn: true,
isEnabled: true,
projectId: c._id,
cronExpression: '* * * * *',
jobName: 'DomainNameResolvingJob',
Expand Down Expand Up @@ -828,6 +831,7 @@ describe('Cron Subscriptions Service', () => {
name: 'test subscription all hosts',
input: 'ALL_HOSTS',
builtIn: true,
isEnabled: true,
cronExpression: '* * * * *',
jobName: 'CustomJob',
projectId: c._id,
Expand Down Expand Up @@ -889,6 +893,7 @@ describe('Cron Subscriptions Service', () => {
name: 'test subscription all hosts',
input: 'ALL_HOSTS',
builtIn: true,
isEnabled: true,
cronExpression: '* * * * *',
jobName: 'CustomJob',
projectId: c._id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
Patch,
Post,
Put,
Query,
UseGuards,
} from '@nestjs/common';
import { DeleteResult, UpdateResult } from 'mongodb';
Expand Down Expand Up @@ -46,20 +45,29 @@ export class EventSubscriptionsController {
async getSubscription(
@Param() IdDto: MongoIdDto,
): Promise<EventSubscriptionsDocument> {
// TODO 162: TEST
return await this.subscriptionsService.get(IdDto.id);
}

@UseGuards(JwtAuthGuard, RolesGuard)
@Roles(Role.User)
@Patch(':id')
async revertSubscription(
@Param() IdDto: MongoIdDto,
@Query() queryParams: PatchSubscriptionDto,
): Promise<UpdateResult> {
if (queryParams.revert)
return await this.subscriptionsService.revertToDefaults(IdDto.id);
else throw new HttpBadRequestException();
async patch(
@Param() idDto: MongoIdDto,
@Body() body: PatchSubscriptionDto,
): Promise<void> {
if (body == null) throw new HttpBadRequestException();

const { revert, isEnabled } = body;
if (isEnabled == null && revert == null)
throw new HttpBadRequestException();

if (revert) {
await this.subscriptionsService.revertToDefaults(idDto.id);
}

if (isEnabled != null) {
await this.subscriptionsService.updateEnabled(idDto.id, isEnabled);
}
}

@UseGuards(JwtAuthGuard, RolesGuard)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Type } from 'class-transformer';
import {
IsArray,
IsBoolean,
IsInt,
IsMongoId,
IsNotEmpty,
Expand All @@ -17,6 +18,10 @@ export class EventSubscriptionDto {
@IsNotEmpty()
public name!: string;

@IsBoolean()
@IsOptional()
public isEnabled?: boolean;

@IsMongoId()
@IsOptional()
public projectId?: string; // if projectId is not set, the subscription is for all projects
Expand Down
Loading
Loading