Skip to main content

Sample Code

This sample script illustrates integration to the Detect Scam Call endpoint of the voice scam protection service. It opens the WAV file specified in the command line argument, feeds it to the service at real-time and simultaneously outputs the service events as they happen. The supplied WAV file must be stereo, 16-bit PCM and uncompressed.

The script requires the websockets Python dependency. You can create a virtual Python environment and install the dependency with:

python3 -m venv venv
source venv/bin/activate
pip install websockets==13.0

Then you can run the script with:

export HIYA_APP_ID=your-app-id
export HIYA_APP_SECRET=your-app-secret
python3 demo_client.py test_file.wav

Where demo_client.py is the following script:

import argparse
import base64
import json
import os
import time
import wave

from websockets.exceptions import ConnectionClosed
import websockets.sync.client

if 'HIYA_APP_ID' in os.environ:
HIYA_APP_ID = os.environ['HIYA_APP_ID']
else:
print('Specify your App ID in the HIYA_APP_ID environment variable')
exit(1)

if 'HIYA_APP_SECRET' in os.environ:
HIYA_APP_SECRET = os.environ['HIYA_APP_SECRET']
else:
print('Specify your App Secret in the HIYA_APP_SECRET environment variable')
exit(1)

HIYA_URL = 'wss://api.hiyaapi.com/v1/voice-scam-protection/detect-scam-call'


def main():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("file", help="Path to a stereo wav file", type=str)
args = arg_parser.parse_args()

audio_bytes, sample_rate = load_wav_file(args.file)
print('Loaded test audio file')

# The initial text frame describing the audio stream and call metadata,
# with placeholder phone numbers and SIP headers for testing.
# See the API documentation for the schema of this object.
initial_frame_json = json.dumps({
'sampleRate': sample_rate,
'phone': '+15555550000',
'userPhone': '+15555550001',
'direction': 'Incoming',
'isContact': False,
'sipMethod': 'INVITE',
'sipHeaders': {
'Via': 'SIP/2.0/UDP 10.0.0.0:5060;branch=z9hG4bK000000000',
'Max-Forwards': '70',
'From': '"Alice" <sip:+15555550000@example.com>',
'To': '"Bob" <sip:+15555550001@example.com>',
'Call-ID': '4a32e052-3024-4e9b-919b-0b9b543c4730',
'CSeq': '1 INVITE',
'Diversion': '<sip:+15555550010@example.com>',
'Contact': '"Alice" <sip:+15555550000@10.0.0.0:5060>',
'P-Asserted-Identity': '"Alice" <sip:+15555550000@example.com>',
'Content-Length': '0',
}
})

# For testing, we'll send in 100 ms chunks of the test file in real time,
# emulating a live audio stream.
chunk_secs = 0.1
audio_chunks_iter = audio_chunks(audio_bytes, sample_rate, chunk_secs)

websocket = connect_to_service(initial_frame_json)
if websocket is None:
return

for audio_chunk in audio_chunks_iter:
try:
# Send the audio chunk as a binary Websocket frame.
websocket.send(audio_chunk)

# Check if we have received an event, then continue sending in audio.
event = websocket.recv(timeout=0)
event = json.loads(event)
if event['type'] == 'result':
print(f'Received result from service: {event}')
if event['type'] == 'transcript':
print(f'Received transcript from service: {event}')
elif event['type'] == 'error':
error = event['message']
print(f'Received error from service: {error}')
except TimeoutError:
# We have not received an event, continuing.
pass
except ConnectionClosed:
# Reconnect if the connection was lost, then continue sending in audio.
print('Connection lost')
websocket = connect_to_service(initial_frame_json)
if websocket is None:
return

time.sleep(chunk_secs)

print('Reached end of audio file, exiting')


def connect_to_service(initial_frame_json):
"""Connects to the service via Websocket and sends the initial text frame to establish the session."""

credentials = base64.b64encode(f'{HIYA_APP_ID}:{HIYA_APP_SECRET}'.encode()).decode()
headers = [('Authorization', f'Basic {credentials}')]

print('Connecting...')
websocket = websockets.sync.client.connect(HIYA_URL, additional_headers=headers)
print('Connected')
websocket.send(initial_frame_json)

event = websocket.recv()
event = json.loads(event)
if event['type'] == 'sessionMetadata':
session_id = event['sessionId']
print(f'Established session, session ID is {session_id}')
return websocket
elif event['type'] == 'error':
error = event['message']
print(f'Failed to establish session, received the following error: {error}')
return


def audio_chunks(audio_bytes, sample_rate, chunk_secs):
chunk_size = int(seconds_to_bytes(chunk_secs, sample_rate))
offset = 0
offset_secs = 0
last_reported_at = 0
while offset < len(audio_bytes):
chunk = audio_bytes[offset:offset + chunk_size]
offset += len(chunk)

if offset - last_reported_at > seconds_to_bytes(5.0, sample_rate):
offset_secs += 5
print(f'Sent in {offset_secs} seconds of audio')
last_reported_at = offset

yield chunk


def seconds_to_bytes(seconds, sample_rate):
"""Assuming two channels and two-byte samples (16-bit PCM wave file)."""
return seconds * float(sample_rate * 2 * 2)


def load_wav_file(filepath) -> (bytes, int):
with wave.open(filepath, "rb") as file:
channels = file.getnchannels()
bytes_per_sample = file.getsampwidth()
sample_rate = file.getframerate()
frames = file.getnframes()
if channels != 2:
raise RuntimeError(f'Unexpected channel count: {channels}')
if bytes_per_sample != 2:
raise RuntimeError(f'Unexpected bytes per sample: {bytes_per_sample}')
return file.readframes(channels * frames), sample_rate


if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Keyboard interrupt, exiting')