]> git.openfl.eu Git - aggred.git/commitdiff
initial implementation main
authorFl_GUI <flor.guilini@hotmail.com>
Sat, 28 Mar 2026 15:09:00 +0000 (16:09 +0100)
committerFl_GUI <flor.guilini@hotmail.com>
Sat, 28 Mar 2026 15:09:00 +0000 (16:09 +0100)
cache/cache.go [new file with mode: 0644]
cache/cachefactory.go [new file with mode: 0644]
combine/combine.go [new file with mode: 0644]
go.mod [new file with mode: 0644]
main.go [new file with mode: 0644]
pipe/pipe.go [new file with mode: 0644]
rsscollector/collector.go [new file with mode: 0644]
rsscollector/collector_test.go [new file with mode: 0644]
rsstypes/rss.go [new file with mode: 0644]
rsstypes/source_test.go [new file with mode: 0644]

diff --git a/cache/cache.go b/cache/cache.go
new file mode 100644 (file)
index 0000000..5384691
--- /dev/null
@@ -0,0 +1,82 @@
+package cache
+
+import (
+       "fmt"
+       "io"
+       "net/http"
+       "net/url"
+       "os"
+       "strings"
+       "time"
+)
+
+type Cache struct {
+       Timeout  time.Duration
+       rssUrl   string
+       root     *os.Root
+       fileName string
+}
+
+func CreateCache(rssUrl string) (Cache, error) {
+       var result Cache
+       result.Timeout = time.Hour
+       result.rssUrl = rssUrl
+       result.root = cacheFactory.root
+
+       parsedUrl, err := url.ParseRequestURI(rssUrl)
+       if err != nil {
+               return result, err
+       }
+       result.fileName = fmt.Sprintf("%s%s", parsedUrl.Hostname(), strings.ReplaceAll(parsedUrl.Path, "/", "."))
+       if result.staleness() != nil {
+               return result, result.Fetch()
+       }
+       return result, nil
+}
+
+func (c Cache) Fetch() error {
+       resp, err := http.Get(c.rssUrl)
+       if err != nil {
+               return err
+       }
+       defer resp.Body.Close()
+       f, err := c.root.Create(c.fileName)
+       if err != nil {
+               return err
+       }
+       _, err = io.Copy(f, resp.Body)
+       return err
+}
+
+func (c Cache) Source() string {
+       return c.rssUrl
+}
+
+func (c Cache) Open() (io.ReadCloser, error) {
+       err := c.staleness()
+       if err != nil {
+               fmt.Printf("fetching %s because of: %s\n", c.fileName, err)
+               err := c.Fetch()
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       // file exists
+       return c.root.Open(c.fileName)
+}
+
+func (c Cache) staleness() error {
+       fileInfo, err := c.root.Stat(c.fileName)
+
+       if err != nil {
+               return err
+       }
+
+       timeSince := time.Since(fileInfo.ModTime())
+       if c.Timeout < timeSince {
+               return fmt.Errorf("timeout of %s exceeded", c.Timeout.String())
+       }
+
+       return nil
+}
diff --git a/cache/cachefactory.go b/cache/cachefactory.go
new file mode 100644 (file)
index 0000000..bfbfcd4
--- /dev/null
@@ -0,0 +1,35 @@
+package cache
+
+import (
+       "errors"
+       "io/fs"
+       "os"
+       "path"
+)
+
+var cacheFactory factory
+
+type factory struct {
+       root *os.Root
+}
+
+func createFactory(root *os.Root) factory {
+       return factory{root}
+}
+
+func init() {
+       cacheDir, err := os.UserCacheDir()
+       if err != nil {
+               panic(err)
+       }
+       cacheDir = path.Join(cacheDir, "aggred-cache")
+       err = os.Mkdir(cacheDir, fs.ModeDir|0740)
+       if err != nil && !errors.Is(err, fs.ErrExist) {
+               panic(err)
+       }
+       cacheRoot, err := os.OpenRoot(cacheDir)
+       if err != nil {
+               panic(err)
+       }
+       cacheFactory = createFactory(cacheRoot)
+}
diff --git a/combine/combine.go b/combine/combine.go
new file mode 100644 (file)
index 0000000..0f36a7f
--- /dev/null
@@ -0,0 +1,28 @@
+package combine
+
+import "aggred/rsstypes"
+
+func Combine(template rsstypes.Rss, others ...rsstypes.Rss) rsstypes.Rss {
+       var items = template.Channel.Items
+       for _, feed := range others {
+               items = append(items, itemReferences(feed)...)
+       }
+       template.Channel.Items = items
+       return template
+}
+
+func itemReferences(rss rsstypes.Rss) []rsstypes.Item {
+       title := rss.Channel.Title
+       source := rss.Source
+       if source == "" {
+               return rss.Channel.Items
+       }
+       for i := range rss.Channel.Items {
+               // edit in place
+               rss.Channel.Items[i].Source = rsstypes.Source{
+                       title,
+                       source,
+               }
+       }
+       return rss.Channel.Items
+}
diff --git a/go.mod b/go.mod
new file mode 100644 (file)
index 0000000..d6dd448
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,3 @@
+module aggred
+
+go 1.25.0
diff --git a/main.go b/main.go
new file mode 100644 (file)
index 0000000..0def980
--- /dev/null
+++ b/main.go
@@ -0,0 +1,57 @@
+package main
+
+import (
+       "aggred/pipe"
+       "aggred/rsscollector"
+       "aggred/rsstypes"
+       "bytes"
+       "context"
+       "encoding/xml"
+       "fmt"
+       "os"
+       "os/signal"
+)
+
+const pipeName = "pipe.io"
+
+func template() rsstypes.Rss {
+       templateString := `
+               <rss version="2.0">
+                       <channel>
+                               <title>Combined</title>
+                       </channel>
+               </rss>
+       `
+       var template rsstypes.Rss
+       err := xml.NewDecoder(bytes.NewBufferString(templateString)).Decode(&template)
+       if err != nil {
+               panic(err)
+       }
+       return template
+}
+
+func main() {
+       nytimes := "https://rss.nytimes.com/services/xml/rss/nyt/Europe.xml"
+       hackaday := "https://hackaday.com/blog/feed/"
+       reader, err := rsscollector.NewRssCollector(template(), nytimes, hackaday)
+       if err != nil {
+               panic(err)
+       }
+       ctx, cancelCtx := context.WithCancel(context.Background())
+       defer cancelCtx()
+
+       closePipe, err := pipe.DoPipe(ctx, reader, "combinedrss.namedpipe")
+       if err != nil {
+               panic(err)
+       }
+       defer func() {
+               fmt.Println("closing named pipe")
+               if err := closePipe(); err != nil {
+                       fmt.Println(err)
+               }
+       }()
+
+       signalCtx, _ := signal.NotifyContext(ctx, os.Interrupt)
+       <-signalCtx.Done()
+       fmt.Println("stopping after interrupt received")
+}
diff --git a/pipe/pipe.go b/pipe/pipe.go
new file mode 100644 (file)
index 0000000..62c1e41
--- /dev/null
@@ -0,0 +1,64 @@
+package pipe
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "io/fs"
+       "os"
+       "syscall"
+       "time"
+)
+
+func DoPipe(ctx context.Context, content io.ReadSeeker, pipePath string) (func() error, error) {
+       closer, err := ensurePipe(pipePath)
+       if err != nil {
+               return closer, fmt.Errorf("failed to create a fifo pipe file: %w", err)
+       }
+       go writeToPipe(ctx, content, pipePath)
+       return closer, nil
+}
+
+func ensurePipe(pipeName string) (func() error, error) {
+       if err := os.Remove(pipeName); err != nil {
+               if !errors.Is(err, fs.ErrNotExist) {
+                       return func() error { return err }, err
+               }
+       }
+       err := syscall.Mkfifo(pipeName, 0666)
+       if err != nil {
+               return func() error { return err }, err
+       }
+       return func() error { return os.Remove(pipeName) }, nil
+}
+
+func writeToPipe(ctx context.Context, content io.ReadSeeker, pipeName string) {
+       fileFlags := os.O_WRONLY | os.O_APPEND
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
+                       file, err := os.OpenFile(pipeName, fileFlags, fs.ModeNamedPipe)
+                       if err != nil {
+                               panic(err)
+                       }
+                       file.WriteString("")
+                       bytes, err := io.Copy(file, content)
+                       fmt.Printf("Written %v bytes\n", bytes)
+                       if err != nil {
+                               fmt.Println(err)
+                               fmt.Println("seeking start of content")
+                               content.Seek(0, io.SeekStart)
+                       }
+                       err = file.Close()
+                       if err != nil {
+                               fmt.Println(err)
+                       }
+                       // allow time for reader of file to close the handle
+                       <-time.After(time.Millisecond)
+               }
+       }
+}
diff --git a/rsscollector/collector.go b/rsscollector/collector.go
new file mode 100644 (file)
index 0000000..2474e19
--- /dev/null
@@ -0,0 +1,153 @@
+package rsscollector
+
+import (
+       "aggred/cache"
+       "aggred/combine"
+       "aggred/rsstypes"
+       "bytes"
+       "encoding/xml"
+       "errors"
+       "fmt"
+       "io"
+       "slices"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+type RssCollector struct {
+       template rsstypes.Rss
+       sources  []cache.Cache
+       buff     bytes.Buffer
+}
+
+func NewRssCollector(template rsstypes.Rss, rsssources ...string) (io.ReadSeeker, error) {
+       rw := new(RssCollector)
+       rw.template = template
+
+       rw.sources = make([]cache.Cache, 0, len(rsssources))
+       errs := make([]error, 0)
+       for _, source := range rsssources {
+               c, err := cache.CreateCache(source)
+               if err != nil {
+                       errs = append(errs, fmt.Errorf("could not create a source for %v: %w", source, err))
+               } else {
+                       rw.sources = append(rw.sources, c)
+               }
+       }
+
+       return rw, errors.Join(errs...)
+}
+
+func (r *RssCollector) Read(p []byte) (n int, err error) {
+       err = r.ensureContent()
+       if err != nil {
+               return 0, err
+       }
+       return r.buff.Read(p)
+}
+
+func (r *RssCollector) WriteTo(w io.Writer) (n int64, err error) {
+       err = r.ensureContent()
+       if err != nil {
+               return 0, err
+       }
+       return r.buff.WriteTo(w)
+}
+
+func (r *RssCollector) Seek(offset int64, whence int) (int64, error) {
+       if whence != io.SeekStart {
+               return 0, errors.New("seeking is only supported from io.SeekStart")
+       }
+       if offset != 0 {
+               return 0, errors.New("seeking is only supported with 0 offset")
+       }
+       r.buff.Reset()
+       return 0, nil
+}
+
+func (r *RssCollector) ensureContent() error {
+       if r.buff.Len() != 0 {
+               // some content left
+               return nil
+       }
+
+       feeds := make([]rsstypes.Rss, len(r.sources))
+       errs := make([]error, len(r.sources))
+       errCount := atomic.Int32{}
+
+       wg := sync.WaitGroup{}
+       workers := make(chan struct{}, 5)
+       for i := range r.sources {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       workers <- struct{}{}
+                       defer func() { <-workers }()
+
+                       source := r.sources[i]
+                       reader, err := source.Open()
+                       if err != nil {
+                               errCount.Add(1)
+                               errs[i] = err
+                               return
+                       }
+                       decoder := xml.NewDecoder(reader)
+
+                       err = decoder.Decode(&feeds[i])
+                       if err != nil {
+                               errCount.Add(1)
+                               errs[i] = err
+                               return
+                       }
+
+                       feeds[i].Source = r.sources[i].Source()
+               }()
+       }
+
+       wg.Wait()
+
+       if errCount.Load() == int32(len(errs)) {
+               noneLoadedErr := errors.New("could not load a single source")
+               return fmt.Errorf("%w: %w", noneLoadedErr, errors.Join(errs...))
+       } else if errCount.Load() != 0 {
+               someFailedErr := errors.New("could not load some sources")
+               err := fmt.Errorf("%w: %w", someFailedErr, errors.Join(errs...))
+               fmt.Println(err)
+       }
+
+       r.buff.Reset()
+       result := combine.Combine(r.template, feeds...)
+
+       slices.SortFunc(result.Channel.Items, compareItems)
+
+       for i := range result.Channel.Items {
+               clearExtra(&result.Channel.Items[i])
+       }
+       return xml.NewEncoder(&r.buff).Encode(result)
+}
+
+func compareItems(a, b rsstypes.Item) int {
+       var aTime, bTime time.Time
+       var err error
+       defaultTime := time.Now().Add(-time.Hour * 24)
+       aPub := a.PubDate
+       bPub := b.PubDate
+
+       aTime, err = time.Parse(time.RFC1123Z, aPub)
+       if err != nil {
+               aTime = defaultTime
+       }
+
+       bTime, err = time.Parse(time.RFC1123Z, bPub)
+       if err != nil {
+               bTime = defaultTime
+       }
+
+       // negative to put newest first
+       return -aTime.Compare(bTime)
+}
+
+func clearExtra(item *rsstypes.Item) {
+       item.PubDate = ""
+}
diff --git a/rsscollector/collector_test.go b/rsscollector/collector_test.go
new file mode 100644 (file)
index 0000000..418d771
--- /dev/null
@@ -0,0 +1,28 @@
+package rsscollector
+
+import (
+       "bytes"
+       "io"
+       "testing"
+)
+
+var _ io.ReadSeeker = (*RssCollector)(nil)
+var _ io.WriterTo = (*RssCollector)(nil)
+
+func TestBufferLenZeroAfterFullWrite(t *testing.T) {
+       bufferFrom := bytes.NewBufferString("foobarspam")
+       var bufferTo bytes.Buffer
+       fullLength := bufferFrom.Len()
+
+       bytes, err := io.Copy(&bufferTo, bufferFrom)
+       if err != nil {
+               t.Fatalf("expected to copy without %v error but got %v", nil, err)
+       }
+       if bytes != int64(fullLength) {
+               t.Errorf("expected to write the full %v length of buffer but only wrote %v", fullLength, bytes)
+       }
+
+       if bufferTo.Len() != fullLength {
+               t.Errorf("expected receiving buffer to have %v unread bytes but got %v", fullLength, bufferTo.Len())
+       }
+}
diff --git a/rsstypes/rss.go b/rsstypes/rss.go
new file mode 100644 (file)
index 0000000..5f70ddc
--- /dev/null
@@ -0,0 +1,29 @@
+package rsstypes
+
+import (
+       "encoding/xml"
+)
+
+type Rss struct {
+       XMLName xml.Name `xml:"rss"`
+       Version string   `xml:"version,attr"`
+       Channel Channel  `xml:"channel"`
+       Source  string   `xml:"-"`
+}
+
+type Channel struct {
+       XMLName xml.Name `xml:"channel"`
+       Title   string   `xml:"title"`
+       Items   []Item   `xml:"item"`
+}
+
+type Item struct {
+       Content string `xml:",innerxml"`
+       Source  Source `xml:"source,omitempty"`
+       PubDate string `xml:"pubDate,omitempty"`
+}
+
+type Source struct {
+       Title string `xml:",innerxml"`
+       Url   string `xml:"url,attr"`
+}
diff --git a/rsstypes/source_test.go b/rsstypes/source_test.go
new file mode 100644 (file)
index 0000000..a3dfd82
--- /dev/null
@@ -0,0 +1,24 @@
+package rsstypes
+
+import (
+       "bytes"
+       "encoding/xml"
+       "fmt"
+       "testing"
+)
+
+func TestSource(t *testing.T) {
+       expectedSource := Source{"perdu", "perdu.com"}
+       xmlString := fmt.Sprintf(`<source url="%s">%s</source>`, expectedSource.Url, expectedSource.Title)
+       var source Source
+       err := xml.NewDecoder(bytes.NewBufferString(xmlString)).Decode(&source)
+       if err != nil {
+               t.Errorf("expected %v error, but got %v", nil, err)
+       }
+       if source.Title != expectedSource.Title {
+               t.Errorf("expected %v title but got %v", expectedSource.Title, source.Title)
+       }
+       if source.Url != expectedSource.Url {
+               t.Errorf("expected %v url but got %v", expectedSource.Url, source.Url)
+       }
+}