Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions splunklib/searchcommands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
from splunklib.searchcommands.eventing_command import EventingCommand
from splunklib.searchcommands.external_search_command import ExternalSearchCommand, execute
from splunklib.searchcommands.generating_command import GeneratingCommand
from splunklib.searchcommands.internals import DiskBufferSettings
from splunklib.searchcommands.reporting_command import ReportingCommand
from splunklib.searchcommands.search_command import SearchMetric, dispatch
from splunklib.searchcommands.streaming_command import StreamingCommand
Expand All @@ -173,6 +174,7 @@
"Boolean",
"Code",
"Configuration",
"DiskBufferSettings",
"Duration",
"EventingCommand",
"ExternalSearchCommand",
Expand Down
4 changes: 2 additions & 2 deletions splunklib/searchcommands/generating_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ def _execute_chunk_v2(self, process, chunk):
for row in process:
records.append(row)
count += 1
if count == self._record_writer._maxresultrows:
if count == self._record_writer.maxresultrows:
break

for row in records:
self._record_writer.write_record(row)

if count == self._record_writer._maxresultrows:
if count == self._record_writer.maxresultrows:
self._finished = False
else:
self._finished = True
Expand Down
105 changes: 105 additions & 0 deletions splunklib/searchcommands/internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import csv
import gzip
import re
import shutil
import sys
import tempfile
import urllib.parse
import warnings
from collections import OrderedDict, deque, namedtuple
from dataclasses import dataclass
from io import StringIO, TextIOWrapper
from itertools import chain
from json import JSONDecoder, JSONEncoder
Expand Down Expand Up @@ -229,6 +232,23 @@ def replace(match):
# endregion


@dataclass(frozen=True, kw_only=True)
class DiskBufferSettings:
"""Controls disk-spill buffering for RecordWriterV3.

When set on a command via ``@Configuration(disk_buffer=DiskBufferSettings())``,
the CSV reply buffer spills to a temp file instead of accumulating entirely in
RAM. This trades some I/O overhead for a bounded memory footprint regardless
of result set size.

Args:
spool_size: Bytes kept in RAM before spilling to disk. Defaults to 4 MB.
Set to 0 to always write directly to disk.
"""

spool_size: int = 4 * 1024 * 1024


class ConfigurationSettingsType(type):
"""Metaclass for constructing ConfigurationSettings classes.

Expand Down Expand Up @@ -306,6 +326,12 @@ def validate_configuration_setting(specification, name, value):
constraint=lambda value: value in ("events", "reporting", "streaming"),
supporting_protocols=[2],
),
# SDK-only: never sent to Splunk. supporting_protocols=[] keeps it out of iteritems().
"disk_buffer": specification(
type=DiskBufferSettings,
constraint=None,
supporting_protocols=[],
),
}


Expand Down Expand Up @@ -462,6 +488,10 @@ def __init__(self, ofile, maxresultrows=None):
self._committed_record_count = 0
self.custom_fields = set()

@property
def maxresultrows(self):
return self._maxresultrows

@property
def is_flushed(self):
return self._flushed
Expand Down Expand Up @@ -797,3 +827,78 @@ def _write_chunk(self, metadata, body):
self.write(body)
self._ofile.flush()
self._flushed = True


class RecordWriterV3(RecordWriterV2):
"""RecordWriterV2 with disk-spill buffering via SpooledTemporaryFile.

Used when a command is configured with ``@Configuration(disk_buffer=DiskBufferSettings())``.
The CSV reply buffer spills to a temp file instead of accumulating in a StringIO,
so peak RAM is bounded by ``spool_size`` rather than the full result payload.
"""

def __init__(self, ofile, maxresultrows=None, disk_buffer=None):
if disk_buffer is None:
raise ValueError("RecordWriterV3 requires a DiskBufferSettings instance")
self._disk_buffer = disk_buffer
super().__init__(ofile, maxresultrows)
# Replace the StringIO created by RecordWriter.__init__ with a spool file
raw = tempfile.SpooledTemporaryFile(
max_size=self._disk_buffer.spool_size,
mode="w+b",
)
self._buffer_raw = raw
self._buffer = TextIOWrapper(raw, encoding="utf-8", newline="")
self._writer = csv.writer(self._buffer, dialect=CsvDialect)
self._writerow = self._writer.writerow

def write_chunk(self, finished=None):
inspector = self._inspector
self._committed_record_count += self.pending_record_count
self._chunk_count += 1

if len(inspector) == 0:
inspector = None

metadata = [("inspector", inspector), ("finished", finished)]

if metadata:
metadata_bytes = str(
"".join(
self._iterencode_json(
dict((n, v) for n, v in metadata if v is not None), 0
)
)
).encode("utf-8")
metadata_length = len(metadata_bytes)
else:
metadata_bytes = b""
metadata_length = 0

# Flush TextIOWrapper so all pending CSV data lands in the binary spool file
self._buffer.flush()

self._buffer_raw.seek(0, 2)
body_length = self._buffer_raw.tell()
self._buffer_raw.seek(0)

if metadata_length > 0 or body_length > 0:
start_line = f"chunked 1.0,{metadata_length},{body_length}\n".encode("utf-8")
self._ofile.write(start_line)
self._ofile.write(metadata_bytes)
shutil.copyfileobj(self._buffer_raw, self._ofile, length=65536)
self._ofile.flush()
self._flushed = True

self._clear()

def _clear(self):
# Flush wrapper, reset the raw spool, re-sync wrapper position
self._buffer.flush()
self._buffer_raw.seek(0)
self._buffer_raw.truncate()
# Discard the wrapper's internal position cache by seeking it too
self._buffer.seek(0)
self._inspector.clear()
self._pending_record_count = 0
self._fieldnames = None
35 changes: 31 additions & 4 deletions splunklib/searchcommands/search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@

import splunklib.searchcommands.environment as environment
from splunklib.client import Service
from splunklib.searchcommands.decorators import Option
from splunklib.searchcommands.decorators import ConfigurationSetting, Option
from splunklib.searchcommands.internals import (
CommandLineParser,
CsvDialect,
DiskBufferSettings,
InputHeader,
Message,
MetadataDecoder,
Expand All @@ -46,6 +47,7 @@
Recorder,
RecordWriterV1,
RecordWriterV2,
RecordWriterV3,
)
from splunklib.searchcommands.validators import Boolean
from splunklib.utils import ensure_str
Expand Down Expand Up @@ -746,9 +748,12 @@ def _process_protocol_v2(self, argv, ifile, ofile):
# Write search command configuration for consumption by splunkd
# noinspection PyBroadException
try:
self._record_writer = RecordWriterV2(
ofile, getattr(self._metadata.searchinfo, "maxresultrows", None)
)
_disk_buffer = getattr(self._configuration, "disk_buffer", None)
_maxresultrows = getattr(self._metadata.searchinfo, "maxresultrows", None)
if _disk_buffer is not None:
self._record_writer = RecordWriterV3(ofile, _maxresultrows, disk_buffer=_disk_buffer)
else:
self._record_writer = RecordWriterV2(ofile, _maxresultrows)
self.fieldnames = []
self.options.reset()

Expand Down Expand Up @@ -1135,6 +1140,28 @@ def iteritems(self):

# endregion

# region SDK-only settings (not sent to Splunk)

disk_buffer = ConfigurationSetting(
doc="""
Enable disk-spill buffering for the CSV reply buffer.

Set to a :class:`DiskBufferSettings` instance to have the SDK write the
CEXC reply payload to a :mod:`tempfile.SpooledTemporaryFile` instead of
a ``StringIO``. The spool file stays in RAM up to ``spool_size`` bytes,
then spills to a temp directory.

This trades I/O overhead for bounded peak memory usage — useful for
commands that generate or pass through very large result sets.

Default: :const:`None` (StringIO, original behaviour)

Supported by: SDK only (not sent to Splunk)
"""
)

# endregion

# endregion


Expand Down
Loading
Loading