Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@ Upgrading to Graylog 6.0.x

## Breaking Changes

### Changed default number of process-buffer and output-buffer processors

The default values for the configuration settings `processbuffer_processors` and `outputbuffer_processors` have been
changed. The values will now be calculated based on the number of CPU cores available to the JVM. If you have not
explicitly set values for these settings in your configuration file, the new defaults apply.

The new defaults should improve performance of your system, however, if you want to continue running your system with
the previous defaults, please add the following settings to your configuration file:

```
processbuffer_processors = 5
outputbuffer_processors = 3
```

### Prometheus metrics

The name of the `jvm_classes_loaded` metric [has been changed](https://github.com/prometheus/client_java/pull/681).
Expand Down
19 changes: 19 additions & 0 deletions changelog/unreleased/issue-17450.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
type = "changed"
message = "Automatically choose default number of process-buffer and output-buffer processors based on available CPU cores."

issues = ["17450"]
pulls = ["17737"]

details.user = """
The default values for the configuration settings `processbuffer_processors` and `outputbuffer_processors` have been
changed. The values will now be calculated based on the number of CPU cores available to the JVM. If you have not
explicitly set values for these settings in your configuration file, the new defaults apply.

The new defaults should improve performance of your system, however, if you want to continue running your system with
the previous defaults, please add the following settings to your configuration file:

```
processbuffer_processors = 5
outputbuffer_processors = 3
```
"""
33 changes: 32 additions & 1 deletion graylog2-server/src/main/java/org/graylog2/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.graylog2.cluster.lock.MongoLockService;
import org.graylog2.configuration.converters.JavaDurationConverter;
import org.graylog2.notifications.Notification;
import org.graylog2.plugin.Tools;
import org.graylog2.security.realm.RootAccountRealm;
import org.graylog2.utilities.IPSubnetConverter;
import org.graylog2.utilities.IpSubnet;
Expand Down Expand Up @@ -79,7 +80,7 @@ public class Configuration extends CaConfiguration {
private int outputFlushInterval = 1;

@Parameter(value = "outputbuffer_processors", required = true, validators = PositiveIntegerValidator.class)
private int outputBufferProcessors = 3;
private int outputBufferProcessors = defaultNumberOfOutputBufferProcessors();

@Parameter(value = "outputbuffer_processor_threads_core_pool_size", required = true, validators = PositiveIntegerValidator.class)
private int outputBufferProcessorThreadsCorePoolSize = 3;
Expand Down Expand Up @@ -562,4 +563,34 @@ public void validate(String name, String path) throws ValidationException {
throw new ValidationException("Node ID file at path " + path + " isn't " + b + ". Please specify the correct path or change the permissions");
}
}

/**
* Calculate the default number of output buffer processors as a linear function of available CPU cores.
* The function is designed to yield predetermined values for the following select numbers of CPU cores that
* have proven to work well in real-world production settings:
* <table>
* <tr>
* <th># CPU cores</th><th># buffer processors</th>
* </tr>
* <tr>
* <td>2</td><td>1</td>
* </tr>
* <tr>
* <td>4</td><td>1</td>
* </tr>
* <tr>
* <td>8</td><td>2</td>
* </tr>
* <tr>
* <td>12</td><td>3</td>
* </tr>
* <tr>
* <td>16</td><td>3</td>
* </tr>
* </table>
*/
private static int defaultNumberOfOutputBufferProcessors() {
return Math.round(Tools.availableProcessors() * 0.162f + 0.625f);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ public Long getValue() {
);
disruptor.setDefaultExceptionHandler(new LoggingExceptionHandler(LOG));

LOG.info("Initialized OutputBuffer with ring size <{}> and wait strategy <{}>.",
ringBufferSize, waitStrategy.getClass().getSimpleName());

final OutputBufferProcessor[] processors = new OutputBufferProcessor[processorCount];

for (int i = 0; i < processorCount; i++) {
Expand All @@ -89,6 +86,10 @@ public Long getValue() {
disruptor.handleEventsWithWorkerPool(processors);

ringBuffer = disruptor.start();

LOG.info("Initialized OutputBuffer with ring size <{}> and wait strategy <{}>, " +
"running {} parallel buffer processors.",
ringBufferSize, waitStrategy.getClass().getSimpleName(), processorCount);
}

private ThreadFactory threadFactory(final MetricRegistry metricRegistry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class BaseConfiguration extends PathConfiguration {
protected int shutdownTimeout = 30000;

@Parameter(value = "processbuffer_processors", required = true, validator = PositiveIntegerValidator.class)
private int processBufferProcessors = 5;
private int processBufferProcessors = defaultNumberOfProcessBufferProcessors();

@Parameter(value = "processor_wait_strategy", required = true)
private String processorWaitStrategy = "blocking";
Expand Down Expand Up @@ -148,7 +148,7 @@ public WaitStrategy getInputBufferWaitStrategy() {
public int getAsyncEventbusProcessors() {
return asyncEventbusProcessors;
}

public boolean isMessageJournalEnabled() {
return messageJournalEnabled;
}
Expand Down Expand Up @@ -226,4 +226,33 @@ public void validateJournalMode() throws ValidationException {
"provided when the journal is enabled.");
}
}

/**
* Calculate the default number of process buffer processors as a linear function of available CPU cores.
* The function is designed to yield predetermined values for the following select numbers of CPU cores that
* have proven to work well in real-world production settings:
* <table>
* <tr>
* <th># CPU cores</th><th># buffer processors</th>
* </tr>
* <tr>
* <td>2</td><td>1</td>
* </tr>
* <tr>
* <td>4</td><td>2</td>
* </tr>
* <tr>
* <td>8</td><td>4</td>
* </tr>
* <tr>
* <td>12</td><td>5</td>
* </tr>
* <tr>
* <td>16</td><td>6</td>
* </tr>
* </table>
*/
private static int defaultNumberOfProcessBufferProcessors() {
return Math.round(Tools.availableProcessors() * 0.36f + 0.625f);
}
}
13 changes: 9 additions & 4 deletions graylog2-server/src/main/java/org/graylog2/plugin/Tools.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
* Utility class for various tool/helper functions.
*/
public final class Tools {
private static final byte[] EMPTY_BYTE_ARRAY_4 = {0,0,0,0};
private static final byte[] EMPTY_BYTE_ARRAY_16 = {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
private static final byte[] EMPTY_BYTE_ARRAY_4 = {0, 0, 0, 0};
private static final byte[] EMPTY_BYTE_ARRAY_16 = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

private static final String ES_DATE_FORMAT_JODA = "yyyy-MM-dd HH:mm:ss.SSS";
private static final String ES_DATE_FORMAT_NO_MS = "yyyy-MM-dd HH:mm:ss";
Expand All @@ -77,6 +77,7 @@ public final class Tools {
public static final DateTimeFormatter ES_DATE_FORMAT_NO_MS_FORMATTER = DateTimeFormat.forPattern(Tools.ES_DATE_FORMAT_NO_MS).withZoneUTC();
public static final DateTimeFormatter ISO_DATE_FORMAT_FORMATTER = ISODateTimeFormat.dateTime().withZoneUTC();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

private Tools() {
}
Expand Down Expand Up @@ -447,7 +448,7 @@ public static Double getDouble(Object x) {

public static Number getNumber(Object o, Number defaultValue) {
if (o instanceof Number) {
return (Number)o;
return (Number) o;
}

try {
Expand Down Expand Up @@ -574,7 +575,7 @@ public static URI uriWithTrailingSlash(@Nullable final URI uri) {
}

final String path = firstNonNull(uri.getPath(), "/");
if(path.endsWith("/")) {
if (path.endsWith("/")) {
return uri;
} else {
try {
Expand Down Expand Up @@ -661,4 +662,8 @@ public static Optional<AbsoluteRange> extractHistogramBoundaries(final String qu
return Optional.empty();
}
}

public static int availableProcessors() {
return AVAILABLE_PROCESSORS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,18 @@ public Long getValue() {
);
disruptor.setDefaultExceptionHandler(new LoggingExceptionHandler(LOG));

LOG.info("Initialized ProcessBuffer with ring size <{}> and wait strategy <{}>.",
ringBufferSize, waitStrategy.getClass().getSimpleName());

processors = new ProcessBufferProcessor[processorCount];
for (int i = 0; i < processorCount; i++) {
processors[i] = bufferProcessorFactory.create(decodingProcessorFactory.create(decodeTime, parseTime));
}
disruptor.handleEventsWithWorkerPool(processors);

ringBuffer = disruptor.start();

LOG.info("Initialized ProcessBuffer with ring size <{}> and wait strategy <{}>, " +
"running {} parallel buffer processors.",
ringBufferSize, waitStrategy.getClass().getSimpleName(), processorCount);

}

private ThreadFactory threadFactory(MetricRegistry metricRegistry) {
Expand All @@ -116,7 +118,7 @@ protected void afterInsert(int n) {
incomingMessages.mark(n);
}

public ImmutableMap<String,String> getDump() {
public ImmutableMap<String, String> getDump() {
final ImmutableMap.Builder<String, String> processBufferDump = ImmutableMap.builder();
for (int i = 0, processorsLength = processors.length; i < processorsLength; i++) {
final ProcessBufferProcessor proc = processors[i];
Expand Down
37 changes: 37 additions & 0 deletions graylog2-server/src/test/java/org/graylog2/ConfigurationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import com.github.joschi.jadconfig.RepositoryException;
import com.github.joschi.jadconfig.ValidationException;
import com.github.joschi.jadconfig.repositories.InMemoryRepository;
import org.graylog2.plugin.Tools;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -298,6 +300,41 @@ public void staleLeaderTimeoutAndStaleMasterTimeoutSet() throws Exception {
assertThat(configuration.getStaleLeaderTimeout()).isEqualTo(3000);
}

@Test
public void defaultProcessorNumbers() throws Exception {
// number of buffer processors:
// process, output
final int[][] baseline = {
{1, 1}, // 1 available processor
{1, 1}, // 2 available processors
{2, 1}, // 3 available processors
{2, 1}, // 4 available processors
{2, 1}, // 5 available processors
{3, 2}, // 6 available processors
{3, 2}, // 7 available processors
{4, 2}, // 8 available processors
{4, 2}, // 9 available processors
{4, 2}, // 10 available processors
{5, 2}, // 11 available processors
{5, 3}, // 12 available processors
{5, 3}, // 13 available processors
{6, 3}, // 14 available processors
{6, 3}, // 15 available processors
{6, 3}, // 16 available processors
};

final int[][] actual = new int[baseline.length][2];
for (int i = 0; i < actual.length; i++) {
try (final var tools = Mockito.mockStatic(Tools.class)) {
tools.when(Tools::availableProcessors).thenReturn(i + 1);
final Configuration config = processValidProperties();
actual[i][0] = config.getProcessBufferProcessors();
actual[i][1] = config.getOutputBufferProcessors();
}
}
assertThat(actual).isEqualTo(baseline);
}

/**
* Run the NodeIDFileValidator on a file with the given permissions.
*
Expand Down
21 changes: 16 additions & 5 deletions misc/graylog.conf
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,19 @@ output_flush_interval = 1
output_fault_count_threshold = 5
output_fault_penalty_seconds = 30

# The number of parallel running processors.
# Raise this number if your buffers are filling up.
processbuffer_processors = 5
outputbuffer_processors = 3
# Number of process buffer processors running in parallel.
# By default, the value will be determined automatically based on the number of CPU cores available to the JVM, using
# the formula (<#cores> * 0.36 + 0.625) rounded to the nearest integer.
# Set this value explicitly to override the dynamically calculated value. Try raising the number if your buffers are
# filling up.
#processbuffer_processors = 5

# Number of output buffer processors running in parallel.
# By default, the value will be determined automatically based on the number of CPU cores available to the JVM, using
# the formula (<#cores> * 0.162 + 0.625) rounded to the nearest integer.
# Set this value explicitly to override the dynamically calculated value. Try raising the number if your buffers are
# filling up.
#outputbuffer_processors = 3

# The size of the thread pool in the output buffer processor.
# Default: 3
Expand All @@ -491,9 +500,11 @@ processor_wait_strategy = blocking
ring_size = 65536

inputbuffer_ring_size = 65536
inputbuffer_processors = 2
inputbuffer_wait_strategy = blocking

# Number of input buffer processors running in parallel.
#inputbuffer_processors = 2

# Manually stopped inputs are no longer auto-restarted. To re-enable the previous behavior, set auto_restart_inputs to true.
#auto_restart_inputs = true

Expand Down