200 lines
4.4 KiB
Go
200 lines
4.4 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/munnerz/kube-plex/pkg/signals"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
)
|
|
|
|
// data pvc name
|
|
var dataPVC = os.Getenv("DATA_PVC")
|
|
|
|
// config pvc name
|
|
var configPVC = os.Getenv("CONFIG_PVC")
|
|
|
|
// transcode pvc name
|
|
var transcodePVC = os.Getenv("TRANSCODE_PVC")
|
|
|
|
// pms namespace
|
|
var namespace = os.Getenv("KUBE_NAMESPACE")
|
|
|
|
// image for the plexmediaserver container containing the transcoder. This
|
|
// should be set to the same as the 'master' pms server
|
|
var pmsImage = os.Getenv("PMS_IMAGE")
|
|
var pmsInternalAddress = os.Getenv("PMS_INTERNAL_ADDRESS")
|
|
|
|
func main() {
|
|
env := os.Environ()
|
|
args := os.Args
|
|
|
|
rewriteEnv(env)
|
|
rewriteArgs(args)
|
|
cwd, err := os.Getwd()
|
|
if err != nil {
|
|
log.Fatalf("Error getting working directory: %s", err)
|
|
}
|
|
pod := generatePod(cwd, env, args)
|
|
|
|
cfg, err := clientcmd.BuildConfigFromFlags("", "")
|
|
if err != nil {
|
|
log.Fatalf("Error building kubeconfig: %s", err)
|
|
}
|
|
|
|
kubeClient, err := kubernetes.NewForConfig(cfg)
|
|
if err != nil {
|
|
log.Fatalf("Error building kubernetes clientset: %s", err)
|
|
}
|
|
|
|
pod, err = kubeClient.CoreV1().Pods(namespace).Create(pod)
|
|
if err != nil {
|
|
log.Fatalf("Error creating pod: %s", err)
|
|
}
|
|
|
|
stopCh := signals.SetupSignalHandler()
|
|
waitFn := func() <-chan error {
|
|
stopCh := make(chan error)
|
|
go func() {
|
|
stopCh <- waitForPodCompletion(kubeClient, pod)
|
|
}()
|
|
return stopCh
|
|
}
|
|
|
|
select {
|
|
case err := <-waitFn():
|
|
if err != nil {
|
|
log.Printf("Error waiting for pod to complete: %s", err)
|
|
}
|
|
case <-stopCh:
|
|
log.Printf("Exit requested.")
|
|
}
|
|
|
|
log.Printf("Cleaning up pod...")
|
|
err = kubeClient.CoreV1().Pods(namespace).Delete(pod.Name, nil)
|
|
if err != nil {
|
|
log.Fatalf("Error cleaning up pod: %s", err)
|
|
}
|
|
}
|
|
|
|
// rewriteEnv rewrites environment variables to be passed to the transcoder
|
|
func rewriteEnv(in []string) {
|
|
// no changes needed
|
|
}
|
|
|
|
func rewriteArgs(in []string) {
|
|
for i, v := range in {
|
|
switch v {
|
|
case "-progressurl", "-manifest_name", "-segment_list":
|
|
in[i+1] = strings.Replace(in[i+1], "http://127.0.0.1:32400", pmsInternalAddress, 1)
|
|
case "-loglevel", "-loglevel_plex":
|
|
in[i+1] = "debug"
|
|
}
|
|
}
|
|
}
|
|
|
|
func generatePod(cwd string, env []string, args []string) *corev1.Pod {
|
|
envVars := toCoreV1EnvVar(env)
|
|
return &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
GenerateName: "pms-elastic-transcoder-",
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
NodeSelector: map[string]string{
|
|
"beta.kubernetes.io/arch": "amd64",
|
|
},
|
|
RestartPolicy: corev1.RestartPolicyNever,
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: "plex",
|
|
Command: args,
|
|
Image: pmsImage,
|
|
Env: envVars,
|
|
WorkingDir: cwd,
|
|
VolumeMounts: []corev1.VolumeMount{
|
|
{
|
|
Name: "data",
|
|
MountPath: "/plex-library",
|
|
ReadOnly: true,
|
|
},
|
|
{
|
|
Name: "config",
|
|
MountPath: "/config",
|
|
ReadOnly: true,
|
|
},
|
|
{
|
|
Name: "transcode",
|
|
MountPath: "/transcode",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
Volumes: []corev1.Volume{
|
|
{
|
|
Name: "data",
|
|
VolumeSource: corev1.VolumeSource{
|
|
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: dataPVC,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "config",
|
|
VolumeSource: corev1.VolumeSource{
|
|
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: configPVC,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "transcode",
|
|
VolumeSource: corev1.VolumeSource{
|
|
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: transcodePVC,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func toCoreV1EnvVar(in []string) []corev1.EnvVar {
|
|
out := make([]corev1.EnvVar, len(in))
|
|
for i, v := range in {
|
|
splitvar := strings.SplitN(v, "=", 2)
|
|
out[i] = corev1.EnvVar{
|
|
Name: splitvar[0],
|
|
Value: splitvar[1],
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func waitForPodCompletion(cl kubernetes.Interface, pod *corev1.Pod) error {
|
|
for {
|
|
pod, err := cl.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch pod.Status.Phase {
|
|
case corev1.PodPending:
|
|
case corev1.PodRunning:
|
|
case corev1.PodUnknown:
|
|
log.Printf("Warning: pod %q is in an unknown state", pod.Name)
|
|
case corev1.PodFailed:
|
|
return fmt.Errorf("pod %q failed", pod.Name)
|
|
case corev1.PodSucceeded:
|
|
return nil
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|