Skip to content
Open
13 changes: 2 additions & 11 deletions cirq-google/cirq_google/engine/abstract_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,8 @@ async def run_sweep_async(
run_sweep = duet.sync(run_sweep_async)

@abc.abstractmethod
def get_sampler(self, run_name: str = "", device_config_name: str = "") -> cg.ProcessorSampler:
"""Returns a sampler backed by the processor.

Args:
run_name: A unique identifier representing an automation run for the
processor. An Automation Run contains a collection of device
configurations for the processor.
device_config_name: An identifier used to select the processor configuration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes what I would expect to be the most common use case: specify only a device config name, but not the version (run or snapshot).

@eliottrosenberg - do we really want to support a default config name ("config alias") at all? While defaults make sense to get the "latest" version, a default grid seem more risky and I think are worth forcing explicit selection.

utilized to run the job. A configuration identifies the set of
available qubits, couplers, and supported gates in the processor.
"""
def get_sampler(self) -> cg.ProcessorSampler:
"""Returns a sampler backed by the processor."""

@abc.abstractmethod
def engine(self) -> abstract_engine.AbstractEngine | None:
Expand Down
80 changes: 68 additions & 12 deletions cirq-google/cirq_google/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,14 @@ def get_processor(self, processor_id: str) -> engine_processor.EngineProcessor:
"""
return engine_processor.EngineProcessor(self.project_id, processor_id, self.context)

def get_sampler(
def get_sampler_from_run_name(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods in Engine are intended to be easy-to-remember convenience helpers and I'm concerned that over-specialization makes these less usable and more painful to maintain. e.g., if we add another feature with 2 ways to construct a sampler and follow this pattern, these 3 functions will turn into 6 new functions with very long names.

The style guide suggests that defaults are useful to avoid "lots of functions for the rare exceptions", and @eliottrosenberg made it sound like these specialized versions are exceptional.

Would it suffice to just check for mutual exclusivity of the arguments and raise an exception if that's violated? Alternatively, if you find it important to bake this check into the interface, you could alternatively do it with the type system, e.g.:

@dataclass
class Snapshot:
  id: str

@dataclass
class Run:
  id: str

type DeviceVersion = Snapshot | Run

class Engine
  ...
  def get_sampler(
     self,
     processor_id: str | list[str],
     device_config_name: str,
     device_version: DeviceVersion | None = None,
     max_concurrent_jobs: int = 100,
  ) -> cirq_google.ProcessorSampler:
  ...

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hoisinberg and I were discussing this exact pattern. Yes this is better I think.

self,
processor_id: str | list[str],
run_name: str = "",
device_config_name: str = "",
snapshot_id: str = "",
processor_id: str,
run_name: str,
device_config_name: str | None = None,
max_concurrent_jobs: int = 100,
) -> cirq_google.ProcessorSampler:
"""Returns a sampler backed by the engine.
"""Returns a sampler backed by the engine and given `run_name`.

Args:
processor_id: String identifier of which processor should be used to sample.
Expand All @@ -608,8 +607,67 @@ def get_sampler(
device_config_name: An identifier used to select the processor configuration
utilized to run the job. A configuration identifies the set of
available qubits, couplers, and supported gates in the processor.
snapshot_id: A unique identifier for an immutable snapshot reference. A
snapshot contains a collection of device configurations for the processor.
max_concurrent_jobs: The maximum number of jobs to be sent
simultaneously to the Engine. This client-side throttle can be
used to proactively reduce load to the backends and avoid quota
violations when pipelining circuit executions.

Returns:
A `cirq.Sampler` instance (specifically a `engine_sampler.ProcessorSampler`
that will send circuits to the Quantum Computing Service
when sampled.
"""
return self.get_processor(processor_id).get_sampler_from_run_name(
run_name=run_name,
device_config_name=device_config_name,
max_concurrent_jobs=max_concurrent_jobs,
)

def get_sampler_from_snapshot_id(
self,
processor_id: str,
snapshot_id: str,
device_config_name: str | None,
max_concurrent_jobs: int = 100,
) -> cirq_google.ProcessorSampler:
"""Returns a sampler backed by the engine.
Args:
processor_id: String identifier of which processor should be used to sample.
device_config_name: An identifier used to select the processor configuration
utilized to run the job. A configuration identifies the set of
available qubits, couplers, and supported gates in the processor.
snapshot_id: A unique identifier for an immutable snapshot reference.
A snapshot contains a collection of device configurations for the
processor.
max_concurrent_jobs: The maximum number of jobs to be sent
simultaneously to the Engine. This client-side throttle can be
used to proactively reduce load to the backends and avoid quota
violations when pipelining circuit executions.

Returns:
A `cirq.Sampler` instance (specifically a `engine_sampler.ProcessorSampler`
that will send circuits to the Quantum Computing Service
when sampled.
"""
return self.get_processor(processor_id).get_sampler_from_snapshot_id(
snapshot_id=snapshot_id,
device_config_name=device_config_name,
max_concurrent_jobs=max_concurrent_jobs,
)

def get_sampler(
self,
processor_id: str | list[str],
device_config_name: str | None = None,
max_concurrent_jobs: int = 100,
) -> cirq_google.ProcessorSampler:
"""Returns a sampler backed by the engine.

Args:
processor_id: String identifier of which processor should be used to sample.
device_config_name: An identifier used to select the processor configuration
utilized to run the job. A configuration identifies the set of
available qubits, couplers, and supported gates in the processor.
max_concurrent_jobs: The maximum number of jobs to be sent
concurrently to the Engine. This client-side throttle can be
used to proactively reduce load to the backends and avoid quota
Expand All @@ -629,11 +687,9 @@ def get_sampler(
'to get_sampler() no longer supported. Use Engine.run() instead if '
'you need to specify a list.'
)

return self.get_processor(processor_id).get_sampler(
run_name=run_name,
device_config_name=device_config_name,
snapshot_id=snapshot_id,
max_concurrent_jobs=max_concurrent_jobs,
device_config_name=device_config_name, max_concurrent_jobs=max_concurrent_jobs
)

async def get_processor_config_from_snapshot_async(
Expand Down
99 changes: 75 additions & 24 deletions cirq-google/cirq_google/engine/engine_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,48 @@ def engine(self) -> engine_base.Engine:

return engine_base.Engine(self.project_id, context=self.context)

def get_sampler(
self,
run_name: str = "",
device_config_name: str = "",
snapshot_id: str = "",
max_concurrent_jobs: int = 100,
def get_sampler_from_run_name(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be simplified by adding a sampler() method on ProcessorConfig? The equivalent of .get_sampler_from_run_name(...) would then be .get_config_from_run_name(...).sampler(). As a bonus, this form localizes scope of each resource configuration, e.g. expanded:

sampler = (
    engine.get_processor('willow_123')
        .get_config_from_run(run_name='current', config_name='some_cool_grid')
        .sampler()
)

This or something like it should probably the the canonical form of retrieval because a sampler should always refer to a fully-qualified config even if the user used defaults to construct it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like Will's suggestion.

self, run_name: str, device_config_name: str | None = None, max_concurrent_jobs: int = 100
) -> cg.engine.ProcessorSampler:
"""Returns a sampler backed by the engine.
"""Returns a sampler backed by the engine and given `run_name`.

Args:
run_name: A unique identifier representing an automation run for the
processor. An Automation Run contains a collection of device
configurations for the processor.
device_config_name: An identifier used to select the processor configuration
utilized to run the job. A configuration identifies the set of
available qubits, couplers, and supported gates in the processor.
max_concurrent_jobs: The maximum number of jobs to be sent
simultaneously to the Engine. This client-side throttle can be
used to proactively reduce load to the backends and avoid quota
violations when pipelining circuit executions.

Returns:
A `cirq.Sampler` instance (specifically a `engine_sampler.ProcessorSampler`
that will send circuits to the Quantum Computing Service
when sampled.
"""
processor = self._inner_processor()
return processor_sampler.ProcessorSampler(
processor=self,
run_name=run_name,
device_config_name=(
device_config_name
if device_config_name
else processor.default_device_config_key.config_alias
),
max_concurrent_jobs=max_concurrent_jobs,
)

def get_sampler_from_snapshot_id(
self,
snapshot_id: str,
device_config_name: str | None = None,
max_concurrent_jobs: int = 100,
) -> cg.engine.ProcessorSampler:
"""Returns a sampler backed by the engine.
Args:
device_config_name: An identifier used to select the processor configuration
utilized to run the job. A configuration identifies the set of
available qubits, couplers, and supported gates in the processor.
Expand All @@ -126,29 +156,50 @@ def get_sampler(
A `cirq.Sampler` instance (specifically a `engine_sampler.ProcessorSampler`
that will send circuits to the Quantum Computing Service
when sampled.
"""
processor = self._inner_processor()
return processor_sampler.ProcessorSampler(
processor=self,
snapshot_id=snapshot_id,
device_config_name=(
device_config_name
if device_config_name
else processor.default_device_config_key.config_alias
),
max_concurrent_jobs=max_concurrent_jobs,
)

Raises:
ValueError: If only one of `run_name` and `device_config_name` are specified.
ValueError: If both `run_name` and `snapshot_id` are specified.
def get_sampler(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following on from the above comment regarding the requirement of a device config to build a sampler, should we remove get_sampler from the processor and instead rely on explicitly grabbing the default configuration to generate the sampler?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Will about removing the get_sampler methods of the processor, although engine.get_sampler() is useful as a convenience function so that you don't always have to get a config first.

self, device_config_name: str | None = None, max_concurrent_jobs: int = 100
) -> cg.engine.ProcessorSampler:
"""Returns the default sampler backed by the engine.

Args:
device_config_name: An identifier used to select the processor configuration
utilized to run the job. A configuration identifies the set of
available qubits, couplers, and supported gates in the processor.
max_concurrent_jobs: The maximum number of jobs to be sent
simultaneously to the Engine. This client-side throttle can be
used to proactively reduce load to the backends and avoid quota
violations when pipelining circuit executions.

Returns:
A `cirq.Sampler` instance (specifically a `engine_sampler.ProcessorSampler`
that will send circuits to the Quantum Computing Service
when sampled.

"""
processor = self._inner_processor()
if run_name and snapshot_id:
raise ValueError('Cannot specify both `run_name` and `snapshot_id`')
if (bool(run_name) or bool(snapshot_id)) ^ bool(device_config_name):
raise ValueError(
'Cannot specify only one of top level identifier and `device_config_name`'
)
# If not provided, initialize the sampler with the Processor's default values.
if not run_name and not device_config_name and not snapshot_id:
run_name = processor.default_device_config_key.run
device_config_name = processor.default_device_config_key.config_alias
snapshot_id = processor.default_device_config_key.snapshot_id

return processor_sampler.ProcessorSampler(
processor=self,
run_name=run_name,
snapshot_id=snapshot_id,
device_config_name=device_config_name,
run_name=processor.default_device_config_key.run,
snapshot_id=processor.default_device_config_key.snapshot_id,
device_config_name=(
device_config_name
if device_config_name
else processor.default_device_config_key.config_alias
),
max_concurrent_jobs=max_concurrent_jobs,
)

Expand Down
94 changes: 65 additions & 29 deletions cirq-google/cirq_google/engine/engine_processor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def test_get_missing_device():
_ = processor.get_device()


def test_get_sampler_initializes_default_device_configuration() -> None:
def test_get_sampler_from_run_name() -> None:
processor = cg.EngineProcessor(
'a',
'p',
Expand All @@ -335,60 +335,96 @@ def test_get_sampler_initializes_default_device_configuration() -> None:
)
),
)
sampler = processor.get_sampler()
run_name = 'test_run_name'
device_config_name = 'test_device_name'

assert sampler.run_name == "run"
assert sampler.device_config_name == "config_alias"
sampler = processor.get_sampler_from_run_name(
run_name=run_name, device_config_name=device_config_name
)

assert sampler.run_name == run_name
assert sampler.device_config_name == device_config_name

def test_get_sampler_uses_custom_default_device_configuration_key() -> None:

def test_get_sampler_from_run_name_with_default_values() -> None:
default_config_alias = 'test_alias'
processor = cg.EngineProcessor(
'a',
'p',
EngineContext(),
_processor=quantum.QuantumProcessor(
default_device_config_key=quantum.DeviceConfigKey(
run="default_run", config_alias="default_config_alias"
run="run", config_alias=default_config_alias
)
),
)
sampler = processor.get_sampler(run_name="run1", device_config_name="config_alias1")
run_name = 'test_run'

sampler = processor.get_sampler_from_run_name(run_name=run_name)

assert sampler.run_name == "run1"
assert sampler.device_config_name == "config_alias1"
assert sampler.run_name == run_name
assert sampler.device_config_name == default_config_alias


@pytest.mark.parametrize(
'run, snapshot_id, config_alias, error_message',
[
('run', '', '', 'Cannot specify only one of top level identifier and `device_config_name`'),
(
'',
'',
'config',
'Cannot specify only one of top level identifier and `device_config_name`',
def test_get_sampler_from_snapshot_id() -> None:
processor = cg.EngineProcessor(
'a',
'p',
EngineContext(),
_processor=quantum.QuantumProcessor(
default_device_config_key=quantum.DeviceConfigKey(
run="run", config_alias="config_alias"
)
),
('run', 'snapshot_id', 'config', 'Cannot specify both `run_name` and `snapshot_id`'),
],
)
def test_get_sampler_with_incomplete_device_configuration_errors(
run, snapshot_id, config_alias, error_message
) -> None:
)
snapshot_id = 'test_snapshot'
device_config_name = 'test_device_name'

sampler = processor.get_sampler_from_snapshot_id(
snapshot_id=snapshot_id, device_config_name=device_config_name
)

assert sampler.snapshot_id == snapshot_id
assert sampler.device_config_name == device_config_name


def test_get_sampler_from_snapshot_id_with_default_device() -> None:
default_config_alias = 'test_alias'
processor = cg.EngineProcessor(
'a',
'p',
EngineContext(),
_processor=quantum.QuantumProcessor(
default_device_config_key=quantum.DeviceConfigKey(
run="default_run", config_alias="default_config_alias"
run="run", config_alias=default_config_alias
)
),
)
snapshot_id = 'test_snapshot'

with pytest.raises(ValueError, match=error_message):
processor.get_sampler(
run_name=run, device_config_name=config_alias, snapshot_id=snapshot_id
)
sampler = processor.get_sampler_from_snapshot_id(
snapshot_id=snapshot_id, device_config_name=default_config_alias
)

assert sampler.snapshot_id == snapshot_id
assert sampler.device_config_name == default_config_alias


def test_get_sampler_initializes_default_device_configuration() -> None:
processor = cg.EngineProcessor(
'a',
'p',
EngineContext(),
_processor=quantum.QuantumProcessor(
default_device_config_key=quantum.DeviceConfigKey(
run="run", config_alias="config_alias"
)
),
)
sampler = processor.get_sampler()

assert sampler.run_name == "run"
assert sampler.device_config_name == "config_alias"


@mock.patch('cirq_google.engine.engine_client.EngineClient.get_processor_async')
Expand Down
Loading