Kubernetes: Pod Topology Spread Constraints
Use topology spread constraints to control how Pods are spread across your cluster among failure-domains such as regions, zones, nodes, and other user-defined topology domains. This can help to achieve high availability as well as efficient resource utilization.
Kubernetes: Inter-pod affinity and anti-affinity Inter-pod affinity and anti-affinity allow you to constrain which nodes your Pods can be scheduled on based on the labels of Pods already running on that node, instead of the node labels.
In OpenShift, the kube-scheduler binds a unit of work (Pod) to a Node. The scheduler reads from a scheduling queue the work, retrieves the current state of the cluster, scores the work based on the scheduling rules (from the policy) and the cluster’s state, and prioritizes binding the Pod to a Node.
These nodes are scheduled based on an instantaneous read of the policy and the environment and a best-estimation placement of the Pod on a Node. With best estimate at the time, these clusters are constantly changing shape and context; there is a need to deschedule and schedule the Pod anew.
Descheduler run on a set interval and re-evaluates the scheduled Pod and Node and Policy, setting an eviction if the Pod should be removed based on the Descheduler Policy.
Pod is removed (unbound).
Thankfully, OpenShift has a Descheduler Operator that more easily facilitates the unbinding of a Pod from a Node based on a cluster-wide configuration of the KubeDeschedulerCustomResource. In a single cluster, there is at most one configured KubeDescheduler named cluster (it has to be fixed), and configures one or more Descheduler Profiles.
Spreads pods evenly among nodes based on topology constraints and duplicate replicates on the same node The profile cannot be used with SoftTopologyAndDuplicates.
SoftTopologyAndDuplicates
Spreads pods with prior with soft constraints The profile cannot be used with TopologyAndDuplicates.
LifecycleAndUtilization
Balances pods based on node resource usage This profile cannot be used with DevPreviewLongLifecycle
EvictPodsWithLocalStorage
Enables pods with local storage to be evicted by the descheduler by all other profiles
EvictPodsWithPVC
Prevents pods with PVCs from being evicted by all other profiles
DevPreviewLongLifecycle
Lifecycle management for pods that are ‘long running’ This profile cannot be used with LifecycleAndUtilization
There must be one or more DeschedulerProfile specified, and there cannot be any duplicates entries. There are two possible mode values – Automatic and Predictive. You have to go the Pod to check the output to see what is Predicted or is Completed.
The DeschedulerOperator excludes the openshift-*, kube-system and hypershift namespaces.
2. Create a Pod that indicates it’s available for eviction using the annotation descheduler.alpha.kubernetes.io/evict: “true” and is updated for the proper node name.
4. Get the Pods in the openshift-kube-descheduler-operator
oc get pods -n openshift-kube-descheduler-operator
NAME READY STATUS RESTARTS AGE
descheduler-f479c5669-5ffxl 1/1 Running 0 2m7s
descheduler-operator-85fc6666cb-5dfr7 1/1 Running 0 27h
5. Check the Logs for the descheduler pod
oc -n openshift-kube-descheduler-operator logs descheduler-f479c5669-5ffxl
I0506 19:59:10.298440 1 pod_lifetime.go:110] "Evicted pod because it exceeded its lifetime" pod="minio-operator/console-7bc65f7dd9-q57lr" maxPodLifeTime=60
I0506 19:59:10.298500 1 evictions.go:158] "Evicted pod in dry run mode" pod="default/demopod1" reason="PodLifeTime"
I0506 19:59:10.298532 1 pod_lifetime.go:110] "Evicted pod because it exceeded its lifetime" pod="default/demopod1" maxPodLifeTime=60
I0506 19:59:10.298598 1 toomanyrestarts.go:90] "Processing node" node="master-0.rdr-rhop-.sslip.io"
I0506 19:59:10.299118 1 toomanyrestarts.go:90] "Processing node" node="master-1.rdr-rhop.sslip.io"
I0506 19:59:10.299575 1 toomanyrestarts.go:90] "Processing node" node="master-2.rdr-rhop.sslip.io"
I0506 19:59:10.300385 1 toomanyrestarts.go:90] "Processing node" node="worker-0.rdr-rhop.sslip.io"
I0506 19:59:10.300701 1 toomanyrestarts.go:90] "Processing node" node="worker-1.rdr-rhop.sslip.io"
I0506 19:59:10.301097 1 descheduler.go:287] "Number of evicted pods" totalEvicted=5
This article shows a simple case for the Descheduler and you can see how it ran a dry run and showed it would evict five pods.
A brief Operator training I gave to my team resulted in these notes. Thanks to many others in the reference section.
An Operator codifies the tasks commonly associated with administrating, operating, and supporting an application. The codified tasks are event-driven responses to changes (create-update-delete-time) in the declared state relative to the actual state of an application, using domain knowledge to reconcile the state and report on the status.
Event (Anomaly) Detection and Response (Remediation)
Scheduling and Tuning
Application Specific Management
Continuous Testing and Chaos Monkey
Helm operators wrap helm charts in a simplistic view of the operation pass-through helm verbs, so one can install, uninstall, destroy, and upgrade using an Operator.
There are four actors in the Operator Pattern.
Initiator – The user who creates the Custom Resource
Operator – The Controller that operates on the Operand
Each Operator operates on an Operand using Managed Resources (Kubernetes and OpenShift) to reconcile states. The states are described in a domain specific language (DSL) encapsulated in a Custom Resource to describe the state of the application:
spec – The User communicates to the Operator the desired state (Operator reads)
status – The Operator communicates back to the User (Operator writes)
While not limited to writing spec and status, if we think spec is initiator specified, and if we think status is operator written, then we limit the chances of creating an unintended reconciliation loop.
The DSL is specified as Custom Resource Definition:
$ oc get crd machinehealthchecks.machine.openshift.io -o=yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
spec:
conversion:
strategy: None
group: machine.openshift.io
names:
kind: MachineHealthCheck
listKind: MachineHealthCheckList
plural: machinehealthchecks
shortNames:
- mhc
- mhcs
singular: machinehealthcheck
scope: Namespaced
name: v1beta1
schema:
openAPIV3Schema:
description: 'MachineHealthCheck'
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation'
type: string
kind:
description: 'Kind is a string value representing the REST resource'
type: string
metadata:
type: object
spec:
description: Specification of machine health check policy
properties:
expectedMachines:
description: total number of machines counted by this machine health
check
minimum: 0
type: integer
unhealthyConditions:
description: UnhealthyConditions contains a list of the conditions.
items:
description: UnhealthyCondition represents a Node.
properties:
status:
minLength: 1
type: string
timeout:
description: Expects an unsigned duration string of decimal
numbers each with optional fraction and a unit suffix, eg
"300ms", "1.5h" or "2h45m". Valid time units are "ns", "us"
(or "µs"), "ms", "s", "m", "h".
pattern: ^([0-9]+(\.[0-9]+)?(ns|us|µs|ms|s|m|h))+$
type: string
type:
minLength: 1
type: string
type: object
minItems: 1
type: array
type: object
For example, these operators manage the applications by orchestrating operations based on changes to the CustomResource (DSL):
As a developer, we’re going to follow a common development pattern:
Implement the Operator Logic (Reconcile the operational state)
Bake Container Image
Create or regenerate Custom Resource Definition (CRD)
Create or regenerate Role-based Access Control (RBAC)
Role
RoleBinding
Apply Operator YAML
Note, we’re not necessarily writing business logic, rather operational logic.
There are some best practices we follow:
Develop one operator per application
One CRD per Controller. Created and Fit for Purpose. Less Contention.
No Cross Dependencies.
Use Kubernetes Primitives when Possible
Be Backwards Compatible
Compartmentalize features via multiple controllers
Scale = one controller
Backup = one controller
Use asynchronous metaphors with the synchronous reconciliation loop
Error, then immediate return, backoff and check later
Use concurrency to split the processing / state
Prune Kubernetes Resources when not used
Apps Run when Operators are stopped
Document what the operator does and how it does it
Install in a single command
We use the Operator SDK – one it’s supported by Red Hat and the CNCF.
operator-sdk: Which one? Ansible and Go
Kubernetes is authored in the Go language. Currently, OpenShift uses Go 1.17 and most operators are implemented in Go. The community has built many go-based operators, we have much more support on StackOverflow and a forum.
Go is ideal for concurrency, strong memory management, everything is baked into the executable deliverable – it’s in memory and ready-to-go. There are lots of alternatives to code NodeJS, Rust, Java, C#, Python. The OpenShift Operators are not necessarily built on the Operator SDK.
Summary
We’ve run through a lot of detail on Operators and learned why we should go with Go operators.
I built a demonstration using GoLang, JSON, bcrypt, http client, http server to model an actual IDP. This is a demonstration only; it really helped me setup/understand what’s happening in the RequestHeader.
This document outlines the flow using the haproxy and Apache Httpd already installed on the Bastion server as part of the installation process and a local Go Test IdP to demonstrate the feature.
The rough flow between OpenShift, the User and the Test IdP is:
For those managing OpenShift clusters, the oc tool manages all the OpenShift resources with handy commands for OpenShift and Kubernetes. The OpenShift Client CLI (oc) project is built on top of kubectl adding built-in features to simplify interactions with an OpenShift cluster.
Much like the kubectl, the oc cli tool provides a feature to Extend the OpenShift CLI with plug-ins. The oc plugins feature is a client-side feature to faciliate interactions with extensions commands; found in the current user’s path. There is an ecosystem of plugins through the community and the Krew Plugin List.
k9s is a terminal based UI to interact with your Kubernetes clusters.
sample-cli-plugin which is a simple example to show how to switch namespaces in k8s. I’m not entirely certain that this works with OpenShift.
These plugins have a wide range of support and code. Some of the plugins are based on python, others are based on go and bash.
oc expands the plugin search path pkg/cli/kubectlwrappers/wrappers.go in plugin.ValidPluginFilenamePrefixes = []string{"oc", "kubectl"} so whole new OpenShift specific plugins are supported. The OpenShift team has also released a number of plugins:
oc-mirror manages OpenShift release, operator catalog, helm charts, and associated container images for mirror registries that support OpenShift environments
oc-compliance facilitates using the OpenShift Compliance operator.
Many of these extensions/plugins are installed using krew; krew is a plugin manager for kubectl. Some users create a directory .kube/plugins and install their plugins in that folder. The plugins folder is then added to the user’s path.
Creating your own Extension
Check to see if any plugins exist:
$ oc plugin list
The following compatible plugins are available:
/Users/user/.kube/plugins/oc-test
If none exist, it’ll prompt you that none are found in the path, and you can install from krew.
To quote the Kubernetes website, “The Operator pattern captures how you can write code to automate a task beyond what Kubernetes itself provides.” The following is an compendium to use while Learning Operators.
The defacto SDK to use is the Operator SDK which provides HELM, Ansible and GO scaffolding to support your implementation of the Operator pattern.
The following are education classes on the OperatorSDK
When Running through the CO0201EN intermediate operators course, I did hit the case where I had to create a ClusterRole and ClusterRoleBinding for the ServiceAccount, here is a snippet that might helper others:
Create OpenShift Plugins – You must have a CLI plug-in file that begins with oc- or kubectl-. You create a file and put it in /usr/local/bin/
Details on running Code Ready Containers on Linux – The key hack I learned awas to ssh -i ~/.crc/machines/crc/id_ecdsa core@<any host in the /etc/hosts>
I ran on VirtualBox Ubuntu 20.04 with Guest Additions Installed
Virtual Box Settings for the Machine – 6 CPU, 18G
System > Processor > Enable PAE/NX and Enable Nested VT-X/AMD-V (which is a must for it to work)
Network > Change Adapter Type to virtio-net and Set Promiscuous Mode to Allow VMS
Install openssh-server so you can login remotely
It will not install without a windowing system, so I have the default windowing environment installed.
Note, I still get a failure on startup complaining about a timeout. I waited about 15 minutes post this, and the command oc get nodes –context admin –cluster crc –kubeconfig .crc/cache/crc_libvirt_4.10.3_amd64/kubeconfig now works.
I had to watch 19 hours of slow paced videos for a training on a new software product (at least new to me). I like fast paced trainings… enter a browser hack.
In Firefox, Navigate to Tools > Browser Tools > Web Developer Tools
Click Console
Type the following snippet to find the first video on a page, and change the playback rate, and Click Enter.
Note, 4.0 can be unintelligible, you’ll need to tweak the speed to match what you need. I found 2.5 to 3.0 to be very comfortable (you just can’t multitask).
Until recently, I’ve been working with NDJSON files as part of the HL7 FHIR: Bulk Data Access IG to export healthcare data and the proposed Import specification to import healthcare data. These files store one JSON per line and delimite with a \n, such as:
The following Java snippet generates a substation set of lines that can be injected into a stream for testing with unicode (and are Newline Delimited).
StringBuilder line = new StringBuilder();
for (int codePoint = 32; codePoint <= 0x1F64F; codePoint++) {
line.append(Character.toChars(codePoint));
if (codePoint % 64 == 0) {
line.append("\n");
}
}
System.out.println(line.toString());
As this data is processed aynchronously in on OpenLiberty: JavaBatch as a set of jobs. These jobs process data through a Read(Source)-Checkpoint-Write(Sink) pattern. The pattern ensures enough data is read from the source before a write action on the sink.
I found that processing the variable data with an unknown unicode set needed a counting stream to keep track of the bytes. The CountingStream acted as a delegate to accumulate bytes, length of the processed values and find the end of a line or end of the file.
public static class CountingStream extends InputStream {
private static int LF = '\n';
private static final long MAX_LENGTH_PER_LINE = 2147483648l;
// 256kb block
private ByteArrayOutputStream out = new ByteArrayOutputStream(256000);
private boolean eol = false;
private long length = 0;
private InputStream delegate;
/**
* ctor
* @param in
*/
public CountingStream(InputStream in) {
this.delegate = in;
}
/**
* reset the line
*/
public void resetLine() {
out.reset();
eol = false;
}
/**
* @return the length of the resources returned in the reader
*/
public long getLength() {
return length;
}
/**
* Gets the String representing the line of bytes.
*
* @return
* @throws UnsupportedEncodingException
*/
public String getLine() throws UnsupportedEncodingException {
String str = new String(out.toByteArray(), "UTF-8");
if (str.isEmpty()) {
str = null;
}
return str;
}
public boolean eol() {
return eol;
}
/**
* Returns the line that is aggregated up until a new line character
* @return
* @throws IOException
*/
public String readLine() throws IOException {
int r = read();
while (r != -1) {
if (eol()) {
eol = false;
return getLine();
}
r = read();
}
if (r == -1 && length > 0) {
return getLine();
}
return getLine();
}
@Override
public int read() throws IOException {
int r = delegate.read();
if (r == -1) {
return -1;
}
byte b = (byte) r;
if (LF == (int) b) {
length++;
eol = true;
} else {
length++;
if (length == MAX_LENGTH_PER_LINE) {
throw new IOException("Current Line in NDJSON exceeds limit " + MAX_LENGTH_PER_LINE);
}
out.write(b);
}
return b;
}
}
Accumulate: Don’t process a character int read() at a time, accumulate your bytes and defer to the String creation in Java to ensure it’s processed in your project’s encoding.
Set a limit: Don’t infinitely process the data, stop when it violates a set contract.
If you are doing more complicated processing, say you are streaming from Azure Blob, Amazon S3 or HTTPS and need to process the stream as chunks. You’ll want to do something a bit more complicated.
The grit here is:
Read Blocks and not the whole stream: Read a block of bytes at a time instead of ‘draining’ the bytes when a sufficient block is retrieved.
Assemble Lines in multiple Block reads.
The code looks like this:
public static class CountingStream extends InputStream {
private static int LF = '\n';
private static final long MAX_LENGTH_PER_LINE = 2147483648l;
// 256kb block
private ByteArrayOutputStream out;
private long length = 0;
private InputStream delegate;
/**
*
* @param out ByteArrayOutputStream caches the data cross reads
* @param in InputStream is generally the S3InputStream
*/
public CountingStream(ByteArrayOutputStream out, InputStream in) {
this.out = out;
this.delegate = in;
}
/**
* Gets the String representing the line of bytes.
*
* @return
* @throws UnsupportedEncodingException
*/
public String getLine() throws UnsupportedEncodingException {
String str = new String(out.toByteArray(), "UTF-8");
if (str.isEmpty()) {
str = null;
}
return str;
}
@Override
public int read() throws IOException {
return delegate.read();
}
/**
* drains the stream so we don't leave a hanging connection
* @throws IOException
*/
public void drain() throws IOException {
int l = delegate.read();
while (l != -1) {
l = delegate.read();
}
}
/**
*
* @param counter
* @return
* @throws IOException
*/
public String readLine() throws IOException {
int r = read();
if (r == -1) {
return null;
} else {
String result = null;
while (r != -1) {
byte b = (byte) r;
if (LF == (int) b) {
length++;
r = -1;
result = getLine();
out.reset();
} else {
length++;
if (length == MAX_LENGTH_PER_LINE) {
throw new IOException("Current Line in NDJSON exceeds limit " + MAX_LENGTH_PER_LINE);
}
out.write(b);
r = read();
}
}
return result;
}
}
}
Importantly, the code defers the caching to the EXTERNAL caller, and in this case assembles a window of resources:
protected void readFromObjectStoreWithLowMaxRange(AmazonS3 c, String b, String workItem) throws FHIRException {
// Don't add tempResources to resources until we're done (we do retry), it's a temporary cache of the Resources
List<Resource> tempResources = new ArrayList<>();
// number of bytes read.
long numberOfBytesRead = 0l;
int totalReads = 0;
int mux = 0;
// The cached FHIRParserException
FHIRParserException fpeDownstream = null;
// Closed when the Scope is out. The size is double the read window.
// The backing array is allocated at creation.
ByteArrayOutputStream cacheOut = new ByteArrayOutputStream(512000);
boolean complete = false;
while (!complete) {
// Condition: At the end of the file... and it should never be more than the file Size
// however, in rare circumstances the person may have 'grown' or added to the file
// while operating on the $import and we want to defensively end rather than an exact match
// Early exit from the loop...
long start = this.transientUserData.getCurrentBytes();
if (this.transientUserData.getImportFileSize() <= start) {
complete = true; // NOP
break;
}
// Condition: Window would exceed the maximum File Size
// Prune the end to -1 off the maximum.
// The following is 256K window. 256K is used so we only drain a portion of the inputstream.
// and not the whole file's input stream.
long end = start + 256000;
if (end >= this.transientUserData.getImportFileSize()) {
end = this.transientUserData.getImportFileSize() - 1;
complete = true; // We still need to process the bytes.
}
// Request the start and end of the S3ObjectInputStream that's going to be retrieved
GetObjectRequest req = new GetObjectRequest(b, workItem)
.withRange(start, end);
if (LOG.isLoggable(Level.FINE)) {
// Useful when debugging edge of the stream problems
LOG.fine("S3ObjectInputStream --- " + start + " " + end);
}
boolean parsedWithIssue = false;
try (S3Object obj = c.getObject(req);
S3ObjectInputStream in = obj.getObjectContent();
BufferedInputStream buffer = new BufferedInputStream(in);
CountingStream reader = new CountingStream(cacheOut, in)) {
// The interior block allows a drain operation to be executed finally.
// as a best practice we want to drain the remainder of the input
// this drain should be at worst 255K (-1 for new line character)
try {
String resourceStr = reader.readLine();
// The first line is a large resource
if (resourceStr == null) {
this.transientUserData.setCurrentBytes(this.transientUserData.getCurrentBytes() + reader.length);
reader.length = 0;
mux++;
}
while (resourceStr != null && totalReads < maxRead) {
try (StringReader stringReader = new StringReader(resourceStr)) {
tempResources.add(FHIRParser.parser(Format.JSON).parse(stringReader));
} catch (FHIRParserException fpe) {
// Log and skip the invalid FHIR resource.
parseFailures++;
parsedWithIssue = true;
fpeDownstream = fpe;
}
long priorLineLength = reader.length;
reader.length = 0;
resourceStr = reader.readLine();
if (!parsedWithIssue) {
this.transientUserData.setCurrentBytes(this.transientUserData.getCurrentBytes() + priorLineLength);
numberOfBytesRead += reader.length;
totalReads++;
} else if ((parsedWithIssue && resourceStr != null)
|| (parsedWithIssue &&
(this.transientUserData.getImportFileSize() <= this.transientUserData.getCurrentBytes() + priorLineLength))) {
// This is potentially end of bad line
// -or-
// This is the last line failing to parse
long line = this.transientUserData.getNumOfProcessedResources() + totalReads;
LOG.log(Level.SEVERE, "readResources: Failed to parse line " + totalReads + " of [" + workItem + "].", fpeDownstream);
String msg = "readResources: " + "Failed to parse line " + line + " of [" + workItem + "].";
ConfigurationAdapter adapter = ConfigurationFactory.getInstance();
String out = adapter.getOperationOutcomeProvider(source);
boolean collectImportOperationOutcomes = adapter.shouldStorageProviderCollectOperationOutcomes(source)
&& !StorageType.HTTPS.equals(adapter.getStorageProviderStorageType(out));
if (collectImportOperationOutcomes) {
FHIRGenerator.generator(Format.JSON)
.generate(generateException(line, msg),
transientUserData.getBufferStreamForImportError());
transientUserData.getBufferStreamForImportError().write(NDJSON_LINESEPERATOR);
}
}
}
} catch (Exception ex) {
LOG.warning("readFhirResourceFromObjectStore: Error proccesing file [" + workItem + "] - " + ex.getMessage());
// Throw exception to fail the job, the job can be continued from the current checkpoint after the
// problem is solved.
throw new FHIRException("Unable to read from S3 during processing", ex);
} finally {
try {
reader.drain();
} catch (Exception s3e) {
LOG.fine(() -> "Error while draining the stream, this is benign");
LOG.throwing("S3Provider", "readFromObjectStoreWithLowMaxRange", s3e);
}
}
// Increment if the last line fails
if (this.transientUserData.getImportFileSize() <= this.transientUserData.getCurrentBytes()) {
parseFailures++;
}
} catch (FHIRException fe) {
throw fe;
} catch (Exception e) {
throw new FHIRException("Unable to read from S3 File", e);
}
// Condition: The optimized block and the number of Resources read
// exceed the minimum thresholds or the maximum size of a single resource
if (tempResources.size() >= maxRead) {
LOG.fine("TempResourceSize " + tempResources.size());
complete = true;
}
// Condition: The optimized block is exceeded and the number of resources is
// only one so we want to threshold a maximum number of resources
// 512K * 5 segments (we don't want to repeat too much work) = 2.6M
if (numberOfBytesRead > 2621440 && tempResources.size() >= 1) {
complete = true;
}
// Condition: The maximum read block is exceeded and we have at least one Resource
// 2147483648 / (256*1024*1024) = 8192 Reads
if (mux == 8193) {
throw new FHIRException("Too Long a Line");
}
// We've read more than one window
if (mux > 1 && tempResources.size() >=1) {
break;
}
}
// Condition: There is no complete resource to read.
if (totalReads == 0) {
LOG.warning("File grew since the start");
this.transientUserData.setCurrentBytes(this.transientUserData.getImportFileSize());
}
// Add the accumulated resources
this.resources.addAll(tempResources);
}
The above code was created and licensed as part of the IBM/FHIR project.
Net, carefully approach Unicode formats, becareful on reassembling bytes and reading windows from Channels.