{modifiedStep.type === FlowActionType.LOOP_ON_ITEMS && (
diff --git a/packages/react-ui/src/app/components/project-layout/project-dashboard-layout-header.tsx b/packages/react-ui/src/app/components/project-layout/project-dashboard-layout-header.tsx
index b7c5b16f53b..3e48884ece2 100644
--- a/packages/react-ui/src/app/components/project-layout/project-dashboard-layout-header.tsx
+++ b/packages/react-ui/src/app/components/project-layout/project-dashboard-layout-header.tsx
@@ -75,7 +75,7 @@ export const ProjectDashboardLayoutHeader = () => {
{
to: authenticationSession.appendProjectRoutePrefix('/todos'),
label: t('Todos'),
- show: platform.plan.todosEnabled,
+ show: !platform.plan.embeddingEnabled,
icon: ListTodo,
hasPermission: checkAccess(Permission.READ_TODOS),
},
diff --git a/packages/react-ui/src/app/components/request-trial.tsx b/packages/react-ui/src/app/components/request-trial.tsx
index 016ab34a85c..44a70f99ff6 100644
--- a/packages/react-ui/src/app/components/request-trial.tsx
+++ b/packages/react-ui/src/app/components/request-trial.tsx
@@ -12,6 +12,7 @@ export type FeatureKey =
| 'TEAM'
| 'GLOBAL_CONNECTIONS'
| 'USERS'
+ | 'EVENT_DESTINATIONS'
| 'API'
| 'SSO'
| 'AUDIT_LOGS'
diff --git a/packages/react-ui/src/app/components/sidebar/builder/flows-navigation.tsx b/packages/react-ui/src/app/components/sidebar/builder/flows-navigation.tsx
index a803cb799d1..72fa196a604 100644
--- a/packages/react-ui/src/app/components/sidebar/builder/flows-navigation.tsx
+++ b/packages/react-ui/src/app/components/sidebar/builder/flows-navigation.tsx
@@ -236,7 +236,7 @@ function DefaultFolder({
{t('Uncategorized')}
-
+
),
},
+ {
+ path: '/platform/infrastructure/event-destinations',
+ element: (
+
+
+
+
+
+ ),
+ },
{
path: '/platform/setup/billing',
element: (
diff --git a/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destination-actions.tsx b/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destination-actions.tsx
new file mode 100644
index 00000000000..1d10b2f9dfb
--- /dev/null
+++ b/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destination-actions.tsx
@@ -0,0 +1,78 @@
+import { t } from 'i18next';
+import { MoreVertical, Pencil, Trash } from 'lucide-react';
+import { useState } from 'react';
+
+import { ConfirmationDeleteDialog } from '@/components/delete-dialog';
+import { Button } from '@/components/ui/button';
+import {
+ DropdownMenu,
+ DropdownMenuItem,
+ DropdownMenuContent,
+ DropdownMenuTrigger,
+} from '@/components/ui/dropdown-menu';
+import { EventDestination } from '@activepieces/ee-shared';
+
+import { eventDestinationsCollectionUtils } from '../lib/event-destinations-collection';
+
+import { EventDestinationDialog } from './event-destination-dialog';
+
+const EventDestinationActions = ({
+ destination,
+}: {
+ destination: EventDestination;
+}) => {
+ const [dropdownOpen, setDropdownOpen] = useState(false);
+
+ return (
+
+
+
+
+
+
+
+ {
+ e.preventDefault();
+ }}
+ >
+
+ {t('Edit')}
+
+
+
+ {
+ if (destination) {
+ eventDestinationsCollectionUtils.delete([destination.id]);
+ }
+ }}
+ isDanger
+ >
+ {
+ e.preventDefault();
+ }}
+ >
+
+ {t('Delete')}
+
+
+
+
+
+ );
+};
+
+export default EventDestinationActions;
diff --git a/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destination-dialog.tsx b/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destination-dialog.tsx
new file mode 100644
index 00000000000..9c053798dc4
--- /dev/null
+++ b/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destination-dialog.tsx
@@ -0,0 +1,220 @@
+import { t } from 'i18next';
+import { useState } from 'react';
+import { useForm } from 'react-hook-form';
+import { toast } from 'sonner';
+
+import { Button } from '@/components/ui/button';
+import { Checkbox } from '@/components/ui/checkbox';
+import {
+ Dialog,
+ DialogContent,
+ DialogFooter,
+ DialogHeader,
+ DialogTitle,
+ DialogTrigger,
+} from '@/components/ui/dialog';
+import {
+ Form,
+ FormControl,
+ FormField,
+ FormItem,
+ FormLabel,
+ FormMessage,
+} from '@/components/ui/form';
+import { Input } from '@/components/ui/input';
+import { Label } from '@/components/ui/label';
+import {
+ ApplicationEventName,
+ EventDestination,
+ CreatePlatformEventDestinationRequestBody,
+} from '@activepieces/ee-shared';
+
+import { eventDestinationsCollectionUtils } from '../lib/event-destinations-collection';
+
+interface EventDestinationDialogProps {
+ children: React.ReactNode;
+ destination: EventDestination | null;
+}
+
+export const EventDestinationDialog = ({
+ children,
+ destination,
+}: EventDestinationDialogProps) => {
+ const [isOpen, setIsOpen] = useState(false);
+
+ const form = useForm({
+ defaultValues: {
+ url: destination?.url || '',
+ events: destination?.events || [],
+ },
+ });
+
+ const { mutate: testDestination, isPending: isTesting } =
+ eventDestinationsCollectionUtils.useTestEventDestination();
+
+ const { mutate: createDestination, isPending: isCreating } =
+ eventDestinationsCollectionUtils.useCreateEventDestination(
+ () => {
+ toast.success(t('Success'), {
+ description: t('Event destination created successfully'),
+ });
+ setIsOpen(false);
+ form.reset();
+ },
+ (error: Error) => {
+ toast.error(t('Error'), {
+ description: error.message,
+ });
+ },
+ );
+
+ const handleSubmit = (data: CreatePlatformEventDestinationRequestBody) => {
+ // Basic validation
+ if (!data.url || data.url.trim() === '') {
+ toast.error(t('Error'), {
+ description: t('Please enter a valid URL'),
+ });
+ return;
+ }
+
+ if (!data.events || data.events.length === 0) {
+ toast.error(t('Error'), {
+ description: t('Please select at least one event'),
+ });
+ return;
+ }
+
+ if (destination) {
+ // Update existing destination
+ try {
+ eventDestinationsCollectionUtils.update(destination.id, data);
+ toast.success(t('Success'), {
+ description: t('Event destination updated successfully'),
+ });
+ setIsOpen(false);
+ form.reset();
+ } catch (error) {
+ toast.error(t('Error'), {
+ description: error instanceof Error ? error.message : 'Unknown error',
+ });
+ }
+ } else {
+ // Create new destination
+ createDestination(data);
+ }
+ };
+
+ const availableEvents = Object.values(ApplicationEventName);
+
+ return (
+
+ );
+};
diff --git a/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destinations-table.tsx b/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destinations-table.tsx
new file mode 100644
index 00000000000..96223c9492c
--- /dev/null
+++ b/packages/react-ui/src/app/routes/platform/infra/event-destinations/components/event-destinations-table.tsx
@@ -0,0 +1,88 @@
+import { ColumnDef } from '@tanstack/react-table';
+import { t } from 'i18next';
+import { Globe } from 'lucide-react';
+
+import { Badge } from '@/components/ui/badge';
+import { DataTable, RowDataWithActions } from '@/components/ui/data-table';
+import { DataTableColumnHeader } from '@/components/ui/data-table/data-table-column-header';
+import { formatUtils } from '@/lib/utils';
+import { EventDestination } from '@activepieces/ee-shared';
+
+import EventDestinationActions from './event-destination-actions';
+
+interface EventDestinationsTableProps {
+ destinations: EventDestination[];
+}
+
+export const EventDestinationsTable = ({
+ destinations,
+}: EventDestinationsTableProps) => {
+ const columns: ColumnDef>[] = [
+ {
+ accessorKey: 'url',
+ header: ({ column }) => (
+
+ ),
+ cell: ({ row }) => (
+
+ {row.original.url}
+
+ ),
+ },
+ {
+ accessorKey: 'events',
+ header: ({ column }) => (
+
+ ),
+ cell: ({ row }) => (
+
+ {row.original.events.slice(0, 2).map((event) => (
+
+ {event.replace(/_/g, ' ')}
+
+ ))}
+ {row.original.events.length > 2 && (
+
+ +{row.original.events.length - 2} more
+
+ )}
+
+ ),
+ },
+ {
+ accessorKey: 'created',
+ header: ({ column }) => (
+
+ ),
+ cell: ({ row }) => (
+
+ {formatUtils.formatDate(new Date(row.original.created))}
+
+ ),
+ },
+ {
+ id: 'actions',
+ cell: ({ row }) => ,
+ },
+ ];
+
+ const page = {
+ data: destinations,
+ next: null,
+ previous: null,
+ };
+
+ return (
+ }
+ />
+ );
+};
diff --git a/packages/react-ui/src/app/routes/platform/infra/event-destinations/index.tsx b/packages/react-ui/src/app/routes/platform/infra/event-destinations/index.tsx
new file mode 100644
index 00000000000..6a092da3521
--- /dev/null
+++ b/packages/react-ui/src/app/routes/platform/infra/event-destinations/index.tsx
@@ -0,0 +1,51 @@
+import { t } from 'i18next';
+import { Plus } from 'lucide-react';
+
+import { DashboardPageHeader } from '@/app/components/dashboard-page-header';
+import LockedFeatureGuard from '@/app/components/locked-feature-guard';
+import { Button } from '@/components/ui/button';
+import { platformHooks } from '@/hooks/platform-hooks';
+
+import { EventDestinationDialog } from './components/event-destination-dialog';
+import { EventDestinationsTable } from './components/event-destinations-table';
+import { eventDestinationsCollectionUtils } from './lib/event-destinations-collection';
+
+const EventDestinationsPage = () => {
+ const { platform } = platformHooks.useCurrentPlatform();
+
+ const isEnabled = platform.plan.eventStreamingEnabled;
+ const { data: destinations } = eventDestinationsCollectionUtils.useAll(
+ platform.plan.eventStreamingEnabled,
+ );
+
+ return (
+
+
+
+
+
+
+
+
+
+
+ );
+};
+
+export default EventDestinationsPage;
diff --git a/packages/react-ui/src/app/routes/platform/infra/event-destinations/lib/event-destinations-collection.ts b/packages/react-ui/src/app/routes/platform/infra/event-destinations/lib/event-destinations-collection.ts
new file mode 100644
index 00000000000..cc9bdb74162
--- /dev/null
+++ b/packages/react-ui/src/app/routes/platform/infra/event-destinations/lib/event-destinations-collection.ts
@@ -0,0 +1,120 @@
+import { queryCollectionOptions } from '@tanstack/query-db-collection';
+import { createCollection, useLiveQuery } from '@tanstack/react-db';
+import { QueryClient, useMutation } from '@tanstack/react-query';
+
+import { api } from '@/lib/api';
+import {
+ CreatePlatformEventDestinationRequestBody,
+ EventDestination,
+ TestPlatformEventDestinationRequestBody,
+ UpdatePlatformEventDestinationRequestBody,
+} from '@activepieces/ee-shared';
+import { SeekPage } from '@activepieces/shared';
+
+const collectionQueryClient = new QueryClient();
+
+export const eventDestinationsCollection = createCollection<
+ EventDestination,
+ string
+>(
+ queryCollectionOptions({
+ queryKey: ['event-destinations'],
+ queryClient: collectionQueryClient,
+ queryFn: async () => {
+ const response = await api.get>(
+ '/v1/event-destinations',
+ );
+ return response.data;
+ },
+ getKey: (item) => item.id,
+ onUpdate: async ({ transaction }) => {
+ for (const { original, modified } of transaction.mutations) {
+ const request: UpdatePlatformEventDestinationRequestBody = {
+ url: modified.url,
+ events: modified.events,
+ };
+ await api.patch(
+ `/v1/event-destinations/${original.id}`,
+ request,
+ );
+ }
+ },
+ onInsert: async ({ transaction }) => {
+ for (const { modified } of transaction.mutations) {
+ const request: CreatePlatformEventDestinationRequestBody = {
+ url: modified.url,
+ events: modified.events,
+ };
+ await api.post('/v1/event-destinations', request);
+ }
+ },
+ onDelete: async ({ transaction }) => {
+ for (const { original } of transaction.mutations) {
+ await api.delete(`/v1/event-destinations/${original.id}`);
+ }
+ },
+ }),
+);
+
+export const eventDestinationsCollectionUtils = {
+ useAll: (enabled: boolean) => {
+ const queryResult = useLiveQuery(
+ (q) =>
+ q
+ .from({ destination: eventDestinationsCollection })
+ .select(({ destination }) => ({ ...destination })),
+ [],
+ );
+ if (!enabled) {
+ return {
+ data: [],
+ isLoading: false,
+ isError: false,
+ isSuccess: true,
+ };
+ }
+ return queryResult;
+ },
+
+ useCreateEventDestination: (
+ onSuccess: (destination: EventDestination) => void,
+ onError: (error: Error) => void,
+ ) => {
+ return useMutation({
+ mutationFn: (request: CreatePlatformEventDestinationRequestBody) =>
+ api.post('/v1/event-destinations', request),
+ onSuccess: (data) => {
+ eventDestinationsCollection.utils.writeInsert(data);
+ onSuccess(data);
+ },
+ onError: (error) => {
+ onError(error);
+ },
+ });
+ },
+
+ update: (
+ destinationId: string,
+ request: UpdatePlatformEventDestinationRequestBody,
+ ) => {
+ eventDestinationsCollection.update(destinationId, (draft) => {
+ Object.assign(
+ draft,
+ Object.fromEntries(
+ Object.entries(request).filter(([_, value]) => value !== undefined),
+ ),
+ );
+ });
+ },
+
+ delete: (destinationIds: string[]) => {
+ eventDestinationsCollection.delete(destinationIds);
+ },
+
+ useTestEventDestination: () => {
+ return useMutation({
+ mutationFn: (request: TestPlatformEventDestinationRequestBody) =>
+ api.post(`/v1/event-destinations/test`, request),
+ });
+ },
+};
diff --git a/packages/react-ui/src/app/routes/templates/index.tsx b/packages/react-ui/src/app/routes/templates/index.tsx
index 806bc9be66f..9c1d67ae275 100644
--- a/packages/react-ui/src/app/routes/templates/index.tsx
+++ b/packages/react-ui/src/app/routes/templates/index.tsx
@@ -43,10 +43,12 @@ const TemplatesPage = () => {
const handleTemplateSelect = (template: Template) => {
navigate(`/templates/${template.id}`);
- templatesTelemetryApi.sendEvent({
- eventType: TemplateTelemetryEventType.VIEW,
- templateId: template.id,
- });
+ if (template.type === TemplateType.OFFICIAL) {
+ templatesTelemetryApi.sendEvent({
+ eventType: TemplateTelemetryEventType.VIEW,
+ templateId: template.id,
+ });
+ }
};
const templatesByCategory = useMemo(() => {
diff --git a/packages/react-ui/src/features/billing/components/features-status.tsx b/packages/react-ui/src/features/billing/components/features-status.tsx
index c96e6aeb0c0..aba69f32253 100644
--- a/packages/react-ui/src/features/billing/components/features-status.tsx
+++ b/packages/react-ui/src/features/billing/components/features-status.tsx
@@ -65,6 +65,10 @@ const LICENSE_PROPS_MAP = {
label: 'Custom Roles',
description: 'Create and manage custom roles for your team',
},
+ eventStreamingEnabled: {
+ label: 'Event Streaming',
+ description: 'Stream platform events to external destinations',
+ },
};
export const FeatureStatus = ({
diff --git a/packages/react-ui/src/features/flow-runs/lib/flow-runs-api.ts b/packages/react-ui/src/features/flow-runs/lib/flow-runs-api.ts
index 05facf3e4b6..bd3802f3231 100644
--- a/packages/react-ui/src/features/flow-runs/lib/flow-runs-api.ts
+++ b/packages/react-ui/src/features/flow-runs/lib/flow-runs-api.ts
@@ -48,7 +48,20 @@ export const flowRunsApi = {
onUpdate: (response: FlowRun) => void,
): Promise {
socket.emit(WebsocketServerEvent.TEST_FLOW_RUN, request);
- const initialRun = await getInitialRun(socket, request.flowVersionId);
+ const initialRun = await getInitialRun(
+ socket,
+ request.flowVersionId,
+ false,
+ );
+ onUpdate(initialRun);
+ },
+ async startManualTrigger(
+ socket: Socket,
+ request: TestFlowRunRequestBody,
+ onUpdate: (response: FlowRun) => void,
+ ): Promise {
+ socket.emit(WebsocketServerEvent.MANUAL_TRIGGER_RUN_STARTED, request);
+ const initialRun = await getInitialRun(socket, request.flowVersionId, true);
onUpdate(initialRun);
},
async testStep(params: TestStepParams): Promise {
@@ -95,16 +108,28 @@ export const flowRunsApi = {
function getInitialRun(
socket: Socket,
flowVersionId: string,
+ forManualTrigger: boolean,
): Promise {
return new Promise((resolve) => {
const onRunStarted = (run: FlowRun) => {
if (run.flowVersionId !== flowVersionId) {
return;
}
- socket.off(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, onRunStarted);
+ if (forManualTrigger) {
+ socket.off(
+ WebsocketClientEvent.MANUAL_TRIGGER_RUN_STARTED,
+ onRunStarted,
+ );
+ } else {
+ socket.off(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, onRunStarted);
+ }
resolve(run);
};
- socket.on(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, onRunStarted);
+ if (forManualTrigger) {
+ socket.on(WebsocketClientEvent.MANUAL_TRIGGER_RUN_STARTED, onRunStarted);
+ } else {
+ socket.on(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, onRunStarted);
+ }
});
}
diff --git a/packages/react-ui/src/features/flows/lib/Import-flow-button.tsx b/packages/react-ui/src/features/flows/lib/Import-flow-button.tsx
index 02c28c18c09..be5afe43250 100644
--- a/packages/react-ui/src/features/flows/lib/Import-flow-button.tsx
+++ b/packages/react-ui/src/features/flows/lib/Import-flow-button.tsx
@@ -68,9 +68,7 @@ export const ImportFlowButton = ({
-
- {t('Import flow')}
-
+ {t('Import flow')}
);
diff --git a/packages/react-ui/src/features/flows/lib/flow-hooks.tsx b/packages/react-ui/src/features/flows/lib/flow-hooks.tsx
index 4950d1dbe4a..50e32f16c35 100644
--- a/packages/react-ui/src/features/flows/lib/flow-hooks.tsx
+++ b/packages/react-ui/src/features/flows/lib/flow-hooks.tsx
@@ -291,6 +291,25 @@ export const flowHooks = {
),
});
},
+ useStartManualTrigger: ({
+ flowVersionId,
+ onUpdateRun,
+ }: {
+ flowVersionId: string;
+ onUpdateRun: (run: FlowRun) => void;
+ }) => {
+ const socket = useSocket();
+ return useMutation({
+ mutationFn: () =>
+ flowRunsApi.startManualTrigger(
+ socket,
+ {
+ flowVersionId,
+ },
+ onUpdateRun,
+ ),
+ });
+ },
useListFlowVersions: (flowId: string) => {
return useQuery, Error>({
queryKey: ['flow-versions', flowId],
diff --git a/packages/react-ui/src/features/flows/lib/new-flow-button.tsx b/packages/react-ui/src/features/flows/lib/new-flow-button.tsx
index 5822e57bb0e..5ebaa5617e8 100644
--- a/packages/react-ui/src/features/flows/lib/new-flow-button.tsx
+++ b/packages/react-ui/src/features/flows/lib/new-flow-button.tsx
@@ -59,9 +59,7 @@ export const NewFlowButton = ({
)}
-
- {t('New flow')}
-
+ {t('New flow')}
);
diff --git a/packages/react-ui/src/features/pieces/components/piece-icon-list.tsx b/packages/react-ui/src/features/pieces/components/piece-icon-list.tsx
index 60ac39aade4..978c3640860 100644
--- a/packages/react-ui/src/features/pieces/components/piece-icon-list.tsx
+++ b/packages/react-ui/src/features/pieces/components/piece-icon-list.tsx
@@ -18,14 +18,14 @@ import { stepsHooks } from '../lib/steps-hooks';
import { PieceIcon } from './piece-icon';
const extraIconVariants = cva(
- 'flex items-center justify-center p-2 bg-background border border-solid text-xs select-none',
+ 'flex items-center justify-center bg-background border border-solid text-xs select-none',
{
variants: {
size: {
xxl: 'size-[64px]',
xl: 'size-[48px]',
lg: 'size-[40px]',
- md: 'size-[36px]',
+ md: 'size-[38px]',
sm: 'size-[25px]',
},
circle: {
diff --git a/packages/react-ui/src/features/pieces/components/piece-icon.tsx b/packages/react-ui/src/features/pieces/components/piece-icon.tsx
index 7504cad41b6..051a1261557 100644
--- a/packages/react-ui/src/features/pieces/components/piece-icon.tsx
+++ b/packages/react-ui/src/features/pieces/components/piece-icon.tsx
@@ -11,19 +11,19 @@ import {
import { cn } from '@/lib/utils';
const pieceIconVariants = cva(
- 'flex rounded-md items-center justify-center ',
+ 'flex rounded-md items-center justify-center bg-background ',
{
variants: {
circle: {
- true: 'rounded-full p-2',
+ true: 'rounded-full p-1.25',
false: '',
},
size: {
- xxl: 'size-[64px] p-4',
+ xxl: 'size-[64px]',
xl: 'size-[48px]',
lg: 'size-[40px]',
- md: 'size-[36px] p-1.5',
- sm: 'size-[25px]',
+ md: 'size-[36px]',
+ sm: 'size-[30px]',
xs: 'size-[18px]',
},
border: {
@@ -34,6 +34,19 @@ const pieceIconVariants = cva(
},
);
+const pieceIconVariantsWithPadding = cva('', {
+ variants: {
+ size: {
+ xxl: 'p-4',
+ xl: 'p-3',
+ lg: 'p-2',
+ md: 'p-0.75',
+ sm: 'p-1.25',
+ xs: '',
+ },
+ },
+});
+
interface PieceIconCircleProps extends VariantProps {
displayName?: string;
logoUrl?: string;
@@ -62,7 +75,10 @@ const PieceIcon = React.memo(
}
/>
diff --git a/packages/react-ui/src/features/pieces/components/piece-selector-tabs.tsx b/packages/react-ui/src/features/pieces/components/piece-selector-tabs.tsx
index ce2d6e71a18..da7b09e8611 100644
--- a/packages/react-ui/src/features/pieces/components/piece-selector-tabs.tsx
+++ b/packages/react-ui/src/features/pieces/components/piece-selector-tabs.tsx
@@ -20,7 +20,7 @@ export const PieceSelectorTabs = ({ tabs }: { tabs: TabType[] }) => {
className="max-w-md w-full"
>
{
svg]:size-5 [&>svg]:shrink-0`}
diff --git a/packages/react-ui/src/features/pieces/lib/piece-search-utils.ts b/packages/react-ui/src/features/pieces/lib/piece-search-utils.ts
index d9990aad02c..3e04ab60a5a 100644
--- a/packages/react-ui/src/features/pieces/lib/piece-search-utils.ts
+++ b/packages/react-ui/src/features/pieces/lib/piece-search-utils.ts
@@ -161,6 +161,7 @@ const sortByPieceNameOrder = (
const HIGHLIGHTED_PIECES_NAMES_FOR_TRIGGERS = [
'@activepieces/piece-webhook',
'@activepieces/piece-schedule',
+ '@activepieces/piece-manual-trigger',
'@activepieces/piece-forms',
'@activepieces/piece-tables',
];
diff --git a/packages/react-ui/src/features/pieces/lib/piece-selector-utils.ts b/packages/react-ui/src/features/pieces/lib/piece-selector-utils.ts
index 2c42950acdd..739acc3b711 100644
--- a/packages/react-ui/src/features/pieces/lib/piece-selector-utils.ts
+++ b/packages/react-ui/src/features/pieces/lib/piece-selector-utils.ts
@@ -356,6 +356,13 @@ const isChatTrigger = (pieceName: string, triggerName: string) => {
triggerName === 'chat_submission'
);
};
+
+const isManualTrigger = (pieceName: string, triggerName: string) => {
+ return (
+ pieceName === '@activepieces/piece-manual-trigger' &&
+ triggerName === 'manual_trigger'
+ );
+};
const getStepNameFromOperationType = (
operation: PieceSelectorOperation,
flowVersion: FlowVersion,
@@ -376,4 +383,5 @@ export const pieceSelectorUtils = {
isChatTrigger,
removeHiddenActions,
getStepNameFromOperationType,
+ isManualTrigger,
};
diff --git a/packages/react-ui/src/features/pieces/lib/step-utils.tsx b/packages/react-ui/src/features/pieces/lib/step-utils.tsx
index 694085e7eac..e2468ccc61b 100644
--- a/packages/react-ui/src/features/pieces/lib/step-utils.tsx
+++ b/packages/react-ui/src/features/pieces/lib/step-utils.tsx
@@ -32,25 +32,25 @@ export const CORE_STEP_METADATA: Record<
> = {
[FlowActionType.CODE]: {
displayName: t('Code'),
- logoUrl: 'https://cdn.activepieces.com/pieces/code.svg',
+ logoUrl: 'https://cdn.activepieces.com/pieces/new-core/code.svg',
description: t('Powerful Node.js & TypeScript code with npm'),
type: FlowActionType.CODE as const,
},
[FlowActionType.LOOP_ON_ITEMS]: {
displayName: t('Loop on Items'),
- logoUrl: 'https://cdn.activepieces.com/pieces/loop.svg',
+ logoUrl: 'https://cdn.activepieces.com/pieces/new-core/loop.svg',
description: 'Iterate over a list of items',
type: FlowActionType.LOOP_ON_ITEMS as const,
},
[FlowActionType.ROUTER]: {
displayName: t('Router'),
- logoUrl: 'https://cdn.activepieces.com/pieces/branch.svg',
+ logoUrl: 'https://cdn.activepieces.com/pieces/new-core/router.svg',
description: t('Split your flow into branches depending on condition(s)'),
type: FlowActionType.ROUTER as const,
},
[FlowTriggerType.EMPTY]: {
displayName: t('Empty Trigger'),
- logoUrl: 'https://cdn.activepieces.com/pieces/empty-trigger.svg',
+ logoUrl: 'https://cdn.activepieces.com/pieces/new-core/empty-trigger.svg',
description: t('Empty Trigger'),
type: FlowTriggerType.EMPTY as const,
},
diff --git a/packages/server/api/src/app/app-connection/app-connection.controller.ts b/packages/server/api/src/app/app-connection/app-connection.controller.ts
index 92b14eddcd5..bb81e06cbd5 100644
--- a/packages/server/api/src/app/app-connection/app-connection.controller.ts
+++ b/packages/server/api/src/app/app-connection/app-connection.controller.ts
@@ -40,7 +40,7 @@ export const appConnectionController: FastifyPluginCallbackTypebox = (app, _opts
metadata: request.body.metadata,
pieceVersion: request.body.pieceVersion,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.CONNECTION_UPSERTED,
data: {
connection: appConnection,
@@ -119,7 +119,7 @@ export const appConnectionController: FastifyPluginCallbackTypebox = (app, _opts
platformId: request.principal.platform.id,
projectId: request.projectId,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.CONNECTION_DELETED,
data: {
connection,
diff --git a/packages/server/api/src/app/app.ts b/packages/server/api/src/app/app.ts
index 59040ba7bfa..c98721a158b 100644
--- a/packages/server/api/src/app/app.ts
+++ b/packages/server/api/src/app/app.ts
@@ -42,6 +42,7 @@ import { adminPlatformModule } from './ee/platform/admin/admin-platform.controll
import { adminPlatformTemplatesCloudModule } from './ee/platform/admin/templates/admin-platform-templates-cloud.module'
import { platformAiCreditsService } from './ee/platform/platform-plan/platform-ai-credits.service'
import { platformPlanModule } from './ee/platform/platform-plan/platform-plan.module'
+import { platformWebhooksModule } from './ee/platform-webhooks/platform-webhooks.module'
import { projectEnterpriseHooks } from './ee/projects/ee-project-hooks'
import { platformProjectModule } from './ee/projects/platform-project-module'
import { projectMemberModule } from './ee/projects/project-members/project-member.module'
@@ -287,6 +288,7 @@ export const setupApp = async (app: FastifyInstance): Promise =
await app.register(apiKeyModule)
await app.register(gitRepoModule)
await app.register(auditEventModule)
+ await app.register(platformWebhooksModule)
await app.register(projectRoleModule)
await app.register(projectReleaseModule)
await app.register(globalConnectionModule)
@@ -312,6 +314,7 @@ export const setupApp = async (app: FastifyInstance): Promise =
await app.register(apiKeyModule)
await app.register(gitRepoModule)
await app.register(auditEventModule)
+ await app.register(platformWebhooksModule)
await app.register(projectRoleModule)
await app.register(projectReleaseModule)
await app.register(globalConnectionModule)
diff --git a/packages/server/api/src/app/authentication/authentication.controller.ts b/packages/server/api/src/app/authentication/authentication.controller.ts
index a4b5f1fbe97..fcc3edfa8bc 100644
--- a/packages/server/api/src/app/authentication/authentication.controller.ts
+++ b/packages/server/api/src/app/authentication/authentication.controller.ts
@@ -28,7 +28,7 @@ export const authenticationController: FastifyPluginAsyncTypebox = async (
platformId: platformId ?? null,
})
- applicationEvents.sendUserEvent({
+ applicationEvents(request.log).sendUserEvent({
platformId: signUpResponse.platformId!,
userId: signUpResponse.id,
projectId: signUpResponse.projectId,
@@ -54,7 +54,7 @@ export const authenticationController: FastifyPluginAsyncTypebox = async (
const responsePlatformId = response.platformId
assertNotNullOrUndefined(responsePlatformId, 'Platform ID is required')
- applicationEvents.sendUserEvent({
+ applicationEvents(request.log).sendUserEvent({
platformId: responsePlatformId,
userId: response.id,
projectId: response.projectId,
diff --git a/packages/server/api/src/app/database/database-connection.ts b/packages/server/api/src/app/database/database-connection.ts
index 3e266078220..aa90a4d1a60 100644
--- a/packages/server/api/src/app/database/database-connection.ts
+++ b/packages/server/api/src/app/database/database-connection.ts
@@ -24,6 +24,7 @@ import { GitRepoEntity } from '../ee/projects/project-release/git-sync/git-sync.
import { ProjectReleaseEntity } from '../ee/projects/project-release/project-release.entity'
import { ProjectRoleEntity } from '../ee/projects/project-role/project-role.entity'
import { SigningKeyEntity } from '../ee/signing-key/signing-key-entity'
+import { EventDestinationEntity } from '../event-destinations/event-destinations.entity'
import { FileEntity } from '../file/file.entity'
import { FlagEntity } from '../flags/flag.entity'
import { FlowEntity } from '../flows/flow/flow.entity'
@@ -108,6 +109,7 @@ function getEntities(): EntitySchema[] {
ConnectionKeyEntity,
AppCredentialEntity,
PlatformPlanEntity,
+ EventDestinationEntity,
]
}
diff --git a/packages/server/api/src/app/database/migration/postgres/1765757655723-add-display-name-to-ai-providers.ts b/packages/server/api/src/app/database/migration/postgres/1765757655723-add-display-name-to-ai-providers.ts
index c748fcbebdb..87b1a9017fd 100644
--- a/packages/server/api/src/app/database/migration/postgres/1765757655723-add-display-name-to-ai-providers.ts
+++ b/packages/server/api/src/app/database/migration/postgres/1765757655723-add-display-name-to-ai-providers.ts
@@ -5,7 +5,7 @@ export class AddDisplayNameToAiProviders1765757655723 implements MigrationInterf
public async up(queryRunner: QueryRunner): Promise {
await queryRunner.query(`
- DROP INDEX "public"."idx_ai_provider_platform_id_provider"
+ DROP INDEX "idx_ai_provider_platform_id_provider"
`)
await queryRunner.query(`
ALTER TABLE "ai_provider" ADD "displayName" character varying;
diff --git a/packages/server/api/src/app/database/migration/postgres/1766375959255-PlatformIdAndProviderUnique.ts b/packages/server/api/src/app/database/migration/postgres/1766375959255-PlatformIdAndProviderUnique.ts
index 390f86038dd..6e2ab77d468 100644
--- a/packages/server/api/src/app/database/migration/postgres/1766375959255-PlatformIdAndProviderUnique.ts
+++ b/packages/server/api/src/app/database/migration/postgres/1766375959255-PlatformIdAndProviderUnique.ts
@@ -11,7 +11,7 @@ export class PlatformIdAndProviderUnique1766375959255 implements MigrationInterf
public async down(queryRunner: QueryRunner): Promise {
await queryRunner.query(`
- DROP INDEX "public"."idx_ai_provider_platform_id_provider"
+ DROP INDEX "idx_ai_provider_platform_id_provider"
`)
}
diff --git a/packages/server/api/src/app/database/migration/postgres/1767141831647-AddBadges.ts b/packages/server/api/src/app/database/migration/postgres/1767141831647-AddBadges.ts
index 1fe726c4ac8..91a6ba13063 100644
--- a/packages/server/api/src/app/database/migration/postgres/1767141831647-AddBadges.ts
+++ b/packages/server/api/src/app/database/migration/postgres/1767141831647-AddBadges.ts
@@ -29,7 +29,7 @@ export class AddBadges1767141831647 implements MigrationInterface {
ALTER TABLE "user_badge" DROP CONSTRAINT "FK_dc6bb11dce7a0a591b5cae0af25"
`)
await queryRunner.query(`
- DROP INDEX "public"."idx_user_badge_user_id"
+ DROP INDEX "idx_user_badge_user_id"
`)
await queryRunner.query(`
DROP TABLE "user_badge"
diff --git a/packages/server/api/src/app/database/migration/postgres/1769084311004-AddEventStreaming.ts b/packages/server/api/src/app/database/migration/postgres/1769084311004-AddEventStreaming.ts
new file mode 100644
index 00000000000..13296a6336d
--- /dev/null
+++ b/packages/server/api/src/app/database/migration/postgres/1769084311004-AddEventStreaming.ts
@@ -0,0 +1,95 @@
+import { MigrationInterface, QueryRunner } from 'typeorm'
+
+export class AddEventStreaming1769084311004 implements MigrationInterface {
+ name = 'AddEventStreaming1769084311004'
+
+ public async up(queryRunner: QueryRunner): Promise {
+ await queryRunner.query(`
+ CREATE TABLE "event_destination" (
+ "id" character varying(21) NOT NULL,
+ "created" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
+ "updated" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
+ "platformId" character varying(21) NOT NULL,
+ "projectId" character varying(21),
+ "scope" character varying NOT NULL,
+ "events" character varying array NOT NULL,
+ "url" character varying NOT NULL,
+ CONSTRAINT "PK_e0fe710f7b5b768b59270f7ac05" PRIMARY KEY ("id")
+ )
+ `)
+ await queryRunner.query(`
+ CREATE INDEX "idx_event_destination_platform_scope" ON "event_destination" ("platformId")
+ WHERE scope = 'PLATFORM'
+ `)
+ await queryRunner.query(`
+ CREATE INDEX "idx_event_destination_project_scope" ON "event_destination" ("projectId")
+ WHERE scope = 'PROJECT'
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan" DROP COLUMN "todosEnabled"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan" DROP COLUMN "agentsEnabled"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan" DROP COLUMN "mcpsEnabled"
+ `)
+ // Step 1: Add the column with a nullable constraint temporarily
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ADD "eventStreamingEnabled" boolean
+ `)
+ // Step 2: Set all current rows to false
+ await queryRunner.query(`
+ UPDATE "platform_plan"
+ SET "eventStreamingEnabled" = false
+ `)
+ // Step 3: Alter the column to not null
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ALTER COLUMN "eventStreamingEnabled" SET NOT NULL
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "event_destination"
+ ADD CONSTRAINT "fk_event_destination_platform_id" FOREIGN KEY ("platformId") REFERENCES "platform"("id") ON DELETE CASCADE ON UPDATE NO ACTION
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "event_destination"
+ ADD CONSTRAINT "fk_event_destination_project_id" FOREIGN KEY ("projectId") REFERENCES "project"("id") ON DELETE CASCADE ON UPDATE NO ACTION
+ `)
+ }
+
+ public async down(queryRunner: QueryRunner): Promise {
+ await queryRunner.query(`
+ ALTER TABLE "event_destination" DROP CONSTRAINT "fk_event_destination_project_id"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "event_destination" DROP CONSTRAINT "fk_event_destination_platform_id"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan" DROP COLUMN "eventStreamingEnabled"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ADD "mcpsEnabled" boolean NOT NULL
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ADD "agentsEnabled" boolean NOT NULL
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ADD "todosEnabled" boolean NOT NULL
+ `)
+ await queryRunner.query(`
+ DROP INDEX "public"."idx_event_destination_project_scope"
+ `)
+ await queryRunner.query(`
+ DROP INDEX "public"."idx_event_destination_platform_scope"
+ `)
+ await queryRunner.query(`
+ DROP TABLE "event_destination"
+ `)
+ }
+
+}
diff --git a/packages/server/api/src/app/database/migration/postgres/1769084419658-AddEventStreaming.ts b/packages/server/api/src/app/database/migration/postgres/1769084419658-AddEventStreaming.ts
new file mode 100644
index 00000000000..ac78d9dade7
--- /dev/null
+++ b/packages/server/api/src/app/database/migration/postgres/1769084419658-AddEventStreaming.ts
@@ -0,0 +1,84 @@
+import { MigrationInterface, QueryRunner } from 'typeorm'
+
+export class AddEventStreaming1769084419658 implements MigrationInterface {
+ name = 'AddEventStreaming1769084419658'
+
+ public async up(queryRunner: QueryRunner): Promise {
+ await queryRunner.query(`
+ CREATE TABLE "event_destination" (
+ "id" character varying(21) NOT NULL,
+ "created" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
+ "updated" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
+ "platformId" character varying(21) NOT NULL,
+ "projectId" character varying(21),
+ "scope" character varying NOT NULL,
+ "events" character varying array NOT NULL,
+ "url" character varying NOT NULL,
+ CONSTRAINT "PK_e0fe710f7b5b768b59270f7ac05" PRIMARY KEY ("id")
+ )
+ `)
+ await queryRunner.query(`
+ CREATE INDEX "idx_event_destination_platform_scope" ON "event_destination" ("platformId")
+ WHERE scope = 'PLATFORM'
+ `)
+ await queryRunner.query(`
+ CREATE INDEX "idx_event_destination_project_scope" ON "event_destination" ("projectId")
+ WHERE scope = 'PROJECT'
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan" DROP COLUMN "todosEnabled"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan" DROP COLUMN "agentsEnabled"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan" DROP COLUMN "mcpsEnabled"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ADD "eventStreamingEnabled" boolean NOT NULL
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "event_destination"
+ ADD CONSTRAINT "fk_event_destination_platform_id" FOREIGN KEY ("platformId") REFERENCES "platform"("id") ON DELETE CASCADE ON UPDATE NO ACTION
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "event_destination"
+ ADD CONSTRAINT "fk_event_destination_project_id" FOREIGN KEY ("projectId") REFERENCES "project"("id") ON DELETE CASCADE ON UPDATE NO ACTION
+ `)
+ }
+
+ public async down(queryRunner: QueryRunner): Promise {
+ await queryRunner.query(`
+ ALTER TABLE "event_destination" DROP CONSTRAINT "fk_event_destination_project_id"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "event_destination" DROP CONSTRAINT "fk_event_destination_platform_id"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan" DROP COLUMN "eventStreamingEnabled"
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ADD "mcpsEnabled" boolean NOT NULL
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ADD "agentsEnabled" boolean NOT NULL
+ `)
+ await queryRunner.query(`
+ ALTER TABLE "platform_plan"
+ ADD "todosEnabled" boolean NOT NULL
+ `)
+ await queryRunner.query(`
+ DROP INDEX "public"."idx_event_destination_project_scope"
+ `)
+ await queryRunner.query(`
+ DROP INDEX "public"."idx_event_destination_platform_scope"
+ `)
+ await queryRunner.query(`
+ DROP TABLE "event_destination"
+ `)
+ }
+
+}
diff --git a/packages/server/api/src/app/database/postgres-connection.ts b/packages/server/api/src/app/database/postgres-connection.ts
index 828f38f3bc4..c6b6ab9b096 100644
--- a/packages/server/api/src/app/database/postgres-connection.ts
+++ b/packages/server/api/src/app/database/postgres-connection.ts
@@ -332,6 +332,7 @@ import { AddTablesIntoTemplateEntity1768306510367 } from './migration/postgres/1
import { AddImageToUser1768502658760 } from './migration/postgres/1768502658760-ADDIMAGETOUSER'
import { RemoveUsageCountFromTemplates1768738475196 } from './migration/postgres/1768738475196-RemoveUsageCountFromTemplates'
import { AddTemplateIdToFlowEntity1768829135202 } from './migration/postgres/1768829135202-AddTemplateIdToFlowEntity'
+import { AddEventStreaming1769084311004 } from './migration/postgres/1769084311004-AddEventStreaming'
const getSslConfig = (): boolean | TlsOptions => {
const useSsl = system.get(AppSystemProp.POSTGRES_USE_SSL)
@@ -676,6 +677,7 @@ export const getMigrations = (): (new () => MigrationInterface)[] => {
AddOutdatedToReport1767994436597,
AddNotesToFlowVersion1768130030028,
MigrateOldTemplateCategoriesToDynamicOne1767624311536,
+ AddEventStreaming1769084311004,
AddImageToUser1768502658760,
RemoveUsageCountFromTemplates1768738475196,
AddTablesIntoTemplateEntity1768306510367,
diff --git a/packages/server/api/src/app/ee/audit-logs/audit-event-service.ts b/packages/server/api/src/app/ee/audit-logs/audit-event-service.ts
index 50a12c945c7..109f89e0082 100644
--- a/packages/server/api/src/app/ee/audit-logs/audit-event-service.ts
+++ b/packages/server/api/src/app/ee/audit-logs/audit-event-service.ts
@@ -3,41 +3,28 @@ import {
} from '@activepieces/ee-shared'
import { rejectedPromiseHandler } from '@activepieces/server-shared'
import {
- apId,
Cursor,
isNil,
SeekPage,
} from '@activepieces/shared'
-import { Value } from '@sinclair/typebox/value'
import { FastifyBaseLogger } from 'fastify'
import { In } from 'typeorm'
-import { userIdentityService } from '../../authentication/user-identity/user-identity-service'
import { repoFactory } from '../../core/db/repo-factory'
-import { applicationEvents, AuditEventParam, MetaInformation } from '../../helper/application-events'
+import { applicationEvents } from '../../helper/application-events'
import { buildPaginator } from '../../helper/pagination/build-paginator'
import { paginationHelper } from '../../helper/pagination/pagination-utils'
-import { platformService } from '../../platform/platform.service'
-import { projectService } from '../../project/project-service'
-import { userService } from '../../user/user-service'
import { AuditEventEntity } from './audit-event-entity'
export const auditLogRepo = repoFactory(AuditEventEntity)
export const auditLogService = (log: FastifyBaseLogger) => ({
setup(): void {
- applicationEvents.registerListeners(log, {
- userEvent: (log) => (requestInformation, params) => {
- rejectedPromiseHandler(saveEvent(requestInformation, params, log), log)
+ applicationEvents(log).registerListeners(log, {
+ userEvent: (log) => async (params) => {
+ rejectedPromiseHandler(auditLogRepo().save(params), log)
},
- workerEvent: (log) => (projectId, params) => {
- rejectedPromiseHandler(projectService.getOneOrThrow(projectId).then((project) => {
- rejectedPromiseHandler(saveEvent({
- platformId: project.platformId,
- projectId,
- userId: undefined,
- ip: undefined,
- }, params, log), log)
- }), log)
+ workerEvent: (log) => async (_projectId, params) => {
+ rejectedPromiseHandler(auditLogRepo().save(params), log)
},
})
},
@@ -60,7 +47,7 @@ export const auditLogService = (log: FastifyBaseLogger) => ({
if (!isNil(action)) {
queryBuilder.andWhere({ action: In(action) })
}
-
+
if (!isNil(projectId)) {
queryBuilder.andWhere({ projectId: In(projectId) })
}
@@ -84,47 +71,6 @@ export const auditLogService = (log: FastifyBaseLogger) => ({
},
})
-async function saveEvent(info: MetaInformation, rawEvent: AuditEventParam, log: FastifyBaseLogger): Promise {
- const platformId = info.platformId
- const platform = await platformService.getOneWithPlanOrThrow(platformId)
- if (!platform.plan.auditLogEnabled) {
- return
- }
- const user = info.userId ? await userService.getOneOrFail({
- id: info.userId,
- }) : undefined
- const identity = !isNil(user?.identityId) ? await userIdentityService(log).getOneOrFail({
- id: user.identityId,
- }) : undefined
- const project = info.projectId ? await projectService.getOne(info.projectId) : undefined
- const eventToSave: unknown = {
- id: apId(),
- created: new Date().toISOString(),
- updated: new Date().toISOString(),
- userId: info.userId,
- userEmail: identity?.email,
- projectId: info.projectId,
- projectDisplayName: project?.displayName,
- platformId: info.platformId,
- ip: info.ip,
- data: {
- ...rawEvent.data,
- project,
- user,
- },
- action: rawEvent.action,
- }
-
- // The event may contain Date objects, so we serialize it to convert dates back to strings as per the schema.
- const clonedAndSerializedDates = JSON.parse(JSON.stringify(eventToSave))
- const cleanedEvent = Value.Clean(ApplicationEvent, clonedAndSerializedDates) as ApplicationEvent
-
- await auditLogRepo().save(cleanedEvent)
- log.info({
- action: cleanedEvent.action,
- message: '[AuditEventService#saveEvent] Audit event saved',
- })
-}
type ListParams = {
platformId: string
diff --git a/packages/server/api/src/app/ee/authentication/enterprise-local-authn/enterprise-local-authn-controller.ts b/packages/server/api/src/app/ee/authentication/enterprise-local-authn/enterprise-local-authn-controller.ts
index 4e946cca772..04a02f5fd16 100644
--- a/packages/server/api/src/app/ee/authentication/enterprise-local-authn/enterprise-local-authn-controller.ts
+++ b/packages/server/api/src/app/ee/authentication/enterprise-local-authn/enterprise-local-authn-controller.ts
@@ -11,7 +11,7 @@ export const enterpriseLocalAuthnController: FastifyPluginAsyncTypebox = async (
app,
) => {
app.post('/verify-email', VerifyEmailRequest, async (req) => {
- applicationEvents.sendUserEvent(req, {
+ applicationEvents(req.log).sendUserEvent(req, {
action: ApplicationEventName.USER_EMAIL_VERIFIED,
data: {},
})
@@ -19,7 +19,7 @@ export const enterpriseLocalAuthnController: FastifyPluginAsyncTypebox = async (
})
app.post('/reset-password', ResetPasswordRequest, async (req) => {
- applicationEvents.sendUserEvent(req, {
+ applicationEvents(req.log).sendUserEvent(req, {
action: ApplicationEventName.USER_PASSWORD_RESET,
data: {},
})
diff --git a/packages/server/api/src/app/ee/authentication/federated-authn/federated-authn-module.ts b/packages/server/api/src/app/ee/authentication/federated-authn/federated-authn-module.ts
index 54d8e1bcaa2..7f5387aab91 100644
--- a/packages/server/api/src/app/ee/authentication/federated-authn/federated-authn-module.ts
+++ b/packages/server/api/src/app/ee/authentication/federated-authn/federated-authn-module.ts
@@ -35,7 +35,7 @@ const federatedAuthnController: FastifyPluginAsyncTypebox = async (app) => {
platformId: platformId ?? undefined,
code: req.body.code,
})
- applicationEvents.sendUserEvent({
+ applicationEvents(req.log).sendUserEvent({
platformId: response.platformId!,
userId: response.id,
projectId: response.projectId,
diff --git a/packages/server/api/src/app/ee/authentication/saml-authn/authn-sso-saml-controller.ts b/packages/server/api/src/app/ee/authentication/saml-authn/authn-sso-saml-controller.ts
index 6cc55f681b8..c720f302abe 100644
--- a/packages/server/api/src/app/ee/authentication/saml-authn/authn-sso-saml-controller.ts
+++ b/packages/server/api/src/app/ee/authentication/saml-authn/authn-sso-saml-controller.ts
@@ -24,7 +24,7 @@ export const authnSsoSamlController: FastifyPluginAsyncTypebox = async (app) =>
})
const url = new URL('/authenticate', `${req.protocol}://${req.hostname}`)
url.searchParams.append('response', JSON.stringify(response))
- applicationEvents.sendUserEvent({
+ applicationEvents(req.log).sendUserEvent({
platformId,
userId: response.id,
projectId: response.projectId,
diff --git a/packages/server/api/src/app/ee/global-connections/global-connection-module.ts b/packages/server/api/src/app/ee/global-connections/global-connection-module.ts
index b27f31bf809..fc05f536e51 100644
--- a/packages/server/api/src/app/ee/global-connections/global-connection-module.ts
+++ b/packages/server/api/src/app/ee/global-connections/global-connection-module.ts
@@ -38,7 +38,7 @@ const globalConnectionController: FastifyPluginAsyncTypebox = async (app) => {
scope: AppConnectionScope.PLATFORM,
pieceVersion: request.body.pieceVersion,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.CONNECTION_UPSERTED,
data: {
connection: appConnection,
@@ -96,7 +96,7 @@ const globalConnectionController: FastifyPluginAsyncTypebox = async (app) => {
scope: AppConnectionScope.PLATFORM,
projectId: null,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.CONNECTION_DELETED,
data: {
connection,
diff --git a/packages/server/api/src/app/ee/license-keys/license-keys-service.ts b/packages/server/api/src/app/ee/license-keys/license-keys-service.ts
index 16488ded27a..b49ad27ab7c 100644
--- a/packages/server/api/src/app/ee/license-keys/license-keys-service.ts
+++ b/packages/server/api/src/app/ee/license-keys/license-keys-service.ts
@@ -146,19 +146,16 @@ export const licenseKeysService = (log: FastifyBaseLogger) => ({
customRolesEnabled: key.customRolesEnabled,
teamProjectsLimit,
managePiecesEnabled: key.managePiecesEnabled,
- mcpsEnabled: key.mcpsEnabled,
- todosEnabled: key.todosEnabled,
- tablesEnabled: key.tablesEnabled,
activeFlowsLimit: undefined,
projectsLimit: undefined,
stripeSubscriptionId: undefined,
stripeSubscriptionStatus: undefined,
- agentsEnabled: key.agentsEnabled,
manageTemplatesEnabled: key.manageTemplatesEnabled,
apiKeysEnabled: key.apiKeysEnabled,
customDomainsEnabled: key.customDomainsEnabled,
projectRolesEnabled: key.projectRolesEnabled,
analyticsEnabled: key.analyticsEnabled,
+ eventStreamingEnabled: key.eventStreamingEnabled,
},
})
},
@@ -180,8 +177,5 @@ const turnedOffFeatures: Omit {
+ app.post('/', CreateEventDestinationRequest, async (req) => {
+ return eventDestinationService(req.log).create(req.body, req.principal.platform.id)
+ })
+
+ app.patch('/:id', UpdateEventDestinationRequest, async (req) => {
+ return eventDestinationService(req.log).update({
+ id: req.params.id,
+ platformId: req.principal.platform.id,
+ request: req.body,
+ })
+ })
+ app.get('/', ListEventDestinationsRequest, async (req) => {
+ return eventDestinationService(req.log).list({
+ platformId: req.principal.platform.id,
+ cursorRequest: req.query.cursor ?? null,
+ limit: req.query.limit ?? 10,
+ })
+ })
+ app.delete('/:id', DeleteEventDestinationRequest, async (req) => {
+ return eventDestinationService(req.log).delete({
+ id: req.params.id,
+ platformId: req.principal.platform.id,
+ })
+ })
+
+ app.post('/test', TestPlatformEventDestinationRequest, async (req) => {
+ return eventDestinationService(req.log).test({
+ platformId: req.principal.platform.id,
+ projectId: undefined,
+ url: req.body.url,
+ })
+ })
+}
+
+export const CreateEventDestinationRequest = {
+ schema: {
+ body: CreatePlatformEventDestinationRequestBody,
+ },
+ config: {
+ security: securityAccess.platformAdminOnly([PrincipalType.USER, PrincipalType.SERVICE]),
+ },
+}
+
+export const UpdateEventDestinationRequest = {
+ schema: {
+ body: UpdatePlatformEventDestinationRequestBody,
+ params: Type.Object({
+ id: Type.String(),
+ }),
+ },
+ config: {
+ security: securityAccess.platformAdminOnly([PrincipalType.USER, PrincipalType.SERVICE]),
+ },
+}
+
+export const ListEventDestinationsRequest = {
+ schema: {
+ querystring: ListPlatformEventDestinationsRequestBody,
+ response: {
+ [StatusCodes.OK]: SeekPage(EventDestination),
+ },
+ tags: ['event-destinations'],
+ description: 'List event destinations',
+ },
+ response: {
+ [StatusCodes.OK]: SeekPage(EventDestination),
+ },
+ config: {
+ security: securityAccess.platformAdminOnly([PrincipalType.USER, PrincipalType.SERVICE]),
+ },
+}
+
+export const DeleteEventDestinationRequest = {
+ schema: {
+ params: Type.Object({
+ id: Type.String(),
+ }),
+ },
+ config: {
+ security: securityAccess.platformAdminOnly([PrincipalType.USER, PrincipalType.SERVICE]),
+ },
+}
+
+export const TestPlatformEventDestinationRequest = {
+ schema: {
+ body: TestPlatformEventDestinationRequestBody,
+ },
+ config: {
+ security: securityAccess.platformAdminOnly([PrincipalType.USER]),
+ },
+}
+
diff --git a/packages/server/api/src/app/ee/platform-webhooks/platform-webhooks.module.ts b/packages/server/api/src/app/ee/platform-webhooks/platform-webhooks.module.ts
new file mode 100644
index 00000000000..cf261e689b2
--- /dev/null
+++ b/packages/server/api/src/app/ee/platform-webhooks/platform-webhooks.module.ts
@@ -0,0 +1,7 @@
+import { FastifyPluginAsync } from 'fastify'
+import { platformWebhooksController } from './platform-webhooks.controller'
+
+export const platformWebhooksModule: FastifyPluginAsync = async (app) => {
+ await app.register(platformWebhooksController, { prefix: '/v1/event-destinations' })
+}
+
diff --git a/packages/server/api/src/app/ee/platform/platform-plan/platform-plan.entity.ts b/packages/server/api/src/app/ee/platform/platform-plan/platform-plan.entity.ts
index 0dda7da1b03..ba2e5319b5d 100644
--- a/packages/server/api/src/app/ee/platform/platform-plan/platform-plan.entity.ts
+++ b/packages/server/api/src/app/ee/platform/platform-plan/platform-plan.entity.ts
@@ -113,24 +113,19 @@ export const PlatformPlanEntity = new EntitySchema({
type: String,
nullable: true,
},
- tablesEnabled: {
- type: Boolean,
- },
- todosEnabled: {
- type: Boolean,
- },
projectsLimit: {
type: Number,
nullable: true,
},
- agentsEnabled: {
+ tablesEnabled: {
type: Boolean,
+ nullable: true,
},
activeFlowsLimit: {
type: Number,
nullable: true,
},
- mcpsEnabled: {
+ eventStreamingEnabled: {
type: Boolean,
},
dedicatedWorkers: {
diff --git a/packages/server/api/src/app/ee/projects/project-release/project-release.controller.ts b/packages/server/api/src/app/ee/projects/project-release/project-release.controller.ts
index 495c9188ba3..af6efe60965 100644
--- a/packages/server/api/src/app/ee/projects/project-release/project-release.controller.ts
+++ b/packages/server/api/src/app/ee/projects/project-release/project-release.controller.ts
@@ -36,7 +36,7 @@ export const projectReleaseController: FastifyPluginAsyncTypebox = async (app) =
log: req.log,
})
- applicationEvents.sendUserEvent(req, {
+ applicationEvents(req.log).sendUserEvent(req, {
action: ApplicationEventName.PROJECT_RELEASE_CREATED,
data: {
release,
diff --git a/packages/server/api/src/app/ee/projects/project-role/project-role.controller.ts b/packages/server/api/src/app/ee/projects/project-role/project-role.controller.ts
index 5f7d4572634..cdb6cdd88fa 100644
--- a/packages/server/api/src/app/ee/projects/project-role/project-role.controller.ts
+++ b/packages/server/api/src/app/ee/projects/project-role/project-role.controller.ts
@@ -37,7 +37,7 @@ export const projectRoleController: FastifyPluginAsyncTypebox = async (app) => {
await platformMustHaveFeatureEnabled((platform) => platform.plan.customRolesEnabled).call(app, req, reply)
const projectRole = await projectRoleService.create(req.principal.platform.id, req.body)
- applicationEvents.sendUserEvent(req, {
+ applicationEvents(req.log).sendUserEvent(req, {
action: ApplicationEventName.PROJECT_ROLE_CREATED,
data: {
projectRole,
@@ -54,7 +54,7 @@ export const projectRoleController: FastifyPluginAsyncTypebox = async (app) => {
name: req.body.name,
permissions: req.body.permissions,
})
- applicationEvents.sendUserEvent(req, {
+ applicationEvents(req.log).sendUserEvent(req, {
action: ApplicationEventName.PROJECT_ROLE_UPDATED,
data: {
projectRole,
@@ -70,7 +70,7 @@ export const projectRoleController: FastifyPluginAsyncTypebox = async (app) => {
name: req.params.name,
platformId: req.principal.platform.id,
})
- applicationEvents.sendUserEvent(req, {
+ applicationEvents(req.log).sendUserEvent(req, {
action: ApplicationEventName.PROJECT_ROLE_DELETED,
data: {
projectRole,
diff --git a/packages/server/api/src/app/ee/signing-key/signing-key-controller.ts b/packages/server/api/src/app/ee/signing-key/signing-key-controller.ts
index 78efd24f475..c5e9f210f6a 100644
--- a/packages/server/api/src/app/ee/signing-key/signing-key-controller.ts
+++ b/packages/server/api/src/app/ee/signing-key/signing-key-controller.ts
@@ -24,7 +24,7 @@ export const signingKeyController: FastifyPluginAsyncTypebox = async (app) => {
displayName: req.body.displayName,
})
- applicationEvents.sendUserEvent(req, {
+ applicationEvents(req.log).sendUserEvent(req, {
action: ApplicationEventName.SIGNING_KEY_CREATED,
data: {
signingKey: newSigningKey,
diff --git a/packages/server/api/src/app/event-destinations/event-destinations.entity.ts b/packages/server/api/src/app/event-destinations/event-destinations.entity.ts
new file mode 100644
index 00000000000..2f2357db096
--- /dev/null
+++ b/packages/server/api/src/app/event-destinations/event-destinations.entity.ts
@@ -0,0 +1,72 @@
+import { EventDestination, EventDestinationScope } from '@activepieces/ee-shared'
+import { Platform, Project } from '@activepieces/shared'
+import { EntitySchema } from 'typeorm'
+import { ApIdSchema, BaseColumnSchemaPart } from '../database/database-common'
+
+export type EventDestinationSchema = EventDestination & {
+ platform: Platform
+ project: Project
+}
+
+export const EventDestinationEntity = new EntitySchema({
+ name: 'event_destination',
+ columns: {
+ ...BaseColumnSchemaPart,
+ platformId: {
+ ...ApIdSchema,
+ nullable: false,
+ },
+ projectId: {
+ ...ApIdSchema,
+ nullable: true,
+ },
+ scope: {
+ type: String,
+ nullable: false,
+ },
+ events: {
+ type: String,
+ array: true,
+ nullable: false,
+ },
+ url: {
+ type: String,
+ nullable: false,
+ },
+ },
+ indices: [
+ {
+ name: 'idx_event_destination_platform_scope',
+ columns: ['platformId'],
+ where: `scope = '${EventDestinationScope.PLATFORM}'`,
+ },
+ {
+ name: 'idx_event_destination_project_scope',
+ columns: ['projectId'],
+ where: `scope = '${EventDestinationScope.PROJECT}'`,
+ },
+ ],
+ relations: {
+ platform: {
+ type: 'many-to-one',
+ target: 'platform',
+ cascade: true,
+ onDelete: 'CASCADE',
+ joinColumn: {
+ name: 'platformId',
+ foreignKeyConstraintName: 'fk_event_destination_platform_id',
+ },
+ },
+ project: {
+ type: 'many-to-one',
+ target: 'project',
+ cascade: true,
+ onDelete: 'CASCADE',
+ joinColumn: {
+ name: 'projectId',
+ foreignKeyConstraintName: 'fk_event_destination_project_id',
+ },
+ },
+ },
+})
+
diff --git a/packages/server/api/src/app/event-destinations/event-destinations.service.ts b/packages/server/api/src/app/event-destinations/event-destinations.service.ts
new file mode 100644
index 00000000000..38b93b19751
--- /dev/null
+++ b/packages/server/api/src/app/event-destinations/event-destinations.service.ts
@@ -0,0 +1,214 @@
+import {
+ ApplicationEvent,
+ ApplicationEventName,
+ CreatePlatformEventDestinationRequestBody,
+ EventDestination,
+ EventDestinationScope,
+ FlowCreatedEvent,
+ UpdatePlatformEventDestinationRequestBody,
+} from '@activepieces/ee-shared'
+import { WorkerSystemProp } from '@activepieces/server-shared'
+import { ActivepiecesError, apId, assertNotNullOrUndefined, Cursor, ErrorCode, isNil, LATEST_JOB_DATA_SCHEMA_VERSION, PlatformId, ProjectId, SeekPage, WorkerJobType } from '@activepieces/shared'
+import { FastifyBaseLogger } from 'fastify'
+import { ArrayContains, FindOptionsWhere } from 'typeorm'
+import { repoFactory } from '../core/db/repo-factory'
+import { applicationEvents } from '../helper/application-events'
+import { buildPaginator } from '../helper/pagination/build-paginator'
+import { paginationHelper } from '../helper/pagination/pagination-utils'
+import { system } from '../helper/system/system'
+import { jobQueue } from '../workers/queue/job-queue'
+import { JobType } from '../workers/queue/queue-manager'
+import {
+ EventDestinationEntity,
+ EventDestinationSchema,
+} from './event-destinations.entity'
+
+const eventDestinationRepo = repoFactory(
+ EventDestinationEntity,
+)
+
+const PROJECT_SCOPE_EVENTS = [ ApplicationEventName.FLOW_RUN_FINISHED ]
+
+export const eventDestinationService = (log: FastifyBaseLogger) => ({
+ setup(): void {
+ applicationEvents(log).registerListeners(log, {
+ userEvent: () => async (event) => {
+ await eventDestinationService(log).trigger({
+ platformId: event.platformId,
+ projectId: event.projectId,
+ event,
+ })
+ },
+ workerEvent: () => async (projectId, event) => {
+ await eventDestinationService(log).trigger({
+ platformId: event.platformId,
+ projectId,
+ event,
+ })
+ },
+ })
+ },
+ create: async (
+ request: CreatePlatformEventDestinationRequestBody,
+ platformId: string,
+ ): Promise => {
+ assertUrlIsExternal(request.url)
+ const entity: EventDestination = {
+ id: apId(),
+ created: new Date().toISOString(),
+ updated: new Date().toISOString(),
+ platformId,
+ scope: EventDestinationScope.PLATFORM,
+ events: request.events,
+ url: request.url,
+ }
+ return eventDestinationRepo().save(entity)
+ },
+ update: async ({ id, platformId, request }: UpdateParams): Promise => {
+ assertUrlIsExternal(request.url)
+ await eventDestinationRepo().update({ id, platformId }, request)
+ return eventDestinationRepo().findOneByOrFail({ id, platformId })
+ },
+ delete: async ({ id, platformId }: DeleteParams): Promise => {
+ await eventDestinationRepo().delete({
+ id,
+ platformId,
+ })
+ },
+ list: async ({
+ platformId,
+ cursorRequest,
+ limit,
+ }: ListParams): Promise> => {
+ const decodedCursor = paginationHelper.decodeCursor(cursorRequest)
+ const paginator = buildPaginator({
+ entity: EventDestinationEntity,
+ query: {
+ limit,
+ afterCursor: decodedCursor.nextCursor,
+ beforeCursor: decodedCursor.previousCursor,
+ },
+ })
+
+ const queryBuilder = eventDestinationRepo()
+ .createQueryBuilder('event_destination')
+ .where({
+ platformId,
+ })
+
+ const { data, cursor } = await paginator.paginate(queryBuilder)
+
+ return paginationHelper.createPage(data, cursor)
+ },
+ trigger: async ({ platformId, projectId, event }: TriggerParams): Promise => {
+ const conditions: FindOptionsWhere[] = [{
+ platformId,
+ events: ArrayContains([event]),
+ scope: EventDestinationScope.PLATFORM,
+ }]
+ const broadcastToProject = !isNil(projectId) && PROJECT_SCOPE_EVENTS.includes(event.action)
+ if (broadcastToProject) {
+ conditions.push({
+ projectId,
+ events: ArrayContains([event]),
+ scope: EventDestinationScope.PROJECT,
+ })
+ }
+ const destinations = await eventDestinationRepo().findBy(conditions)
+ await Promise.all(destinations.map(destination =>
+ jobQueue(log).add({
+ type: JobType.ONE_TIME,
+ id: apId(),
+ data: {
+ schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION,
+ platformId,
+ projectId,
+ webhookId: destination.id,
+ webhookUrl: destination.url,
+ payload: event.data,
+ jobType: WorkerJobType.EVENT_DESTINATION,
+ },
+ }),
+ ))
+ },
+ test: async ({ platformId, projectId, url }: TestParams): Promise => {
+ assertUrlIsExternal(url)
+ const mockEvent: FlowCreatedEvent = {
+ id: apId(),
+ created: new Date().toISOString(),
+ updated: new Date().toISOString(),
+ ip: '127.0.0.1',
+ platformId,
+ data: {
+ flow: {
+ id: apId(),
+ created: new Date().toISOString(),
+ updated: new Date().toISOString(),
+ },
+ project: {
+ displayName: 'Dream Department',
+ },
+ },
+ projectId,
+ userId: apId(),
+ action: ApplicationEventName.FLOW_CREATED,
+ }
+ await jobQueue(log).add({
+ type: JobType.ONE_TIME,
+ id: apId(),
+ data: {
+ schemaVersion: LATEST_JOB_DATA_SCHEMA_VERSION,
+ platformId,
+ projectId,
+ webhookId: apId(),
+ webhookUrl: url,
+ payload: mockEvent,
+ jobType: WorkerJobType.EVENT_DESTINATION,
+ },
+ })
+ },
+})
+
+const assertUrlIsExternal = (url: string) => {
+ const frontendUrl = system.get(WorkerSystemProp.FRONTEND_URL)
+ assertNotNullOrUndefined(frontendUrl, 'frontendUrl')
+ if (new URL(url).host === new URL(frontendUrl).host) {
+ throw new ActivepiecesError({
+ code: ErrorCode.VALIDATION,
+ params: {
+ message: 'Activepieces URL is not allowed to avoid recursive calls',
+ },
+ })
+ }
+}
+
+
+type DeleteParams = {
+ id: string
+ platformId: string
+}
+
+type UpdateParams = {
+ id: string
+ platformId: string
+ request: UpdatePlatformEventDestinationRequestBody
+}
+
+type ListParams = {
+ platformId: PlatformId
+ cursorRequest: Cursor
+ limit?: number
+}
+
+type TriggerParams = {
+ platformId: PlatformId
+ projectId?: ProjectId
+ event: Pick
+}
+
+type TestParams = {
+ platformId: PlatformId
+ projectId?: ProjectId
+ url: string
+}
+
diff --git a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts
index 16c2d55e0a4..23dbda3de77 100644
--- a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts
+++ b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts
@@ -345,6 +345,31 @@ export const flowRunService = (log: FastifyBaseLogger) => ({
sampleData: !isNil(stepNameToTest) ? await sampleDataService(log).getSampleDataForFlow(projectId, flowVersion, SampleDataFileType.OUTPUT) : undefined,
}, log)
},
+ async startManualTrigger({ projectId, flowVersionId, triggeredBy }: StartManualTriggerParams): Promise {
+ const flowVersion = await flowVersionService(log).getOneOrThrow(flowVersionId)
+ const triggerPayload = {}
+ const flowRun = await queueOrCreateInstantly({
+ projectId,
+ flowId: flowVersion.flowId,
+ flowVersionId: flowVersion.id,
+ environment: RunEnvironment.PRODUCTION,
+ parentRunId: undefined,
+ failParentOnFailure: undefined,
+ stepNameToTest: undefined,
+ triggeredBy,
+ }, log)
+ return addToQueue({
+ flowRun,
+ payload: triggerPayload,
+ executionType: ExecutionType.BEGIN,
+ synchronousHandlerId: undefined,
+ httpRequestId: undefined,
+ platformId: await projectService.getPlatformId(projectId),
+ executeTrigger: false,
+ progressUpdateType: ProgressUpdateType.TEST_FLOW,
+ sampleData: undefined,
+ }, log)
+ },
async getOne(params: GetOneParams): Promise {
const flowRun = await queryBuilderForFlowRun(flowRunRepo()).where({
id: params.id,
@@ -690,6 +715,11 @@ type TestParams = {
stepNameToTest?: string
}
+type StartManualTriggerParams = {
+ projectId: ProjectId
+ flowVersionId: FlowVersionId
+ triggeredBy: string
+}
type RetryParams = {
flowRunId: FlowRunId
strategy: FlowRetryStrategy
diff --git a/packages/server/api/src/app/flows/flow-run/flow-run-side-effects.ts b/packages/server/api/src/app/flows/flow-run/flow-run-side-effects.ts
index e298c9b1f26..2e682f7a9ad 100644
--- a/packages/server/api/src/app/flows/flow-run/flow-run-side-effects.ts
+++ b/packages/server/api/src/app/flows/flow-run/flow-run-side-effects.ts
@@ -16,7 +16,7 @@ export const flowRunSideEffects = (log: FastifyBaseLogger) => ({
return
}
await flowRunHooks(log).onFinish(flowRun)
- applicationEvents.sendWorkerEvent(flowRun.projectId, {
+ applicationEvents(log).sendWorkerEvent(flowRun.projectId, {
action: ApplicationEventName.FLOW_RUN_FINISHED,
data: {
flowRun,
@@ -24,7 +24,7 @@ export const flowRunSideEffects = (log: FastifyBaseLogger) => ({
})
},
async onResume(flowRun: FlowRun): Promise {
- applicationEvents.sendWorkerEvent(flowRun.projectId, {
+ applicationEvents(log).sendWorkerEvent(flowRun.projectId, {
action: ApplicationEventName.FLOW_RUN_RESUMED,
data: {
flowRun,
@@ -33,7 +33,7 @@ export const flowRunSideEffects = (log: FastifyBaseLogger) => ({
},
async onStart(flowRun: FlowRun): Promise {
- applicationEvents.sendWorkerEvent(flowRun.projectId, {
+ applicationEvents(log).sendWorkerEvent(flowRun.projectId, {
action: ApplicationEventName.FLOW_RUN_STARTED,
data: {
flowRun,
diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts
index 2678d0df1c7..d46ecc31fe3 100644
--- a/packages/server/api/src/app/flows/flow.module.ts
+++ b/packages/server/api/src/app/flows/flow.module.ts
@@ -22,6 +22,16 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => {
socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun)
}
})
+ websocketService.addListener(PrincipalType.USER, WebsocketServerEvent.MANUAL_TRIGGER_RUN_STARTED, (socket) => {
+ return async (data: TestFlowRunRequestBody, principal, projectId) => {
+ const flowRun = await flowRunService(app.log).startManualTrigger({
+ projectId,
+ flowVersionId: data.flowVersionId,
+ triggeredBy: principal.id,
+ })
+ socket.emit(WebsocketClientEvent.MANUAL_TRIGGER_RUN_STARTED, flowRun)
+ }
+ })
websocketService.addListener(PrincipalType.WORKER, WebsocketServerEvent.EMIT_TEST_STEP_PROGRESS, (socket) => {
return async (data: EmitTestStepProgressRequest, _principal, _projectId, callback?: (data?: unknown) => void): Promise => {
socket.to(data.projectId).emit(WebsocketClientEvent.TEST_STEP_PROGRESS, data)
diff --git a/packages/server/api/src/app/flows/flow/flow.controller.ts b/packages/server/api/src/app/flows/flow/flow.controller.ts
index 52b9c5df700..3c3139b3814 100644
--- a/packages/server/api/src/app/flows/flow/flow.controller.ts
+++ b/packages/server/api/src/app/flows/flow/flow.controller.ts
@@ -51,7 +51,7 @@ export const flowController: FastifyPluginAsyncTypebox = async (app) => {
templateId: request.body.templateId,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.FLOW_CREATED,
data: {
flow: newFlow,
@@ -120,7 +120,7 @@ export const flowController: FastifyPluginAsyncTypebox = async (app) => {
projectId: request.projectId,
operation: cleanOperation(request.body),
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.FLOW_UPDATED,
data: {
request: request.body,
@@ -187,7 +187,7 @@ export const flowController: FastifyPluginAsyncTypebox = async (app) => {
id: request.params.id,
projectId: request.projectId,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.FLOW_DELETED,
data: {
flow,
diff --git a/packages/server/api/src/app/flows/folder/folder.module.ts b/packages/server/api/src/app/flows/folder/folder.module.ts
index e3b45854ad6..1c6eb416169 100644
--- a/packages/server/api/src/app/flows/folder/folder.module.ts
+++ b/packages/server/api/src/app/flows/folder/folder.module.ts
@@ -30,7 +30,7 @@ const folderController: FastifyPluginAsyncTypebox = async (fastify) => {
projectId: request.projectId,
request: request.body,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.FOLDER_CREATED,
data: {
folder: createdFolder,
@@ -50,7 +50,7 @@ const folderController: FastifyPluginAsyncTypebox = async (fastify) => {
request: request.body,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.FOLDER_UPDATED,
data: {
folder: updatedFlow,
@@ -94,7 +94,7 @@ const folderController: FastifyPluginAsyncTypebox = async (fastify) => {
projectId: request.projectId,
folderId: request.params.id,
})
- applicationEvents.sendUserEvent(request, {
+ applicationEvents(request.log).sendUserEvent(request, {
action: ApplicationEventName.FOLDER_DELETED,
data: {
folder,
diff --git a/packages/server/api/src/app/helper/application-events.ts b/packages/server/api/src/app/helper/application-events.ts
index 6f4cf6251bb..4c1d72a5055 100644
--- a/packages/server/api/src/app/helper/application-events.ts
+++ b/packages/server/api/src/app/helper/application-events.ts
@@ -1,35 +1,18 @@
-import { AuthenticationEvent, ConnectionEvent, FlowCreatedEvent, FlowDeletedEvent, FlowRunEvent, FlowUpdatedEvent, FolderEvent, ProjectReleaseEvent, ProjectRoleEvent, SigningKeyEvent, SignUpEvent } from '@activepieces/ee-shared'
-import { AppSystemProp, networkUtils } from '@activepieces/server-shared'
-import { PrincipalType } from '@activepieces/shared'
+import { ApplicationEvent } from '@activepieces/ee-shared'
+import { AppSystemProp, networkUtils, rejectedPromiseHandler } from '@activepieces/server-shared'
+import { apId, isNil, PrincipalType } from '@activepieces/shared'
import { Static, Type } from '@sinclair/typebox'
+import { Clean } from '@sinclair/typebox/value'
import { FastifyBaseLogger, FastifyRequest } from 'fastify'
import { authenticationUtils } from '../authentication/authentication-utils'
+import { userIdentityService } from '../authentication/user-identity/user-identity-service'
+import { projectService } from '../project/project-service'
+import { userService } from '../user/user-service'
import { system } from './system/system'
-export const AuditEventParam = Type.Pick(Type.Union([
- ConnectionEvent,
- FlowCreatedEvent,
- FlowDeletedEvent,
- FlowUpdatedEvent,
- AuthenticationEvent,
- FolderEvent,
- SignUpEvent,
- SigningKeyEvent,
- FlowRunEvent,
- ProjectRoleEvent,
- ProjectReleaseEvent,
-]), ['data', 'action'])
-export type AuditEventParam = Static
-export type MetaInformation = {
- platformId: string
- userId?: string
- projectId?: string
- ip?: string
-}
-
-type UserEventListener = (requestInformation: MetaInformation, params: AuditEventParam) => void
-type WorkerEventListener = (projectId: string, params: AuditEventParam) => void
+type UserEventListener = (params: ApplicationEvent) => void
+type WorkerEventListener = (projectId: string, params: ApplicationEvent) => void
type ListenerRegistration = {
userEventListeners: UserEventListener[]
@@ -41,7 +24,17 @@ const listeners: ListenerRegistration = {
workerEventListeners: [],
}
-export const applicationEvents = {
+const RawAuditEventParam = Type.Pick(ApplicationEvent, ['data', 'action'])
+type RawAuditEventParam = Static
+
+type MetaInformation = {
+ platformId: string
+ userId?: string
+ projectId?: string
+ ip?: string
+}
+
+export const applicationEvents = (log: FastifyBaseLogger) => ({
registerListeners(log: FastifyBaseLogger, registration: {
userEvent: (log: FastifyBaseLogger) => UserEventListener
workerEvent: (log: FastifyBaseLogger) => WorkerEventListener
@@ -49,38 +42,82 @@ export const applicationEvents = {
listeners.userEventListeners.push(registration.userEvent(log))
listeners.workerEventListeners.push(registration.workerEvent(log))
},
- sendUserEvent(requestOrMeta: FastifyRequest | MetaInformation, params: AuditEventParam): void {
- const isRequest = 'principal' in requestOrMeta
- if (isRequest) {
- const request = requestOrMeta as FastifyRequest
- const principal = request.principal
- if (!principal || principal.type === PrincipalType.UNKNOWN || principal.type === PrincipalType.WORKER) {
- return
- }
- authenticationUtils.extractUserIdFromRequest(request).then((userId) => {
- const meta: MetaInformation = {
- platformId: principal.platform.id,
- projectId: principal.projectId,
- userId,
- ip: networkUtils.extractClientRealIp(request, system.get(AppSystemProp.CLIENT_REAL_IP_HEADER)),
- }
+ sendUserEvent(requestOrMeta: FastifyRequest | MetaInformation, params: RawAuditEventParam): void {
+ rejectedPromiseHandler(enrichAuditEventParam(requestOrMeta, params, log).then((event) => {
+ if (!isNil(event)) {
for (const listener of listeners.userEventListeners) {
- listener(meta, params)
+ listener(event)
}
- }).catch((error) => {
- request.log.error({ error }, '[ApplicationEvents#sendUserEvent] Failed to extract user ID from principal')
- })
- }
- else {
- const meta = requestOrMeta as MetaInformation
- for (const listener of listeners.userEventListeners) {
- listener(meta, params)
}
- }
+ }), log)
},
- sendWorkerEvent(projectId: string, params: AuditEventParam): void {
- for (const listener of listeners.workerEventListeners) {
- listener(projectId, params)
- }
+ sendWorkerEvent(projectId: string, params: RawAuditEventParam): void {
+ projectService.getPlatformId(projectId).then((platformId) => {
+ for (const listener of listeners.workerEventListeners) {
+ listener(projectId, {
+ ...params,
+ projectId,
+ platformId,
+ id: apId(),
+ created: new Date().toISOString(),
+ updated: new Date().toISOString(),
+ })
+ }
+ }).catch((error) => {
+ log.error(error)
+ })
},
+})
+
+
+async function enrichAuditEventParam(requestOrMeta: FastifyRequest | MetaInformation, params: RawAuditEventParam, log: FastifyBaseLogger): Promise {
+ const meta = await extractMetaInformation(requestOrMeta)
+ if (isNil(meta)) {
+ return undefined
+ }
+ const user = meta.userId ? await userService.getOneOrFail({ id: meta.userId }) : undefined
+ const identity = !isNil(user?.identityId) ? await userIdentityService(log).getOneOrFail({ id: user.identityId }) : undefined
+ const project = meta.projectId ? await projectService.getOne(meta.projectId) : undefined
+ const eventToSave: unknown = {
+ id: apId(),
+ created: new Date().toISOString(),
+ updated: new Date().toISOString(),
+ userId: meta.userId,
+ userEmail: identity?.email,
+ projectId: meta.projectId,
+ projectDisplayName: project?.displayName,
+ platformId: meta.platformId,
+ ip: meta.ip,
+ data: {
+ ...params.data,
+ project,
+ user,
+ },
+ action: params.action,
+ }
+
+ // The event may contain Date objects, so we serialize it to convert dates back to strings as per the schema.
+ const clonedAndSerializedDates = JSON.parse(JSON.stringify(eventToSave))
+ const cleanedEvent = Clean(ApplicationEvent, clonedAndSerializedDates) as ApplicationEvent
+ return cleanedEvent
}
+
+async function extractMetaInformation(requestOrMeta: FastifyRequest | MetaInformation): Promise {
+ const isRequest = 'principal' in requestOrMeta
+ if (isRequest) {
+ const request = requestOrMeta as FastifyRequest
+ const principal = request.principal
+ if (!principal || principal.type === PrincipalType.UNKNOWN || principal.type === PrincipalType.WORKER) {
+ return undefined
+ }
+ const extractedUserId = await authenticationUtils.extractUserIdFromRequest(request)
+ const meta: MetaInformation = {
+ platformId: principal.platform.id,
+ projectId: principal.projectId,
+ userId: extractedUserId,
+ ip: networkUtils.extractClientRealIp(request, system.get(AppSystemProp.CLIENT_REAL_IP_HEADER)),
+ }
+ return meta
+ }
+ return requestOrMeta as MetaInformation
+}
\ No newline at end of file
diff --git a/packages/server/api/src/app/helper/system-validator.ts b/packages/server/api/src/app/helper/system-validator.ts
index a40ec8cc190..bff1b4d1a2d 100644
--- a/packages/server/api/src/app/helper/system-validator.ts
+++ b/packages/server/api/src/app/helper/system-validator.ts
@@ -53,6 +53,7 @@ const systemPropValidators: {
[AppSystemProp.TRIGGER_TIMEOUT_SECONDS]: numberValidator,
[AppSystemProp.TRIGGER_HOOKS_TIMEOUT_SECONDS]: numberValidator,
[AppSystemProp.FLOW_TIMEOUT_SECONDS]: numberValidator,
+ [AppSystemProp.EVENT_DESTINATION_TIMEOUT_SECONDS]: numberValidator,
[AppSystemProp.PAUSED_FLOW_TIMEOUT_DAYS]: numberValidator,
[AppSystemProp.APP_WEBHOOK_SECRETS]: stringValidator,
[AppSystemProp.MAX_FILE_SIZE_MB]: numberValidator,
diff --git a/packages/server/api/src/app/helper/system/system.ts b/packages/server/api/src/app/helper/system/system.ts
index ce77b96eeda..7b68211a72f 100644
--- a/packages/server/api/src/app/helper/system/system.ts
+++ b/packages/server/api/src/app/helper/system/system.ts
@@ -46,6 +46,7 @@ const systemPropDefaultValues: Partial> = {
[AppSystemProp.TRIGGER_TIMEOUT_SECONDS]: '60',
[AppSystemProp.RUNS_METADATA_UPDATE_CONCURRENCY]: '10',
[AppSystemProp.TRIGGER_HOOKS_TIMEOUT_SECONDS]: '180',
+ [AppSystemProp.EVENT_DESTINATION_TIMEOUT_SECONDS]: '10',
[AppSystemProp.REDIS_FAILED_JOB_RETENTION_DAYS]: '30',
[AppSystemProp.REDIS_FAILED_JOB_RETENTION_MAX_COUNT]: '100000',
[AppSystemProp.TELEMETRY_ENABLED]: 'true',
diff --git a/packages/server/api/src/app/trigger/trigger-source/flow-trigger-side-effect.ts b/packages/server/api/src/app/trigger/trigger-source/flow-trigger-side-effect.ts
index e43e9b31c69..e6e1f3bf6ad 100644
--- a/packages/server/api/src/app/trigger/trigger-source/flow-trigger-side-effect.ts
+++ b/packages/server/api/src/app/trigger/trigger-source/flow-trigger-side-effect.ts
@@ -80,6 +80,11 @@ export const flowTriggerSideEffect = (log: FastifyBaseLogger) => {
...params,
})
}
+ case TriggerStrategy.MANUAL: {
+ return {
+ scheduleOptions: undefined,
+ }
+ }
}
},
async disable(params: DisableFlowTriggerParams): Promise {
@@ -121,6 +126,8 @@ export const flowTriggerSideEffect = (log: FastifyBaseLogger) => {
flowVersionId,
})
break
+ case TriggerStrategy.MANUAL:
+ break
}
},
@@ -176,7 +183,6 @@ async function handleWebhookTrigger({ flowId, flowVersionId, projectId, pieceTri
async function handlePollingTrigger({ engineHelperResponse, flowId, flowVersionId, projectId, log }: ActiveTriggerParams): Promise {
const pollingFrequencyCronExpression = `*/${system.getNumber(AppSystemProp.TRIGGER_DEFAULT_POLL_INTERVAL) ?? 5} * * * *`
-
if (isNil(engineHelperResponse.result.scheduleOptions)) {
engineHelperResponse.result.scheduleOptions = {
cronExpression: pollingFrequencyCronExpression,
diff --git a/packages/server/api/src/app/user/badges/badge-check.ts b/packages/server/api/src/app/user/badges/badge-check.ts
index b100cbe2cb2..486b42b64c5 100644
--- a/packages/server/api/src/app/user/badges/badge-check.ts
+++ b/packages/server/api/src/app/user/badges/badge-check.ts
@@ -1,5 +1,5 @@
+import { ApplicationEvent } from '@activepieces/ee-shared'
import { BADGES } from '@activepieces/shared'
-import { AuditEventParam } from '../../helper/application-events'
export type BadgeCheckResult = {
userId: string | null
@@ -7,8 +7,5 @@ export type BadgeCheckResult = {
}
export type BadgeCheck = {
- eval: (params: {
- userId?: string
- event: AuditEventParam
- }) => Promise
+ eval: (applicationEvent: ApplicationEvent) => Promise
}
diff --git a/packages/server/api/src/app/user/badges/badge-controller.ts b/packages/server/api/src/app/user/badges/badge-controller.ts
deleted file mode 100644
index a1aa8fe6e51..00000000000
--- a/packages/server/api/src/app/user/badges/badge-controller.ts
+++ /dev/null
@@ -1,6 +0,0 @@
-import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox'
-
-export const userBadgeController: FastifyPluginAsyncTypebox = async (_app) => {
- // TODO: Implement badge controller endpoints
-}
-
diff --git a/packages/server/api/src/app/user/badges/badge-module.ts b/packages/server/api/src/app/user/badges/badge-module.ts
index bff62dd44ef..8a540d8875e 100644
--- a/packages/server/api/src/app/user/badges/badge-module.ts
+++ b/packages/server/api/src/app/user/badges/badge-module.ts
@@ -1,8 +1,6 @@
import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox'
-import { userBadgeController } from './badge-controller'
import { userBadgeService } from './badge-service'
export const userBadgeModule: FastifyPluginAsyncTypebox = async (app) => {
userBadgeService(app.log).setup()
- await app.register(userBadgeController, { prefix: '/v1/user-badges' })
}
diff --git a/packages/server/api/src/app/user/badges/badge-service.ts b/packages/server/api/src/app/user/badges/badge-service.ts
index 69541ec336c..477391d8582 100644
--- a/packages/server/api/src/app/user/badges/badge-service.ts
+++ b/packages/server/api/src/app/user/badges/badge-service.ts
@@ -1,10 +1,11 @@
+import { ApplicationEvent } from '@activepieces/ee-shared'
import { apId, BADGES, isNil, WebsocketClientEvent } from '@activepieces/shared'
import { FastifyBaseLogger } from 'fastify'
import { In } from 'typeorm'
import { repoFactory } from '../../core/db/repo-factory'
import { websocketService } from '../../core/websockets.service'
import { emailService } from '../../ee/helper/email/email-service'
-import { applicationEvents, AuditEventParam } from '../../helper/application-events'
+import { applicationEvents } from '../../helper/application-events'
import { BadgeCheck } from './badge-check'
import { UserBadgeEntity } from './badge-entity'
import { flowsBadgesCheck } from './checks/active-flows-badges'
@@ -24,19 +25,19 @@ const workerEventsChecks: BadgeCheck[] = [
async function processBadgeChecks(
checks: BadgeCheck[],
- userId: string | undefined,
- event: AuditEventParam,
+ event: ApplicationEvent,
log: FastifyBaseLogger,
): Promise {
- const checkResults = await Promise.all(checks.map(badgeCheck => badgeCheck.eval({ userId, event })))
+ const userId = event.userId
+ const checkResults = await Promise.all(checks.map(badgeCheck => badgeCheck.eval(event)))
const badgesByUser = new Map()
for (const result of checkResults) {
- if (isNil(result.userId) || result.badges.length === 0) {
+ if (isNil(userId) || result.badges.length === 0) {
continue
}
- const existing = badgesByUser.get(result.userId) ?? []
- badgesByUser.set(result.userId, [...existing, ...result.badges])
+ const existing = badgesByUser.get(userId) ?? []
+ badgesByUser.set(userId, [...existing, ...result.badges])
}
for (const [userId, badgesToAward] of badgesByUser) {
@@ -74,12 +75,12 @@ async function processBadgeChecks(
export const userBadgeService = (log: FastifyBaseLogger) => ({
setup(): void {
- applicationEvents.registerListeners(log, {
- userEvent: () => async (requestInformation, event) => {
- await processBadgeChecks(userEventsChecks, requestInformation.userId, event, log)
+ applicationEvents(log).registerListeners(log, {
+ userEvent: () => async (event) => {
+ await processBadgeChecks(userEventsChecks, event, log)
},
workerEvent: () => async (_projectId, event) => {
- await processBadgeChecks(workerEventsChecks, undefined, event, log)
+ await processBadgeChecks(workerEventsChecks, event, log)
},
})
},
diff --git a/packages/server/api/src/app/user/badges/checks/active-flows-badges.ts b/packages/server/api/src/app/user/badges/checks/active-flows-badges.ts
index 2cce4483216..605dd0ac0f2 100644
--- a/packages/server/api/src/app/user/badges/checks/active-flows-badges.ts
+++ b/packages/server/api/src/app/user/badges/checks/active-flows-badges.ts
@@ -1,10 +1,11 @@
-import { ApplicationEventName, FlowUpdatedEvent } from '@activepieces/ee-shared'
+import { ApplicationEvent, ApplicationEventName, FlowUpdatedEvent } from '@activepieces/ee-shared'
import { BADGES, FlowOperationType, FlowStatus, isNil } from '@activepieces/shared'
import { flowRepo } from '../../../flows/flow/flow.repo'
import { BadgeCheck, BadgeCheckResult } from '../badge-check'
export const flowsBadgesCheck: BadgeCheck = {
- eval: async ({ event, userId }): Promise => {
+ eval: async (event: ApplicationEvent): Promise => {
+ const userId = event.userId
if (event.action !== ApplicationEventName.FLOW_UPDATED) {
return { userId: null, badges: [] }
}
diff --git a/packages/server/api/src/app/user/badges/checks/flow-content.ts b/packages/server/api/src/app/user/badges/checks/flow-content.ts
index 678aa78e6d6..d7e80615924 100644
--- a/packages/server/api/src/app/user/badges/checks/flow-content.ts
+++ b/packages/server/api/src/app/user/badges/checks/flow-content.ts
@@ -1,4 +1,4 @@
-import { ApplicationEventName, FlowUpdatedEvent } from '@activepieces/ee-shared'
+import { ApplicationEvent, ApplicationEventName, FlowUpdatedEvent } from '@activepieces/ee-shared'
import { BADGES, FlowActionType, FlowOperationType, flowStructureUtil, FlowTriggerType, isNil } from '@activepieces/shared'
import { flowVersionRepo } from '../../../flows/flow-version/flow-version.service'
import { BadgeCheck, BadgeCheckResult } from '../badge-check'
@@ -7,7 +7,8 @@ const WEBHOOK_PIECE_NAME = '@activepieces/piece-webhook'
const AI_PIECE_NAME = '@activepieces/piece-ai'
export const flowContentBadgesCheck: BadgeCheck = {
- eval: async ({ event, userId }): Promise => {
+ eval: async (event: ApplicationEvent): Promise => {
+ const userId = event.userId
if (isNil(userId)) {
return { userId: null, badges: [] }
}
diff --git a/packages/server/api/src/app/user/badges/checks/flow-runs-badges.ts b/packages/server/api/src/app/user/badges/checks/flow-runs-badges.ts
index 5c7f2b7910a..293f75c4696 100644
--- a/packages/server/api/src/app/user/badges/checks/flow-runs-badges.ts
+++ b/packages/server/api/src/app/user/badges/checks/flow-runs-badges.ts
@@ -1,9 +1,9 @@
-import { ApplicationEventName, FlowRunEvent } from '@activepieces/ee-shared'
+import { ApplicationEvent, ApplicationEventName, FlowRunEvent } from '@activepieces/ee-shared'
import { BADGES, FlowRunStatus, isFailedState, isNil, RunEnvironment } from '@activepieces/shared'
import { BadgeCheck, BadgeCheckResult } from '../badge-check'
export const flowRunsBadgesCheck: BadgeCheck = {
- eval: async ({ event }): Promise => {
+ eval: async (event: ApplicationEvent): Promise => {
const badges: (keyof typeof BADGES)[] = []
if (event.action !== ApplicationEventName.FLOW_RUN_FINISHED) {
diff --git a/packages/server/api/src/app/workers/machine/machine-service.ts b/packages/server/api/src/app/workers/machine/machine-service.ts
index d3106a1100b..99e81d41deb 100644
--- a/packages/server/api/src/app/workers/machine/machine-service.ts
+++ b/packages/server/api/src/app/workers/machine/machine-service.ts
@@ -80,6 +80,7 @@ export const machineService = (log: FastifyBaseLogger) => {
REDIS_SENTINEL_ROLE: system.get(AppSystemProp.REDIS_SENTINEL_ROLE),
REDIS_SENTINEL_HOSTS: system.get(AppSystemProp.REDIS_SENTINEL_HOSTS),
REDIS_SENTINEL_NAME: system.get(AppSystemProp.REDIS_SENTINEL_NAME),
+ EVENT_DESTINATION_TIMEOUT_SECONDS: system.getNumberOrThrow(AppSystemProp.EVENT_DESTINATION_TIMEOUT_SECONDS),
REDIS_FAILED_JOB_RETENTION_DAYS: system.getNumberOrThrow(AppSystemProp.REDIS_FAILED_JOB_RETENTION_DAYS),
REDIS_FAILED_JOB_RETENTION_MAX_COUNT: system.getNumberOrThrow(AppSystemProp.REDIS_FAILED_JOB_RETENTION_MAX_COUNT),
EDITION: system.getOrThrow(AppSystemProp.EDITION),
diff --git a/packages/server/api/src/app/workers/queue/job-queue.ts b/packages/server/api/src/app/workers/queue/job-queue.ts
index cc572e5d0d8..d1d0247a4ee 100644
--- a/packages/server/api/src/app/workers/queue/job-queue.ts
+++ b/packages/server/api/src/app/workers/queue/job-queue.ts
@@ -1,5 +1,5 @@
import { apDayjsDuration, AppSystemProp, getPlatformQueueName, memoryLock, QueueName } from '@activepieces/server-shared'
-import { ApId, getDefaultJobPriority, isNil, JOB_PRIORITY } from '@activepieces/shared'
+import { ApId, getDefaultJobPriority, isNil, JOB_PRIORITY, WorkerJobType } from '@activepieces/shared'
import { Queue } from 'bullmq'
import { BullMQOtel } from 'bullmq-otel'
import { FastifyBaseLogger } from 'fastify'
@@ -82,6 +82,7 @@ export const jobQueue = (log: FastifyBaseLogger) => ({
priority: JOB_PRIORITY[getDefaultJobPriority(data)],
delay: !isNil(dependOnJobId) ? apDayjsDuration(1, 'year').asMilliseconds() : params.delay,
jobId: params.id,
+ removeOnFail: data.jobType === WorkerJobType.EVENT_DESTINATION,
})
break
}
diff --git a/packages/server/api/src/app/workers/queue/queue-manager.ts b/packages/server/api/src/app/workers/queue/queue-manager.ts
index 5f047c5c540..577857b6e9a 100644
--- a/packages/server/api/src/app/workers/queue/queue-manager.ts
+++ b/packages/server/api/src/app/workers/queue/queue-manager.ts
@@ -1,4 +1,4 @@
-import { ApId, ExecuteFlowJobData, JobData, PollingJobData, RenewWebhookJobData, ScheduleOptions, UserInteractionJobData, WebhookJobData } from '@activepieces/shared'
+import { ApId, EventDestinationJobData, ExecuteFlowJobData, JobData, PollingJobData, RenewWebhookJobData, ScheduleOptions, UserInteractionJobData, WebhookJobData } from '@activepieces/shared'
export enum JobType {
@@ -16,7 +16,7 @@ type BaseAddParams, JT extends JobType>
type RepeatingJobAddParams = BaseAddParams & {
scheduleOptions: ScheduleOptions
}
-type OneTimeJobAddParams = BaseAddParams
+type OneTimeJobAddParams = BaseAddParams
export type AddJobParams = type extends JobType.REPEATING ? RepeatingJobAddParams : OneTimeJobAddParams
diff --git a/packages/server/api/test/helpers/mocks/index.ts b/packages/server/api/test/helpers/mocks/index.ts
index c23c479abcc..4c3daf907e5 100644
--- a/packages/server/api/test/helpers/mocks/index.ts
+++ b/packages/server/api/test/helpers/mocks/index.ts
@@ -227,10 +227,9 @@ export const createMockPlatformPlan = (platformPlan?: Partial): Pl
includedAiCredits: platformPlan?.includedAiCredits ?? 0,
licenseKey: platformPlan?.licenseKey ?? faker.lorem.word(),
stripeCustomerId: undefined,
- mcpsEnabled: platformPlan?.mcpsEnabled ?? false,
stripeSubscriptionId: undefined,
ssoEnabled: platformPlan?.ssoEnabled ?? false,
- agentsEnabled: platformPlan?.agentsEnabled ?? false,
+ eventStreamingEnabled: platformPlan?.eventStreamingEnabled ?? false,
aiCreditsAutoTopUpState: AiCreditsAutoTopUpState.DISABLED,
environmentsEnabled: platformPlan?.environmentsEnabled ?? false,
analyticsEnabled: platformPlan?.analyticsEnabled ?? false,
@@ -247,8 +246,6 @@ export const createMockPlatformPlan = (platformPlan?: Partial): Pl
teamProjectsLimit: platformPlan?.teamProjectsLimit ?? TeamProjectsLimit.NONE,
projectRolesEnabled: platformPlan?.projectRolesEnabled ?? false,
customDomainsEnabled: platformPlan?.customDomainsEnabled ?? false,
- tablesEnabled: platformPlan?.tablesEnabled ?? false,
- todosEnabled: platformPlan?.todosEnabled ?? false,
stripeSubscriptionEndDate: apDayjs().endOf('month').unix(),
stripeSubscriptionStartDate: apDayjs().startOf('month').unix(),
plan: platformPlan?.plan,
diff --git a/packages/server/shared/package.json b/packages/server/shared/package.json
index 40d9ea13152..351522a96d8 100644
--- a/packages/server/shared/package.json
+++ b/packages/server/shared/package.json
@@ -5,7 +5,7 @@
"main": "./src/index.js",
"typings": "./src/index.d.ts",
"dependencies": {
- "@activepieces/pieces-framework": "0.23.0",
+ "@activepieces/pieces-framework": "0.24.0",
"@activepieces/shared": "0.32.0",
"tslib": "2.6.2",
"pino": "10.1.0",
diff --git a/packages/server/shared/src/lib/system-props.ts b/packages/server/shared/src/lib/system-props.ts
index a0696d5a202..a254bac85ef 100644
--- a/packages/server/shared/src/lib/system-props.ts
+++ b/packages/server/shared/src/lib/system-props.ts
@@ -114,6 +114,7 @@ export enum AppSystemProp {
TRIGGER_TIMEOUT_SECONDS = 'TRIGGER_TIMEOUT_SECONDS',
WEBHOOK_TIMEOUT_SECONDS = 'WEBHOOK_TIMEOUT_SECONDS',
OPENROUTER_PROVISION_KEY = 'OPENROUTER_PROVISION_KEY',
+ EVENT_DESTINATION_TIMEOUT_SECONDS = 'EVENT_DESTINATION_TIMEOUT_SECONDS',
}
export enum ContainerType {
diff --git a/packages/server/worker/package.json b/packages/server/worker/package.json
index 1eb5ce05a76..80e444f94b6 100644
--- a/packages/server/worker/package.json
+++ b/packages/server/worker/package.json
@@ -5,7 +5,7 @@
"main": "./src/index.js",
"typings": "./src/index.d.ts",
"dependencies": {
- "@activepieces/pieces-framework": "0.23.0",
+ "@activepieces/pieces-framework": "0.24.0",
"@activepieces/server-shared": "0.0.2",
"@activepieces/shared": "0.32.0",
"write-file-atomic": "5.0.1",
diff --git a/packages/server/worker/src/lib/consume/executors/event-destination-job-executor.ts b/packages/server/worker/src/lib/consume/executors/event-destination-job-executor.ts
new file mode 100644
index 00000000000..a60ebbc1773
--- /dev/null
+++ b/packages/server/worker/src/lib/consume/executors/event-destination-job-executor.ts
@@ -0,0 +1,25 @@
+import { EventDestinationJobData } from '@activepieces/shared'
+import axios from 'axios'
+import { FastifyBaseLogger } from 'fastify'
+
+export const eventDestinationExecutor = (log: FastifyBaseLogger) => ({
+ async execute(jobId: string, jobData: EventDestinationJobData, timeoutInSeconds: number): Promise {
+ const { webhookUrl, payload } = jobData
+
+ log.info({
+ jobId,
+ webhookUrl,
+ }, 'Consuming event destination job')
+
+ const response = await axios.post(webhookUrl, payload, {
+ timeout: timeoutInSeconds * 1000,
+ })
+
+ log.info({
+ jobId,
+ response: {
+ status: response.status,
+ },
+ }, 'Event destination job consumed')
+ },
+})
\ No newline at end of file
diff --git a/packages/server/worker/src/lib/consume/job-consmer.ts b/packages/server/worker/src/lib/consume/job-consmer.ts
index f3bcbafa8f5..f5e02ee6174 100644
--- a/packages/server/worker/src/lib/consume/job-consmer.ts
+++ b/packages/server/worker/src/lib/consume/job-consmer.ts
@@ -5,6 +5,7 @@ import dayjs from 'dayjs'
import { FastifyBaseLogger } from 'fastify'
import { workerMachine } from '../utils/machine'
import { tokenUtls } from '../utils/token-utils'
+import { eventDestinationExecutor } from './executors/event-destination-job-executor'
import { executeTriggerExecutor } from './executors/execute-trigger-executor'
import { flowJobExecutor } from './executors/flow-job-executor'
import { renewWebhookExecutor } from './executors/renew-webhook-executor'
@@ -72,6 +73,13 @@ export const jobConsmer = (log: FastifyBaseLogger) => ({
span.setAttribute('worker.webhookExecution', true)
return await webhookExecutor(log).consumeWebhook(jobId, jobData, engineToken, timeoutInSeconds)
}
+ case WorkerJobType.EVENT_DESTINATION: {
+ await eventDestinationExecutor(log).execute(jobId, jobData, timeoutInSeconds)
+ span.setAttribute('worker.completed', true)
+ return {
+ status: ConsumeJobResponseStatus.OK,
+ }
+ }
}
}
finally {
@@ -95,5 +103,7 @@ const getTimeoutForWorkerJob = (jobType: WorkerJobType): number => {
return dayjs.duration(workerMachine.getSettings().TRIGGER_TIMEOUT_SECONDS, 'seconds').asSeconds()
case WorkerJobType.EXECUTE_FLOW:
return dayjs.duration(workerMachine.getSettings().FLOW_TIMEOUT_SECONDS, 'seconds').asSeconds()
+ case WorkerJobType.EVENT_DESTINATION:
+ return dayjs.duration(workerMachine.getSettings().EVENT_DESTINATION_TIMEOUT_SECONDS, 'seconds').asSeconds()
}
}
\ No newline at end of file
diff --git a/packages/shared/src/lib/license-keys/index.ts b/packages/shared/src/lib/license-keys/index.ts
index 9d1494cc41a..76229dc397b 100644
--- a/packages/shared/src/lib/license-keys/index.ts
+++ b/packages/shared/src/lib/license-keys/index.ts
@@ -29,10 +29,7 @@ export const LicenseKeyEntity = Type.Object({
analyticsEnabled: Type.Boolean(),
globalConnectionsEnabled: Type.Boolean(),
customRolesEnabled: Type.Boolean(),
- agentsEnabled: Type.Boolean(),
- tablesEnabled: Type.Boolean(),
- todosEnabled: Type.Boolean(),
- mcpsEnabled: Type.Boolean(),
+ eventStreamingEnabled: Type.Boolean(),
})
diff --git a/packages/shared/src/lib/platform/platform.model.ts b/packages/shared/src/lib/platform/platform.model.ts
index d226411d189..ee6595332ef 100644
--- a/packages/shared/src/lib/platform/platform.model.ts
+++ b/packages/shared/src/lib/platform/platform.model.ts
@@ -54,7 +54,9 @@ export const PlatformPlan = Type.Object({
platformId: Type.String(),
includedAiCredits: Type.Number(),
lastFreeAiCreditsRenewalDate: Type.Optional(Type.String()),
-
+
+ tablesEnabled: Type.Boolean(),
+ eventStreamingEnabled: Type.Boolean(),
aiCreditsAutoTopUpState: Type.Enum(AiCreditsAutoTopUpState),
aiCreditsAutoTopUpThreshold: Type.Optional(Type.Number()),
aiCreditsAutoTopUpCreditsToAdd: Type.Optional(Type.Number()),
@@ -63,10 +65,6 @@ export const PlatformPlan = Type.Object({
environmentsEnabled: Type.Boolean(),
analyticsEnabled: Type.Boolean(),
showPoweredBy: Type.Boolean(),
- agentsEnabled: Type.Boolean(),
- mcpsEnabled: Type.Boolean(),
- tablesEnabled: Type.Boolean(),
- todosEnabled: Type.Boolean(),
auditLogEnabled: Type.Boolean(),
embeddingEnabled: Type.Boolean(),
managePiecesEnabled: Type.Boolean(),
diff --git a/packages/shared/src/lib/trigger/index.ts b/packages/shared/src/lib/trigger/index.ts
index 73ffc1415eb..248aa88c73a 100644
--- a/packages/shared/src/lib/trigger/index.ts
+++ b/packages/shared/src/lib/trigger/index.ts
@@ -5,6 +5,7 @@ export enum TriggerStrategy {
POLLING = 'POLLING',
WEBHOOK = 'WEBHOOK',
APP_WEBHOOK = 'APP_WEBHOOK',
+ MANUAL = 'MANUAL',
}
export enum WebhookHandshakeStrategy {
diff --git a/packages/shared/src/lib/websocket/index.ts b/packages/shared/src/lib/websocket/index.ts
index d8180fc7cbd..5697fcd5508 100644
--- a/packages/shared/src/lib/websocket/index.ts
+++ b/packages/shared/src/lib/websocket/index.ts
@@ -5,6 +5,7 @@ import { StepRunResponse } from '../flows/sample-data'
export enum WebsocketClientEvent {
TEST_FLOW_RUN_STARTED = 'TEST_FLOW_RUN_STARTED',
+ MANUAL_TRIGGER_RUN_STARTED = 'MANUAL_TRIGGER_RUN_STARTED',
TEST_STEP_FINISHED = 'TEST_STEP_FINISHED',
TEST_STEP_PROGRESS = 'TEST_STEP_PROGRESS',
REFRESH_PIECE = 'REFRESH_PIECE',
@@ -60,6 +61,7 @@ export enum WebsocketServerEvent {
WORKER_HEALTHCHECK = 'WORKER_HEALTHCHECK',
EMIT_TEST_STEP_PROGRESS = 'EMIT_TEST_STEP_PROGRESS',
EMIT_TEST_STEP_FINISHED = 'EMIT_TEST_STEP_FINISHED',
+ MANUAL_TRIGGER_RUN_STARTED = 'MANUAL_TRIGGER_RUN_STARTED',
}
export * from './socket-utils'
diff --git a/packages/shared/src/lib/workers/index.ts b/packages/shared/src/lib/workers/index.ts
index ea0b36a2091..6e583962811 100644
--- a/packages/shared/src/lib/workers/index.ts
+++ b/packages/shared/src/lib/workers/index.ts
@@ -112,6 +112,7 @@ export const WorkerSettingsResponse = Type.Object({
PROJECT_RATE_LIMITER_ENABLED: Type.Boolean(),
MAX_CONCURRENT_JOBS_PER_PROJECT: Type.Number(),
JWT_SECRET: Type.String(),
+ EVENT_DESTINATION_TIMEOUT_SECONDS: Type.Number(),
PLATFORM_ID_FOR_DEDICATED_WORKER: Type.Optional(Type.String()),
EDITION: Type.String(),
})
diff --git a/packages/shared/src/lib/workers/job-data.ts b/packages/shared/src/lib/workers/job-data.ts
index ce3344d047f..89d25374439 100644
--- a/packages/shared/src/lib/workers/job-data.ts
+++ b/packages/shared/src/lib/workers/job-data.ts
@@ -40,6 +40,7 @@ export function getDefaultJobPriority(job: JobData): keyof typeof JOB_PRIORITY {
case WorkerJobType.RENEW_WEBHOOK:
return 'veryLow'
case WorkerJobType.EXECUTE_WEBHOOK:
+ case WorkerJobType.EVENT_DESTINATION:
return 'medium'
case WorkerJobType.EXECUTE_FLOW:
return getExecuteFlowPriority(job.environment, job.synchronousHandlerId)
@@ -61,6 +62,7 @@ export enum WorkerJobType {
EXECUTE_TRIGGER_HOOK = 'EXECUTE_TRIGGER_HOOK',
EXECUTE_PROPERTY = 'EXECUTE_PROPERTY',
EXECUTE_EXTRACT_PIECE_INFORMATION = 'EXECUTE_EXTRACT_PIECE_INFORMATION',
+ EVENT_DESTINATION = 'EVENT_DESTINATION',
}
export const NON_SCHEDULED_JOB_TYPES: WorkerJobType[] = [
@@ -208,11 +210,24 @@ export const UserInteractionJobDataWithoutWatchingInformation = Type.Union([
])
export type UserInteractionJobDataWithoutWatchingInformation = Static
+export const EventDestinationJobData = Type.Object({
+ schemaVersion: Type.Number(),
+ platformId: Type.String(),
+ projectId: Type.Optional(Type.String()),
+ webhookId: Type.String(),
+ webhookUrl: Type.String(),
+ payload: Type.Unknown(),
+ jobType: Type.Literal(WorkerJobType.EVENT_DESTINATION),
+})
+
+export type EventDestinationJobData = Static
+
export const JobData = Type.Union([
PollingJobData,
RenewWebhookJobData,
ExecuteFlowJobData,
WebhookJobData,
UserInteractionJobData,
+ EventDestinationJobData,
])
export type JobData = Static
diff --git a/tsconfig.base.json b/tsconfig.base.json
index 2d1675fb584..5816a51683b 100644
--- a/tsconfig.base.json
+++ b/tsconfig.base.json
@@ -526,6 +526,9 @@
],
"@activepieces/piece-week-done": [
"packages/pieces/community/week-done/src/index.ts"
+ ],
+ "@activepieces/piece-manual-trigger": [
+ "packages/pieces/community/manual-trigger/src/index.ts"
]
},
"resolveJsonModule": true