@@ -42,6 +42,7 @@ import {
4242} from 'pgsql-ast-parser' ;
4343import { STREAM_FUNCTIONS } from './functions.js' ;
4444import { CompatibilityEdition } from '../compatibility.js' ;
45+ import { DetectRequestParameters } from '../validators.js' ;
4546
4647export function syncStreamFromSql (
4748 descriptorName : string ,
@@ -56,6 +57,7 @@ class SyncStreamCompiler {
5657 descriptorName : string ;
5758 sql : string ;
5859 options : StreamParseOptions ;
60+ parameterDetector : DetectRequestParameters = new DetectRequestParameters ( ) ;
5961
6062 errors : SqlRuleError [ ] ;
6163
@@ -108,6 +110,19 @@ class SyncStreamCompiler {
108110 }
109111
110112 this . errors . push ( ...tools . errors ) ;
113+ if ( this . parameterDetector . usesStreamParameters && stream . subscribedToByDefault ) {
114+ const error = new SqlRuleError (
115+ 'Clients subscribe to this stream by default, but it uses subscription parameters. Default subscriptions use ' +
116+ 'null for all parameters, which can lead to unintentional results. Try removing the parameter or not ' +
117+ 'marking the stream as auto-subscribe.' ,
118+ tools . sql ,
119+ undefined
120+ ) ;
121+ error . type = 'warning' ;
122+
123+ this . errors . push ( error ) ;
124+ }
125+
111126 return stream ;
112127 }
113128
@@ -259,7 +274,8 @@ class SyncStreamCompiler {
259274 }
260275
261276 const regularClause = tools . compileClause ( clause ) ;
262- return compiledClauseToFilter ( tools , clause ?. _location ?? null , regularClause ) ;
277+ this . parameterDetector . accept ( regularClause ) ;
278+ return this . compiledClauseToFilter ( tools , clause ?. _location ?? null , regularClause ) ;
263279 }
264280
265281 private compileInOperator ( tools : SqlTools , clause : ExprBinary ) : FilterOperator {
@@ -308,7 +324,7 @@ class SyncStreamCompiler {
308324 // left clause doesn't depend on row data however, we can push it down into the subquery where it would be
309325 // introduced as a parameter: `EXISTS (SELECT _ FROM users WHERE is_admin AND user_id = request.user_id())`.
310326 const additionalClause = subqueryTools . parameterMatchClause ( subquery . column , left ) ;
311- subquery . addFilter ( compiledClauseToFilter ( subqueryTools , null , additionalClause ) ) ;
327+ subquery . addFilter ( this . compiledClauseToFilter ( subqueryTools , null , additionalClause ) ) ;
312328 return new ExistsOperator ( location , subquery ) ;
313329 } else {
314330 // Case 1
@@ -322,7 +338,7 @@ class SyncStreamCompiler {
322338 // a ParameterMatchClause, which we can translate via CompareRowValueWithStreamParameter. Case 5 is either a row-value
323339 // or a parameter-value clause which we can wrap in EvaluateSimpleCondition.
324340 const combined = tools . compileInClause ( clause . left , left , clause . right , right ) ;
325- return compiledClauseToFilter ( tools , location , combined ) ;
341+ return this . compiledClauseToFilter ( tools , location , combined ) ;
326342 }
327343
328344 private compileOverlapOperator ( tools : SqlTools , clause : ExprBinary ) : FilterOperator {
@@ -356,7 +372,7 @@ class SyncStreamCompiler {
356372 // a ParameterMatchClause, which we can translate via CompareRowValueWithStreamParameter. Case 5 is either a row-value
357373 // or a parameter-value clause which we can wrap in EvaluateSimpleCondition.
358374 const combined = tools . compileOverlapClause ( clause . left , left , clause . right , right ) ;
359- return compiledClauseToFilter ( tools , location , combined ) ;
375+ return this . compiledClauseToFilter ( tools , location , combined ) ;
360376 }
361377
362378 private compileSubquery ( stmt : SelectStatement ) : [ Subquery , SqlTools ] | undefined {
@@ -399,7 +415,10 @@ class SyncStreamCompiler {
399415 const where = tools . compileClause ( query . where ) ;
400416
401417 this . errors . push ( ...tools . errors ) ;
402- return [ new Subquery ( sourceTable , column , compiledClauseToFilter ( tools , query . where ?. _location , where ) ) , tools ] ;
418+ return [
419+ new Subquery ( sourceTable , column , this . compiledClauseToFilter ( tools , query . where ?. _location , where ) ) ,
420+ tools
421+ ] ;
403422 }
404423
405424 private checkValidSelectStatement ( stmt : Statement ) {
@@ -446,6 +465,20 @@ class SyncStreamCompiler {
446465 sourceTable
447466 } ;
448467 }
468+
469+ compiledClauseToFilter ( tools : SqlTools , location : NodeLocation | nil , regularClause : CompiledClause ) {
470+ this . parameterDetector . accept ( regularClause ) ;
471+
472+ if ( isScalarExpression ( regularClause ) ) {
473+ return new EvaluateSimpleCondition ( location ?? null , regularClause ) ;
474+ } else if ( isParameterMatchClause ( regularClause ) ) {
475+ return new CompareRowValueWithStreamParameter ( location ?? null , regularClause ) ;
476+ } else if ( isClauseError ( regularClause ) ) {
477+ return recoverErrorClause ( tools ) ;
478+ } else {
479+ throw new Error ( 'Unknown clause type' ) ;
480+ }
481+ }
449482}
450483
451484function isScalarExpression ( clause : CompiledClause ) : clause is ScalarExpression {
@@ -456,15 +489,3 @@ function recoverErrorClause(tools: SqlTools): EvaluateSimpleCondition {
456489 // An error has already been logged.
457490 return new EvaluateSimpleCondition ( null , tools . compileClause ( null ) as StaticValueClause ) ;
458491}
459-
460- function compiledClauseToFilter ( tools : SqlTools , location : NodeLocation | nil , regularClause : CompiledClause ) {
461- if ( isScalarExpression ( regularClause ) ) {
462- return new EvaluateSimpleCondition ( location ?? null , regularClause ) ;
463- } else if ( isParameterMatchClause ( regularClause ) ) {
464- return new CompareRowValueWithStreamParameter ( location ?? null , regularClause ) ;
465- } else if ( isClauseError ( regularClause ) ) {
466- return recoverErrorClause ( tools ) ;
467- } else {
468- throw new Error ( 'Unknown clause type' ) ;
469- }
470- }
0 commit comments