Source code for quantumaudio.tools.stream

# Copyright 2024 Moth Quantum
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==========================================================================

from typing import Any, Callable, Union

import numpy as np
from tqdm import tqdm

# ======================
# Buffering Functions
# ======================


[docs] def get_chunks( data: np.ndarray, chunk_size: int = 256, verbose: bool = False, ) -> None: """ Splits a `numpy` array into smaller chunks of specified size. This function takes a long array and divides it into smaller chunks, which can be useful for processing large datasets in manageable pieces. Args: data: The input array to be split. The array can be one-dimensional or two-dimensional. If one-dimensional, it will be reshaped into two dimensions. chunk_size: The size of each chunk. Default is 256. verbose: If True, prints detailed information about the data and chunks. Default is False. Returns: None """ if verbose: print(f"\nShape: {data.shape}") if data.ndim == 1: data = data.reshape(1, -1) y_chunks = [] for i in range(0, data.shape[-1], chunk_size): chunk = data[:, i : i + chunk_size] y_chunks.append(chunk) if verbose: print( f"Num samples: {data.shape[-1]}, Num channels: {data.shape[0]}, Buffer size: {chunk_size}" ) print(f"Number of chunks: {len(y_chunks)}") print(f"Shape per buffer: {y_chunks[0].shape}\n") return y_chunks
[docs] def process( chunk: np.ndarray, scheme: "quantumaudio.schemes.Scheme", backend: Any = None, shots: int = 8000 ) -> np.ndarray: """Process a chunk of data according to a specified scheme by encoding it and decoding it back. Args: chunk: Data chunk to be processed. scheme: Processing scheme. backend: A valid Backend object accepted by the :ref:`execute function <execute>` at `decode`. Defaults to `qiskit_aer.AerSimulator()`. shots: Number of shots. Returns: None """ chunk = scheme.decode( scheme.encode(chunk, verbose=0), backend=backend, shots=shots ) return chunk
[docs] def process_chunks( chunks: list[np.ndarray], scheme: "quantumaudio.schemes.Scheme", process_function: Callable[[np.ndarray, Any, dict], list] = process, batch_process: bool = False, verbose: bool = True, **kwargs, ) -> list: """Process chunks of data in an iteration according to a specified scheme. Args: chunks: Data chunks to be processed. scheme: Processing scheme. process_function: Function to process each chunk (default is 'process'). verbose: If True, enables verbose logging. Defaults to False. Returns: None """ processed_chunks = [] if not batch_process: # process one by one try: for chunk in tqdm(chunks, disable=not verbose): processed_chunk = process_function(chunk, scheme, **kwargs) processed_chunks.append(processed_chunk) except (KeyboardInterrupt, Exception) as e: print(e) return processed_chunks else: # process all at once processed_chunks = process_function(chunks, scheme, **kwargs) return processed_chunks
[docs] def combine_chunks(chunks: list[np.ndarray]) -> np.ndarray: """Combine a list of `numpy` arrays along an axis based on the data dimension. Args: chunks: A list of `numpy` arrays to be combined. Returns: np.ndarray """ try: if chunks[0].ndim != 1: output = np.concatenate(chunks, axis=1) else: output = np.concatenate(chunks, axis=0) return output except: print("Warning: Chunks cannot be combined.") # if different data type return chunks
[docs] def normalize(data: np.ndarray) -> np.ndarray: """Normalize the input data to ensure it lies within the standard range [-1.0, 1.0]. Args: data: Input array containing audio data. """ if not np.all((data >= -1.0) & (data <= 1.0)): print("Warning: Values outside the digital audio range are clipped.") data = np.clip(data, -1.0, 1.0) return data
[docs] def stream_data( data: np.ndarray, scheme: "quantumaudio.schemes.Scheme", chunk_size: int = 64, process_function: Callable[[np.ndarray, Any, dict], list] = process, batch_process: bool = False, verbose: Union[int, bool] = 2, **kwargs, ) -> np.ndarray: """Processes data by dividing it into chunks, applying a Quantum Audio scheme, and combining the results. Args: data: The input data array to be processed. scheme: The quantum audio scheme to be applied to each chunk. chunk_size: The size of each chunk. Defaults to 64. process_function: Function to process each chunk. - Defaults to :func:`process` which accepts any additional `**kwargs`. batch_process: Boolean value to inidicate whether the provided `process_function` applies to a single chunk or a batch. verbose: If True, enables verbose logging. Defaults to 2. - >1: Shows progress bar. - >2: Shows additional information such as buffer size and number of qubits. Returns: np.ndarray """ data = normalize(data) if chunk_size > data.shape[-1]: chunk_size = data.shape[-1] if verbose == 2: print(f"Chunk size set to {data.shape[-1]}.") chunks = get_chunks( data=data, chunk_size=chunk_size, verbose=(verbose == 2) ) if verbose == 2: scheme.calculate(chunks[0]) processed_chunks = process_chunks( chunks=chunks, scheme=scheme, process_function=process_function, batch_process=batch_process, verbose=verbose, **kwargs, ) output = combine_chunks(processed_chunks) return output