Audio Automation Scripting: Streamline Your Workflow with Code
2025/09/12
17 min read

Audio Automation Scripting: Streamline Your Workflow with Code

Learn how to automate audio processing tasks using Python, FFmpeg, and other scripting tools to boost productivity and ensure consistent results.

Audio Automation Scripting: Streamline Your Workflow with Code

In today's fast-paced audio production environment, automation is key to maintaining efficiency and consistency. This comprehensive guide explores how to leverage scripting and automation tools to streamline audio workflows, from simple batch processing to complex production pipelines.

Introduction to Audio Automation

Why Automate Audio Tasks?

Efficiency Benefits:

  • Time Savings: Process hundreds of files in minutes instead of hours
  • Consistency: Ensure identical processing across all files
  • Scalability: Handle large-scale projects with ease
  • Error Reduction: Minimize human mistakes in repetitive tasks
  • 24/7 Processing: Run tasks overnight or during downtime

Common Automation Scenarios:

  • Batch audio format conversion
  • Automatic loudness normalization
  • Metadata tagging and organization
  • Quality control and validation
  • Backup and archival workflows
  • Real-time processing pipelines

Automation Tools Overview

Command-Line Tools:

  • FFmpeg: Swiss army knife for audio/video processing
  • SoX: Sound processing library and utilities
  • LAME: High-quality MP3 encoder
  • FLAC: Free lossless audio codec tools
  • MediaInfo: Media file analysis tool

Programming Languages:

  • Python: Excellent libraries and ease of use
  • JavaScript/Node.js: Web-based automation and APIs
  • Bash/Shell: System-level automation on Unix systems
  • PowerShell: Windows automation and scripting
  • Go: High-performance concurrent processing

Specialized Software:

  • Adobe Audition: Batch processing capabilities
  • Reaper: Extensive scripting and automation features
  • Pro Tools: Batch processing and workflow automation
  • Logic Pro: Scripter plugin and automation tools

FFmpeg Mastery for Audio Automation

FFmpeg Fundamentals

Basic Syntax Structure:

ffmpeg [global_options] [input_options] -i input_file [output_options] output_file

Essential Audio Operations:

Format Conversion:

# Convert WAV to MP3 with high quality
ffmpeg -i input.wav -codec:a libmp3lame -b:a 320k output.mp3
 
# Convert to FLAC with maximum compression
ffmpeg -i input.wav -codec:a flac -compression_level 12 output.flac
 
# Convert to AAC with variable bitrate
ffmpeg -i input.wav -codec:a aac -q:a 2 output.m4a

Audio Processing:

# Normalize audio to -23 LUFS
ffmpeg -i input.wav -af loudnorm=I=-23:LRA=7:TP=-2 output.wav
 
# Apply high-pass filter at 80Hz
ffmpeg -i input.wav -af highpass=f=80 output.wav
 
# Fade in/out effects
ffmpeg -i input.wav -af "afade=t=in:ss=0:d=3,afade=t=out:st=57:d=3" output.wav

Advanced FFmpeg Techniques

Batch Processing Scripts:

Bash Script for Format Conversion:

#!/bin/bash
 
# Convert all WAV files to MP3
for file in *.wav; do
    if [ -f "$file" ]; then
        output="${file%.wav}.mp3"
        echo "Converting $file to $output"
        ffmpeg -i "$file" -codec:a libmp3lame -b:a 320k "$output"
    fi
done
 
echo "Batch conversion completed!"

PowerShell Script for Windows:

# Convert all audio files to normalized MP3
Get-ChildItem -Path "." -Include *.wav,*.flac,*.aiff -Recurse | ForEach-Object {
    $outputFile = $_.BaseName + ".mp3"
    $outputPath = Join-Path $_.DirectoryName $outputFile
    
    Write-Host "Processing: $($_.Name)"
    & ffmpeg -i $_.FullName -af loudnorm=I=-16:LRA=11:TP=-1.5 -codec:a libmp3lame -b:a 320k $outputPath
}

Complex Audio Processing Pipeline:

#!/bin/bash
 
# Professional audio processing pipeline
process_audio() {
    local input_file="$1"
    local output_file="$2"
    
    ffmpeg -i "$input_file" \
        -af "highpass=f=20,lowpass=f=20000,loudnorm=I=-23:LRA=7:TP=-2" \
        -codec:a libmp3lame \
        -b:a 320k \
        -metadata title="Processed Audio" \
        -metadata artist="Auto Processor" \
        "$output_file"
}
 
# Process all files in directory
for file in *.wav; do
    if [ -f "$file" ]; then
        output="processed_${file%.wav}.mp3"
        process_audio "$file" "$output"
        echo "Processed: $file$output"
    fi
done

Python Audio Automation

Essential Python Libraries

Core Audio Libraries:

# Install required packages
pip install pydub librosa soundfile mutagen pyaudio

Library Overview:

  • pydub: Simple audio manipulation and conversion
  • librosa: Advanced audio analysis and processing
  • soundfile: Reading and writing audio files
  • mutagen: Metadata manipulation
  • pyaudio: Real-time audio I/O

Pydub for Audio Processing

Basic Operations:

from pydub import AudioSegment
from pydub.utils import which
import os
 
# Set FFmpeg path (if needed)
AudioSegment.converter = which("ffmpeg")
AudioSegment.ffmpeg = which("ffmpeg")
AudioSegment.ffprobe = which("ffprobe")
 
def convert_audio_format(input_file, output_file, format="mp3", bitrate="320k"):
    """Convert audio file to specified format"""
    try:
        audio = AudioSegment.from_file(input_file)
        audio.export(output_file, format=format, bitrate=bitrate)
        print(f"Converted: {input_file}{output_file}")
        return True
    except Exception as e:
        print(f"Error converting {input_file}: {e}")
        return False
 
def normalize_loudness(input_file, output_file, target_lufs=-23):
    """Normalize audio to target LUFS"""
    audio = AudioSegment.from_file(input_file)
    
    # Calculate current loudness (simplified)
    current_db = audio.dBFS
    target_db = target_lufs
    
    # Adjust gain
    gain_adjustment = target_db - current_db
    normalized_audio = audio + gain_adjustment
    
    normalized_audio.export(output_file, format="wav")
    print(f"Normalized: {input_file} (gain: {gain_adjustment:.2f}dB)")
 
def batch_process_directory(input_dir, output_dir, operation="convert"):
    """Process all audio files in directory"""
    os.makedirs(output_dir, exist_ok=True)
    
    audio_extensions = ['.wav', '.mp3', '.flac', '.aiff', '.m4a']
    
    for filename in os.listdir(input_dir):
        if any(filename.lower().endswith(ext) for ext in audio_extensions):
            input_path = os.path.join(input_dir, filename)
            output_filename = os.path.splitext(filename)[0] + ".mp3"
            output_path = os.path.join(output_dir, output_filename)
            
            if operation == "convert":
                convert_audio_format(input_path, output_path)
            elif operation == "normalize":
                normalize_loudness(input_path, output_path)
 
# Usage example
if __name__ == "__main__":
    batch_process_directory("./input", "./output", "convert")

Advanced Audio Manipulation:

from pydub import AudioSegment
from pydub.effects import normalize, compress_dynamic_range
import numpy as np
 
class AudioProcessor:
    def __init__(self):
        self.processed_files = []
    
    def apply_fade(self, audio, fade_in_ms=1000, fade_out_ms=1000):
        """Apply fade in/out effects"""
        return audio.fade_in(fade_in_ms).fade_out(fade_out_ms)
    
    def apply_eq(self, audio, low_gain=0, mid_gain=0, high_gain=0):
        """Simple 3-band EQ simulation"""
        # This is a simplified EQ - for professional use, consider librosa
        result = audio
        
        if low_gain != 0:
            # Boost/cut low frequencies (simplified)
            result = result.low_pass_filter(500) + low_gain + result.high_pass_filter(500)
        
        return result
    
    def create_crossfade(self, audio1, audio2, crossfade_duration=2000):
        """Create crossfade between two audio segments"""
        # Ensure audio2 starts with fade in
        audio2_faded = audio2.fade_in(crossfade_duration)
        
        # Ensure audio1 ends with fade out
        audio1_faded = audio1.fade_out(crossfade_duration)
        
        # Overlay the crossfade portion
        crossfade_start = len(audio1_faded) - crossfade_duration
        result = audio1_faded.overlay(audio2_faded, position=crossfade_start)
        
        return result
    
    def batch_enhance(self, input_dir, output_dir):
        """Apply enhancement to all files"""
        for filename in os.listdir(input_dir):
            if filename.lower().endswith(('.wav', '.mp3', '.flac')):
                input_path = os.path.join(input_dir, filename)
                output_path = os.path.join(output_dir, f"enhanced_{filename}")
                
                try:
                    audio = AudioSegment.from_file(input_path)
                    
                    # Apply processing chain
                    enhanced = self.apply_fade(audio)
                    enhanced = normalize(enhanced)
                    
                    enhanced.export(output_path, format="wav")
                    self.processed_files.append(output_path)
                    print(f"Enhanced: {filename}")
                    
                except Exception as e:
                    print(f"Error processing {filename}: {e}")
 
# Usage
processor = AudioProcessor()
processor.batch_enhance("./input", "./enhanced")

Librosa for Advanced Analysis

Audio Analysis and Feature Extraction:

import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
import soundfile as sf
 
class AudioAnalyzer:
    def __init__(self, sample_rate=22050):
        self.sr = sample_rate
    
    def load_audio(self, file_path):
        """Load audio file with librosa"""
        y, sr = librosa.load(file_path, sr=self.sr)
        return y, sr
    
    def analyze_tempo(self, y, sr):
        """Extract tempo and beat information"""
        tempo, beats = librosa.beat.beat_track(y=y, sr=sr)
        return tempo, beats
    
    def extract_features(self, y, sr):
        """Extract comprehensive audio features"""
        features = {}
        
        # Spectral features
        features['spectral_centroid'] = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
        features['spectral_rolloff'] = librosa.feature.spectral_rolloff(y=y, sr=sr)[0]
        features['zero_crossing_rate'] = librosa.feature.zero_crossing_rate(y)[0]
        
        # MFCC features
        features['mfcc'] = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
        
        # Chroma features
        features['chroma'] = librosa.feature.chroma_stft(y=y, sr=sr)
        
        # Tempo and rhythm
        tempo, beats = self.analyze_tempo(y, sr)
        features['tempo'] = tempo
        features['beats'] = beats
        
        return features
    
    def detect_onset(self, y, sr):
        """Detect onset times in audio"""
        onset_frames = librosa.onset.onset_detect(y=y, sr=sr)
        onset_times = librosa.frames_to_time(onset_frames, sr=sr)
        return onset_times
    
    def pitch_shift(self, y, sr, n_steps):
        """Pitch shift audio by n semitones"""
        return librosa.effects.pitch_shift(y, sr=sr, n_steps=n_steps)
    
    def time_stretch(self, y, rate):
        """Time stretch audio by rate factor"""
        return librosa.effects.time_stretch(y, rate=rate)
    
    def generate_report(self, file_path):
        """Generate comprehensive audio analysis report"""
        y, sr = self.load_audio(file_path)
        features = self.extract_features(y, sr)
        
        report = {
            'file': file_path,
            'duration': len(y) / sr,
            'sample_rate': sr,
            'tempo': features['tempo'],
            'avg_spectral_centroid': np.mean(features['spectral_centroid']),
            'avg_zero_crossing_rate': np.mean(features['zero_crossing_rate']),
            'onset_count': len(self.detect_onset(y, sr))
        }
        
        return report
 
# Usage example
analyzer = AudioAnalyzer()
 
def batch_analyze(directory):
    """Analyze all audio files in directory"""
    reports = []
    
    for filename in os.listdir(directory):
        if filename.lower().endswith(('.wav', '.mp3', '.flac')):
            file_path = os.path.join(directory, filename)
            try:
                report = analyzer.generate_report(file_path)
                reports.append(report)
                print(f"Analyzed: {filename}")
            except Exception as e:
                print(f"Error analyzing {filename}: {e}")
    
    return reports
 
# Generate analysis reports
reports = batch_analyze("./audio_files")
for report in reports:
    print(f"File: {report['file']}")
    print(f"Duration: {report['duration']:.2f}s")
    print(f"Tempo: {report['tempo']:.1f} BPM")
    print("---")

Metadata Management Automation

Mutagen for Tag Manipulation

Comprehensive Metadata Management:

from mutagen.mp3 import MP3
from mutagen.flac import FLAC
from mutagen.mp4 import MP4
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TCON, APIC
import os
import csv
from PIL import Image
import requests
 
class MetadataManager:
    def __init__(self):
        self.supported_formats = {
            '.mp3': self.process_mp3,
            '.flac': self.process_flac,
            '.m4a': self.process_mp4,
            '.mp4': self.process_mp4
        }
    
    def process_mp3(self, file_path, metadata):
        """Process MP3 metadata"""
        try:
            audio = MP3(file_path, ID3=ID3)
            
            # Add ID3 tag if it doesn't exist
            if audio.tags is None:
                audio.add_tags()
            
            # Set metadata
            if 'title' in metadata:
                audio.tags.add(TIT2(encoding=3, text=metadata['title']))
            if 'artist' in metadata:
                audio.tags.add(TPE1(encoding=3, text=metadata['artist']))
            if 'album' in metadata:
                audio.tags.add(TALB(encoding=3, text=metadata['album']))
            if 'year' in metadata:
                audio.tags.add(TDRC(encoding=3, text=str(metadata['year'])))
            if 'genre' in metadata:
                audio.tags.add(TCON(encoding=3, text=metadata['genre']))
            
            # Add album art if provided
            if 'artwork' in metadata:
                self.add_artwork_mp3(audio, metadata['artwork'])
            
            audio.save()
            return True
            
        except Exception as e:
            print(f"Error processing MP3 {file_path}: {e}")
            return False
    
    def process_flac(self, file_path, metadata):
        """Process FLAC metadata"""
        try:
            audio = FLAC(file_path)
            
            # Set metadata
            for key, value in metadata.items():
                if key != 'artwork':
                    audio[key.upper()] = str(value)
            
            # Add album art if provided
            if 'artwork' in metadata:
                self.add_artwork_flac(audio, metadata['artwork'])
            
            audio.save()
            return True
            
        except Exception as e:
            print(f"Error processing FLAC {file_path}: {e}")
            return False
    
    def process_mp4(self, file_path, metadata):
        """Process MP4/M4A metadata"""
        try:
            audio = MP4(file_path)
            
            # MP4 tag mapping
            tag_mapping = {
                'title': '\xa9nam',
                'artist': '\xa9ART',
                'album': '\xa9alb',
                'year': '\xa9day',
                'genre': '\xa9gen'
            }
            
            for key, value in metadata.items():
                if key in tag_mapping:
                    audio[tag_mapping[key]] = [str(value)]
            
            # Add album art if provided
            if 'artwork' in metadata:
                self.add_artwork_mp4(audio, metadata['artwork'])
            
            audio.save()
            return True
            
        except Exception as e:
            print(f"Error processing MP4 {file_path}: {e}")
            return False
    
    def add_artwork_mp3(self, audio, artwork_path):
        """Add artwork to MP3 file"""
        try:
            with open(artwork_path, 'rb') as img_file:
                audio.tags.add(APIC(
                    encoding=3,
                    mime='image/jpeg',
                    type=3,  # Cover (front)
                    desc='Cover',
                    data=img_file.read()
                ))
        except Exception as e:
            print(f"Error adding artwork: {e}")
    
    def add_artwork_flac(self, audio, artwork_path):
        """Add artwork to FLAC file"""
        try:
            image = Image.open(artwork_path)
            audio.clear_pictures()
            
            pic = mutagen.flac.Picture()
            pic.type = 3  # Cover (front)
            pic.mime = 'image/jpeg'
            pic.desc = 'Cover'
            
            with open(artwork_path, 'rb') as img_file:
                pic.data = img_file.read()
            
            audio.add_picture(pic)
        except Exception as e:
            print(f"Error adding FLAC artwork: {e}")
    
    def add_artwork_mp4(self, audio, artwork_path):
        """Add artwork to MP4 file"""
        try:
            with open(artwork_path, 'rb') as img_file:
                audio['covr'] = [img_file.read()]
        except Exception as e:
            print(f"Error adding MP4 artwork: {e}")
    
    def batch_tag_from_csv(self, csv_file, audio_directory):
        """Batch tag files from CSV metadata"""
        try:
            with open(csv_file, 'r', encoding='utf-8') as file:
                reader = csv.DictReader(file)
                
                for row in reader:
                    filename = row.get('filename')
                    if not filename:
                        continue
                    
                    file_path = os.path.join(audio_directory, filename)
                    if not os.path.exists(file_path):
                        print(f"File not found: {file_path}")
                        continue
                    
                    # Extract file extension
                    _, ext = os.path.splitext(filename.lower())
                    
                    if ext in self.supported_formats:
                        # Prepare metadata dictionary
                        metadata = {k: v for k, v in row.items() if k != 'filename' and v}
                        
                        # Process the file
                        success = self.supported_formats[ext](file_path, metadata)
                        if success:
                            print(f"Tagged: {filename}")
                        else:
                            print(f"Failed to tag: {filename}")
                    else:
                        print(f"Unsupported format: {ext}")
                        
        except Exception as e:
            print(f"Error processing CSV: {e}")
    
    def extract_metadata_to_csv(self, directory, output_csv):
        """Extract metadata from all files to CSV"""
        metadata_list = []
        
        for filename in os.listdir(directory):
            file_path = os.path.join(directory, filename)
            _, ext = os.path.splitext(filename.lower())
            
            if ext == '.mp3':
                try:
                    audio = MP3(file_path)
                    metadata = {
                        'filename': filename,
                        'title': str(audio.get('TIT2', [''])[0]),
                        'artist': str(audio.get('TPE1', [''])[0]),
                        'album': str(audio.get('TALB', [''])[0]),
                        'year': str(audio.get('TDRC', [''])[0]),
                        'genre': str(audio.get('TCON', [''])[0])
                    }
                    metadata_list.append(metadata)
                except Exception as e:
                    print(f"Error reading {filename}: {e}")
        
        # Write to CSV
        if metadata_list:
            with open(output_csv, 'w', newline='', encoding='utf-8') as file:
                fieldnames = ['filename', 'title', 'artist', 'album', 'year', 'genre']
                writer = csv.DictWriter(file, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(metadata_list)
            
            print(f"Metadata extracted to {output_csv}")
 
# Usage example
metadata_manager = MetadataManager()
 
# Batch tag from CSV
metadata_manager.batch_tag_from_csv('metadata.csv', './audio_files')
 
# Extract metadata to CSV
metadata_manager.extract_metadata_to_csv('./audio_files', 'extracted_metadata.csv')

Workflow Automation Systems

Complete Processing Pipeline

Professional Audio Processing Pipeline:

import os
import json
import logging
from datetime import datetime
from pathlib import Path
import shutil
 
class AudioPipeline:
    def __init__(self, config_file='pipeline_config.json'):
        self.config = self.load_config(config_file)
        self.setup_logging()
        self.processed_files = []
        self.failed_files = []
    
    def load_config(self, config_file):
        """Load pipeline configuration"""
        default_config = {
            "input_directory": "./input",
            "output_directory": "./output",
            "backup_directory": "./backup",
            "temp_directory": "./temp",
            "processing_steps": [
                "validate",
                "normalize",
                "convert",
                "tag",
                "quality_check"
            ],
            "output_format": "mp3",
            "target_loudness": -16,
            "quality_settings": {
                "mp3_bitrate": "320k",
                "flac_compression": 8
            }
        }
        
        try:
            with open(config_file, 'r') as f:
                user_config = json.load(f)
                default_config.update(user_config)
        except FileNotFoundError:
            print(f"Config file {config_file} not found, using defaults")
        
        return default_config
    
    def setup_logging(self):
        """Setup logging for pipeline"""
        log_dir = Path("logs")
        log_dir.mkdir(exist_ok=True)
        
        log_file = log_dir / f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        
        self.logger = logging.getLogger(__name__)
    
    def validate_file(self, file_path):
        """Validate audio file integrity"""
        try:
            # Use FFprobe to validate file
            import subprocess
            result = subprocess.run([
                'ffprobe', '-v', 'error', '-show_entries',
                'format=duration', '-of', 'csv=p=0', file_path
            ], capture_output=True, text=True)
            
            if result.returncode == 0 and result.stdout.strip():
                duration = float(result.stdout.strip())
                if duration > 0:
                    self.logger.info(f"Validated: {file_path} (duration: {duration:.2f}s)")
                    return True
            
            self.logger.error(f"Invalid file: {file_path}")
            return False
            
        except Exception as e:
            self.logger.error(f"Validation error for {file_path}: {e}")
            return False
    
    def normalize_audio(self, input_path, output_path):
        """Normalize audio to target loudness"""
        try:
            import subprocess
            
            cmd = [
                'ffmpeg', '-i', input_path,
                '-af', f'loudnorm=I={self.config["target_loudness"]}:LRA=7:TP=-2',
                '-y', output_path
            ]
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                self.logger.info(f"Normalized: {input_path}")
                return True
            else:
                self.logger.error(f"Normalization failed: {result.stderr}")
                return False
                
        except Exception as e:
            self.logger.error(f"Normalization error: {e}")
            return False
    
    def convert_format(self, input_path, output_path):
        """Convert to target format"""
        try:
            import subprocess
            
            format_settings = self.config["quality_settings"]
            output_format = self.config["output_format"]
            
            if output_format == "mp3":
                cmd = [
                    'ffmpeg', '-i', input_path,
                    '-codec:a', 'libmp3lame',
                    '-b:a', format_settings["mp3_bitrate"],
                    '-y', output_path
                ]
            elif output_format == "flac":
                cmd = [
                    'ffmpeg', '-i', input_path,
                    '-codec:a', 'flac',
                    '-compression_level', str(format_settings["flac_compression"]),
                    '-y', output_path
                ]
            else:
                self.logger.error(f"Unsupported output format: {output_format}")
                return False
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                self.logger.info(f"Converted: {input_path} to {output_format}")
                return True
            else:
                self.logger.error(f"Conversion failed: {result.stderr}")
                return False
                
        except Exception as e:
            self.logger.error(f"Conversion error: {e}")
            return False
    
    def quality_check(self, file_path):
        """Perform quality checks on processed file"""
        try:
            # Check file size
            file_size = os.path.getsize(file_path)
            if file_size < 1000:  # Less than 1KB
                self.logger.warning(f"Suspiciously small file: {file_path}")
                return False
            
            # Validate with FFprobe
            if not self.validate_file(file_path):
                return False
            
            self.logger.info(f"Quality check passed: {file_path}")
            return True
            
        except Exception as e:
            self.logger.error(f"Quality check error: {e}")
            return False
    
    def backup_original(self, file_path):
        """Backup original file"""
        try:
            backup_dir = Path(self.config["backup_directory"])
            backup_dir.mkdir(exist_ok=True)
            
            backup_path = backup_dir / Path(file_path).name
            shutil.copy2(file_path, backup_path)
            
            self.logger.info(f"Backed up: {file_path}")
            return True
            
        except Exception as e:
            self.logger.error(f"Backup error: {e}")
            return False
    
    def process_file(self, file_path):
        """Process single audio file through pipeline"""
        self.logger.info(f"Starting processing: {file_path}")
        
        # Create temp and output paths
        temp_dir = Path(self.config["temp_directory"])
        output_dir = Path(self.config["output_directory"])
        temp_dir.mkdir(exist_ok=True)
        output_dir.mkdir(exist_ok=True)
        
        file_stem = Path(file_path).stem
        temp_path = temp_dir / f"{file_stem}_temp.wav"
        output_path = output_dir / f"{file_stem}.{self.config['output_format']}"
        
        try:
            # Backup original
            if not self.backup_original(file_path):
                raise Exception("Backup failed")
            
            # Validation
            if "validate" in self.config["processing_steps"]:
                if not self.validate_file(file_path):
                    raise Exception("Validation failed")
            
            # Normalization
            if "normalize" in self.config["processing_steps"]:
                if not self.normalize_audio(file_path, str(temp_path)):
                    raise Exception("Normalization failed")
                current_file = str(temp_path)
            else:
                current_file = file_path
            
            # Format conversion
            if "convert" in self.config["processing_steps"]:
                if not self.convert_format(current_file, str(output_path)):
                    raise Exception("Conversion failed")
            
            # Quality check
            if "quality_check" in self.config["processing_steps"]:
                if not self.quality_check(str(output_path)):
                    raise Exception("Quality check failed")
            
            # Cleanup temp files
            if temp_path.exists():
                temp_path.unlink()
            
            self.processed_files.append(str(output_path))
            self.logger.info(f"Successfully processed: {file_path}")
            return True
            
        except Exception as e:
            self.failed_files.append(file_path)
            self.logger.error(f"Processing failed for {file_path}: {e}")
            
            # Cleanup on failure
            if temp_path.exists():
                temp_path.unlink()
            if output_path.exists():
                output_path.unlink()
            
            return False
    
    def run_pipeline(self):
        """Run complete processing pipeline"""
        input_dir = Path(self.config["input_directory"])
        
        if not input_dir.exists():
            self.logger.error(f"Input directory does not exist: {input_dir}")
            return
        
        # Find all audio files
        audio_extensions = ['.wav', '.mp3', '.flac', '.aiff', '.m4a']
        audio_files = []
        
        for ext in audio_extensions:
            audio_files.extend(input_dir.glob(f"*{ext}"))
            audio_files.extend(input_dir.glob(f"*{ext.upper()}"))
        
        if not audio_files:
            self.logger.warning("No audio files found in input directory")
            return
        
        self.logger.info(f"Found {len(audio_files)} audio files to process")
        
        # Process each file
        for file_path in audio_files:
            self.process_file(str(file_path))
        
        # Generate summary report
        self.generate_report()
    
    def generate_report(self):
        """Generate processing summary report"""
        total_files = len(self.processed_files) + len(self.failed_files)
        success_rate = (len(self.processed_files) / total_files * 100) if total_files > 0 else 0
        
        report = {
            "timestamp": datetime.now().isoformat(),
            "total_files": total_files,
            "processed_successfully": len(self.processed_files),
            "failed_files": len(self.failed_files),
            "success_rate": f"{success_rate:.1f}%",
            "processed_files": self.processed_files,
            "failed_files": self.failed_files,
            "configuration": self.config
        }
        
        # Save report
        report_path = f"processing_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(report_path, 'w') as f:
            json.dump(report, f, indent=2)
        
        # Log summary
        self.logger.info(f"Processing complete: {len(self.processed_files)}/{total_files} files successful")
        self.logger.info(f"Report saved: {report_path}")
        
        print(f"\nProcessing Summary:")
        print(f"Total files: {total_files}")
        print(f"Successful: {len(self.processed_files)}")
        print(f"Failed: {len(self.failed_files)}")
        print(f"Success rate: {success_rate:.1f}%")
 
# Usage example
if __name__ == "__main__":
    # Create pipeline configuration
    config = {
        "input_directory": "./input",
        "output_directory": "./output",
        "target_loudness": -16,
        "output_format": "mp3",
        "processing_steps": ["validate", "normalize", "convert", "quality_check"]
    }
    
    with open('pipeline_config.json', 'w') as f:
        json.dump(config, f, indent=2)
    
    # Run pipeline
    pipeline = AudioPipeline()
    pipeline.run_pipeline()

Real-Time Audio Processing

Live Audio Monitoring

Real-Time Audio Analysis:

import pyaudio
import numpy as np
import threading
import time
from collections import deque
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
 
class RealTimeAudioMonitor:
    def __init__(self, sample_rate=44100, chunk_size=1024):
        self.sample_rate = sample_rate
        self.chunk_size = chunk_size
        self.audio = pyaudio.PyAudio()
        self.stream = None
        self.is_monitoring = False
        
        # Data storage
        self.audio_buffer = deque(maxlen=100)  # Store last 100 chunks
        self.level_history = deque(maxlen=200)  # RMS level history
        self.peak_history = deque(maxlen=200)   # Peak level history
        
        # Analysis parameters
        self.rms_threshold = 0.01
        self.peak_threshold = 0.8
        
    def start_monitoring(self, device_index=None):
        """Start real-time audio monitoring"""
        try:
            self.stream = self.audio.open(
                format=pyaudio.paFloat32,
                channels=1,
                rate=self.sample_rate,
                input=True,
                input_device_index=device_index,
                frames_per_buffer=self.chunk_size,
                stream_callback=self.audio_callback
            )
            
            self.is_monitoring = True
            self.stream.start_stream()
            print("Audio monitoring started")
            
        except Exception as e:
            print(f"Error starting monitoring: {e}")
    
    def stop_monitoring(self):
        """Stop audio monitoring"""
        self.is_monitoring = False
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
        self.audio.terminate()
        print("Audio monitoring stopped")
    
    def audio_callback(self, in_data, frame_count, time_info, status):
        """Process incoming audio data"""
        if status:
            print(f"Audio callback status: {status}")
        
        # Convert bytes to numpy array
        audio_data = np.frombuffer(in_data, dtype=np.float32)
        
        # Store in buffer
        self.audio_buffer.append(audio_data)
        
        # Calculate levels
        rms_level = np.sqrt(np.mean(audio_data**2))
        peak_level = np.max(np.abs(audio_data))
        
        # Store level history
        self.level_history.append(rms_level)
        self.peak_history.append(peak_level)
        
        # Check for clipping
        if peak_level > self.peak_threshold:
            print(f"WARNING: Clipping detected! Peak: {peak_level:.3f}")
        
        # Check for silence
        if rms_level < self.rms_threshold:
            print(f"Low signal level: {rms_level:.6f}")
        
        return (in_data, pyaudio.paContinue)
    
    def get_current_levels(self):
        """Get current audio levels"""
        if self.level_history and self.peak_history:
            return {
                'rms': self.level_history[-1],
                'peak': self.peak_history[-1],
                'rms_avg': np.mean(list(self.level_history)[-10:]),  # Last 10 samples
                'peak_max': np.max(list(self.peak_history)[-10:])
            }
        return None
    
    def analyze_frequency_spectrum(self):
        """Analyze frequency spectrum of recent audio"""
        if not self.audio_buffer:
            return None
        
        # Concatenate recent audio data
        recent_audio = np.concatenate(list(self.audio_buffer)[-10:])
        
        # Perform FFT
        fft = np.fft.fft(recent_audio)
        freqs = np.fft.fftfreq(len(recent_audio), 1/self.sample_rate)
        
        # Get magnitude spectrum (positive frequencies only)
        magnitude = np.abs(fft[:len(fft)//2])
        freqs = freqs[:len(freqs)//2]
        
        return freqs, magnitude
    
    def detect_silence(self, duration_threshold=2.0):
        """Detect silence periods"""
        if len(self.level_history) < duration_threshold * self.sample_rate / self.chunk_size:
            return False
        
        # Check recent history
        recent_samples = int(duration_threshold * self.sample_rate / self.chunk_size)
        recent_levels = list(self.level_history)[-recent_samples:]
        
        return all(level < self.rms_threshold for level in recent_levels)
 
# Usage example for real-time monitoring
def run_audio_monitor():
    monitor = RealTimeAudioMonitor()
    
    try:
        monitor.start_monitoring()
        
        # Monitor for 30 seconds
        for i in range(30):
            time.sleep(1)
            levels = monitor.get_current_levels()
            if levels:
                print(f"RMS: {levels['rms']:.4f}, Peak: {levels['peak']:.4f}")
            
            # Check for silence
            if monitor.detect_silence():
                print("Silence detected!")
        
    except KeyboardInterrupt:
        print("Monitoring interrupted by user")
    finally:
        monitor.stop_monitoring()
 
# Uncomment to run
# run_audio_monitor()

Conclusion

Audio automation scripting opens up powerful possibilities for streamlining workflows, ensuring consistency, and scaling audio production operations. The techniques and tools covered in this guide provide a solid foundation for building custom automation solutions.

Key Benefits of Audio Automation:

  • Efficiency: Process large volumes of audio quickly and consistently
  • Quality Control: Implement standardized processing and validation
  • Scalability: Handle projects of any size with automated workflows
  • Reliability: Reduce human error in repetitive tasks
  • Flexibility: Customize processing chains for specific requirements

Best Practices:

  • Start Simple: Begin with basic scripts and gradually add complexity
  • Test Thoroughly: Validate automation with small batches before full deployment
  • Monitor Quality: Implement quality checks and validation steps
  • Document Workflows: Maintain clear documentation for automation processes
  • Backup Originals: Always preserve original files before processing
  • Log Everything: Implement comprehensive logging for troubleshooting

Future Considerations:

  • Cloud Processing: Leverage cloud services for large-scale automation
  • Machine Learning: Integrate AI for intelligent audio processing decisions
  • Real-Time Processing: Develop live audio processing and monitoring systems
  • API Integration: Connect with streaming platforms and distribution services
  • Collaborative Workflows: Build systems for team-based audio production

By mastering these automation techniques, audio professionals can focus more on creative decisions while ensuring technical excellence through reliable, repeatable processes. The investment in learning scripting and automation pays dividends in productivity and consistency across all audio production workflows.

Remember that automation should enhance creativity, not replace it. Use these tools to handle the technical heavy lifting while you focus on the artistic and creative aspects of audio production.

Author

avatar for Mp3To Team
Mp3To Team

Categories

Newsletter

Join the community

Subscribe to our newsletter for the latest news and updates