Skip to content

Commit

Permalink
playback: improve /list response time (#3637) (#4096)
Browse files Browse the repository at this point in the history
Response times of the /list endpoint were slow because the duration of
each segment was computed from scratch by summing the duration of each
of its parts.

This is improved by storing the duration of the overall segment in the
header and using that, if available.
  • Loading branch information
aler9 authored Jan 3, 2025
1 parent 07af42e commit ac0ddc9
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 44 deletions.
7 changes: 7 additions & 0 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,11 @@ func (pconf *Path) validate(
return fmt.Errorf("a path with a regular expression (or path 'all') and a static source" +
" must have 'sourceOnDemand' set to true")
}

if pconf.SRTPublishPassphrase != "" && pconf.Source != "publisher" {
return fmt.Errorf("'srtPublishPassphase' can only be used when source is 'publisher'")
}

if pconf.SourceOnDemand && pconf.Source == "publisher" {
return fmt.Errorf("'sourceOnDemand' is useless when source is 'publisher'")
}
Expand Down Expand Up @@ -488,6 +490,11 @@ func (pconf *Path) validate(
}
}

// avoid overflowing DurationV0 of mvhd
if pconf.RecordSegmentDuration > Duration(24*time.Hour) {
return fmt.Errorf("maximum segment duration is 1 day")
}

// Authentication (deprecated)

if deprecatedCredentialsMode {
Expand Down
4 changes: 2 additions & 2 deletions internal/playback/on_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func seekAndMux(
}
defer f.Close()

firstInit, err = segmentFMP4ReadInit(f)
firstInit, _, err = segmentFMP4ReadHeader(f)
if err != nil {
return err
}
Expand All @@ -81,7 +81,7 @@ func seekAndMux(
defer f.Close()

var init *fmp4.Init
init, err = segmentFMP4ReadInit(f)
init, _, err = segmentFMP4ReadHeader(f)
if err != nil {
return err
}
Expand Down
26 changes: 12 additions & 14 deletions internal/playback/on_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
Expand All @@ -29,7 +28,7 @@ type listEntry struct {
URL string `json:"url"`
}

func computeDurationAndConcatenate(
func readDurationAndConcatenate(
recordFormat conf.RecordFormat,
segments []*recordstore.Segment,
) ([]listEntry, error) {
Expand All @@ -45,19 +44,18 @@ func computeDurationAndConcatenate(
}
defer f.Close()

init, err := segmentFMP4ReadInit(f)
init, duration, err := segmentFMP4ReadHeader(f)
if err != nil {
return err
}

_, err = f.Seek(0, io.SeekStart)
if err != nil {
return err
}

maxDuration, err := segmentFMP4ReadDuration(f, init)
if err != nil {
return err
// if duration is not present in the header, compute it
// by parsing each part
if duration == 0 {
duration, err = segmentFMP4ReadDurationFromParts(f, init)
if err != nil {
return err
}
}

if len(out) != 0 && segmentFMP4CanBeConcatenated(
Expand All @@ -66,12 +64,12 @@ func computeDurationAndConcatenate(
init,
seg.Start) {
prevStart := out[len(out)-1].Start
curEnd := seg.Start.Add(maxDuration)
curEnd := seg.Start.Add(duration)
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
} else {
out = append(out, listEntry{
Start: seg.Start,
Duration: listEntryDuration(maxDuration),
Duration: listEntryDuration(duration),
})
}

Expand Down Expand Up @@ -137,7 +135,7 @@ func (s *Server) onList(ctx *gin.Context) {
return
}

entries, err := computeDurationAndConcatenate(pathConf.RecordFormat, segments)
entries, err := readDurationAndConcatenate(pathConf.RecordFormat, segments)
if err != nil {
s.writeError(ctx, http.StatusInternalServerError, err)
return
Expand Down
149 changes: 149 additions & 0 deletions internal/playback/on_list_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package playback

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"testing"
"time"

"github.com/abema/go-mp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -206,3 +211,147 @@ func TestOnListDifferentInit(t *testing.T) {
},
}, out)
}

func writeDuration(f io.ReadWriteSeeker, d time.Duration) error {
_, err := f.Seek(0, io.SeekStart)
if err != nil {
return err
}

// check and skip ftyp header and content

buf := make([]byte, 8)
_, err = io.ReadFull(f, buf)
if err != nil {
return err
}

if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) {
return fmt.Errorf("ftyp box not found")
}

ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])

_, err = f.Seek(int64(ftypSize), io.SeekStart)
if err != nil {
return err
}

// check and skip moov header

_, err = io.ReadFull(f, buf)
if err != nil {
return err
}

if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) {
return fmt.Errorf("moov box not found")
}

moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])

moovPos, err := f.Seek(8, io.SeekCurrent)
if err != nil {
return err
}

var mvhd mp4.Mvhd
_, err = mp4.Unmarshal(f, uint64(moovSize-8), &mvhd, mp4.Context{})
if err != nil {
return err
}

mvhd.DurationV0 = uint32(d / time.Millisecond)

_, err = f.Seek(moovPos, io.SeekStart)
if err != nil {
return err
}

_, err = mp4.Marshal(f, &mvhd, mp4.Context{})
if err != nil {
return err
}

return nil
}

func TestOnListCachedDuration(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)

err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)

func() {
var f *os.File
f, err = os.Create(filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
require.NoError(t, err)
defer f.Close()

init := fmp4.Init{
Tracks: []*fmp4.InitTrack{
{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
},
},
},
}

err = init.Marshal(f)
require.NoError(t, err)

err = writeDuration(f, 50*time.Second)
require.NoError(t, err)
}()

s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.Duration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Name: "mypath",
RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
},
},
AuthManager: test.NilAuthManager,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()

u, err := url.Parse("http://myuser:mypass@localhost:9996/list")
require.NoError(t, err)

v := url.Values{}
v.Set("path", "mypath")
u.RawQuery = v.Encode()

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
require.NoError(t, err)

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()

require.Equal(t, http.StatusOK, res.StatusCode)

var out interface{}
err = json.NewDecoder(res.Body).Decode(&out)
require.NoError(t, err)

require.Equal(t, []interface{}{
map[string]interface{}{
"duration": float64(50),
"start": time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano),
"url": "http://localhost:9996/get?duration=50&path=mypath&start=" +
url.QueryEscape(time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano)),
},
}, out)
}
Loading

0 comments on commit ac0ddc9

Please sign in to comment.